Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 488b540d

History | View | Annotate | Download (153.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46
class LogicalUnit(object):
47
  """Logical Unit base class.
48

49
  Subclasses must follow these rules:
50
    - implement CheckPrereq which also fills in the opcode instance
51
      with all the fields (even if as None)
52
    - implement Exec
53
    - implement BuildHooksEnv
54
    - redefine HPATH and HTYPE
55
    - optionally redefine their run requirements (REQ_CLUSTER,
56
      REQ_MASTER); note that all commands require root permissions
57

58
  """
59
  HPATH = None
60
  HTYPE = None
61
  _OP_REQP = []
62
  REQ_CLUSTER = True
63
  REQ_MASTER = True
64

    
65
  def __init__(self, processor, op, cfg, sstore):
66
    """Constructor for LogicalUnit.
67

68
    This needs to be overriden in derived classes in order to check op
69
    validity.
70

71
    """
72
    self.proc = processor
73
    self.op = op
74
    self.cfg = cfg
75
    self.sstore = sstore
76
    for attr_name in self._OP_REQP:
77
      attr_val = getattr(op, attr_name, None)
78
      if attr_val is None:
79
        raise errors.OpPrereqError("Required parameter '%s' missing" %
80
                                   attr_name)
81
    if self.REQ_CLUSTER:
82
      if not cfg.IsCluster():
83
        raise errors.OpPrereqError("Cluster not initialized yet,"
84
                                   " use 'gnt-cluster init' first.")
85
      if self.REQ_MASTER:
86
        master = sstore.GetMasterNode()
87
        if master != utils.HostInfo().name:
88
          raise errors.OpPrereqError("Commands must be run on the master"
89
                                     " node %s" % master)
90

    
91
  def CheckPrereq(self):
92
    """Check prerequisites for this LU.
93

94
    This method should check that the prerequisites for the execution
95
    of this LU are fulfilled. It can do internode communication, but
96
    it should be idempotent - no cluster or system changes are
97
    allowed.
98

99
    The method should raise errors.OpPrereqError in case something is
100
    not fulfilled. Its return value is ignored.
101

102
    This method should also update all the parameters of the opcode to
103
    their canonical form; e.g. a short node name must be fully
104
    expanded after this method has successfully completed (so that
105
    hooks, logging, etc. work correctly).
106

107
    """
108
    raise NotImplementedError
109

    
110
  def Exec(self, feedback_fn):
111
    """Execute the LU.
112

113
    This method should implement the actual work. It should raise
114
    errors.OpExecError for failures that are somewhat dealt with in
115
    code, or expected.
116

117
    """
118
    raise NotImplementedError
119

    
120
  def BuildHooksEnv(self):
121
    """Build hooks environment for this LU.
122

123
    This method should return a three-node tuple consisting of: a dict
124
    containing the environment that will be used for running the
125
    specific hook for this LU, a list of node names on which the hook
126
    should run before the execution, and a list of node names on which
127
    the hook should run after the execution.
128

129
    The keys of the dict must not have 'GANETI_' prefixed as this will
130
    be handled in the hooks runner. Also note additional keys will be
131
    added by the hooks runner. If the LU doesn't define any
132
    environment, an empty dict (and not None) should be returned.
133

134
    As for the node lists, the master should not be included in the
135
    them, as it will be added by the hooks runner in case this LU
136
    requires a cluster to run on (otherwise we don't have a node
137
    list). No nodes should be returned as an empty list (and not
138
    None).
139

140
    Note that if the HPATH for a LU class is None, this function will
141
    not be called.
142

143
    """
144
    raise NotImplementedError
145

    
146

    
147
class NoHooksLU(LogicalUnit):
148
  """Simple LU which runs no hooks.
149

150
  This LU is intended as a parent for other LogicalUnits which will
151
  run no hooks, in order to reduce duplicate code.
152

153
  """
154
  HPATH = None
155
  HTYPE = None
156

    
157
  def BuildHooksEnv(self):
158
    """Build hooks env.
159

160
    This is a no-op, since we don't run hooks.
161

162
    """
163
    return {}, [], []
164

    
165

    
166
def _AddHostToEtcHosts(hostname):
167
  """Wrapper around utils.SetEtcHostsEntry.
168

169
  """
170
  hi = utils.HostInfo(name=hostname)
171
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172

    
173

    
174
def _RemoveHostFromEtcHosts(hostname):
175
  """Wrapper around utils.RemoveEtcHostsEntry.
176

177
  """
178
  hi = utils.HostInfo(name=hostname)
179
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181

    
182

    
183
def _GetWantedNodes(lu, nodes):
184
  """Returns list of checked and expanded node names.
185

186
  Args:
187
    nodes: List of nodes (strings) or None for all
188

189
  """
190
  if not isinstance(nodes, list):
191
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
192

    
193
  if nodes:
194
    wanted = []
195

    
196
    for name in nodes:
197
      node = lu.cfg.ExpandNodeName(name)
198
      if node is None:
199
        raise errors.OpPrereqError("No such node name '%s'" % name)
200
      wanted.append(node)
201

    
202
  else:
203
    wanted = lu.cfg.GetNodeList()
204
  return utils.NiceSort(wanted)
205

    
206

    
207
def _GetWantedInstances(lu, instances):
208
  """Returns list of checked and expanded instance names.
209

210
  Args:
211
    instances: List of instances (strings) or None for all
212

213
  """
214
  if not isinstance(instances, list):
215
    raise errors.OpPrereqError("Invalid argument type 'instances'")
216

    
217
  if instances:
218
    wanted = []
219

    
220
    for name in instances:
221
      instance = lu.cfg.ExpandInstanceName(name)
222
      if instance is None:
223
        raise errors.OpPrereqError("No such instance name '%s'" % name)
224
      wanted.append(instance)
225

    
226
  else:
227
    wanted = lu.cfg.GetInstanceList()
228
  return utils.NiceSort(wanted)
229

    
230

    
231
def _CheckOutputFields(static, dynamic, selected):
232
  """Checks whether all selected fields are valid.
233

234
  Args:
235
    static: Static fields
236
    dynamic: Dynamic fields
237

238
  """
239
  static_fields = frozenset(static)
240
  dynamic_fields = frozenset(dynamic)
241

    
242
  all_fields = static_fields | dynamic_fields
243

    
244
  if not all_fields.issuperset(selected):
245
    raise errors.OpPrereqError("Unknown output fields selected: %s"
246
                               % ",".join(frozenset(selected).
247
                                          difference(all_fields)))
248

    
249

    
250
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251
                          memory, vcpus, nics):
252
  """Builds instance related env variables for hooks from single variables.
253

254
  Args:
255
    secondary_nodes: List of secondary nodes as strings
256
  """
257
  env = {
258
    "OP_TARGET": name,
259
    "INSTANCE_NAME": name,
260
    "INSTANCE_PRIMARY": primary_node,
261
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262
    "INSTANCE_OS_TYPE": os_type,
263
    "INSTANCE_STATUS": status,
264
    "INSTANCE_MEMORY": memory,
265
    "INSTANCE_VCPUS": vcpus,
266
  }
267

    
268
  if nics:
269
    nic_count = len(nics)
270
    for idx, (ip, bridge, mac) in enumerate(nics):
271
      if ip is None:
272
        ip = ""
273
      env["INSTANCE_NIC%d_IP" % idx] = ip
274
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
276
  else:
277
    nic_count = 0
278

    
279
  env["INSTANCE_NIC_COUNT"] = nic_count
280

    
281
  return env
282

    
283

    
284
def _BuildInstanceHookEnvByObject(instance, override=None):
285
  """Builds instance related env variables for hooks from an object.
286

287
  Args:
288
    instance: objects.Instance object of instance
289
    override: dict of values to override
290
  """
291
  args = {
292
    'name': instance.name,
293
    'primary_node': instance.primary_node,
294
    'secondary_nodes': instance.secondary_nodes,
295
    'os_type': instance.os,
296
    'status': instance.os,
297
    'memory': instance.memory,
298
    'vcpus': instance.vcpus,
299
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
300
  }
301
  if override:
302
    args.update(override)
303
  return _BuildInstanceHookEnv(**args)
304

    
305

    
306
def _UpdateKnownHosts(fullnode, ip, pubkey):
307
  """Ensure a node has a correct known_hosts entry.
308

309
  Args:
310
    fullnode - Fully qualified domain name of host. (str)
311
    ip       - IPv4 address of host (str)
312
    pubkey   - the public key of the cluster
313

314
  """
315
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317
  else:
318
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
319

    
320
  inthere = False
321

    
322
  save_lines = []
323
  add_lines = []
324
  removed = False
325

    
326
  for rawline in f:
327
    logger.Debug('read %s' % (repr(rawline),))
328

    
329
    parts = rawline.rstrip('\r\n').split()
330

    
331
    # Ignore unwanted lines
332
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333
      fields = parts[0].split(',')
334
      key = parts[2]
335

    
336
      haveall = True
337
      havesome = False
338
      for spec in [ ip, fullnode ]:
339
        if spec not in fields:
340
          haveall = False
341
        if spec in fields:
342
          havesome = True
343

    
344
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345
      if haveall and key == pubkey:
346
        inthere = True
347
        save_lines.append(rawline)
348
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
349
        continue
350

    
351
      if havesome and (not haveall or key != pubkey):
352
        removed = True
353
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
354
        continue
355

    
356
    save_lines.append(rawline)
357

    
358
  if not inthere:
359
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
361

    
362
  if removed:
363
    save_lines = save_lines + add_lines
364

    
365
    # Write a new file and replace old.
366
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367
                                   constants.DATA_DIR)
368
    newfile = os.fdopen(fd, 'w')
369
    try:
370
      newfile.write(''.join(save_lines))
371
    finally:
372
      newfile.close()
373
    logger.Debug("Wrote new known_hosts.")
374
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
375

    
376
  elif add_lines:
377
    # Simply appending a new line will do the trick.
378
    f.seek(0, 2)
379
    for add in add_lines:
380
      f.write(add)
381

    
382
  f.close()
383

    
384

    
385
def _HasValidVG(vglist, vgname):
386
  """Checks if the volume group list is valid.
387

388
  A non-None return value means there's an error, and the return value
389
  is the error message.
390

391
  """
392
  vgsize = vglist.get(vgname, None)
393
  if vgsize is None:
394
    return "volume group '%s' missing" % vgname
395
  elif vgsize < 20480:
396
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
397
            (vgname, vgsize))
398
  return None
399

    
400

    
401
def _InitSSHSetup(node):
402
  """Setup the SSH configuration for the cluster.
403

404

405
  This generates a dsa keypair for root, adds the pub key to the
406
  permitted hosts and adds the hostkey to its own known hosts.
407

408
  Args:
409
    node: the name of this host as a fqdn
410

411
  """
412
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413

    
414
  for name in priv_key, pub_key:
415
    if os.path.exists(name):
416
      utils.CreateBackup(name)
417
    utils.RemoveFile(name)
418

    
419
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
420
                         "-f", priv_key,
421
                         "-q", "-N", ""])
422
  if result.failed:
423
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
424
                             result.output)
425

    
426
  f = open(pub_key, 'r')
427
  try:
428
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
429
  finally:
430
    f.close()
431

    
432

    
433
def _InitGanetiServerSetup(ss):
434
  """Setup the necessary configuration for the initial node daemon.
435

436
  This creates the nodepass file containing the shared password for
437
  the cluster and also generates the SSL certificate.
438

439
  """
440
  # Create pseudo random password
441
  randpass = sha.new(os.urandom(64)).hexdigest()
442
  # and write it into sstore
443
  ss.SetKey(ss.SS_NODED_PASS, randpass)
444

    
445
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446
                         "-days", str(365*5), "-nodes", "-x509",
447
                         "-keyout", constants.SSL_CERT_FILE,
448
                         "-out", constants.SSL_CERT_FILE, "-batch"])
449
  if result.failed:
450
    raise errors.OpExecError("could not generate server ssl cert, command"
451
                             " %s had exitcode %s and error message %s" %
452
                             (result.cmd, result.exit_code, result.output))
453

    
454
  os.chmod(constants.SSL_CERT_FILE, 0400)
455

    
456
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
457

    
458
  if result.failed:
459
    raise errors.OpExecError("Could not start the node daemon, command %s"
460
                             " had exitcode %s and error %s" %
461
                             (result.cmd, result.exit_code, result.output))
462

    
463

    
464
def _CheckInstanceBridgesExist(instance):
465
  """Check that the brigdes needed by an instance exist.
466

467
  """
468
  # check bridges existance
469
  brlist = [nic.bridge for nic in instance.nics]
470
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
471
    raise errors.OpPrereqError("one or more target bridges %s does not"
472
                               " exist on destination node '%s'" %
473
                               (brlist, instance.primary_node))
474

    
475

    
476
class LUInitCluster(LogicalUnit):
477
  """Initialise the cluster.
478

479
  """
480
  HPATH = "cluster-init"
481
  HTYPE = constants.HTYPE_CLUSTER
482
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483
              "def_bridge", "master_netdev"]
484
  REQ_CLUSTER = False
485

    
486
  def BuildHooksEnv(self):
487
    """Build hooks env.
488

489
    Notes: Since we don't require a cluster, we must manually add
490
    ourselves in the post-run node list.
491

492
    """
493
    env = {"OP_TARGET": self.op.cluster_name}
494
    return env, [], [self.hostname.name]
495

    
496
  def CheckPrereq(self):
497
    """Verify that the passed name is a valid one.
498

499
    """
500
    if config.ConfigWriter.IsCluster():
501
      raise errors.OpPrereqError("Cluster is already initialised")
502

    
503
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
505
        raise errors.OpPrereqError("Please prepare the cluster VNC"
506
                                   "password file %s" %
507
                                   constants.VNC_PASSWORD_FILE)
508

    
509
    self.hostname = hostname = utils.HostInfo()
510

    
511
    if hostname.ip.startswith("127."):
512
      raise errors.OpPrereqError("This host's IP resolves to the private"
513
                                 " range (%s). Please fix DNS or %s." %
514
                                 (hostname.ip, constants.ETC_HOSTS))
515

    
516
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517

    
518
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519
                         constants.DEFAULT_NODED_PORT):
520
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521
                                 " to %s,\nbut this ip address does not"
522
                                 " belong to this host."
523
                                 " Aborting." % hostname.ip)
524

    
525
    secondary_ip = getattr(self.op, "secondary_ip", None)
526
    if secondary_ip and not utils.IsValidIP(secondary_ip):
527
      raise errors.OpPrereqError("Invalid secondary ip given")
528
    if (secondary_ip and
529
        secondary_ip != hostname.ip and
530
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531
                           constants.DEFAULT_NODED_PORT))):
532
      raise errors.OpPrereqError("You gave %s as secondary IP,"
533
                                 " but it does not belong to this host." %
534
                                 secondary_ip)
535
    self.secondary_ip = secondary_ip
536

    
537
    # checks presence of the volume group given
538
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
539

    
540
    if vgstatus:
541
      raise errors.OpPrereqError("Error: %s" % vgstatus)
542

    
543
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544
                    self.op.mac_prefix):
545
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
546
                                 self.op.mac_prefix)
547

    
548
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
549
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550
                                 self.op.hypervisor_type)
551

    
552
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553
    if result.failed:
554
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555
                                 (self.op.master_netdev,
556
                                  result.output.strip()))
557

    
558
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
561
                                 " executable." % constants.NODE_INITD_SCRIPT)
562

    
563
  def Exec(self, feedback_fn):
564
    """Initialize the cluster.
565

566
    """
567
    clustername = self.clustername
568
    hostname = self.hostname
569

    
570
    # set up the simple store
571
    self.sstore = ss = ssconf.SimpleStore()
572
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577

    
578
    # set up the inter-node password and certificate
579
    _InitGanetiServerSetup(ss)
580

    
581
    # start the master ip
582
    rpc.call_node_start_master(hostname.name)
583

    
584
    # set up ssh config and /etc/hosts
585
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
586
    try:
587
      sshline = f.read()
588
    finally:
589
      f.close()
590
    sshkey = sshline.split(" ")[1]
591

    
592
    _AddHostToEtcHosts(hostname.name)
593

    
594
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595

    
596
    _InitSSHSetup(hostname.name)
597

    
598
    # init of cluster config file
599
    self.cfg = cfgw = config.ConfigWriter()
600
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601
                    sshkey, self.op.mac_prefix,
602
                    self.op.vg_name, self.op.def_bridge)
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signalled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.sstore.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.sstore.GetMasterNode()
635
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636
    utils.CreateBackup(priv_key)
637
    utils.CreateBackup(pub_key)
638
    rpc.call_node_leave_cluster(master)
639

    
640

    
641
class LUVerifyCluster(NoHooksLU):
642
  """Verifies the cluster status.
643

644
  """
645
  _OP_REQP = []
646

    
647
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648
                  remote_version, feedback_fn):
649
    """Run multiple tests against a node.
650

651
    Test list:
652
      - compares ganeti version
653
      - checks vg existance and size > 20G
654
      - checks config file checksum
655
      - checks ssh to other nodes
656

657
    Args:
658
      node: name of the node to check
659
      file_list: required list of files
660
      local_cksum: dictionary of local files and their checksums
661

662
    """
663
    # compares ganeti version
664
    local_version = constants.PROTOCOL_VERSION
665
    if not remote_version:
666
      feedback_fn(" - ERROR: connection to %s failed" % (node))
667
      return True
668

    
669
    if local_version != remote_version:
670
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
671
                      (local_version, node, remote_version))
672
      return True
673

    
674
    # checks vg existance and size > 20G
675

    
676
    bad = False
677
    if not vglist:
678
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
679
                      (node,))
680
      bad = True
681
    else:
682
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683
      if vgstatus:
684
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685
        bad = True
686

    
687
    # checks config file checksum
688
    # checks ssh to any
689

    
690
    if 'filelist' not in node_result:
691
      bad = True
692
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
693
    else:
694
      remote_cksum = node_result['filelist']
695
      for file_name in file_list:
696
        if file_name not in remote_cksum:
697
          bad = True
698
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
699
        elif remote_cksum[file_name] != local_cksum[file_name]:
700
          bad = True
701
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
702

    
703
    if 'nodelist' not in node_result:
704
      bad = True
705
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
706
    else:
707
      if node_result['nodelist']:
708
        bad = True
709
        for node in node_result['nodelist']:
710
          feedback_fn("  - ERROR: communication with node '%s': %s" %
711
                          (node, node_result['nodelist'][node]))
712
    hyp_result = node_result.get('hypervisor', None)
713
    if hyp_result is not None:
714
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
715
    return bad
716

    
717
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718
    """Verify an instance.
719

720
    This function checks to see if the required block devices are
721
    available on the instance's node.
722

723
    """
724
    bad = False
725

    
726
    instancelist = self.cfg.GetInstanceList()
727
    if not instance in instancelist:
728
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
729
                      (instance, instancelist))
730
      bad = True
731

    
732
    instanceconfig = self.cfg.GetInstanceInfo(instance)
733
    node_current = instanceconfig.primary_node
734

    
735
    node_vol_should = {}
736
    instanceconfig.MapLVsByNode(node_vol_should)
737

    
738
    for node in node_vol_should:
739
      for volume in node_vol_should[node]:
740
        if node not in node_vol_is or volume not in node_vol_is[node]:
741
          feedback_fn("  - ERROR: volume %s missing on node %s" %
742
                          (volume, node))
743
          bad = True
744

    
745
    if not instanceconfig.status == 'down':
746
      if not instance in node_instance[node_current]:
747
        feedback_fn("  - ERROR: instance %s not running on node %s" %
748
                        (instance, node_current))
749
        bad = True
750

    
751
    for node in node_instance:
752
      if (not node == node_current):
753
        if instance in node_instance[node]:
754
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
755
                          (instance, node))
756
          bad = True
757

    
758
    return bad
759

    
760
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761
    """Verify if there are any unknown volumes in the cluster.
762

763
    The .os, .swap and backup volumes are ignored. All other volumes are
764
    reported as unknown.
765

766
    """
767
    bad = False
768

    
769
    for node in node_vol_is:
770
      for volume in node_vol_is[node]:
771
        if node not in node_vol_should or volume not in node_vol_should[node]:
772
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
773
                      (volume, node))
774
          bad = True
775
    return bad
776

    
777
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778
    """Verify the list of running instances.
779

780
    This checks what instances are running but unknown to the cluster.
781

782
    """
783
    bad = False
784
    for node in node_instance:
785
      for runninginstance in node_instance[node]:
786
        if runninginstance not in instancelist:
787
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
788
                          (runninginstance, node))
789
          bad = True
790
    return bad
791

    
792
  def CheckPrereq(self):
793
    """Check prerequisites.
794

795
    This has no prerequisites.
796

797
    """
798
    pass
799

    
800
  def Exec(self, feedback_fn):
801
    """Verify integrity of cluster, performing various test on nodes.
802

803
    """
804
    bad = False
805
    feedback_fn("* Verifying global settings")
806
    for msg in self.cfg.VerifyConfig():
807
      feedback_fn("  - ERROR: %s" % msg)
808

    
809
    vg_name = self.cfg.GetVGName()
810
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
811
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
812
    node_volume = {}
813
    node_instance = {}
814

    
815
    # FIXME: verify OS list
816
    # do local checksums
817
    file_names = list(self.sstore.GetFileList())
818
    file_names.append(constants.SSL_CERT_FILE)
819
    file_names.append(constants.CLUSTER_CONF_FILE)
820
    local_checksums = utils.FingerprintFiles(file_names)
821

    
822
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824
    all_instanceinfo = rpc.call_instance_list(nodelist)
825
    all_vglist = rpc.call_vg_list(nodelist)
826
    node_verify_param = {
827
      'filelist': file_names,
828
      'nodelist': nodelist,
829
      'hypervisor': None,
830
      }
831
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832
    all_rversion = rpc.call_version(nodelist)
833

    
834
    for node in nodelist:
835
      feedback_fn("* Verifying node %s" % node)
836
      result = self._VerifyNode(node, file_names, local_checksums,
837
                                all_vglist[node], all_nvinfo[node],
838
                                all_rversion[node], feedback_fn)
839
      bad = bad or result
840

    
841
      # node_volume
842
      volumeinfo = all_volumeinfo[node]
843

    
844
      if isinstance(volumeinfo, basestring):
845
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846
                    (node, volumeinfo[-400:].encode('string_escape')))
847
        bad = True
848
        node_volume[node] = {}
849
      elif not isinstance(volumeinfo, dict):
850
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
851
        bad = True
852
        continue
853
      else:
854
        node_volume[node] = volumeinfo
855

    
856
      # node_instance
857
      nodeinstance = all_instanceinfo[node]
858
      if type(nodeinstance) != list:
859
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
860
        bad = True
861
        continue
862

    
863
      node_instance[node] = nodeinstance
864

    
865
    node_vol_should = {}
866

    
867
    for instance in instancelist:
868
      feedback_fn("* Verifying instance %s" % instance)
869
      result =  self._VerifyInstance(instance, node_volume, node_instance,
870
                                     feedback_fn)
871
      bad = bad or result
872

    
873
      inst_config = self.cfg.GetInstanceInfo(instance)
874

    
875
      inst_config.MapLVsByNode(node_vol_should)
876

    
877
    feedback_fn("* Verifying orphan volumes")
878
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
879
                                       feedback_fn)
880
    bad = bad or result
881

    
882
    feedback_fn("* Verifying remaining instances")
883
    result = self._VerifyOrphanInstances(instancelist, node_instance,
884
                                         feedback_fn)
885
    bad = bad or result
886

    
887
    return int(bad)
888

    
889

    
890
class LUVerifyDisks(NoHooksLU):
891
  """Verifies the cluster disks status.
892

893
  """
894
  _OP_REQP = []
895

    
896
  def CheckPrereq(self):
897
    """Check prerequisites.
898

899
    This has no prerequisites.
900

901
    """
902
    pass
903

    
904
  def Exec(self, feedback_fn):
905
    """Verify integrity of cluster disks.
906

907
    """
908
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909

    
910
    vg_name = self.cfg.GetVGName()
911
    nodes = utils.NiceSort(self.cfg.GetNodeList())
912
    instances = [self.cfg.GetInstanceInfo(name)
913
                 for name in self.cfg.GetInstanceList()]
914

    
915
    nv_dict = {}
916
    for inst in instances:
917
      inst_lvs = {}
918
      if (inst.status != "up" or
919
          inst.disk_template not in constants.DTS_NET_MIRROR):
920
        continue
921
      inst.MapLVsByNode(inst_lvs)
922
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923
      for node, vol_list in inst_lvs.iteritems():
924
        for vol in vol_list:
925
          nv_dict[(node, vol)] = inst
926

    
927
    if not nv_dict:
928
      return result
929

    
930
    node_lvs = rpc.call_volume_list(nodes, vg_name)
931

    
932
    to_act = set()
933
    for node in nodes:
934
      # node_volume
935
      lvs = node_lvs[node]
936

    
937
      if isinstance(lvs, basestring):
938
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939
        res_nlvm[node] = lvs
940
      elif not isinstance(lvs, dict):
941
        logger.Info("connection to node %s failed or invalid data returned" %
942
                    (node,))
943
        res_nodes.append(node)
944
        continue
945

    
946
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947
        inst = nv_dict.pop((node, lv_name), None)
948
        if (not lv_online and inst is not None
949
            and inst.name not in res_instances):
950
          res_instances.append(inst.name)
951

    
952
    # any leftover items in nv_dict are missing LVs, let's arrange the
953
    # data better
954
    for key, inst in nv_dict.iteritems():
955
      if inst.name not in res_missing:
956
        res_missing[inst.name] = []
957
      res_missing[inst.name].append(key)
958

    
959
    return result
960

    
961

    
962
class LURenameCluster(LogicalUnit):
963
  """Rename the cluster.
964

965
  """
966
  HPATH = "cluster-rename"
967
  HTYPE = constants.HTYPE_CLUSTER
968
  _OP_REQP = ["name"]
969

    
970
  def BuildHooksEnv(self):
971
    """Build hooks env.
972

973
    """
974
    env = {
975
      "OP_TARGET": self.sstore.GetClusterName(),
976
      "NEW_NAME": self.op.name,
977
      }
978
    mn = self.sstore.GetMasterNode()
979
    return env, [mn], [mn]
980

    
981
  def CheckPrereq(self):
982
    """Verify that the passed name is a valid one.
983

984
    """
985
    hostname = utils.HostInfo(self.op.name)
986

    
987
    new_name = hostname.name
988
    self.ip = new_ip = hostname.ip
989
    old_name = self.sstore.GetClusterName()
990
    old_ip = self.sstore.GetMasterIP()
991
    if new_name == old_name and new_ip == old_ip:
992
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
993
                                 " cluster has changed")
994
    if new_ip != old_ip:
995
      result = utils.RunCmd(["fping", "-q", new_ip])
996
      if not result.failed:
997
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
998
                                   " reachable on the network. Aborting." %
999
                                   new_ip)
1000

    
1001
    self.op.name = new_name
1002

    
1003
  def Exec(self, feedback_fn):
1004
    """Rename the cluster.
1005

1006
    """
1007
    clustername = self.op.name
1008
    ip = self.ip
1009
    ss = self.sstore
1010

    
1011
    # shutdown the master IP
1012
    master = ss.GetMasterNode()
1013
    if not rpc.call_node_stop_master(master):
1014
      raise errors.OpExecError("Could not disable the master role")
1015

    
1016
    try:
1017
      # modify the sstore
1018
      ss.SetKey(ss.SS_MASTER_IP, ip)
1019
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020

    
1021
      # Distribute updated ss config to all nodes
1022
      myself = self.cfg.GetNodeInfo(master)
1023
      dist_nodes = self.cfg.GetNodeList()
1024
      if myself.name in dist_nodes:
1025
        dist_nodes.remove(myself.name)
1026

    
1027
      logger.Debug("Copying updated ssconf data to all nodes")
1028
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029
        fname = ss.KeyToFilename(keyname)
1030
        result = rpc.call_upload_file(dist_nodes, fname)
1031
        for to_node in dist_nodes:
1032
          if not result[to_node]:
1033
            logger.Error("copy of file %s to node %s failed" %
1034
                         (fname, to_node))
1035
    finally:
1036
      if not rpc.call_node_start_master(master):
1037
        logger.Error("Could not re-enable the master role on the master,"
1038
                     " please restart manually.")
1039

    
1040

    
1041
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042
  """Sleep and poll for an instance's disk to sync.
1043

1044
  """
1045
  if not instance.disks:
1046
    return True
1047

    
1048
  if not oneshot:
1049
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050

    
1051
  node = instance.primary_node
1052

    
1053
  for dev in instance.disks:
1054
    cfgw.SetDiskID(dev, node)
1055

    
1056
  retries = 0
1057
  while True:
1058
    max_time = 0
1059
    done = True
1060
    cumul_degraded = False
1061
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062
    if not rstats:
1063
      proc.LogWarning("Can't get any data from node %s" % node)
1064
      retries += 1
1065
      if retries >= 10:
1066
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1067
                                 " aborting." % node)
1068
      time.sleep(6)
1069
      continue
1070
    retries = 0
1071
    for i in range(len(rstats)):
1072
      mstat = rstats[i]
1073
      if mstat is None:
1074
        proc.LogWarning("Can't compute data for node %s/%s" %
1075
                        (node, instance.disks[i].iv_name))
1076
        continue
1077
      # we ignore the ldisk parameter
1078
      perc_done, est_time, is_degraded, _ = mstat
1079
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080
      if perc_done is not None:
1081
        done = False
1082
        if est_time is not None:
1083
          rem_time = "%d estimated seconds remaining" % est_time
1084
          max_time = est_time
1085
        else:
1086
          rem_time = "no time estimate"
1087
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088
                     (instance.disks[i].iv_name, perc_done, rem_time))
1089
    if done or oneshot:
1090
      break
1091

    
1092
    if unlock:
1093
      utils.Unlock('cmd')
1094
    try:
1095
      time.sleep(min(60, max_time))
1096
    finally:
1097
      if unlock:
1098
        utils.Lock('cmd')
1099

    
1100
  if done:
1101
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102
  return not cumul_degraded
1103

    
1104

    
1105
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106
  """Check that mirrors are not degraded.
1107

1108
  The ldisk parameter, if True, will change the test from the
1109
  is_degraded attribute (which represents overall non-ok status for
1110
  the device(s)) to the ldisk (representing the local storage status).
1111

1112
  """
1113
  cfgw.SetDiskID(dev, node)
1114
  if ldisk:
1115
    idx = 6
1116
  else:
1117
    idx = 5
1118

    
1119
  result = True
1120
  if on_primary or dev.AssembleOnSecondary():
1121
    rstats = rpc.call_blockdev_find(node, dev)
1122
    if not rstats:
1123
      logger.ToStderr("Can't get any data from node %s" % node)
1124
      result = False
1125
    else:
1126
      result = result and (not rstats[idx])
1127
  if dev.children:
1128
    for child in dev.children:
1129
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1130

    
1131
  return result
1132

    
1133

    
1134
class LUDiagnoseOS(NoHooksLU):
1135
  """Logical unit for OS diagnose/query.
1136

1137
  """
1138
  _OP_REQP = []
1139

    
1140
  def CheckPrereq(self):
1141
    """Check prerequisites.
1142

1143
    This always succeeds, since this is a pure query LU.
1144

1145
    """
1146
    return
1147

    
1148
  def Exec(self, feedback_fn):
1149
    """Compute the list of OSes.
1150

1151
    """
1152
    node_list = self.cfg.GetNodeList()
1153
    node_data = rpc.call_os_diagnose(node_list)
1154
    if node_data == False:
1155
      raise errors.OpExecError("Can't gather the list of OSes")
1156
    return node_data
1157

    
1158

    
1159
class LURemoveNode(LogicalUnit):
1160
  """Logical unit for removing a node.
1161

1162
  """
1163
  HPATH = "node-remove"
1164
  HTYPE = constants.HTYPE_NODE
1165
  _OP_REQP = ["node_name"]
1166

    
1167
  def BuildHooksEnv(self):
1168
    """Build hooks env.
1169

1170
    This doesn't run on the target node in the pre phase as a failed
1171
    node would not allows itself to run.
1172

1173
    """
1174
    env = {
1175
      "OP_TARGET": self.op.node_name,
1176
      "NODE_NAME": self.op.node_name,
1177
      }
1178
    all_nodes = self.cfg.GetNodeList()
1179
    all_nodes.remove(self.op.node_name)
1180
    return env, all_nodes, all_nodes
1181

    
1182
  def CheckPrereq(self):
1183
    """Check prerequisites.
1184

1185
    This checks:
1186
     - the node exists in the configuration
1187
     - it does not have primary or secondary instances
1188
     - it's not the master
1189

1190
    Any errors are signalled by raising errors.OpPrereqError.
1191

1192
    """
1193
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194
    if node is None:
1195
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196

    
1197
    instance_list = self.cfg.GetInstanceList()
1198

    
1199
    masternode = self.sstore.GetMasterNode()
1200
    if node.name == masternode:
1201
      raise errors.OpPrereqError("Node is the master node,"
1202
                                 " you need to failover first.")
1203

    
1204
    for instance_name in instance_list:
1205
      instance = self.cfg.GetInstanceInfo(instance_name)
1206
      if node.name == instance.primary_node:
1207
        raise errors.OpPrereqError("Instance %s still running on the node,"
1208
                                   " please remove first." % instance_name)
1209
      if node.name in instance.secondary_nodes:
1210
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211
                                   " please remove first." % instance_name)
1212
    self.op.node_name = node.name
1213
    self.node = node
1214

    
1215
  def Exec(self, feedback_fn):
1216
    """Removes the node from the cluster.
1217

1218
    """
1219
    node = self.node
1220
    logger.Info("stopping the node daemon and removing configs from node %s" %
1221
                node.name)
1222

    
1223
    rpc.call_node_leave_cluster(node.name)
1224

    
1225
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226

    
1227
    logger.Info("Removing node %s from config" % node.name)
1228

    
1229
    self.cfg.RemoveNode(node.name)
1230

    
1231
    _RemoveHostFromEtcHosts(node.name)
1232

    
1233

    
1234
class LUQueryNodes(NoHooksLU):
1235
  """Logical unit for querying nodes.
1236

1237
  """
1238
  _OP_REQP = ["output_fields", "names"]
1239

    
1240
  def CheckPrereq(self):
1241
    """Check prerequisites.
1242

1243
    This checks that the fields required are valid output fields.
1244

1245
    """
1246
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1247
                                     "mtotal", "mnode", "mfree",
1248
                                     "bootid"])
1249

    
1250
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251
                               "pinst_list", "sinst_list",
1252
                               "pip", "sip"],
1253
                       dynamic=self.dynamic_fields,
1254
                       selected=self.op.output_fields)
1255

    
1256
    self.wanted = _GetWantedNodes(self, self.op.names)
1257

    
1258
  def Exec(self, feedback_fn):
1259
    """Computes the list of nodes and their attributes.
1260

1261
    """
1262
    nodenames = self.wanted
1263
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264

    
1265
    # begin data gathering
1266

    
1267
    if self.dynamic_fields.intersection(self.op.output_fields):
1268
      live_data = {}
1269
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270
      for name in nodenames:
1271
        nodeinfo = node_data.get(name, None)
1272
        if nodeinfo:
1273
          live_data[name] = {
1274
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279
            "bootid": nodeinfo['bootid'],
1280
            }
1281
        else:
1282
          live_data[name] = {}
1283
    else:
1284
      live_data = dict.fromkeys(nodenames, {})
1285

    
1286
    node_to_primary = dict([(name, set()) for name in nodenames])
1287
    node_to_secondary = dict([(name, set()) for name in nodenames])
1288

    
1289
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290
                             "sinst_cnt", "sinst_list"))
1291
    if inst_fields & frozenset(self.op.output_fields):
1292
      instancelist = self.cfg.GetInstanceList()
1293

    
1294
      for instance_name in instancelist:
1295
        inst = self.cfg.GetInstanceInfo(instance_name)
1296
        if inst.primary_node in node_to_primary:
1297
          node_to_primary[inst.primary_node].add(inst.name)
1298
        for secnode in inst.secondary_nodes:
1299
          if secnode in node_to_secondary:
1300
            node_to_secondary[secnode].add(inst.name)
1301

    
1302
    # end data gathering
1303

    
1304
    output = []
1305
    for node in nodelist:
1306
      node_output = []
1307
      for field in self.op.output_fields:
1308
        if field == "name":
1309
          val = node.name
1310
        elif field == "pinst_list":
1311
          val = list(node_to_primary[node.name])
1312
        elif field == "sinst_list":
1313
          val = list(node_to_secondary[node.name])
1314
        elif field == "pinst_cnt":
1315
          val = len(node_to_primary[node.name])
1316
        elif field == "sinst_cnt":
1317
          val = len(node_to_secondary[node.name])
1318
        elif field == "pip":
1319
          val = node.primary_ip
1320
        elif field == "sip":
1321
          val = node.secondary_ip
1322
        elif field in self.dynamic_fields:
1323
          val = live_data[node.name].get(field, None)
1324
        else:
1325
          raise errors.ParameterError(field)
1326
        node_output.append(val)
1327
      output.append(node_output)
1328

    
1329
    return output
1330

    
1331

    
1332
class LUQueryNodeVolumes(NoHooksLU):
1333
  """Logical unit for getting volumes on node(s).
1334

1335
  """
1336
  _OP_REQP = ["nodes", "output_fields"]
1337

    
1338
  def CheckPrereq(self):
1339
    """Check prerequisites.
1340

1341
    This checks that the fields required are valid output fields.
1342

1343
    """
1344
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1345

    
1346
    _CheckOutputFields(static=["node"],
1347
                       dynamic=["phys", "vg", "name", "size", "instance"],
1348
                       selected=self.op.output_fields)
1349

    
1350

    
1351
  def Exec(self, feedback_fn):
1352
    """Computes the list of nodes and their attributes.
1353

1354
    """
1355
    nodenames = self.nodes
1356
    volumes = rpc.call_node_volumes(nodenames)
1357

    
1358
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359
             in self.cfg.GetInstanceList()]
1360

    
1361
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1362

    
1363
    output = []
1364
    for node in nodenames:
1365
      if node not in volumes or not volumes[node]:
1366
        continue
1367

    
1368
      node_vols = volumes[node][:]
1369
      node_vols.sort(key=lambda vol: vol['dev'])
1370

    
1371
      for vol in node_vols:
1372
        node_output = []
1373
        for field in self.op.output_fields:
1374
          if field == "node":
1375
            val = node
1376
          elif field == "phys":
1377
            val = vol['dev']
1378
          elif field == "vg":
1379
            val = vol['vg']
1380
          elif field == "name":
1381
            val = vol['name']
1382
          elif field == "size":
1383
            val = int(float(vol['size']))
1384
          elif field == "instance":
1385
            for inst in ilist:
1386
              if node not in lv_by_node[inst]:
1387
                continue
1388
              if vol['name'] in lv_by_node[inst][node]:
1389
                val = inst.name
1390
                break
1391
            else:
1392
              val = '-'
1393
          else:
1394
            raise errors.ParameterError(field)
1395
          node_output.append(str(val))
1396

    
1397
        output.append(node_output)
1398

    
1399
    return output
1400

    
1401

    
1402
class LUAddNode(LogicalUnit):
1403
  """Logical unit for adding node to the cluster.
1404

1405
  """
1406
  HPATH = "node-add"
1407
  HTYPE = constants.HTYPE_NODE
1408
  _OP_REQP = ["node_name"]
1409

    
1410
  def BuildHooksEnv(self):
1411
    """Build hooks env.
1412

1413
    This will run on all nodes before, and on all nodes + the new node after.
1414

1415
    """
1416
    env = {
1417
      "OP_TARGET": self.op.node_name,
1418
      "NODE_NAME": self.op.node_name,
1419
      "NODE_PIP": self.op.primary_ip,
1420
      "NODE_SIP": self.op.secondary_ip,
1421
      }
1422
    nodes_0 = self.cfg.GetNodeList()
1423
    nodes_1 = nodes_0 + [self.op.node_name, ]
1424
    return env, nodes_0, nodes_1
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks:
1430
     - the new node is not already in the config
1431
     - it is resolvable
1432
     - its parameters (single/dual homed) matches the cluster
1433

1434
    Any errors are signalled by raising errors.OpPrereqError.
1435

1436
    """
1437
    node_name = self.op.node_name
1438
    cfg = self.cfg
1439

    
1440
    dns_data = utils.HostInfo(node_name)
1441

    
1442
    node = dns_data.name
1443
    primary_ip = self.op.primary_ip = dns_data.ip
1444
    secondary_ip = getattr(self.op, "secondary_ip", None)
1445
    if secondary_ip is None:
1446
      secondary_ip = primary_ip
1447
    if not utils.IsValidIP(secondary_ip):
1448
      raise errors.OpPrereqError("Invalid secondary IP given")
1449
    self.op.secondary_ip = secondary_ip
1450
    node_list = cfg.GetNodeList()
1451
    if node in node_list:
1452
      raise errors.OpPrereqError("Node %s is already in the configuration"
1453
                                 % node)
1454

    
1455
    for existing_node_name in node_list:
1456
      existing_node = cfg.GetNodeInfo(existing_node_name)
1457
      if (existing_node.primary_ip == primary_ip or
1458
          existing_node.secondary_ip == primary_ip or
1459
          existing_node.primary_ip == secondary_ip or
1460
          existing_node.secondary_ip == secondary_ip):
1461
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1462
                                   " existing node %s" % existing_node.name)
1463

    
1464
    # check that the type of the node (single versus dual homed) is the
1465
    # same as for the master
1466
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1468
    newbie_singlehomed = secondary_ip == primary_ip
1469
    if master_singlehomed != newbie_singlehomed:
1470
      if master_singlehomed:
1471
        raise errors.OpPrereqError("The master has no private ip but the"
1472
                                   " new node has one")
1473
      else:
1474
        raise errors.OpPrereqError("The master has a private ip but the"
1475
                                   " new node doesn't have one")
1476

    
1477
    # checks reachablity
1478
    if not utils.TcpPing(utils.HostInfo().name,
1479
                         primary_ip,
1480
                         constants.DEFAULT_NODED_PORT):
1481
      raise errors.OpPrereqError("Node not reachable by ping")
1482

    
1483
    if not newbie_singlehomed:
1484
      # check reachability from my secondary ip to newbie's secondary ip
1485
      if not utils.TcpPing(myself.secondary_ip,
1486
                           secondary_ip,
1487
                           constants.DEFAULT_NODED_PORT):
1488
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489
                                   " based ping to noded port")
1490

    
1491
    self.new_node = objects.Node(name=node,
1492
                                 primary_ip=primary_ip,
1493
                                 secondary_ip=secondary_ip)
1494

    
1495
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498
                                   constants.VNC_PASSWORD_FILE)
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Adds the new node to the cluster.
1502

1503
    """
1504
    new_node = self.new_node
1505
    node = new_node.name
1506

    
1507
    # set up inter-node password and certificate and restarts the node daemon
1508
    gntpass = self.sstore.GetNodeDaemonPassword()
1509
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510
      raise errors.OpExecError("ganeti password corruption detected")
1511
    f = open(constants.SSL_CERT_FILE)
1512
    try:
1513
      gntpem = f.read(8192)
1514
    finally:
1515
      f.close()
1516
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517
    # so we use this to detect an invalid certificate; as long as the
1518
    # cert doesn't contain this, the here-document will be correctly
1519
    # parsed by the shell sequence below
1520
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522
    if not gntpem.endswith("\n"):
1523
      raise errors.OpExecError("PEM must end with newline")
1524
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525

    
1526
    # and then connect with ssh to set password and start ganeti-noded
1527
    # note that all the below variables are sanitized at this point,
1528
    # either by being constants or by the checks above
1529
    ss = self.sstore
1530
    mycommand = ("umask 077 && "
1531
                 "echo '%s' > '%s' && "
1532
                 "cat > '%s' << '!EOF.' && \n"
1533
                 "%s!EOF.\n%s restart" %
1534
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535
                  constants.SSL_CERT_FILE, gntpem,
1536
                  constants.NODE_INITD_SCRIPT))
1537

    
1538
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539
    if result.failed:
1540
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1541
                               " output: %s" %
1542
                               (node, result.fail_reason, result.output))
1543

    
1544
    # check connectivity
1545
    time.sleep(4)
1546

    
1547
    result = rpc.call_version([node])[node]
1548
    if result:
1549
      if constants.PROTOCOL_VERSION == result:
1550
        logger.Info("communication to node %s fine, sw version %s match" %
1551
                    (node, result))
1552
      else:
1553
        raise errors.OpExecError("Version mismatch master version %s,"
1554
                                 " node version %s" %
1555
                                 (constants.PROTOCOL_VERSION, result))
1556
    else:
1557
      raise errors.OpExecError("Cannot get version from the new node")
1558

    
1559
    # setup ssh on node
1560
    logger.Info("copy ssh key to node %s" % node)
1561
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562
    keyarray = []
1563
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1565
                priv_key, pub_key]
1566

    
1567
    for i in keyfiles:
1568
      f = open(i, 'r')
1569
      try:
1570
        keyarray.append(f.read())
1571
      finally:
1572
        f.close()
1573

    
1574
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575
                               keyarray[3], keyarray[4], keyarray[5])
1576

    
1577
    if not result:
1578
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579

    
1580
    # Add node to our /etc/hosts, and add key to known_hosts
1581
    _AddHostToEtcHosts(new_node.name)
1582

    
1583
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584
                      self.cfg.GetHostKey())
1585

    
1586
    if new_node.secondary_ip != new_node.primary_ip:
1587
      if not rpc.call_node_tcp_ping(new_node.name,
1588
                                    constants.LOCALHOST_IP_ADDRESS,
1589
                                    new_node.secondary_ip,
1590
                                    constants.DEFAULT_NODED_PORT,
1591
                                    10, False):
1592
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593
                                 " you gave (%s). Please fix and re-run this"
1594
                                 " command." % new_node.secondary_ip)
1595

    
1596
    success, msg = ssh.VerifyNodeHostname(node)
1597
    if not success:
1598
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599
                               " than the one the resolver gives: %s."
1600
                               " Please fix and re-run this command." %
1601
                               (node, msg))
1602

    
1603
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1604
    # including the node just added
1605
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606
    dist_nodes = self.cfg.GetNodeList() + [node]
1607
    if myself.name in dist_nodes:
1608
      dist_nodes.remove(myself.name)
1609

    
1610
    logger.Debug("Copying hosts and known_hosts to all nodes")
1611
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1612
      result = rpc.call_upload_file(dist_nodes, fname)
1613
      for to_node in dist_nodes:
1614
        if not result[to_node]:
1615
          logger.Error("copy of file %s to node %s failed" %
1616
                       (fname, to_node))
1617

    
1618
    to_copy = ss.GetFileList()
1619
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620
      to_copy.append(constants.VNC_PASSWORD_FILE)
1621
    for fname in to_copy:
1622
      if not ssh.CopyFileToNode(node, fname):
1623
        logger.Error("could not copy file %s to node %s" % (fname, node))
1624

    
1625
    logger.Info("adding node %s to cluster.conf" % node)
1626
    self.cfg.AddNode(new_node)
1627

    
1628

    
1629
class LUMasterFailover(LogicalUnit):
1630
  """Failover the master node to the current node.
1631

1632
  This is a special LU in that it must run on a non-master node.
1633

1634
  """
1635
  HPATH = "master-failover"
1636
  HTYPE = constants.HTYPE_CLUSTER
1637
  REQ_MASTER = False
1638
  _OP_REQP = []
1639

    
1640
  def BuildHooksEnv(self):
1641
    """Build hooks env.
1642

1643
    This will run on the new master only in the pre phase, and on all
1644
    the nodes in the post phase.
1645

1646
    """
1647
    env = {
1648
      "OP_TARGET": self.new_master,
1649
      "NEW_MASTER": self.new_master,
1650
      "OLD_MASTER": self.old_master,
1651
      }
1652
    return env, [self.new_master], self.cfg.GetNodeList()
1653

    
1654
  def CheckPrereq(self):
1655
    """Check prerequisites.
1656

1657
    This checks that we are not already the master.
1658

1659
    """
1660
    self.new_master = utils.HostInfo().name
1661
    self.old_master = self.sstore.GetMasterNode()
1662

    
1663
    if self.old_master == self.new_master:
1664
      raise errors.OpPrereqError("This commands must be run on the node"
1665
                                 " where you want the new master to be."
1666
                                 " %s is already the master" %
1667
                                 self.old_master)
1668

    
1669
  def Exec(self, feedback_fn):
1670
    """Failover the master node.
1671

1672
    This command, when run on a non-master node, will cause the current
1673
    master to cease being master, and the non-master to become new
1674
    master.
1675

1676
    """
1677
    #TODO: do not rely on gethostname returning the FQDN
1678
    logger.Info("setting master to %s, old master: %s" %
1679
                (self.new_master, self.old_master))
1680

    
1681
    if not rpc.call_node_stop_master(self.old_master):
1682
      logger.Error("could disable the master role on the old master"
1683
                   " %s, please disable manually" % self.old_master)
1684

    
1685
    ss = self.sstore
1686
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689
      logger.Error("could not distribute the new simple store master file"
1690
                   " to the other nodes, please check.")
1691

    
1692
    if not rpc.call_node_start_master(self.new_master):
1693
      logger.Error("could not start the master role on the new master"
1694
                   " %s, please check" % self.new_master)
1695
      feedback_fn("Error in activating the master IP on the new master,"
1696
                  " please fix manually.")
1697

    
1698

    
1699

    
1700
class LUQueryClusterInfo(NoHooksLU):
1701
  """Query cluster configuration.
1702

1703
  """
1704
  _OP_REQP = []
1705
  REQ_MASTER = False
1706

    
1707
  def CheckPrereq(self):
1708
    """No prerequsites needed for this LU.
1709

1710
    """
1711
    pass
1712

    
1713
  def Exec(self, feedback_fn):
1714
    """Return cluster config.
1715

1716
    """
1717
    result = {
1718
      "name": self.sstore.GetClusterName(),
1719
      "software_version": constants.RELEASE_VERSION,
1720
      "protocol_version": constants.PROTOCOL_VERSION,
1721
      "config_version": constants.CONFIG_VERSION,
1722
      "os_api_version": constants.OS_API_VERSION,
1723
      "export_version": constants.EXPORT_VERSION,
1724
      "master": self.sstore.GetMasterNode(),
1725
      "architecture": (platform.architecture()[0], platform.machine()),
1726
      }
1727

    
1728
    return result
1729

    
1730

    
1731
class LUClusterCopyFile(NoHooksLU):
1732
  """Copy file to cluster.
1733

1734
  """
1735
  _OP_REQP = ["nodes", "filename"]
1736

    
1737
  def CheckPrereq(self):
1738
    """Check prerequisites.
1739

1740
    It should check that the named file exists and that the given list
1741
    of nodes is valid.
1742

1743
    """
1744
    if not os.path.exists(self.op.filename):
1745
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746

    
1747
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1748

    
1749
  def Exec(self, feedback_fn):
1750
    """Copy a file from master to some nodes.
1751

1752
    Args:
1753
      opts - class with options as members
1754
      args - list containing a single element, the file name
1755
    Opts used:
1756
      nodes - list containing the name of target nodes; if empty, all nodes
1757

1758
    """
1759
    filename = self.op.filename
1760

    
1761
    myname = utils.HostInfo().name
1762

    
1763
    for node in self.nodes:
1764
      if node == myname:
1765
        continue
1766
      if not ssh.CopyFileToNode(node, filename):
1767
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1768

    
1769

    
1770
class LUDumpClusterConfig(NoHooksLU):
1771
  """Return a text-representation of the cluster-config.
1772

1773
  """
1774
  _OP_REQP = []
1775

    
1776
  def CheckPrereq(self):
1777
    """No prerequisites.
1778

1779
    """
1780
    pass
1781

    
1782
  def Exec(self, feedback_fn):
1783
    """Dump a representation of the cluster config to the standard output.
1784

1785
    """
1786
    return self.cfg.DumpConfig()
1787

    
1788

    
1789
class LURunClusterCommand(NoHooksLU):
1790
  """Run a command on some nodes.
1791

1792
  """
1793
  _OP_REQP = ["command", "nodes"]
1794

    
1795
  def CheckPrereq(self):
1796
    """Check prerequisites.
1797

1798
    It checks that the given list of nodes is valid.
1799

1800
    """
1801
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1802

    
1803
  def Exec(self, feedback_fn):
1804
    """Run a command on some nodes.
1805

1806
    """
1807
    data = []
1808
    for node in self.nodes:
1809
      result = ssh.SSHCall(node, "root", self.op.command)
1810
      data.append((node, result.output, result.exit_code))
1811

    
1812
    return data
1813

    
1814

    
1815
class LUActivateInstanceDisks(NoHooksLU):
1816
  """Bring up an instance's disks.
1817

1818
  """
1819
  _OP_REQP = ["instance_name"]
1820

    
1821
  def CheckPrereq(self):
1822
    """Check prerequisites.
1823

1824
    This checks that the instance is in the cluster.
1825

1826
    """
1827
    instance = self.cfg.GetInstanceInfo(
1828
      self.cfg.ExpandInstanceName(self.op.instance_name))
1829
    if instance is None:
1830
      raise errors.OpPrereqError("Instance '%s' not known" %
1831
                                 self.op.instance_name)
1832
    self.instance = instance
1833

    
1834

    
1835
  def Exec(self, feedback_fn):
1836
    """Activate the disks.
1837

1838
    """
1839
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840
    if not disks_ok:
1841
      raise errors.OpExecError("Cannot activate block devices")
1842

    
1843
    return disks_info
1844

    
1845

    
1846
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847
  """Prepare the block devices for an instance.
1848

1849
  This sets up the block devices on all nodes.
1850

1851
  Args:
1852
    instance: a ganeti.objects.Instance object
1853
    ignore_secondaries: if true, errors on secondary nodes won't result
1854
                        in an error return from the function
1855

1856
  Returns:
1857
    false if the operation failed
1858
    list of (host, instance_visible_name, node_visible_name) if the operation
1859
         suceeded with the mapping from node devices to instance devices
1860
  """
1861
  device_info = []
1862
  disks_ok = True
1863
  iname = instance.name
1864
  # With the two passes mechanism we try to reduce the window of
1865
  # opportunity for the race condition of switching DRBD to primary
1866
  # before handshaking occured, but we do not eliminate it
1867

    
1868
  # The proper fix would be to wait (with some limits) until the
1869
  # connection has been made and drbd transitions from WFConnection
1870
  # into any other network-connected state (Connected, SyncTarget,
1871
  # SyncSource, etc.)
1872

    
1873
  # 1st pass, assemble on all nodes in secondary mode
1874
  for inst_disk in instance.disks:
1875
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1876
      cfg.SetDiskID(node_disk, node)
1877
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1878
      if not result:
1879
        logger.Error("could not prepare block device %s on node %s"
1880
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1881
        if not ignore_secondaries:
1882
          disks_ok = False
1883

    
1884
  # FIXME: race condition on drbd migration to primary
1885

    
1886
  # 2nd pass, do only the primary node
1887
  for inst_disk in instance.disks:
1888
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1889
      if node != instance.primary_node:
1890
        continue
1891
      cfg.SetDiskID(node_disk, node)
1892
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1893
      if not result:
1894
        logger.Error("could not prepare block device %s on node %s"
1895
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1896
        disks_ok = False
1897
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1898

    
1899
  # leave the disks configured for the primary node
1900
  # this is a workaround that would be fixed better by
1901
  # improving the logical/physical id handling
1902
  for disk in instance.disks:
1903
    cfg.SetDiskID(disk, instance.primary_node)
1904

    
1905
  return disks_ok, device_info
1906

    
1907

    
1908
def _StartInstanceDisks(cfg, instance, force):
1909
  """Start the disks of an instance.
1910

1911
  """
1912
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1913
                                           ignore_secondaries=force)
1914
  if not disks_ok:
1915
    _ShutdownInstanceDisks(instance, cfg)
1916
    if force is not None and not force:
1917
      logger.Error("If the message above refers to a secondary node,"
1918
                   " you can retry the operation using '--force'.")
1919
    raise errors.OpExecError("Disk consistency error")
1920

    
1921

    
1922
class LUDeactivateInstanceDisks(NoHooksLU):
1923
  """Shutdown an instance's disks.
1924

1925
  """
1926
  _OP_REQP = ["instance_name"]
1927

    
1928
  def CheckPrereq(self):
1929
    """Check prerequisites.
1930

1931
    This checks that the instance is in the cluster.
1932

1933
    """
1934
    instance = self.cfg.GetInstanceInfo(
1935
      self.cfg.ExpandInstanceName(self.op.instance_name))
1936
    if instance is None:
1937
      raise errors.OpPrereqError("Instance '%s' not known" %
1938
                                 self.op.instance_name)
1939
    self.instance = instance
1940

    
1941
  def Exec(self, feedback_fn):
1942
    """Deactivate the disks
1943

1944
    """
1945
    instance = self.instance
1946
    ins_l = rpc.call_instance_list([instance.primary_node])
1947
    ins_l = ins_l[instance.primary_node]
1948
    if not type(ins_l) is list:
1949
      raise errors.OpExecError("Can't contact node '%s'" %
1950
                               instance.primary_node)
1951

    
1952
    if self.instance.name in ins_l:
1953
      raise errors.OpExecError("Instance is running, can't shutdown"
1954
                               " block devices.")
1955

    
1956
    _ShutdownInstanceDisks(instance, self.cfg)
1957

    
1958

    
1959
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1960
  """Shutdown block devices of an instance.
1961

1962
  This does the shutdown on all nodes of the instance.
1963

1964
  If the ignore_primary is false, errors on the primary node are
1965
  ignored.
1966

1967
  """
1968
  result = True
1969
  for disk in instance.disks:
1970
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1971
      cfg.SetDiskID(top_disk, node)
1972
      if not rpc.call_blockdev_shutdown(node, top_disk):
1973
        logger.Error("could not shutdown block device %s on node %s" %
1974
                     (disk.iv_name, node))
1975
        if not ignore_primary or node != instance.primary_node:
1976
          result = False
1977
  return result
1978

    
1979

    
1980
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1981
  """Checks if a node has enough free memory.
1982

1983
  This function check if a given node has the needed amount of free
1984
  memory. In case the node has less memory or we cannot get the
1985
  information from the node, this function raise an OpPrereqError
1986
  exception.
1987

1988
  Args:
1989
    - cfg: a ConfigWriter instance
1990
    - node: the node name
1991
    - reason: string to use in the error message
1992
    - requested: the amount of memory in MiB
1993

1994
  """
1995
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1996
  if not nodeinfo or not isinstance(nodeinfo, dict):
1997
    raise errors.OpPrereqError("Could not contact node %s for resource"
1998
                             " information" % (node,))
1999

    
2000
  free_mem = nodeinfo[node].get('memory_free')
2001
  if not isinstance(free_mem, int):
2002
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2003
                             " was '%s'" % (node, free_mem))
2004
  if requested > free_mem:
2005
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2006
                             " needed %s MiB, available %s MiB" %
2007
                             (node, reason, requested, free_mem))
2008

    
2009

    
2010
class LUStartupInstance(LogicalUnit):
2011
  """Starts an instance.
2012

2013
  """
2014
  HPATH = "instance-start"
2015
  HTYPE = constants.HTYPE_INSTANCE
2016
  _OP_REQP = ["instance_name", "force"]
2017

    
2018
  def BuildHooksEnv(self):
2019
    """Build hooks env.
2020

2021
    This runs on master, primary and secondary nodes of the instance.
2022

2023
    """
2024
    env = {
2025
      "FORCE": self.op.force,
2026
      }
2027
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2028
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029
          list(self.instance.secondary_nodes))
2030
    return env, nl, nl
2031

    
2032
  def CheckPrereq(self):
2033
    """Check prerequisites.
2034

2035
    This checks that the instance is in the cluster.
2036

2037
    """
2038
    instance = self.cfg.GetInstanceInfo(
2039
      self.cfg.ExpandInstanceName(self.op.instance_name))
2040
    if instance is None:
2041
      raise errors.OpPrereqError("Instance '%s' not known" %
2042
                                 self.op.instance_name)
2043

    
2044
    # check bridges existance
2045
    _CheckInstanceBridgesExist(instance)
2046

    
2047
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048
                         "starting instance %s" % instance.name,
2049
                         instance.memory)
2050

    
2051
    self.instance = instance
2052
    self.op.instance_name = instance.name
2053

    
2054
  def Exec(self, feedback_fn):
2055
    """Start the instance.
2056

2057
    """
2058
    instance = self.instance
2059
    force = self.op.force
2060
    extra_args = getattr(self.op, "extra_args", "")
2061

    
2062
    node_current = instance.primary_node
2063

    
2064
    _StartInstanceDisks(self.cfg, instance, force)
2065

    
2066
    if not rpc.call_instance_start(node_current, instance, extra_args):
2067
      _ShutdownInstanceDisks(instance, self.cfg)
2068
      raise errors.OpExecError("Could not start instance")
2069

    
2070
    self.cfg.MarkInstanceUp(instance.name)
2071

    
2072

    
2073
class LURebootInstance(LogicalUnit):
2074
  """Reboot an instance.
2075

2076
  """
2077
  HPATH = "instance-reboot"
2078
  HTYPE = constants.HTYPE_INSTANCE
2079
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2080

    
2081
  def BuildHooksEnv(self):
2082
    """Build hooks env.
2083

2084
    This runs on master, primary and secondary nodes of the instance.
2085

2086
    """
2087
    env = {
2088
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2089
      }
2090
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2091
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2092
          list(self.instance.secondary_nodes))
2093
    return env, nl, nl
2094

    
2095
  def CheckPrereq(self):
2096
    """Check prerequisites.
2097

2098
    This checks that the instance is in the cluster.
2099

2100
    """
2101
    instance = self.cfg.GetInstanceInfo(
2102
      self.cfg.ExpandInstanceName(self.op.instance_name))
2103
    if instance is None:
2104
      raise errors.OpPrereqError("Instance '%s' not known" %
2105
                                 self.op.instance_name)
2106

    
2107
    # check bridges existance
2108
    _CheckInstanceBridgesExist(instance)
2109

    
2110
    self.instance = instance
2111
    self.op.instance_name = instance.name
2112

    
2113
  def Exec(self, feedback_fn):
2114
    """Reboot the instance.
2115

2116
    """
2117
    instance = self.instance
2118
    ignore_secondaries = self.op.ignore_secondaries
2119
    reboot_type = self.op.reboot_type
2120
    extra_args = getattr(self.op, "extra_args", "")
2121

    
2122
    node_current = instance.primary_node
2123

    
2124
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2125
                           constants.INSTANCE_REBOOT_HARD,
2126
                           constants.INSTANCE_REBOOT_FULL]:
2127
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2128
                                  (constants.INSTANCE_REBOOT_SOFT,
2129
                                   constants.INSTANCE_REBOOT_HARD,
2130
                                   constants.INSTANCE_REBOOT_FULL))
2131

    
2132
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2133
                       constants.INSTANCE_REBOOT_HARD]:
2134
      if not rpc.call_instance_reboot(node_current, instance,
2135
                                      reboot_type, extra_args):
2136
        raise errors.OpExecError("Could not reboot instance")
2137
    else:
2138
      if not rpc.call_instance_shutdown(node_current, instance):
2139
        raise errors.OpExecError("could not shutdown instance for full reboot")
2140
      _ShutdownInstanceDisks(instance, self.cfg)
2141
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2142
      if not rpc.call_instance_start(node_current, instance, extra_args):
2143
        _ShutdownInstanceDisks(instance, self.cfg)
2144
        raise errors.OpExecError("Could not start instance for full reboot")
2145

    
2146
    self.cfg.MarkInstanceUp(instance.name)
2147

    
2148

    
2149
class LUShutdownInstance(LogicalUnit):
2150
  """Shutdown an instance.
2151

2152
  """
2153
  HPATH = "instance-stop"
2154
  HTYPE = constants.HTYPE_INSTANCE
2155
  _OP_REQP = ["instance_name"]
2156

    
2157
  def BuildHooksEnv(self):
2158
    """Build hooks env.
2159

2160
    This runs on master, primary and secondary nodes of the instance.
2161

2162
    """
2163
    env = _BuildInstanceHookEnvByObject(self.instance)
2164
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2165
          list(self.instance.secondary_nodes))
2166
    return env, nl, nl
2167

    
2168
  def CheckPrereq(self):
2169
    """Check prerequisites.
2170

2171
    This checks that the instance is in the cluster.
2172

2173
    """
2174
    instance = self.cfg.GetInstanceInfo(
2175
      self.cfg.ExpandInstanceName(self.op.instance_name))
2176
    if instance is None:
2177
      raise errors.OpPrereqError("Instance '%s' not known" %
2178
                                 self.op.instance_name)
2179
    self.instance = instance
2180

    
2181
  def Exec(self, feedback_fn):
2182
    """Shutdown the instance.
2183

2184
    """
2185
    instance = self.instance
2186
    node_current = instance.primary_node
2187
    if not rpc.call_instance_shutdown(node_current, instance):
2188
      logger.Error("could not shutdown instance")
2189

    
2190
    self.cfg.MarkInstanceDown(instance.name)
2191
    _ShutdownInstanceDisks(instance, self.cfg)
2192

    
2193

    
2194
class LUReinstallInstance(LogicalUnit):
2195
  """Reinstall an instance.
2196

2197
  """
2198
  HPATH = "instance-reinstall"
2199
  HTYPE = constants.HTYPE_INSTANCE
2200
  _OP_REQP = ["instance_name"]
2201

    
2202
  def BuildHooksEnv(self):
2203
    """Build hooks env.
2204

2205
    This runs on master, primary and secondary nodes of the instance.
2206

2207
    """
2208
    env = _BuildInstanceHookEnvByObject(self.instance)
2209
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210
          list(self.instance.secondary_nodes))
2211
    return env, nl, nl
2212

    
2213
  def CheckPrereq(self):
2214
    """Check prerequisites.
2215

2216
    This checks that the instance is in the cluster and is not running.
2217

2218
    """
2219
    instance = self.cfg.GetInstanceInfo(
2220
      self.cfg.ExpandInstanceName(self.op.instance_name))
2221
    if instance is None:
2222
      raise errors.OpPrereqError("Instance '%s' not known" %
2223
                                 self.op.instance_name)
2224
    if instance.disk_template == constants.DT_DISKLESS:
2225
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2226
                                 self.op.instance_name)
2227
    if instance.status != "down":
2228
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2229
                                 self.op.instance_name)
2230
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2231
    if remote_info:
2232
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2233
                                 (self.op.instance_name,
2234
                                  instance.primary_node))
2235

    
2236
    self.op.os_type = getattr(self.op, "os_type", None)
2237
    if self.op.os_type is not None:
2238
      # OS verification
2239
      pnode = self.cfg.GetNodeInfo(
2240
        self.cfg.ExpandNodeName(instance.primary_node))
2241
      if pnode is None:
2242
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2243
                                   self.op.pnode)
2244
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2245
      if not os_obj:
2246
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2247
                                   " primary node"  % self.op.os_type)
2248

    
2249
    self.instance = instance
2250

    
2251
  def Exec(self, feedback_fn):
2252
    """Reinstall the instance.
2253

2254
    """
2255
    inst = self.instance
2256

    
2257
    if self.op.os_type is not None:
2258
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2259
      inst.os = self.op.os_type
2260
      self.cfg.AddInstance(inst)
2261

    
2262
    _StartInstanceDisks(self.cfg, inst, None)
2263
    try:
2264
      feedback_fn("Running the instance OS create scripts...")
2265
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2266
        raise errors.OpExecError("Could not install OS for instance %s"
2267
                                 " on node %s" %
2268
                                 (inst.name, inst.primary_node))
2269
    finally:
2270
      _ShutdownInstanceDisks(inst, self.cfg)
2271

    
2272

    
2273
class LURenameInstance(LogicalUnit):
2274
  """Rename an instance.
2275

2276
  """
2277
  HPATH = "instance-rename"
2278
  HTYPE = constants.HTYPE_INSTANCE
2279
  _OP_REQP = ["instance_name", "new_name"]
2280

    
2281
  def BuildHooksEnv(self):
2282
    """Build hooks env.
2283

2284
    This runs on master, primary and secondary nodes of the instance.
2285

2286
    """
2287
    env = _BuildInstanceHookEnvByObject(self.instance)
2288
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2289
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290
          list(self.instance.secondary_nodes))
2291
    return env, nl, nl
2292

    
2293
  def CheckPrereq(self):
2294
    """Check prerequisites.
2295

2296
    This checks that the instance is in the cluster and is not running.
2297

2298
    """
2299
    instance = self.cfg.GetInstanceInfo(
2300
      self.cfg.ExpandInstanceName(self.op.instance_name))
2301
    if instance is None:
2302
      raise errors.OpPrereqError("Instance '%s' not known" %
2303
                                 self.op.instance_name)
2304
    if instance.status != "down":
2305
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2306
                                 self.op.instance_name)
2307
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2308
    if remote_info:
2309
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2310
                                 (self.op.instance_name,
2311
                                  instance.primary_node))
2312
    self.instance = instance
2313

    
2314
    # new name verification
2315
    name_info = utils.HostInfo(self.op.new_name)
2316

    
2317
    self.op.new_name = new_name = name_info.name
2318
    instance_list = self.cfg.GetInstanceList()
2319
    if new_name in instance_list:
2320
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2321
                                 instance_name)
2322

    
2323
    if not getattr(self.op, "ignore_ip", False):
2324
      command = ["fping", "-q", name_info.ip]
2325
      result = utils.RunCmd(command)
2326
      if not result.failed:
2327
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2328
                                   (name_info.ip, new_name))
2329

    
2330

    
2331
  def Exec(self, feedback_fn):
2332
    """Reinstall the instance.
2333

2334
    """
2335
    inst = self.instance
2336
    old_name = inst.name
2337

    
2338
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2339

    
2340
    # re-read the instance from the configuration after rename
2341
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2342

    
2343
    _StartInstanceDisks(self.cfg, inst, None)
2344
    try:
2345
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2346
                                          "sda", "sdb"):
2347
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2348
               " instance has been renamed in Ganeti)" %
2349
               (inst.name, inst.primary_node))
2350
        logger.Error(msg)
2351
    finally:
2352
      _ShutdownInstanceDisks(inst, self.cfg)
2353

    
2354

    
2355
class LURemoveInstance(LogicalUnit):
2356
  """Remove an instance.
2357

2358
  """
2359
  HPATH = "instance-remove"
2360
  HTYPE = constants.HTYPE_INSTANCE
2361
  _OP_REQP = ["instance_name"]
2362

    
2363
  def BuildHooksEnv(self):
2364
    """Build hooks env.
2365

2366
    This runs on master, primary and secondary nodes of the instance.
2367

2368
    """
2369
    env = _BuildInstanceHookEnvByObject(self.instance)
2370
    nl = [self.sstore.GetMasterNode()]
2371
    return env, nl, nl
2372

    
2373
  def CheckPrereq(self):
2374
    """Check prerequisites.
2375

2376
    This checks that the instance is in the cluster.
2377

2378
    """
2379
    instance = self.cfg.GetInstanceInfo(
2380
      self.cfg.ExpandInstanceName(self.op.instance_name))
2381
    if instance is None:
2382
      raise errors.OpPrereqError("Instance '%s' not known" %
2383
                                 self.op.instance_name)
2384
    self.instance = instance
2385

    
2386
  def Exec(self, feedback_fn):
2387
    """Remove the instance.
2388

2389
    """
2390
    instance = self.instance
2391
    logger.Info("shutting down instance %s on node %s" %
2392
                (instance.name, instance.primary_node))
2393

    
2394
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2395
      if self.op.ignore_failures:
2396
        feedback_fn("Warning: can't shutdown instance")
2397
      else:
2398
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2399
                                 (instance.name, instance.primary_node))
2400

    
2401
    logger.Info("removing block devices for instance %s" % instance.name)
2402

    
2403
    if not _RemoveDisks(instance, self.cfg):
2404
      if self.op.ignore_failures:
2405
        feedback_fn("Warning: can't remove instance's disks")
2406
      else:
2407
        raise errors.OpExecError("Can't remove instance's disks")
2408

    
2409
    logger.Info("removing instance %s out of cluster config" % instance.name)
2410

    
2411
    self.cfg.RemoveInstance(instance.name)
2412

    
2413

    
2414
class LUQueryInstances(NoHooksLU):
2415
  """Logical unit for querying instances.
2416

2417
  """
2418
  _OP_REQP = ["output_fields", "names"]
2419

    
2420
  def CheckPrereq(self):
2421
    """Check prerequisites.
2422

2423
    This checks that the fields required are valid output fields.
2424

2425
    """
2426
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2427
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2428
                               "admin_state", "admin_ram",
2429
                               "disk_template", "ip", "mac", "bridge",
2430
                               "sda_size", "sdb_size", "vcpus"],
2431
                       dynamic=self.dynamic_fields,
2432
                       selected=self.op.output_fields)
2433

    
2434
    self.wanted = _GetWantedInstances(self, self.op.names)
2435

    
2436
  def Exec(self, feedback_fn):
2437
    """Computes the list of nodes and their attributes.
2438

2439
    """
2440
    instance_names = self.wanted
2441
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2442
                     in instance_names]
2443

    
2444
    # begin data gathering
2445

    
2446
    nodes = frozenset([inst.primary_node for inst in instance_list])
2447

    
2448
    bad_nodes = []
2449
    if self.dynamic_fields.intersection(self.op.output_fields):
2450
      live_data = {}
2451
      node_data = rpc.call_all_instances_info(nodes)
2452
      for name in nodes:
2453
        result = node_data[name]
2454
        if result:
2455
          live_data.update(result)
2456
        elif result == False:
2457
          bad_nodes.append(name)
2458
        # else no instance is alive
2459
    else:
2460
      live_data = dict([(name, {}) for name in instance_names])
2461

    
2462
    # end data gathering
2463

    
2464
    output = []
2465
    for instance in instance_list:
2466
      iout = []
2467
      for field in self.op.output_fields:
2468
        if field == "name":
2469
          val = instance.name
2470
        elif field == "os":
2471
          val = instance.os
2472
        elif field == "pnode":
2473
          val = instance.primary_node
2474
        elif field == "snodes":
2475
          val = list(instance.secondary_nodes)
2476
        elif field == "admin_state":
2477
          val = (instance.status != "down")
2478
        elif field == "oper_state":
2479
          if instance.primary_node in bad_nodes:
2480
            val = None
2481
          else:
2482
            val = bool(live_data.get(instance.name))
2483
        elif field == "status":
2484
          if instance.primary_node in bad_nodes:
2485
            val = "ERROR_nodedown"
2486
          else:
2487
            running = bool(live_data.get(instance.name))
2488
            if running:
2489
              if instance.status != "down":
2490
                val = "running"
2491
              else:
2492
                val = "ERROR_up"
2493
            else:
2494
              if instance.status != "down":
2495
                val = "ERROR_down"
2496
              else:
2497
                val = "ADMIN_down"
2498
        elif field == "admin_ram":
2499
          val = instance.memory
2500
        elif field == "oper_ram":
2501
          if instance.primary_node in bad_nodes:
2502
            val = None
2503
          elif instance.name in live_data:
2504
            val = live_data[instance.name].get("memory", "?")
2505
          else:
2506
            val = "-"
2507
        elif field == "disk_template":
2508
          val = instance.disk_template
2509
        elif field == "ip":
2510
          val = instance.nics[0].ip
2511
        elif field == "bridge":
2512
          val = instance.nics[0].bridge
2513
        elif field == "mac":
2514
          val = instance.nics[0].mac
2515
        elif field == "sda_size" or field == "sdb_size":
2516
          disk = instance.FindDisk(field[:3])
2517
          if disk is None:
2518
            val = None
2519
          else:
2520
            val = disk.size
2521
        elif field == "vcpus":
2522
          val = instance.vcpus
2523
        else:
2524
          raise errors.ParameterError(field)
2525
        iout.append(val)
2526
      output.append(iout)
2527

    
2528
    return output
2529

    
2530

    
2531
class LUFailoverInstance(LogicalUnit):
2532
  """Failover an instance.
2533

2534
  """
2535
  HPATH = "instance-failover"
2536
  HTYPE = constants.HTYPE_INSTANCE
2537
  _OP_REQP = ["instance_name", "ignore_consistency"]
2538

    
2539
  def BuildHooksEnv(self):
2540
    """Build hooks env.
2541

2542
    This runs on master, primary and secondary nodes of the instance.
2543

2544
    """
2545
    env = {
2546
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2547
      }
2548
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2549
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2550
    return env, nl, nl
2551

    
2552
  def CheckPrereq(self):
2553
    """Check prerequisites.
2554

2555
    This checks that the instance is in the cluster.
2556

2557
    """
2558
    instance = self.cfg.GetInstanceInfo(
2559
      self.cfg.ExpandInstanceName(self.op.instance_name))
2560
    if instance is None:
2561
      raise errors.OpPrereqError("Instance '%s' not known" %
2562
                                 self.op.instance_name)
2563

    
2564
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2565
      raise errors.OpPrereqError("Instance's disk layout is not"
2566
                                 " network mirrored, cannot failover.")
2567

    
2568
    secondary_nodes = instance.secondary_nodes
2569
    if not secondary_nodes:
2570
      raise errors.ProgrammerError("no secondary node but using "
2571
                                   "DT_REMOTE_RAID1 template")
2572

    
2573
    target_node = secondary_nodes[0]
2574
    # check memory requirements on the secondary node
2575
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2576
                         instance.name, instance.memory)
2577

    
2578
    # check bridge existance
2579
    brlist = [nic.bridge for nic in instance.nics]
2580
    if not rpc.call_bridges_exist(target_node, brlist):
2581
      raise errors.OpPrereqError("One or more target bridges %s does not"
2582
                                 " exist on destination node '%s'" %
2583
                                 (brlist, target_node))
2584

    
2585
    self.instance = instance
2586

    
2587
  def Exec(self, feedback_fn):
2588
    """Failover an instance.
2589

2590
    The failover is done by shutting it down on its present node and
2591
    starting it on the secondary.
2592

2593
    """
2594
    instance = self.instance
2595

    
2596
    source_node = instance.primary_node
2597
    target_node = instance.secondary_nodes[0]
2598

    
2599
    feedback_fn("* checking disk consistency between source and target")
2600
    for dev in instance.disks:
2601
      # for remote_raid1, these are md over drbd
2602
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2603
        if not self.op.ignore_consistency:
2604
          raise errors.OpExecError("Disk %s is degraded on target node,"
2605
                                   " aborting failover." % dev.iv_name)
2606

    
2607
    feedback_fn("* shutting down instance on source node")
2608
    logger.Info("Shutting down instance %s on node %s" %
2609
                (instance.name, source_node))
2610

    
2611
    if not rpc.call_instance_shutdown(source_node, instance):
2612
      if self.op.ignore_consistency:
2613
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2614
                     " anyway. Please make sure node %s is down"  %
2615
                     (instance.name, source_node, source_node))
2616
      else:
2617
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2618
                                 (instance.name, source_node))
2619

    
2620
    feedback_fn("* deactivating the instance's disks on source node")
2621
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2622
      raise errors.OpExecError("Can't shut down the instance's disks.")
2623

    
2624
    instance.primary_node = target_node
2625
    # distribute new instance config to the other nodes
2626
    self.cfg.AddInstance(instance)
2627

    
2628
    feedback_fn("* activating the instance's disks on target node")
2629
    logger.Info("Starting instance %s on node %s" %
2630
                (instance.name, target_node))
2631

    
2632
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2633
                                             ignore_secondaries=True)
2634
    if not disks_ok:
2635
      _ShutdownInstanceDisks(instance, self.cfg)
2636
      raise errors.OpExecError("Can't activate the instance's disks")
2637

    
2638
    feedback_fn("* starting the instance on the target node")
2639
    if not rpc.call_instance_start(target_node, instance, None):
2640
      _ShutdownInstanceDisks(instance, self.cfg)
2641
      raise errors.OpExecError("Could not start instance %s on node %s." %
2642
                               (instance.name, target_node))
2643

    
2644

    
2645
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2646
  """Create a tree of block devices on the primary node.
2647

2648
  This always creates all devices.
2649

2650
  """
2651
  if device.children:
2652
    for child in device.children:
2653
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2654
        return False
2655

    
2656
  cfg.SetDiskID(device, node)
2657
  new_id = rpc.call_blockdev_create(node, device, device.size,
2658
                                    instance.name, True, info)
2659
  if not new_id:
2660
    return False
2661
  if device.physical_id is None:
2662
    device.physical_id = new_id
2663
  return True
2664

    
2665

    
2666
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2667
  """Create a tree of block devices on a secondary node.
2668

2669
  If this device type has to be created on secondaries, create it and
2670
  all its children.
2671

2672
  If not, just recurse to children keeping the same 'force' value.
2673

2674
  """
2675
  if device.CreateOnSecondary():
2676
    force = True
2677
  if device.children:
2678
    for child in device.children:
2679
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2680
                                        child, force, info):
2681
        return False
2682

    
2683
  if not force:
2684
    return True
2685
  cfg.SetDiskID(device, node)
2686
  new_id = rpc.call_blockdev_create(node, device, device.size,
2687
                                    instance.name, False, info)
2688
  if not new_id:
2689
    return False
2690
  if device.physical_id is None:
2691
    device.physical_id = new_id
2692
  return True
2693

    
2694

    
2695
def _GenerateUniqueNames(cfg, exts):
2696
  """Generate a suitable LV name.
2697

2698
  This will generate a logical volume name for the given instance.
2699

2700
  """
2701
  results = []
2702
  for val in exts:
2703
    new_id = cfg.GenerateUniqueID()
2704
    results.append("%s%s" % (new_id, val))
2705
  return results
2706

    
2707

    
2708
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2709
  """Generate a drbd device complete with its children.
2710

2711
  """
2712
  port = cfg.AllocatePort()
2713
  vgname = cfg.GetVGName()
2714
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2715
                          logical_id=(vgname, names[0]))
2716
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2717
                          logical_id=(vgname, names[1]))
2718
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2719
                          logical_id = (primary, secondary, port),
2720
                          children = [dev_data, dev_meta])
2721
  return drbd_dev
2722

    
2723

    
2724
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2725
  """Generate a drbd8 device complete with its children.
2726

2727
  """
2728
  port = cfg.AllocatePort()
2729
  vgname = cfg.GetVGName()
2730
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2731
                          logical_id=(vgname, names[0]))
2732
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2733
                          logical_id=(vgname, names[1]))
2734
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2735
                          logical_id = (primary, secondary, port),
2736
                          children = [dev_data, dev_meta],
2737
                          iv_name=iv_name)
2738
  return drbd_dev
2739

    
2740
def _GenerateDiskTemplate(cfg, template_name,
2741
                          instance_name, primary_node,
2742
                          secondary_nodes, disk_sz, swap_sz):
2743
  """Generate the entire disk layout for a given template type.
2744

2745
  """
2746
  #TODO: compute space requirements
2747

    
2748
  vgname = cfg.GetVGName()
2749
  if template_name == constants.DT_DISKLESS:
2750
    disks = []
2751
  elif template_name == constants.DT_PLAIN:
2752
    if len(secondary_nodes) != 0:
2753
      raise errors.ProgrammerError("Wrong template configuration")
2754

    
2755
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2756
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2757
                           logical_id=(vgname, names[0]),
2758
                           iv_name = "sda")
2759
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2760
                           logical_id=(vgname, names[1]),
2761
                           iv_name = "sdb")
2762
    disks = [sda_dev, sdb_dev]
2763
  elif template_name == constants.DT_LOCAL_RAID1:
2764
    if len(secondary_nodes) != 0:
2765
      raise errors.ProgrammerError("Wrong template configuration")
2766

    
2767

    
2768
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2769
                                       ".sdb_m1", ".sdb_m2"])
2770
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2771
                              logical_id=(vgname, names[0]))
2772
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2773
                              logical_id=(vgname, names[1]))
2774
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2775
                              size=disk_sz,
2776
                              children = [sda_dev_m1, sda_dev_m2])
2777
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2778
                              logical_id=(vgname, names[2]))
2779
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2780
                              logical_id=(vgname, names[3]))
2781
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2782
                              size=swap_sz,
2783
                              children = [sdb_dev_m1, sdb_dev_m2])
2784
    disks = [md_sda_dev, md_sdb_dev]
2785
  elif template_name == constants.DT_REMOTE_RAID1:
2786
    if len(secondary_nodes) != 1:
2787
      raise errors.ProgrammerError("Wrong template configuration")
2788
    remote_node = secondary_nodes[0]
2789
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2790
                                       ".sdb_data", ".sdb_meta"])
2791
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2792
                                         disk_sz, names[0:2])
2793
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2794
                              children = [drbd_sda_dev], size=disk_sz)
2795
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2796
                                         swap_sz, names[2:4])
2797
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2798
                              children = [drbd_sdb_dev], size=swap_sz)
2799
    disks = [md_sda_dev, md_sdb_dev]
2800
  elif template_name == constants.DT_DRBD8:
2801
    if len(secondary_nodes) != 1:
2802
      raise errors.ProgrammerError("Wrong template configuration")
2803
    remote_node = secondary_nodes[0]
2804
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2805
                                       ".sdb_data", ".sdb_meta"])
2806
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2807
                                         disk_sz, names[0:2], "sda")
2808
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2809
                                         swap_sz, names[2:4], "sdb")
2810
    disks = [drbd_sda_dev, drbd_sdb_dev]
2811
  else:
2812
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2813
  return disks
2814

    
2815

    
2816
def _GetInstanceInfoText(instance):
2817
  """Compute that text that should be added to the disk's metadata.
2818

2819
  """
2820
  return "originstname+%s" % instance.name
2821

    
2822

    
2823
def _CreateDisks(cfg, instance):
2824
  """Create all disks for an instance.
2825

2826
  This abstracts away some work from AddInstance.
2827

2828
  Args:
2829
    instance: the instance object
2830

2831
  Returns:
2832
    True or False showing the success of the creation process
2833

2834
  """
2835
  info = _GetInstanceInfoText(instance)
2836

    
2837
  for device in instance.disks:
2838
    logger.Info("creating volume %s for instance %s" %
2839
              (device.iv_name, instance.name))
2840
    #HARDCODE
2841
    for secondary_node in instance.secondary_nodes:
2842
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2843
                                        device, False, info):
2844
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2845
                     (device.iv_name, device, secondary_node))
2846
        return False
2847
    #HARDCODE
2848
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2849
                                    instance, device, info):
2850
      logger.Error("failed to create volume %s on primary!" %
2851
                   device.iv_name)
2852
      return False
2853
  return True
2854

    
2855

    
2856
def _RemoveDisks(instance, cfg):
2857
  """Remove all disks for an instance.
2858

2859
  This abstracts away some work from `AddInstance()` and
2860
  `RemoveInstance()`. Note that in case some of the devices couldn't
2861
  be removed, the removal will continue with the other ones (compare
2862
  with `_CreateDisks()`).
2863

2864
  Args:
2865
    instance: the instance object
2866

2867
  Returns:
2868
    True or False showing the success of the removal proces
2869

2870
  """
2871
  logger.Info("removing block devices for instance %s" % instance.name)
2872

    
2873
  result = True
2874
  for device in instance.disks:
2875
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2876
      cfg.SetDiskID(disk, node)
2877
      if not rpc.call_blockdev_remove(node, disk):
2878
        logger.Error("could not remove block device %s on node %s,"
2879
                     " continuing anyway" %
2880
                     (device.iv_name, node))
2881
        result = False
2882
  return result
2883

    
2884

    
2885
class LUCreateInstance(LogicalUnit):
2886
  """Create an instance.
2887

2888
  """
2889
  HPATH = "instance-add"
2890
  HTYPE = constants.HTYPE_INSTANCE
2891
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2892
              "disk_template", "swap_size", "mode", "start", "vcpus",
2893
              "wait_for_sync", "ip_check", "mac"]
2894

    
2895
  def BuildHooksEnv(self):
2896
    """Build hooks env.
2897

2898
    This runs on master, primary and secondary nodes of the instance.
2899

2900
    """
2901
    env = {
2902
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2903
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2904
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2905
      "INSTANCE_ADD_MODE": self.op.mode,
2906
      }
2907
    if self.op.mode == constants.INSTANCE_IMPORT:
2908
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2909
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2910
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2911

    
2912
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2913
      primary_node=self.op.pnode,
2914
      secondary_nodes=self.secondaries,
2915
      status=self.instance_status,
2916
      os_type=self.op.os_type,
2917
      memory=self.op.mem_size,
2918
      vcpus=self.op.vcpus,
2919
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2920
    ))
2921

    
2922
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2923
          self.secondaries)
2924
    return env, nl, nl
2925

    
2926

    
2927
  def CheckPrereq(self):
2928
    """Check prerequisites.
2929

2930
    """
2931
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2932
      if not hasattr(self.op, attr):
2933
        setattr(self.op, attr, None)
2934

    
2935
    if self.op.mode not in (constants.INSTANCE_CREATE,
2936
                            constants.INSTANCE_IMPORT):
2937
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2938
                                 self.op.mode)
2939

    
2940
    if self.op.mode == constants.INSTANCE_IMPORT:
2941
      src_node = getattr(self.op, "src_node", None)
2942
      src_path = getattr(self.op, "src_path", None)
2943
      if src_node is None or src_path is None:
2944
        raise errors.OpPrereqError("Importing an instance requires source"
2945
                                   " node and path options")
2946
      src_node_full = self.cfg.ExpandNodeName(src_node)
2947
      if src_node_full is None:
2948
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2949
      self.op.src_node = src_node = src_node_full
2950

    
2951
      if not os.path.isabs(src_path):
2952
        raise errors.OpPrereqError("The source path must be absolute")
2953

    
2954
      export_info = rpc.call_export_info(src_node, src_path)
2955

    
2956
      if not export_info:
2957
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2958

    
2959
      if not export_info.has_section(constants.INISECT_EXP):
2960
        raise errors.ProgrammerError("Corrupted export config")
2961

    
2962
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2963
      if (int(ei_version) != constants.EXPORT_VERSION):
2964
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2965
                                   (ei_version, constants.EXPORT_VERSION))
2966

    
2967
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2968
        raise errors.OpPrereqError("Can't import instance with more than"
2969
                                   " one data disk")
2970

    
2971
      # FIXME: are the old os-es, disk sizes, etc. useful?
2972
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2973
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2974
                                                         'disk0_dump'))
2975
      self.src_image = diskimage
2976
    else: # INSTANCE_CREATE
2977
      if getattr(self.op, "os_type", None) is None:
2978
        raise errors.OpPrereqError("No guest OS specified")
2979

    
2980
    # check primary node
2981
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2982
    if pnode is None:
2983
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2984
                                 self.op.pnode)
2985
    self.op.pnode = pnode.name
2986
    self.pnode = pnode
2987
    self.secondaries = []
2988
    # disk template and mirror node verification
2989
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2990
      raise errors.OpPrereqError("Invalid disk template name")
2991

    
2992
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2993
      if getattr(self.op, "snode", None) is None:
2994
        raise errors.OpPrereqError("The networked disk templates need"
2995
                                   " a mirror node")
2996

    
2997
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
2998
      if snode_name is None:
2999
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3000
                                   self.op.snode)
3001
      elif snode_name == pnode.name:
3002
        raise errors.OpPrereqError("The secondary node cannot be"
3003
                                   " the primary node.")
3004
      self.secondaries.append(snode_name)
3005

    
3006
    # Required free disk space as a function of disk and swap space
3007
    req_size_dict = {
3008
      constants.DT_DISKLESS: None,
3009
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3010
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3011
      # 256 MB are added for drbd metadata, 128MB for each drbd device
3012
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3013
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3014
    }
3015

    
3016
    if self.op.disk_template not in req_size_dict:
3017
      raise errors.ProgrammerError("Disk template '%s' size requirement"
3018
                                   " is unknown" %  self.op.disk_template)
3019

    
3020
    req_size = req_size_dict[self.op.disk_template]
3021

    
3022
    # Check lv size requirements
3023
    if req_size is not None:
3024
      nodenames = [pnode.name] + self.secondaries
3025
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3026
      for node in nodenames:
3027
        info = nodeinfo.get(node, None)
3028
        if not info:
3029
          raise errors.OpPrereqError("Cannot get current information"
3030
                                     " from node '%s'" % nodeinfo)
3031
        vg_free = info.get('vg_free', None)
3032
        if not isinstance(vg_free, int):
3033
          raise errors.OpPrereqError("Can't compute free disk space on"
3034
                                     " node %s" % node)
3035
        if req_size > info['vg_free']:
3036
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3037
                                     " %d MB available, %d MB required" %
3038
                                     (node, info['vg_free'], req_size))
3039

    
3040
    # os verification
3041
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3042
    if not os_obj:
3043
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3044
                                 " primary node"  % self.op.os_type)
3045

    
3046
    if self.op.kernel_path == constants.VALUE_NONE:
3047
      raise errors.OpPrereqError("Can't set instance kernel to none")
3048

    
3049
    # instance verification
3050
    hostname1 = utils.HostInfo(self.op.instance_name)
3051

    
3052
    self.op.instance_name = instance_name = hostname1.name
3053
    instance_list = self.cfg.GetInstanceList()
3054
    if instance_name in instance_list:
3055
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3056
                                 instance_name)
3057

    
3058
    ip = getattr(self.op, "ip", None)
3059
    if ip is None or ip.lower() == "none":
3060
      inst_ip = None
3061
    elif ip.lower() == "auto":
3062
      inst_ip = hostname1.ip
3063
    else:
3064
      if not utils.IsValidIP(ip):
3065
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3066
                                   " like a valid IP" % ip)
3067
      inst_ip = ip
3068
    self.inst_ip = inst_ip
3069

    
3070
    if self.op.start and not self.op.ip_check:
3071
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3072
                                 " adding an instance in start mode")
3073

    
3074
    if self.op.ip_check:
3075
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3076
                       constants.DEFAULT_NODED_PORT):
3077
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3078
                                   (hostname1.ip, instance_name))
3079

    
3080
    # MAC address verification
3081
    if self.op.mac != "auto":
3082
      if not utils.IsValidMac(self.op.mac.lower()):
3083
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3084
                                   self.op.mac)
3085

    
3086
    # bridge verification
3087
    bridge = getattr(self.op, "bridge", None)
3088
    if bridge is None:
3089
      self.op.bridge = self.cfg.GetDefBridge()
3090
    else:
3091
      self.op.bridge = bridge
3092

    
3093
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3094
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3095
                                 " destination node '%s'" %
3096
                                 (self.op.bridge, pnode.name))
3097

    
3098
    # boot order verification
3099
    if self.op.hvm_boot_order is not None:
3100
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3101
        raise errors.OpPrereqError("invalid boot order specified,"
3102
                                   " must be one or more of [acdn]")
3103

    
3104
    if self.op.start:
3105
      self.instance_status = 'up'
3106
    else:
3107
      self.instance_status = 'down'
3108

    
3109
  def Exec(self, feedback_fn):
3110
    """Create and add the instance to the cluster.
3111

3112
    """
3113
    instance = self.op.instance_name
3114
    pnode_name = self.pnode.name
3115

    
3116
    if self.op.mac == "auto":
3117
      mac_address = self.cfg.GenerateMAC()
3118
    else:
3119
      mac_address = self.op.mac
3120

    
3121
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3122
    if self.inst_ip is not None:
3123
      nic.ip = self.inst_ip
3124

    
3125
    ht_kind = self.sstore.GetHypervisorType()
3126
    if ht_kind in constants.HTS_REQ_PORT:
3127
      network_port = self.cfg.AllocatePort()
3128
    else:
3129
      network_port = None
3130

    
3131
    disks = _GenerateDiskTemplate(self.cfg,
3132
                                  self.op.disk_template,
3133
                                  instance, pnode_name,
3134
                                  self.secondaries, self.op.disk_size,
3135
                                  self.op.swap_size)
3136

    
3137
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3138
                            primary_node=pnode_name,
3139
                            memory=self.op.mem_size,
3140
                            vcpus=self.op.vcpus,
3141
                            nics=[nic], disks=disks,
3142
                            disk_template=self.op.disk_template,
3143
                            status=self.instance_status,
3144
                            network_port=network_port,
3145
                            kernel_path=self.op.kernel_path,
3146
                            initrd_path=self.op.initrd_path,
3147
                            hvm_boot_order=self.op.hvm_boot_order,
3148
                            )
3149

    
3150
    feedback_fn("* creating instance disks...")
3151
    if not _CreateDisks(self.cfg, iobj):
3152
      _RemoveDisks(iobj, self.cfg)
3153
      raise errors.OpExecError("Device creation failed, reverting...")
3154

    
3155
    feedback_fn("adding instance %s to cluster config" % instance)
3156

    
3157
    self.cfg.AddInstance(iobj)
3158

    
3159
    if self.op.wait_for_sync:
3160
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3161
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3162
      # make sure the disks are not degraded (still sync-ing is ok)
3163
      time.sleep(15)
3164
      feedback_fn("* checking mirrors status")
3165
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3166
    else:
3167
      disk_abort = False
3168

    
3169
    if disk_abort:
3170
      _RemoveDisks(iobj, self.cfg)
3171
      self.cfg.RemoveInstance(iobj.name)
3172
      raise errors.OpExecError("There are some degraded disks for"
3173
                               " this instance")
3174

    
3175
    feedback_fn("creating os for instance %s on node %s" %
3176
                (instance, pnode_name))
3177

    
3178
    if iobj.disk_template != constants.DT_DISKLESS:
3179
      if self.op.mode == constants.INSTANCE_CREATE:
3180
        feedback_fn("* running the instance OS create scripts...")
3181
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3182
          raise errors.OpExecError("could not add os for instance %s"
3183
                                   " on node %s" %
3184
                                   (instance, pnode_name))
3185

    
3186
      elif self.op.mode == constants.INSTANCE_IMPORT:
3187
        feedback_fn("* running the instance OS import scripts...")
3188
        src_node = self.op.src_node
3189
        src_image = self.src_image
3190
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3191
                                                src_node, src_image):
3192
          raise errors.OpExecError("Could not import os for instance"
3193
                                   " %s on node %s" %
3194
                                   (instance, pnode_name))
3195
      else:
3196
        # also checked in the prereq part
3197
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3198
                                     % self.op.mode)
3199

    
3200
    if self.op.start:
3201
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3202
      feedback_fn("* starting instance...")
3203
      if not rpc.call_instance_start(pnode_name, iobj, None):
3204
        raise errors.OpExecError("Could not start instance")
3205

    
3206

    
3207
class LUConnectConsole(NoHooksLU):
3208
  """Connect to an instance's console.
3209

3210
  This is somewhat special in that it returns the command line that
3211
  you need to run on the master node in order to connect to the
3212
  console.
3213

3214
  """
3215
  _OP_REQP = ["instance_name"]
3216

    
3217
  def CheckPrereq(self):
3218
    """Check prerequisites.
3219

3220
    This checks that the instance is in the cluster.
3221

3222
    """
3223
    instance = self.cfg.GetInstanceInfo(
3224
      self.cfg.ExpandInstanceName(self.op.instance_name))
3225
    if instance is None:
3226
      raise errors.OpPrereqError("Instance '%s' not known" %
3227
                                 self.op.instance_name)
3228
    self.instance = instance
3229

    
3230
  def Exec(self, feedback_fn):
3231
    """Connect to the console of an instance
3232

3233
    """
3234
    instance = self.instance
3235
    node = instance.primary_node
3236

    
3237
    node_insts = rpc.call_instance_list([node])[node]
3238
    if node_insts is False:
3239
      raise errors.OpExecError("Can't connect to node %s." % node)
3240

    
3241
    if instance.name not in node_insts:
3242
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3243

    
3244
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3245

    
3246
    hyper = hypervisor.GetHypervisor()
3247
    console_cmd = hyper.GetShellCommandForConsole(instance)
3248
    # build ssh cmdline
3249
    argv = ["ssh", "-q", "-t"]
3250
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3251
    argv.extend(ssh.BATCH_MODE_OPTS)
3252
    argv.append(node)
3253
    argv.append(console_cmd)
3254
    return "ssh", argv
3255

    
3256

    
3257
class LUAddMDDRBDComponent(LogicalUnit):
3258
  """Adda new mirror member to an instance's disk.
3259

3260
  """
3261
  HPATH = "mirror-add"
3262
  HTYPE = constants.HTYPE_INSTANCE
3263
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3264

    
3265
  def BuildHooksEnv(self):
3266
    """Build hooks env.
3267

3268
    This runs on the master, the primary and all the secondaries.
3269

3270
    """
3271
    env = {
3272
      "NEW_SECONDARY": self.op.remote_node,
3273
      "DISK_NAME": self.op.disk_name,
3274
      }
3275
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3276
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3277
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3278
    return env, nl, nl
3279

    
3280
  def CheckPrereq(self):
3281
    """Check prerequisites.
3282

3283
    This checks that the instance is in the cluster.
3284

3285
    """
3286
    instance = self.cfg.GetInstanceInfo(
3287
      self.cfg.ExpandInstanceName(self.op.instance_name))
3288
    if instance is None:
3289
      raise errors.OpPrereqError("Instance '%s' not known" %
3290
                                 self.op.instance_name)
3291
    self.instance = instance
3292

    
3293
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3294
    if remote_node is None:
3295
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3296
    self.remote_node = remote_node
3297

    
3298
    if remote_node == instance.primary_node:
3299
      raise errors.OpPrereqError("The specified node is the primary node of"
3300
                                 " the instance.")
3301

    
3302
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3303
      raise errors.OpPrereqError("Instance's disk layout is not"
3304
                                 " remote_raid1.")
3305
    for disk in instance.disks:
3306
      if disk.iv_name == self.op.disk_name:
3307
        break
3308
    else:
3309
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3310
                                 " instance." % self.op.disk_name)
3311
    if len(disk.children) > 1:
3312
      raise errors.OpPrereqError("The device already has two slave devices."
3313
                                 " This would create a 3-disk raid1 which we"
3314
                                 " don't allow.")
3315
    self.disk = disk
3316

    
3317
  def Exec(self, feedback_fn):
3318
    """Add the mirror component
3319

3320
    """
3321
    disk = self.disk
3322
    instance = self.instance
3323

    
3324
    remote_node = self.remote_node
3325
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3326
    names = _GenerateUniqueNames(self.cfg, lv_names)
3327
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3328
                                     remote_node, disk.size, names)
3329

    
3330
    logger.Info("adding new mirror component on secondary")
3331
    #HARDCODE
3332
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3333
                                      new_drbd, False,
3334
                                      _GetInstanceInfoText(instance)):
3335
      raise errors.OpExecError("Failed to create new component on secondary"
3336
                               " node %s" % remote_node)
3337

    
3338
    logger.Info("adding new mirror component on primary")
3339
    #HARDCODE
3340
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3341
                                    instance, new_drbd,
3342
                                    _GetInstanceInfoText(instance)):
3343
      # remove secondary dev
3344
      self.cfg.SetDiskID(new_drbd, remote_node)
3345
      rpc.call_blockdev_remove(remote_node, new_drbd)
3346
      raise errors.OpExecError("Failed to create volume on primary")
3347

    
3348
    # the device exists now
3349
    # call the primary node to add the mirror to md
3350
    logger.Info("adding new mirror component to md")
3351
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3352
                                         disk, [new_drbd]):
3353
      logger.Error("Can't add mirror compoment to md!")
3354
      self.cfg.SetDiskID(new_drbd, remote_node)
3355
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3356
        logger.Error("Can't rollback on secondary")
3357
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3358
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3359
        logger.Error("Can't rollback on primary")
3360
      raise errors.OpExecError("Can't add mirror component to md array")
3361

    
3362
    disk.children.append(new_drbd)
3363

    
3364
    self.cfg.AddInstance(instance)
3365

    
3366
    _WaitForSync(self.cfg, instance, self.proc)
3367

    
3368
    return 0
3369

    
3370

    
3371
class LURemoveMDDRBDComponent(LogicalUnit):
3372
  """Remove a component from a remote_raid1 disk.
3373

3374
  """
3375
  HPATH = "mirror-remove"
3376
  HTYPE = constants.HTYPE_INSTANCE
3377
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3378

    
3379
  def BuildHooksEnv(self):
3380
    """Build hooks env.
3381

3382
    This runs on the master, the primary and all the secondaries.
3383

3384
    """
3385
    env = {
3386
      "DISK_NAME": self.op.disk_name,
3387
      "DISK_ID": self.op.disk_id,
3388
      "OLD_SECONDARY": self.old_secondary,
3389
      }
3390
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3391
    nl = [self.sstore.GetMasterNode(),
3392
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3393
    return env, nl, nl
3394

    
3395
  def CheckPrereq(self):
3396
    """Check prerequisites.
3397

3398
    This checks that the instance is in the cluster.
3399

3400
    """
3401
    instance = self.cfg.GetInstanceInfo(
3402
      self.cfg.ExpandInstanceName(self.op.instance_name))
3403
    if instance is None:
3404
      raise errors.OpPrereqError("Instance '%s' not known" %
3405
                                 self.op.instance_name)
3406
    self.instance = instance
3407

    
3408
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3409
      raise errors.OpPrereqError("Instance's disk layout is not"
3410
                                 " remote_raid1.")
3411
    for disk in instance.disks:
3412
      if disk.iv_name == self.op.disk_name:
3413
        break
3414
    else:
3415
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3416
                                 " instance." % self.op.disk_name)
3417
    for child in disk.children:
3418
      if (child.dev_type == constants.LD_DRBD7 and
3419
          child.logical_id[2] == self.op.disk_id):
3420
        break
3421
    else:
3422
      raise errors.OpPrereqError("Can't find the device with this port.")
3423

    
3424
    if len(disk.children) < 2:
3425
      raise errors.OpPrereqError("Cannot remove the last component from"
3426
                                 " a mirror.")
3427
    self.disk = disk
3428
    self.child = child
3429
    if self.child.logical_id[0] == instance.primary_node:
3430
      oid = 1
3431
    else:
3432
      oid = 0
3433
    self.old_secondary = self.child.logical_id[oid]
3434

    
3435
  def Exec(self, feedback_fn):
3436
    """Remove the mirror component
3437

3438
    """
3439
    instance = self.instance
3440
    disk = self.disk
3441
    child = self.child
3442
    logger.Info("remove mirror component")
3443
    self.cfg.SetDiskID(disk, instance.primary_node)
3444
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3445
                                            disk, [child]):
3446
      raise errors.OpExecError("Can't remove child from mirror.")
3447

    
3448
    for node in child.logical_id[:2]:
3449
      self.cfg.SetDiskID(child, node)
3450
      if not rpc.call_blockdev_remove(node, child):
3451
        logger.Error("Warning: failed to remove device from node %s,"
3452
                     " continuing operation." % node)
3453

    
3454
    disk.children.remove(child)
3455
    self.cfg.AddInstance(instance)
3456

    
3457

    
3458
class LUReplaceDisks(LogicalUnit):
3459
  """Replace the disks of an instance.
3460

3461
  """
3462
  HPATH = "mirrors-replace"
3463
  HTYPE = constants.HTYPE_INSTANCE
3464
  _OP_REQP = ["instance_name", "mode", "disks"]
3465

    
3466
  def BuildHooksEnv(self):
3467
    """Build hooks env.
3468

3469
    This runs on the master, the primary and all the secondaries.
3470

3471
    """
3472
    env = {
3473
      "MODE": self.op.mode,
3474
      "NEW_SECONDARY": self.op.remote_node,
3475
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3476
      }
3477
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3478
    nl = [
3479
      self.sstore.GetMasterNode(),
3480
      self.instance.primary_node,
3481
      ]
3482
    if self.op.remote_node is not None:
3483
      nl.append(self.op.remote_node)
3484
    return env, nl, nl
3485

    
3486
  def CheckPrereq(self):
3487
    """Check prerequisites.
3488

3489
    This checks that the instance is in the cluster.
3490

3491
    """
3492
    instance = self.cfg.GetInstanceInfo(
3493
      self.cfg.ExpandInstanceName(self.op.instance_name))
3494
    if instance is None:
3495
      raise errors.OpPrereqError("Instance '%s' not known" %
3496
                                 self.op.instance_name)
3497
    self.instance = instance
3498
    self.op.instance_name = instance.name
3499

    
3500
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3501
      raise errors.OpPrereqError("Instance's disk layout is not"
3502
                                 " network mirrored.")
3503

    
3504
    if len(instance.secondary_nodes) != 1:
3505
      raise errors.OpPrereqError("The instance has a strange layout,"
3506
                                 " expected one secondary but found %d" %
3507
                                 len(instance.secondary_nodes))
3508

    
3509
    self.sec_node = instance.secondary_nodes[0]
3510

    
3511
    remote_node = getattr(self.op, "remote_node", None)
3512
    if remote_node is not None:
3513
      remote_node = self.cfg.ExpandNodeName(remote_node)
3514
      if remote_node is None:
3515
        raise errors.OpPrereqError("Node '%s' not known" %
3516
                                   self.op.remote_node)
3517
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3518
    else:
3519
      self.remote_node_info = None
3520
    if remote_node == instance.primary_node:
3521
      raise errors.OpPrereqError("The specified node is the primary node of"
3522
                                 " the instance.")
3523
    elif remote_node == self.sec_node:
3524
      if self.op.mode == constants.REPLACE_DISK_SEC:
3525
        # this is for DRBD8, where we can't execute the same mode of
3526
        # replacement as for drbd7 (no different port allocated)
3527
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3528
                                   " replacement")
3529
      # the user gave the current secondary, switch to
3530
      # 'no-replace-secondary' mode for drbd7
3531
      remote_node = None
3532
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3533
        self.op.mode != constants.REPLACE_DISK_ALL):
3534
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3535
                                 " disks replacement, not individual ones")
3536
    if instance.disk_template == constants.DT_DRBD8:
3537
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3538
          remote_node is not None):
3539
        # switch to replace secondary mode
3540
        self.op.mode = constants.REPLACE_DISK_SEC
3541

    
3542
      if self.op.mode == constants.REPLACE_DISK_ALL:
3543
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3544
                                   " secondary disk replacement, not"
3545
                                   " both at once")
3546
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3547
        if remote_node is not None:
3548
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3549
                                     " the secondary while doing a primary"
3550
                                     " node disk replacement")
3551
        self.tgt_node = instance.primary_node
3552
        self.oth_node = instance.secondary_nodes[0]
3553
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3554
        self.new_node = remote_node # this can be None, in which case
3555
                                    # we don't change the secondary
3556
        self.tgt_node = instance.secondary_nodes[0]
3557
        self.oth_node = instance.primary_node
3558
      else:
3559
        raise errors.ProgrammerError("Unhandled disk replace mode")
3560

    
3561
    for name in self.op.disks:
3562
      if instance.FindDisk(name) is None:
3563
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3564
                                   (name, instance.name))
3565
    self.op.remote_node = remote_node
3566

    
3567
  def _ExecRR1(self, feedback_fn):
3568
    """Replace the disks of an instance.
3569

3570
    """
3571
    instance = self.instance
3572
    iv_names = {}
3573
    # start of work
3574
    if self.op.remote_node is None:
3575
      remote_node = self.sec_node
3576
    else:
3577
      remote_node = self.op.remote_node
3578
    cfg = self.cfg
3579
    for dev in instance.disks:
3580
      size = dev.size
3581
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3582
      names = _GenerateUniqueNames(cfg, lv_names)
3583
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3584
                                       remote_node, size, names)
3585
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3586
      logger.Info("adding new mirror component on secondary for %s" %
3587
                  dev.iv_name)
3588
      #HARDCODE
3589
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3590
                                        new_drbd, False,
3591
                                        _GetInstanceInfoText(instance)):
3592
        raise errors.OpExecError("Failed to create new component on secondary"
3593
                                 " node %s. Full abort, cleanup manually!" %
3594
                                 remote_node)
3595

    
3596
      logger.Info("adding new mirror component on primary")
3597
      #HARDCODE
3598
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3599
                                      instance, new_drbd,
3600
                                      _GetInstanceInfoText(instance)):
3601
        # remove secondary dev
3602
        cfg.SetDiskID(new_drbd, remote_node)
3603
        rpc.call_blockdev_remove(remote_node, new_drbd)
3604
        raise errors.OpExecError("Failed to create volume on primary!"
3605
                                 " Full abort, cleanup manually!!")
3606

    
3607
      # the device exists now
3608
      # call the primary node to add the mirror to md
3609
      logger.Info("adding new mirror component to md")
3610
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3611
                                           [new_drbd]):
3612
        logger.Error("Can't add mirror compoment to md!")
3613
        cfg.SetDiskID(new_drbd, remote_node)
3614
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3615
          logger.Error("Can't rollback on secondary")
3616
        cfg.SetDiskID(new_drbd, instance.primary_node)
3617
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3618
          logger.Error("Can't rollback on primary")
3619
        raise errors.OpExecError("Full abort, cleanup manually!!")
3620

    
3621
      dev.children.append(new_drbd)
3622
      cfg.AddInstance(instance)
3623

    
3624
    # this can fail as the old devices are degraded and _WaitForSync
3625
    # does a combined result over all disks, so we don't check its
3626
    # return value
3627
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3628

    
3629
    # so check manually all the devices
3630
    for name in iv_names:
3631
      dev, child, new_drbd = iv_names[name]
3632
      cfg.SetDiskID(dev, instance.primary_node)
3633
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3634
      if is_degr:
3635
        raise errors.OpExecError("MD device %s is degraded!" % name)
3636
      cfg.SetDiskID(new_drbd, instance.primary_node)
3637
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3638
      if is_degr:
3639
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3640

    
3641
    for name in iv_names:
3642
      dev, child, new_drbd = iv_names[name]
3643
      logger.Info("remove mirror %s component" % name)
3644
      cfg.SetDiskID(dev, instance.primary_node)
3645
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3646
                                              dev, [child]):
3647
        logger.Error("Can't remove child from mirror, aborting"
3648
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3649
        continue
3650

    
3651
      for node in child.logical_id[:2]:
3652
        logger.Info("remove child device on %s" % node)
3653
        cfg.SetDiskID(child, node)
3654
        if not rpc.call_blockdev_remove(node, child):
3655
          logger.Error("Warning: failed to remove device from node %s,"
3656
                       " continuing operation." % node)
3657

    
3658
      dev.children.remove(child)
3659

    
3660
      cfg.AddInstance(instance)
3661

    
3662
  def _ExecD8DiskOnly(self, feedback_fn):
3663
    """Replace a disk on the primary or secondary for dbrd8.
3664

3665
    The algorithm for replace is quite complicated:
3666
      - for each disk to be replaced:
3667
        - create new LVs on the target node with unique names
3668
        - detach old LVs from the drbd device
3669
        - rename old LVs to name_replaced.<time_t>
3670
        - rename new LVs to old LVs
3671
        - attach the new LVs (with the old names now) to the drbd device
3672
      - wait for sync across all devices
3673
      - for each modified disk:
3674
        - remove old LVs (which have the name name_replaces.<time_t>)
3675

3676
    Failures are not very well handled.
3677

3678
    """
3679
    steps_total = 6
3680
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3681
    instance = self.instance
3682
    iv_names = {}
3683
    vgname = self.cfg.GetVGName()
3684
    # start of work
3685
    cfg = self.cfg
3686
    tgt_node = self.tgt_node
3687
    oth_node = self.oth_node
3688

    
3689
    # Step: check device activation
3690
    self.proc.LogStep(1, steps_total, "check device existence")
3691
    info("checking volume groups")
3692
    my_vg = cfg.GetVGName()
3693
    results = rpc.call_vg_list([oth_node, tgt_node])
3694
    if not results:
3695
      raise errors.OpExecError("Can't list volume groups on the nodes")
3696
    for node in oth_node, tgt_node:
3697
      res = results.get(node, False)
3698
      if not res or my_vg not in res:
3699
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3700
                                 (my_vg, node))
3701
    for dev in instance.disks:
3702
      if not dev.iv_name in self.op.disks:
3703
        continue
3704
      for node in tgt_node, oth_node:
3705
        info("checking %s on %s" % (dev.iv_name, node))
3706
        cfg.SetDiskID(dev, node)
3707
        if not rpc.call_blockdev_find(node, dev):
3708
          raise errors.OpExecError("Can't find device %s on node %s" %
3709
                                   (dev.iv_name, node))
3710

    
3711
    # Step: check other node consistency
3712
    self.proc.LogStep(2, steps_total, "check peer consistency")
3713
    for dev in instance.disks:
3714
      if not dev.iv_name in self.op.disks:
3715
        continue
3716
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3717
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3718
                                   oth_node==instance.primary_node):
3719
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3720
                                 " to replace disks on this node (%s)" %
3721
                                 (oth_node, tgt_node))
3722

    
3723
    # Step: create new storage
3724
    self.proc.LogStep(3, steps_total, "allocate new storage")
3725
    for dev in instance.disks:
3726
      if not dev.iv_name in self.op.disks:
3727
        continue
3728
      size = dev.size
3729
      cfg.SetDiskID(dev, tgt_node)
3730
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3731
      names = _GenerateUniqueNames(cfg, lv_names)
3732
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3733
                             logical_id=(vgname, names[0]))
3734
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3735
                             logical_id=(vgname, names[1]))
3736
      new_lvs = [lv_data, lv_meta]
3737
      old_lvs = dev.children
3738
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3739
      info("creating new local storage on %s for %s" %
3740
           (tgt_node, dev.iv_name))
3741
      # since we *always* want to create this LV, we use the
3742
      # _Create...OnPrimary (which forces the creation), even if we
3743
      # are talking about the secondary node
3744
      for new_lv in new_lvs:
3745
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3746
                                        _GetInstanceInfoText(instance)):
3747
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3748
                                   " node '%s'" %
3749
                                   (new_lv.logical_id[1], tgt_node))
3750

    
3751
    # Step: for each lv, detach+rename*2+attach
3752
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3753
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3754
      info("detaching %s drbd from local storage" % dev.iv_name)
3755
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3756
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3757
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3758
      #dev.children = []
3759
      #cfg.Update(instance)
3760

    
3761
      # ok, we created the new LVs, so now we know we have the needed
3762
      # storage; as such, we proceed on the target node to rename
3763
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3764
      # using the assumption that logical_id == physical_id (which in
3765
      # turn is the unique_id on that node)
3766

    
3767
      # FIXME(iustin): use a better name for the replaced LVs
3768
      temp_suffix = int(time.time())
3769
      ren_fn = lambda d, suff: (d.physical_id[0],
3770
                                d.physical_id[1] + "_replaced-%s" % suff)
3771
      # build the rename list based on what LVs exist on the node
3772
      rlist = []
3773
      for to_ren in old_lvs:
3774
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3775
        if find_res is not None: # device exists
3776
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3777

    
3778
      info("renaming the old LVs on the target node")
3779
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3780
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3781
      # now we rename the new LVs to the old LVs
3782
      info("renaming the new LVs on the target node")
3783
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3784
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3785
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3786

    
3787
      for old, new in zip(old_lvs, new_lvs):
3788
        new.logical_id = old.logical_id
3789
        cfg.SetDiskID(new, tgt_node)
3790

    
3791
      for disk in old_lvs:
3792
        disk.logical_id = ren_fn(disk, temp_suffix)
3793
        cfg.SetDiskID(disk, tgt_node)
3794

    
3795
      # now that the new lvs have the old name, we can add them to the device
3796
      info("adding new mirror component on %s" % tgt_node)
3797
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3798
        for new_lv in new_lvs:
3799
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3800
            warning("Can't rollback device %s", hint="manually cleanup unused"
3801
                    " logical volumes")
3802
        raise errors.OpExecError("Can't add local storage to drbd")
3803

    
3804
      dev.children = new_lvs
3805
      cfg.Update(instance)
3806

    
3807
    # Step: wait for sync
3808

    
3809
    # this can fail as the old devices are degraded and _WaitForSync
3810
    # does a combined result over all disks, so we don't check its
3811
    # return value
3812
    self.proc.LogStep(5, steps_total, "sync devices")
3813
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3814

    
3815
    # so check manually all the devices
3816
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3817
      cfg.SetDiskID(dev, instance.primary_node)
3818
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3819
      if is_degr:
3820
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3821

    
3822
    # Step: remove old storage
3823
    self.proc.LogStep(6, steps_total, "removing old storage")
3824
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3825
      info("remove logical volumes for %s" % name)
3826
      for lv in old_lvs:
3827
        cfg.SetDiskID(lv, tgt_node)
3828
        if not rpc.call_blockdev_remove(tgt_node, lv):
3829
          warning("Can't remove old LV", hint="manually remove unused LVs")
3830
          continue
3831

    
3832
  def _ExecD8Secondary(self, feedback_fn):
3833
    """Replace the secondary node for drbd8.
3834

3835
    The algorithm for replace is quite complicated:
3836
      - for all disks of the instance:
3837
        - create new LVs on the new node with same names
3838
        - shutdown the drbd device on the old secondary
3839
        - disconnect the drbd network on the primary
3840
        - create the drbd device on the new secondary
3841
        - network attach the drbd on the primary, using an artifice:
3842
          the drbd code for Attach() will connect to the network if it
3843
          finds a device which is connected to the good local disks but
3844
          not network enabled
3845
      - wait for sync across all devices
3846
      - remove all disks from the old secondary
3847

3848
    Failures are not very well handled.
3849

3850
    """
3851
    steps_total = 6
3852
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3853
    instance = self.instance
3854
    iv_names = {}
3855
    vgname = self.cfg.GetVGName()
3856
    # start of work
3857
    cfg = self.cfg
3858
    old_node = self.tgt_node
3859
    new_node = self.new_node
3860
    pri_node = instance.primary_node
3861

    
3862
    # Step: check device activation
3863
    self.proc.LogStep(1, steps_total, "check device existence")
3864
    info("checking volume groups")
3865
    my_vg = cfg.GetVGName()
3866
    results = rpc.call_vg_list([pri_node, new_node])
3867
    if not results:
3868
      raise errors.OpExecError("Can't list volume groups on the nodes")
3869
    for node in pri_node, new_node:
3870
      res = results.get(node, False)
3871
      if not res or my_vg not in res:
3872
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3873
                                 (my_vg, node))
3874
    for dev in instance.disks:
3875
      if not dev.iv_name in self.op.disks:
3876
        continue
3877
      info("checking %s on %s" % (dev.iv_name, pri_node))
3878
      cfg.SetDiskID(dev, pri_node)
3879
      if not rpc.call_blockdev_find(pri_node, dev):
3880
        raise errors.OpExecError("Can't find device %s on node %s" %
3881
                                 (dev.iv_name, pri_node))
3882

    
3883
    # Step: check other node consistency
3884
    self.proc.LogStep(2, steps_total, "check peer consistency")
3885
    for dev in instance.disks:
3886
      if not dev.iv_name in self.op.disks:
3887
        continue
3888
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3889
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3890
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3891
                                 " unsafe to replace the secondary" %
3892
                                 pri_node)
3893

    
3894
    # Step: create new storage
3895
    self.proc.LogStep(3, steps_total, "allocate new storage")
3896
    for dev in instance.disks:
3897
      size = dev.size
3898
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3899
      # since we *always* want to create this LV, we use the
3900
      # _Create...OnPrimary (which forces the creation), even if we
3901
      # are talking about the secondary node
3902
      for new_lv in dev.children:
3903
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3904
                                        _GetInstanceInfoText(instance)):
3905
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3906
                                   " node '%s'" %
3907
                                   (new_lv.logical_id[1], new_node))
3908

    
3909
      iv_names[dev.iv_name] = (dev, dev.children)
3910

    
3911
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3912
    for dev in instance.disks:
3913
      size = dev.size
3914
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3915
      # create new devices on new_node
3916
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3917
                              logical_id=(pri_node, new_node,
3918
                                          dev.logical_id[2]),
3919
                              children=dev.children)
3920
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3921
                                        new_drbd, False,
3922
                                      _GetInstanceInfoText(instance)):
3923
        raise errors.OpExecError("Failed to create new DRBD on"
3924
                                 " node '%s'" % new_node)
3925

    
3926
    for dev in instance.disks:
3927
      # we have new devices, shutdown the drbd on the old secondary
3928
      info("shutting down drbd for %s on old node" % dev.iv_name)
3929
      cfg.SetDiskID(dev, old_node)
3930
      if not rpc.call_blockdev_shutdown(old_node, dev):
3931
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3932
                hint="Please cleanup this device manually as soon as possible")
3933

    
3934
    info("detaching primary drbds from the network (=> standalone)")
3935
    done = 0
3936
    for dev in instance.disks:
3937
      cfg.SetDiskID(dev, pri_node)
3938
      # set the physical (unique in bdev terms) id to None, meaning
3939
      # detach from network
3940
      dev.physical_id = (None,) * len(dev.physical_id)
3941
      # and 'find' the device, which will 'fix' it to match the
3942
      # standalone state
3943
      if rpc.call_blockdev_find(pri_node, dev):
3944
        done += 1
3945
      else:
3946
        warning("Failed to detach drbd %s from network, unusual case" %
3947
                dev.iv_name)
3948

    
3949
    if not done:
3950
      # no detaches succeeded (very unlikely)
3951
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3952

    
3953
    # if we managed to detach at least one, we update all the disks of
3954
    # the instance to point to the new secondary
3955
    info("updating instance configuration")
3956
    for dev in instance.disks:
3957
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3958
      cfg.SetDiskID(dev, pri_node)
3959
    cfg.Update(instance)
3960

    
3961
    # and now perform the drbd attach
3962
    info("attaching primary drbds to new secondary (standalone => connected)")
3963
    failures = []
3964
    for dev in instance.disks:
3965
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3966
      # since the attach is smart, it's enough to 'find' the device,
3967
      # it will automatically activate the network, if the physical_id
3968
      # is correct
3969
      cfg.SetDiskID(dev, pri_node)
3970
      if not rpc.call_blockdev_find(pri_node, dev):
3971
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3972
                "please do a gnt-instance info to see the status of disks")
3973

    
3974
    # this can fail as the old devices are degraded and _WaitForSync
3975
    # does a combined result over all disks, so we don't check its
3976
    # return value
3977
    self.proc.LogStep(5, steps_total, "sync devices")
3978
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3979

    
3980
    # so check manually all the devices
3981
    for name, (dev, old_lvs) in iv_names.iteritems():
3982
      cfg.SetDiskID(dev, pri_node)
3983
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3984
      if is_degr:
3985
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3986

    
3987
    self.proc.LogStep(6, steps_total, "removing old storage")
3988
    for name, (dev, old_lvs) in iv_names.iteritems():
3989
      info("remove logical volumes for %s" % name)
3990
      for lv in old_lvs:
3991
        cfg.SetDiskID(lv, old_node)
3992
        if not rpc.call_blockdev_remove(old_node, lv):
3993
          warning("Can't remove LV on old secondary",
3994
                  hint="Cleanup stale volumes by hand")
3995

    
3996
  def Exec(self, feedback_fn):
3997
    """Execute disk replacement.
3998

3999
    This dispatches the disk replacement to the appropriate handler.
4000

4001
    """
4002
    instance = self.instance
4003
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4004
      fn = self._ExecRR1
4005
    elif instance.disk_template == constants.DT_DRBD8:
4006
      if self.op.remote_node is None:
4007
        fn = self._ExecD8DiskOnly
4008
      else:
4009
        fn = self._ExecD8Secondary
4010
    else:
4011
      raise errors.ProgrammerError("Unhandled disk replacement case")
4012
    return fn(feedback_fn)
4013

    
4014

    
4015
class LUQueryInstanceData(NoHooksLU):
4016
  """Query runtime instance data.
4017

4018
  """
4019
  _OP_REQP = ["instances"]
4020

    
4021
  def CheckPrereq(self):
4022
    """Check prerequisites.
4023

4024
    This only checks the optional instance list against the existing names.
4025

4026
    """
4027
    if not isinstance(self.op.instances, list):
4028
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4029
    if self.op.instances:
4030
      self.wanted_instances = []
4031
      names = self.op.instances
4032
      for name in names:
4033
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4034
        if instance is None:
4035
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4036
        self.wanted_instances.append(instance)
4037
    else:
4038
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4039
                               in self.cfg.GetInstanceList()]
4040
    return
4041

    
4042

    
4043
  def _ComputeDiskStatus(self, instance, snode, dev):
4044
    """Compute block device status.
4045

4046
    """
4047
    self.cfg.SetDiskID(dev, instance.primary_node)
4048
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4049
    if dev.dev_type in constants.LDS_DRBD:
4050
      # we change the snode then (otherwise we use the one passed in)
4051
      if dev.logical_id[0] == instance.primary_node:
4052
        snode = dev.logical_id[1]
4053
      else:
4054
        snode = dev.logical_id[0]
4055

    
4056
    if snode:
4057
      self.cfg.SetDiskID(dev, snode)
4058
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4059
    else:
4060
      dev_sstatus = None
4061

    
4062
    if dev.children:
4063
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4064
                      for child in dev.children]
4065
    else:
4066
      dev_children = []
4067

    
4068
    data = {
4069
      "iv_name": dev.iv_name,
4070
      "dev_type": dev.dev_type,
4071
      "logical_id": dev.logical_id,
4072
      "physical_id": dev.physical_id,
4073
      "pstatus": dev_pstatus,
4074
      "sstatus": dev_sstatus,
4075
      "children": dev_children,
4076
      }
4077

    
4078
    return data
4079

    
4080
  def Exec(self, feedback_fn):
4081
    """Gather and return data"""
4082
    result = {}
4083
    for instance in self.wanted_instances:
4084
      remote_info = rpc.call_instance_info(instance.primary_node,
4085
                                                instance.name)
4086
      if remote_info and "state" in remote_info:
4087
        remote_state = "up"
4088
      else:
4089
        remote_state = "down"
4090
      if instance.status == "down":
4091
        config_state = "down"
4092
      else:
4093
        config_state = "up"
4094

    
4095
      disks = [self._ComputeDiskStatus(instance, None, device)
4096
               for device in instance.disks]
4097

    
4098
      idict = {
4099
        "name": instance.name,
4100
        "config_state": config_state,
4101
        "run_state": remote_state,
4102
        "pnode": instance.primary_node,
4103
        "snodes": instance.secondary_nodes,
4104
        "os": instance.os,
4105
        "memory": instance.memory,
4106
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4107
        "disks": disks,
4108
        "network_port": instance.network_port,
4109
        "vcpus": instance.vcpus,
4110
        "kernel_path": instance.kernel_path,
4111
        "initrd_path": instance.initrd_path,
4112
        "hvm_boot_order": instance.hvm_boot_order,
4113
        }
4114

    
4115
      result[instance.name] = idict
4116

    
4117
    return result
4118

    
4119

    
4120
class LUSetInstanceParms(LogicalUnit):
4121
  """Modifies an instances's parameters.
4122

4123
  """
4124
  HPATH = "instance-modify"
4125
  HTYPE = constants.HTYPE_INSTANCE
4126
  _OP_REQP = ["instance_name"]
4127

    
4128
  def BuildHooksEnv(self):
4129
    """Build hooks env.
4130

4131
    This runs on the master, primary and secondaries.
4132

4133
    """
4134
    args = dict()
4135
    if self.mem:
4136
      args['memory'] = self.mem
4137
    if self.vcpus:
4138
      args['vcpus'] = self.vcpus
4139
    if self.do_ip or self.do_bridge or self.mac:
4140
      if self.do_ip:
4141
        ip = self.ip
4142
      else:
4143
        ip = self.instance.nics[0].ip
4144
      if self.bridge:
4145
        bridge = self.bridge
4146
      else:
4147
        bridge = self.instance.nics[0].bridge
4148
      if self.mac:
4149
        mac = self.mac
4150
      else:
4151
        mac = self.instance.nics[0].mac
4152
      args['nics'] = [(ip, bridge, mac)]
4153
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4154
    nl = [self.sstore.GetMasterNode(),
4155
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4156
    return env, nl, nl
4157

    
4158
  def CheckPrereq(self):
4159
    """Check prerequisites.
4160

4161
    This only checks the instance list against the existing names.
4162

4163
    """
4164
    self.mem = getattr(self.op, "mem", None)
4165
    self.vcpus = getattr(self.op, "vcpus", None)
4166
    self.ip = getattr(self.op, "ip", None)
4167
    self.mac = getattr(self.op, "mac", None)
4168
    self.bridge = getattr(self.op, "bridge", None)
4169
    self.kernel_path = getattr(self.op, "kernel_path", None)
4170
    self.initrd_path = getattr(self.op, "initrd_path", None)
4171
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4172
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4173
                 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4174
    if all_parms.count(None) == len(all_parms):
4175
      raise errors.OpPrereqError("No changes submitted")
4176
    if self.mem is not None:
4177
      try:
4178
        self.mem = int(self.mem)
4179
      except ValueError, err:
4180
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4181
    if self.vcpus is not None:
4182
      try:
4183
        self.vcpus = int(self.vcpus)
4184
      except ValueError, err:
4185
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4186
    if self.ip is not None:
4187
      self.do_ip = True
4188
      if self.ip.lower() == "none":
4189
        self.ip = None
4190
      else:
4191
        if not utils.IsValidIP(self.ip):
4192
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4193
    else:
4194
      self.do_ip = False
4195
    self.do_bridge = (self.bridge is not None)
4196
    if self.mac is not None:
4197
      if self.cfg.IsMacInUse(self.mac):
4198
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4199
                                   self.mac)
4200
      if not utils.IsValidMac(self.mac):
4201
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4202

    
4203
    if self.kernel_path is not None:
4204
      self.do_kernel_path = True
4205
      if self.kernel_path == constants.VALUE_NONE:
4206
        raise errors.OpPrereqError("Can't set instance to no kernel")
4207

    
4208
      if self.kernel_path != constants.VALUE_DEFAULT:
4209
        if not os.path.isabs(self.kernel_path):
4210
          raise errors.OpPrereqError("The kernel path must be an absolute"
4211
                                    " filename")
4212
    else:
4213
      self.do_kernel_path = False
4214

    
4215
    if self.initrd_path is not None:
4216
      self.do_initrd_path = True
4217
      if self.initrd_path not in (constants.VALUE_NONE,
4218
                                  constants.VALUE_DEFAULT):
4219
        if not os.path.isabs(self.initrd_path):
4220
          raise errors.OpPrereqError("The initrd path must be an absolute"
4221
                                    " filename")
4222
    else:
4223
      self.do_initrd_path = False
4224

    
4225
    # boot order verification
4226
    if self.hvm_boot_order is not None:
4227
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4228
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4229
          raise errors.OpPrereqError("invalid boot order specified,"
4230
                                     " must be one or more of [acdn]"
4231
                                     " or 'default'")
4232

    
4233
    instance = self.cfg.GetInstanceInfo(
4234
      self.cfg.ExpandInstanceName(self.op.instance_name))
4235
    if instance is None:
4236
      raise errors.OpPrereqError("No such instance name '%s'" %
4237
                                 self.op.instance_name)
4238
    self.op.instance_name = instance.name
4239
    self.instance = instance
4240
    return
4241

    
4242
  def Exec(self, feedback_fn):
4243
    """Modifies an instance.
4244

4245
    All parameters take effect only at the next restart of the instance.
4246
    """
4247
    result = []
4248
    instance = self.instance
4249
    if self.mem:
4250
      instance.memory = self.mem
4251
      result.append(("mem", self.mem))
4252
    if self.vcpus:
4253
      instance.vcpus = self.vcpus
4254
      result.append(("vcpus",  self.vcpus))
4255
    if self.do_ip:
4256
      instance.nics[0].ip = self.ip
4257
      result.append(("ip", self.ip))
4258
    if self.bridge:
4259
      instance.nics[0].bridge = self.bridge
4260
      result.append(("bridge", self.bridge))
4261
    if self.mac:
4262
      instance.nics[0].mac = self.mac
4263
      result.append(("mac", self.mac))
4264
    if self.do_kernel_path:
4265
      instance.kernel_path = self.kernel_path
4266
      result.append(("kernel_path", self.kernel_path))
4267
    if self.do_initrd_path:
4268
      instance.initrd_path = self.initrd_path
4269
      result.append(("initrd_path", self.initrd_path))
4270
    if self.hvm_boot_order:
4271
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4272
        instance.hvm_boot_order = None
4273
      else:
4274
        instance.hvm_boot_order = self.hvm_boot_order
4275
      result.append(("hvm_boot_order", self.hvm_boot_order))
4276

    
4277
    self.cfg.AddInstance(instance)
4278

    
4279
    return result
4280

    
4281

    
4282
class LUQueryExports(NoHooksLU):
4283
  """Query the exports list
4284

4285
  """
4286
  _OP_REQP = []
4287

    
4288
  def CheckPrereq(self):
4289
    """Check that the nodelist contains only existing nodes.
4290

4291
    """
4292
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4293

    
4294
  def Exec(self, feedback_fn):
4295
    """Compute the list of all the exported system images.
4296

4297
    Returns:
4298
      a dictionary with the structure node->(export-list)
4299
      where export-list is a list of the instances exported on
4300
      that node.
4301

4302
    """
4303
    return rpc.call_export_list(self.nodes)
4304

    
4305

    
4306
class LUExportInstance(LogicalUnit):
4307
  """Export an instance to an image in the cluster.
4308

4309
  """
4310
  HPATH = "instance-export"
4311
  HTYPE = constants.HTYPE_INSTANCE
4312
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4313

    
4314
  def BuildHooksEnv(self):
4315
    """Build hooks env.
4316

4317
    This will run on the master, primary node and target node.
4318

4319
    """
4320
    env = {
4321
      "EXPORT_NODE": self.op.target_node,
4322
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4323
      }
4324
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4325
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4326
          self.op.target_node]
4327
    return env, nl, nl
4328

    
4329
  def CheckPrereq(self):
4330
    """Check prerequisites.
4331

4332
    This checks that the instance name is a valid one.
4333

4334
    """
4335
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4336
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4337
    if self.instance is None:
4338
      raise errors.OpPrereqError("Instance '%s' not found" %
4339
                                 self.op.instance_name)
4340

    
4341
    # node verification
4342
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4343
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4344

    
4345
    if self.dst_node is None:
4346
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4347
                                 self.op.target_node)
4348
    self.op.target_node = self.dst_node.name
4349

    
4350
  def Exec(self, feedback_fn):
4351
    """Export an instance to an image in the cluster.
4352

4353
    """
4354
    instance = self.instance
4355
    dst_node = self.dst_node
4356
    src_node = instance.primary_node
4357
    # shutdown the instance, unless requested not to do so
4358
    if self.op.shutdown:
4359
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4360
      self.proc.ChainOpCode(op)
4361

    
4362
    vgname = self.cfg.GetVGName()
4363

    
4364
    snap_disks = []
4365

    
4366
    try:
4367
      for disk in instance.disks:
4368
        if disk.iv_name == "sda":
4369
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4370
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4371

    
4372
          if not new_dev_name:
4373
            logger.Error("could not snapshot block device %s on node %s" %
4374
                         (disk.logical_id[1], src_node))
4375
          else:
4376
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4377
                                      logical_id=(vgname, new_dev_name),
4378
                                      physical_id=(vgname, new_dev_name),
4379
                                      iv_name=disk.iv_name)
4380
            snap_disks.append(new_dev)
4381

    
4382
    finally:
4383
      if self.op.shutdown:
4384
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4385
                                       force=False)
4386
        self.proc.ChainOpCode(op)
4387

    
4388
    # TODO: check for size
4389

    
4390
    for dev in snap_disks:
4391
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4392
                                           instance):
4393
        logger.Error("could not export block device %s from node"
4394
                     " %s to node %s" %
4395
                     (dev.logical_id[1], src_node, dst_node.name))
4396
      if not rpc.call_blockdev_remove(src_node, dev):
4397
        logger.Error("could not remove snapshot block device %s from"
4398
                     " node %s" % (dev.logical_id[1], src_node))
4399

    
4400
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4401
      logger.Error("could not finalize export for instance %s on node %s" %
4402
                   (instance.name, dst_node.name))
4403

    
4404
    nodelist = self.cfg.GetNodeList()
4405
    nodelist.remove(dst_node.name)
4406

    
4407
    # on one-node clusters nodelist will be empty after the removal
4408
    # if we proceed the backup would be removed because OpQueryExports
4409
    # substitutes an empty list with the full cluster node list.
4410
    if nodelist:
4411
      op = opcodes.OpQueryExports(nodes=nodelist)
4412
      exportlist = self.proc.ChainOpCode(op)
4413
      for node in exportlist:
4414
        if instance.name in exportlist[node]:
4415
          if not rpc.call_export_remove(node, instance.name):
4416
            logger.Error("could not remove older export for instance %s"
4417
                         " on node %s" % (instance.name, node))
4418

    
4419

    
4420
class TagsLU(NoHooksLU):
4421
  """Generic tags LU.
4422

4423
  This is an abstract class which is the parent of all the other tags LUs.
4424

4425
  """
4426
  def CheckPrereq(self):
4427
    """Check prerequisites.
4428

4429
    """
4430
    if self.op.kind == constants.TAG_CLUSTER:
4431
      self.target = self.cfg.GetClusterInfo()
4432
    elif self.op.kind == constants.TAG_NODE:
4433
      name = self.cfg.ExpandNodeName(self.op.name)
4434
      if name is None:
4435
        raise errors.OpPrereqError("Invalid node name (%s)" %
4436
                                   (self.op.name,))
4437
      self.op.name = name
4438
      self.target = self.cfg.GetNodeInfo(name)
4439
    elif self.op.kind == constants.TAG_INSTANCE:
4440
      name = self.cfg.ExpandInstanceName(self.op.name)
4441
      if name is None:
4442
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4443
                                   (self.op.name,))
4444
      self.op.name = name
4445
      self.target = self.cfg.GetInstanceInfo(name)
4446
    else:
4447
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4448
                                 str(self.op.kind))
4449

    
4450

    
4451
class LUGetTags(TagsLU):
4452
  """Returns the tags of a given object.
4453

4454
  """
4455
  _OP_REQP = ["kind", "name"]
4456

    
4457
  def Exec(self, feedback_fn):
4458
    """Returns the tag list.
4459

4460
    """
4461
    return self.target.GetTags()
4462

    
4463

    
4464
class LUSearchTags(NoHooksLU):
4465
  """Searches the tags for a given pattern.
4466

4467
  """
4468
  _OP_REQP = ["pattern"]
4469

    
4470
  def CheckPrereq(self):
4471
    """Check prerequisites.
4472

4473
    This checks the pattern passed for validity by compiling it.
4474

4475
    """
4476
    try:
4477
      self.re = re.compile(self.op.pattern)
4478
    except re.error, err:
4479
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4480
                                 (self.op.pattern, err))
4481

    
4482
  def Exec(self, feedback_fn):
4483
    """Returns the tag list.
4484

4485
    """
4486
    cfg = self.cfg
4487
    tgts = [("/cluster", cfg.GetClusterInfo())]
4488
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4489
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4490
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4491
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4492
    results = []
4493
    for path, target in tgts:
4494
      for tag in target.GetTags():
4495
        if self.re.search(tag):
4496
          results.append((path, tag))
4497
    return results
4498

    
4499

    
4500
class LUAddTags(TagsLU):
4501
  """Sets a tag on a given object.
4502

4503
  """
4504
  _OP_REQP = ["kind", "name", "tags"]
4505

    
4506
  def CheckPrereq(self):
4507
    """Check prerequisites.
4508

4509
    This checks the type and length of the tag name and value.
4510

4511
    """
4512
    TagsLU.CheckPrereq(self)
4513
    for tag in self.op.tags:
4514
      objects.TaggableObject.ValidateTag(tag)
4515

    
4516
  def Exec(self, feedback_fn):
4517
    """Sets the tag.
4518

4519
    """
4520
    try:
4521
      for tag in self.op.tags:
4522
        self.target.AddTag(tag)
4523
    except errors.TagError, err:
4524
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4525
    try:
4526
      self.cfg.Update(self.target)
4527
    except errors.ConfigurationError:
4528
      raise errors.OpRetryError("There has been a modification to the"
4529
                                " config file and the operation has been"
4530
                                " aborted. Please retry.")
4531

    
4532

    
4533
class LUDelTags(TagsLU):
4534
  """Delete a list of tags from a given object.
4535

4536
  """
4537
  _OP_REQP = ["kind", "name", "tags"]
4538

    
4539
  def CheckPrereq(self):
4540
    """Check prerequisites.
4541

4542
    This checks that we have the given tag.
4543

4544
    """
4545
    TagsLU.CheckPrereq(self)
4546
    for tag in self.op.tags:
4547
      objects.TaggableObject.ValidateTag(tag)
4548
    del_tags = frozenset(self.op.tags)
4549
    cur_tags = self.target.GetTags()
4550
    if not del_tags <= cur_tags:
4551
      diff_tags = del_tags - cur_tags
4552
      diff_names = ["'%s'" % tag for tag in diff_tags]
4553
      diff_names.sort()
4554
      raise errors.OpPrereqError("Tag(s) %s not found" %
4555
                                 (",".join(diff_names)))
4556

    
4557
  def Exec(self, feedback_fn):
4558
    """Remove the tag from the object.
4559

4560
    """
4561
    for tag in self.op.tags:
4562
      self.target.RemoveTag(tag)
4563
    try:
4564
      self.cfg.Update(self.target)
4565
    except errors.ConfigurationError:
4566
      raise errors.OpRetryError("There has been a modification to the"
4567
                                " config file and the operation has been"
4568
                                " aborted. Please retry.")
4569

    
4570
class LUTestDelay(NoHooksLU):
4571
  """Sleep for a specified amount of time.
4572

4573
  This LU sleeps on the master and/or nodes for a specified amoutn of
4574
  time.
4575

4576
  """
4577
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4578

    
4579
  def CheckPrereq(self):
4580
    """Check prerequisites.
4581

4582
    This checks that we have a good list of nodes and/or the duration
4583
    is valid.
4584

4585
    """
4586

    
4587
    if self.op.on_nodes:
4588
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4589

    
4590
  def Exec(self, feedback_fn):
4591
    """Do the actual sleep.
4592

4593
    """
4594
    if self.op.on_master:
4595
      if not utils.TestDelay(self.op.duration):
4596
        raise errors.OpExecError("Error during master delay test")
4597
    if self.op.on_nodes:
4598
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4599
      if not result:
4600
        raise errors.OpExecError("Complete failure from rpc call")
4601
      for node, node_result in result.items():
4602
        if not node_result:
4603
          raise errors.OpExecError("Failure during rpc call to node %s,"
4604
                                   " result: %s" % (node, node_result))