Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 7c0d6283

History | View | Annotate | Download (153.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import ssconf
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement CheckPrereq which also fills in the opcode instance
52
      with all the fields (even if as None)
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements (REQ_CLUSTER,
57
      REQ_MASTER); note that all commands require root permissions
58

59
  """
60
  HPATH = None
61
  HTYPE = None
62
  _OP_REQP = []
63
  REQ_CLUSTER = True
64
  REQ_MASTER = True
65

    
66
  def __init__(self, processor, op, cfg, sstore):
67
    """Constructor for LogicalUnit.
68

69
    This needs to be overriden in derived classes in order to check op
70
    validity.
71

72
    """
73
    self.proc = processor
74
    self.op = op
75
    self.cfg = cfg
76
    self.sstore = sstore
77
    for attr_name in self._OP_REQP:
78
      attr_val = getattr(op, attr_name, None)
79
      if attr_val is None:
80
        raise errors.OpPrereqError("Required parameter '%s' missing" %
81
                                   attr_name)
82
    if self.REQ_CLUSTER:
83
      if not cfg.IsCluster():
84
        raise errors.OpPrereqError("Cluster not initialized yet,"
85
                                   " use 'gnt-cluster init' first.")
86
      if self.REQ_MASTER:
87
        master = sstore.GetMasterNode()
88
        if master != utils.HostInfo().name:
89
          raise errors.OpPrereqError("Commands must be run on the master"
90
                                     " node %s" % master)
91

    
92
  def CheckPrereq(self):
93
    """Check prerequisites for this LU.
94

95
    This method should check that the prerequisites for the execution
96
    of this LU are fulfilled. It can do internode communication, but
97
    it should be idempotent - no cluster or system changes are
98
    allowed.
99

100
    The method should raise errors.OpPrereqError in case something is
101
    not fulfilled. Its return value is ignored.
102

103
    This method should also update all the parameters of the opcode to
104
    their canonical form; e.g. a short node name must be fully
105
    expanded after this method has successfully completed (so that
106
    hooks, logging, etc. work correctly).
107

108
    """
109
    raise NotImplementedError
110

    
111
  def Exec(self, feedback_fn):
112
    """Execute the LU.
113

114
    This method should implement the actual work. It should raise
115
    errors.OpExecError for failures that are somewhat dealt with in
116
    code, or expected.
117

118
    """
119
    raise NotImplementedError
120

    
121
  def BuildHooksEnv(self):
122
    """Build hooks environment for this LU.
123

124
    This method should return a three-node tuple consisting of: a dict
125
    containing the environment that will be used for running the
126
    specific hook for this LU, a list of node names on which the hook
127
    should run before the execution, and a list of node names on which
128
    the hook should run after the execution.
129

130
    The keys of the dict must not have 'GANETI_' prefixed as this will
131
    be handled in the hooks runner. Also note additional keys will be
132
    added by the hooks runner. If the LU doesn't define any
133
    environment, an empty dict (and not None) should be returned.
134

135
    As for the node lists, the master should not be included in the
136
    them, as it will be added by the hooks runner in case this LU
137
    requires a cluster to run on (otherwise we don't have a node
138
    list). No nodes should be returned as an empty list (and not
139
    None).
140

141
    Note that if the HPATH for a LU class is None, this function will
142
    not be called.
143

144
    """
145
    raise NotImplementedError
146

    
147

    
148
class NoHooksLU(LogicalUnit):
149
  """Simple LU which runs no hooks.
150

151
  This LU is intended as a parent for other LogicalUnits which will
152
  run no hooks, in order to reduce duplicate code.
153

154
  """
155
  HPATH = None
156
  HTYPE = None
157

    
158
  def BuildHooksEnv(self):
159
    """Build hooks env.
160

161
    This is a no-op, since we don't run hooks.
162

163
    """
164
    return {}, [], []
165

    
166

    
167
def _AddHostToEtcHosts(hostname):
168
  """Wrapper around utils.SetEtcHostsEntry.
169

170
  """
171
  hi = utils.HostInfo(name=hostname)
172
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
173

    
174

    
175
def _RemoveHostFromEtcHosts(hostname):
176
  """Wrapper around utils.RemoveEtcHostsEntry.
177

178
  """
179
  hi = utils.HostInfo(name=hostname)
180
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
181
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
182

    
183

    
184
def _GetWantedNodes(lu, nodes):
185
  """Returns list of checked and expanded node names.
186

187
  Args:
188
    nodes: List of nodes (strings) or None for all
189

190
  """
191
  if not isinstance(nodes, list):
192
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
193

    
194
  if nodes:
195
    wanted = []
196

    
197
    for name in nodes:
198
      node = lu.cfg.ExpandNodeName(name)
199
      if node is None:
200
        raise errors.OpPrereqError("No such node name '%s'" % name)
201
      wanted.append(node)
202

    
203
  else:
204
    wanted = lu.cfg.GetNodeList()
205
  return utils.NiceSort(wanted)
206

    
207

    
208
def _GetWantedInstances(lu, instances):
209
  """Returns list of checked and expanded instance names.
210

211
  Args:
212
    instances: List of instances (strings) or None for all
213

214
  """
215
  if not isinstance(instances, list):
216
    raise errors.OpPrereqError("Invalid argument type 'instances'")
217

    
218
  if instances:
219
    wanted = []
220

    
221
    for name in instances:
222
      instance = lu.cfg.ExpandInstanceName(name)
223
      if instance is None:
224
        raise errors.OpPrereqError("No such instance name '%s'" % name)
225
      wanted.append(instance)
226

    
227
  else:
228
    wanted = lu.cfg.GetInstanceList()
229
  return utils.NiceSort(wanted)
230

    
231

    
232
def _CheckOutputFields(static, dynamic, selected):
233
  """Checks whether all selected fields are valid.
234

235
  Args:
236
    static: Static fields
237
    dynamic: Dynamic fields
238

239
  """
240
  static_fields = frozenset(static)
241
  dynamic_fields = frozenset(dynamic)
242

    
243
  all_fields = static_fields | dynamic_fields
244

    
245
  if not all_fields.issuperset(selected):
246
    raise errors.OpPrereqError("Unknown output fields selected: %s"
247
                               % ",".join(frozenset(selected).
248
                                          difference(all_fields)))
249

    
250

    
251
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
252
                          memory, vcpus, nics):
253
  """Builds instance related env variables for hooks from single variables.
254

255
  Args:
256
    secondary_nodes: List of secondary nodes as strings
257
  """
258
  env = {
259
    "OP_TARGET": name,
260
    "INSTANCE_NAME": name,
261
    "INSTANCE_PRIMARY": primary_node,
262
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
263
    "INSTANCE_OS_TYPE": os_type,
264
    "INSTANCE_STATUS": status,
265
    "INSTANCE_MEMORY": memory,
266
    "INSTANCE_VCPUS": vcpus,
267
  }
268

    
269
  if nics:
270
    nic_count = len(nics)
271
    for idx, (ip, bridge, mac) in enumerate(nics):
272
      if ip is None:
273
        ip = ""
274
      env["INSTANCE_NIC%d_IP" % idx] = ip
275
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
276
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
277
  else:
278
    nic_count = 0
279

    
280
  env["INSTANCE_NIC_COUNT"] = nic_count
281

    
282
  return env
283

    
284

    
285
def _BuildInstanceHookEnvByObject(instance, override=None):
286
  """Builds instance related env variables for hooks from an object.
287

288
  Args:
289
    instance: objects.Instance object of instance
290
    override: dict of values to override
291
  """
292
  args = {
293
    'name': instance.name,
294
    'primary_node': instance.primary_node,
295
    'secondary_nodes': instance.secondary_nodes,
296
    'os_type': instance.os,
297
    'status': instance.os,
298
    'memory': instance.memory,
299
    'vcpus': instance.vcpus,
300
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
301
  }
302
  if override:
303
    args.update(override)
304
  return _BuildInstanceHookEnv(**args)
305

    
306

    
307
def _UpdateKnownHosts(fullnode, ip, pubkey):
308
  """Ensure a node has a correct known_hosts entry.
309

310
  Args:
311
    fullnode - Fully qualified domain name of host. (str)
312
    ip       - IPv4 address of host (str)
313
    pubkey   - the public key of the cluster
314

315
  """
316
  if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
317
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
318
  else:
319
    f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
320

    
321
  inthere = False
322

    
323
  save_lines = []
324
  add_lines = []
325
  removed = False
326

    
327
  for rawline in f:
328
    logger.Debug('read %s' % (repr(rawline),))
329

    
330
    parts = rawline.rstrip('\r\n').split()
331

    
332
    # Ignore unwanted lines
333
    if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
334
      fields = parts[0].split(',')
335
      key = parts[2]
336

    
337
      haveall = True
338
      havesome = False
339
      for spec in [ ip, fullnode ]:
340
        if spec not in fields:
341
          haveall = False
342
        if spec in fields:
343
          havesome = True
344

    
345
      logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
346
      if haveall and key == pubkey:
347
        inthere = True
348
        save_lines.append(rawline)
349
        logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
350
        continue
351

    
352
      if havesome and (not haveall or key != pubkey):
353
        removed = True
354
        logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
355
        continue
356

    
357
    save_lines.append(rawline)
358

    
359
  if not inthere:
360
    add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
361
    logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
362

    
363
  if removed:
364
    save_lines = save_lines + add_lines
365

    
366
    # Write a new file and replace old.
367
    fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
368
                                   constants.DATA_DIR)
369
    newfile = os.fdopen(fd, 'w')
370
    try:
371
      newfile.write(''.join(save_lines))
372
    finally:
373
      newfile.close()
374
    logger.Debug("Wrote new known_hosts.")
375
    os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
376

    
377
  elif add_lines:
378
    # Simply appending a new line will do the trick.
379
    f.seek(0, 2)
380
    for add in add_lines:
381
      f.write(add)
382

    
383
  f.close()
384

    
385

    
386
def _HasValidVG(vglist, vgname):
387
  """Checks if the volume group list is valid.
388

389
  A non-None return value means there's an error, and the return value
390
  is the error message.
391

392
  """
393
  vgsize = vglist.get(vgname, None)
394
  if vgsize is None:
395
    return "volume group '%s' missing" % vgname
396
  elif vgsize < 20480:
397
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
398
            (vgname, vgsize))
399
  return None
400

    
401

    
402
def _InitSSHSetup(node):
403
  """Setup the SSH configuration for the cluster.
404

405

406
  This generates a dsa keypair for root, adds the pub key to the
407
  permitted hosts and adds the hostkey to its own known hosts.
408

409
  Args:
410
    node: the name of this host as a fqdn
411

412
  """
413
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
414

    
415
  for name in priv_key, pub_key:
416
    if os.path.exists(name):
417
      utils.CreateBackup(name)
418
    utils.RemoveFile(name)
419

    
420
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
421
                         "-f", priv_key,
422
                         "-q", "-N", ""])
423
  if result.failed:
424
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
425
                             result.output)
426

    
427
  f = open(pub_key, 'r')
428
  try:
429
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
430
  finally:
431
    f.close()
432

    
433

    
434
def _InitGanetiServerSetup(ss):
435
  """Setup the necessary configuration for the initial node daemon.
436

437
  This creates the nodepass file containing the shared password for
438
  the cluster and also generates the SSL certificate.
439

440
  """
441
  # Create pseudo random password
442
  randpass = sha.new(os.urandom(64)).hexdigest()
443
  # and write it into sstore
444
  ss.SetKey(ss.SS_NODED_PASS, randpass)
445

    
446
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
447
                         "-days", str(365*5), "-nodes", "-x509",
448
                         "-keyout", constants.SSL_CERT_FILE,
449
                         "-out", constants.SSL_CERT_FILE, "-batch"])
450
  if result.failed:
451
    raise errors.OpExecError("could not generate server ssl cert, command"
452
                             " %s had exitcode %s and error message %s" %
453
                             (result.cmd, result.exit_code, result.output))
454

    
455
  os.chmod(constants.SSL_CERT_FILE, 0400)
456

    
457
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
458

    
459
  if result.failed:
460
    raise errors.OpExecError("Could not start the node daemon, command %s"
461
                             " had exitcode %s and error %s" %
462
                             (result.cmd, result.exit_code, result.output))
463

    
464

    
465
def _CheckInstanceBridgesExist(instance):
466
  """Check that the brigdes needed by an instance exist.
467

468
  """
469
  # check bridges existance
470
  brlist = [nic.bridge for nic in instance.nics]
471
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
472
    raise errors.OpPrereqError("one or more target bridges %s does not"
473
                               " exist on destination node '%s'" %
474
                               (brlist, instance.primary_node))
475

    
476

    
477
class LUInitCluster(LogicalUnit):
478
  """Initialise the cluster.
479

480
  """
481
  HPATH = "cluster-init"
482
  HTYPE = constants.HTYPE_CLUSTER
483
  _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
484
              "def_bridge", "master_netdev"]
485
  REQ_CLUSTER = False
486

    
487
  def BuildHooksEnv(self):
488
    """Build hooks env.
489

490
    Notes: Since we don't require a cluster, we must manually add
491
    ourselves in the post-run node list.
492

493
    """
494
    env = {"OP_TARGET": self.op.cluster_name}
495
    return env, [], [self.hostname.name]
496

    
497
  def CheckPrereq(self):
498
    """Verify that the passed name is a valid one.
499

500
    """
501
    if config.ConfigWriter.IsCluster():
502
      raise errors.OpPrereqError("Cluster is already initialised")
503

    
504
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
505
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
506
        raise errors.OpPrereqError("Please prepare the cluster VNC"
507
                                   "password file %s" %
508
                                   constants.VNC_PASSWORD_FILE)
509

    
510
    self.hostname = hostname = utils.HostInfo()
511

    
512
    if hostname.ip.startswith("127."):
513
      raise errors.OpPrereqError("This host's IP resolves to the private"
514
                                 " range (%s). Please fix DNS or %s." %
515
                                 (hostname.ip, constants.ETC_HOSTS))
516

    
517
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
518

    
519
    if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
520
                         constants.DEFAULT_NODED_PORT):
521
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
522
                                 " to %s,\nbut this ip address does not"
523
                                 " belong to this host."
524
                                 " Aborting." % hostname.ip)
525

    
526
    secondary_ip = getattr(self.op, "secondary_ip", None)
527
    if secondary_ip and not utils.IsValidIP(secondary_ip):
528
      raise errors.OpPrereqError("Invalid secondary ip given")
529
    if (secondary_ip and
530
        secondary_ip != hostname.ip and
531
        (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
532
                           constants.DEFAULT_NODED_PORT))):
533
      raise errors.OpPrereqError("You gave %s as secondary IP,"
534
                                 " but it does not belong to this host." %
535
                                 secondary_ip)
536
    self.secondary_ip = secondary_ip
537

    
538
    # checks presence of the volume group given
539
    vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
540

    
541
    if vgstatus:
542
      raise errors.OpPrereqError("Error: %s" % vgstatus)
543

    
544
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
545
                    self.op.mac_prefix):
546
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
547
                                 self.op.mac_prefix)
548

    
549
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
550
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
551
                                 self.op.hypervisor_type)
552

    
553
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
554
    if result.failed:
555
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
556
                                 (self.op.master_netdev,
557
                                  result.output.strip()))
558

    
559
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
560
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
561
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
562
                                 " executable." % constants.NODE_INITD_SCRIPT)
563

    
564
  def Exec(self, feedback_fn):
565
    """Initialize the cluster.
566

567
    """
568
    clustername = self.clustername
569
    hostname = self.hostname
570

    
571
    # set up the simple store
572
    self.sstore = ss = ssconf.SimpleStore()
573
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
574
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
575
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
576
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
577
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
578

    
579
    # set up the inter-node password and certificate
580
    _InitGanetiServerSetup(ss)
581

    
582
    # start the master ip
583
    rpc.call_node_start_master(hostname.name)
584

    
585
    # set up ssh config and /etc/hosts
586
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
587
    try:
588
      sshline = f.read()
589
    finally:
590
      f.close()
591
    sshkey = sshline.split(" ")[1]
592

    
593
    _AddHostToEtcHosts(hostname.name)
594

    
595
    _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
596

    
597
    _InitSSHSetup(hostname.name)
598

    
599
    # init of cluster config file
600
    self.cfg = cfgw = config.ConfigWriter()
601
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
602
                    sshkey, self.op.mac_prefix,
603
                    self.op.vg_name, self.op.def_bridge)
604

    
605

    
606
class LUDestroyCluster(NoHooksLU):
607
  """Logical unit for destroying the cluster.
608

609
  """
610
  _OP_REQP = []
611

    
612
  def CheckPrereq(self):
613
    """Check prerequisites.
614

615
    This checks whether the cluster is empty.
616

617
    Any errors are signalled by raising errors.OpPrereqError.
618

619
    """
620
    master = self.sstore.GetMasterNode()
621

    
622
    nodelist = self.cfg.GetNodeList()
623
    if len(nodelist) != 1 or nodelist[0] != master:
624
      raise errors.OpPrereqError("There are still %d node(s) in"
625
                                 " this cluster." % (len(nodelist) - 1))
626
    instancelist = self.cfg.GetInstanceList()
627
    if instancelist:
628
      raise errors.OpPrereqError("There are still %d instance(s) in"
629
                                 " this cluster." % len(instancelist))
630

    
631
  def Exec(self, feedback_fn):
632
    """Destroys the cluster.
633

634
    """
635
    master = self.sstore.GetMasterNode()
636
    if not rpc.call_node_stop_master(master):
637
      raise errors.OpExecError("Could not disable the master role")
638
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
639
    utils.CreateBackup(priv_key)
640
    utils.CreateBackup(pub_key)
641
    rpc.call_node_leave_cluster(master)
642

    
643

    
644
class LUVerifyCluster(NoHooksLU):
645
  """Verifies the cluster status.
646

647
  """
648
  _OP_REQP = []
649

    
650
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
651
                  remote_version, feedback_fn):
652
    """Run multiple tests against a node.
653

654
    Test list:
655
      - compares ganeti version
656
      - checks vg existance and size > 20G
657
      - checks config file checksum
658
      - checks ssh to other nodes
659

660
    Args:
661
      node: name of the node to check
662
      file_list: required list of files
663
      local_cksum: dictionary of local files and their checksums
664

665
    """
666
    # compares ganeti version
667
    local_version = constants.PROTOCOL_VERSION
668
    if not remote_version:
669
      feedback_fn(" - ERROR: connection to %s failed" % (node))
670
      return True
671

    
672
    if local_version != remote_version:
673
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
674
                      (local_version, node, remote_version))
675
      return True
676

    
677
    # checks vg existance and size > 20G
678

    
679
    bad = False
680
    if not vglist:
681
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
682
                      (node,))
683
      bad = True
684
    else:
685
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
686
      if vgstatus:
687
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
688
        bad = True
689

    
690
    # checks config file checksum
691
    # checks ssh to any
692

    
693
    if 'filelist' not in node_result:
694
      bad = True
695
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
696
    else:
697
      remote_cksum = node_result['filelist']
698
      for file_name in file_list:
699
        if file_name not in remote_cksum:
700
          bad = True
701
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
702
        elif remote_cksum[file_name] != local_cksum[file_name]:
703
          bad = True
704
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
705

    
706
    if 'nodelist' not in node_result:
707
      bad = True
708
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
709
    else:
710
      if node_result['nodelist']:
711
        bad = True
712
        for node in node_result['nodelist']:
713
          feedback_fn("  - ERROR: communication with node '%s': %s" %
714
                          (node, node_result['nodelist'][node]))
715
    hyp_result = node_result.get('hypervisor', None)
716
    if hyp_result is not None:
717
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
718
    return bad
719

    
720
  def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
721
    """Verify an instance.
722

723
    This function checks to see if the required block devices are
724
    available on the instance's node.
725

726
    """
727
    bad = False
728

    
729
    instancelist = self.cfg.GetInstanceList()
730
    if not instance in instancelist:
731
      feedback_fn("  - ERROR: instance %s not in instance list %s" %
732
                      (instance, instancelist))
733
      bad = True
734

    
735
    instanceconfig = self.cfg.GetInstanceInfo(instance)
736
    node_current = instanceconfig.primary_node
737

    
738
    node_vol_should = {}
739
    instanceconfig.MapLVsByNode(node_vol_should)
740

    
741
    for node in node_vol_should:
742
      for volume in node_vol_should[node]:
743
        if node not in node_vol_is or volume not in node_vol_is[node]:
744
          feedback_fn("  - ERROR: volume %s missing on node %s" %
745
                          (volume, node))
746
          bad = True
747

    
748
    if not instanceconfig.status == 'down':
749
      if not instance in node_instance[node_current]:
750
        feedback_fn("  - ERROR: instance %s not running on node %s" %
751
                        (instance, node_current))
752
        bad = True
753

    
754
    for node in node_instance:
755
      if (not node == node_current):
756
        if instance in node_instance[node]:
757
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
758
                          (instance, node))
759
          bad = True
760

    
761
    return bad
762

    
763
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764
    """Verify if there are any unknown volumes in the cluster.
765

766
    The .os, .swap and backup volumes are ignored. All other volumes are
767
    reported as unknown.
768

769
    """
770
    bad = False
771

    
772
    for node in node_vol_is:
773
      for volume in node_vol_is[node]:
774
        if node not in node_vol_should or volume not in node_vol_should[node]:
775
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
776
                      (volume, node))
777
          bad = True
778
    return bad
779

    
780
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781
    """Verify the list of running instances.
782

783
    This checks what instances are running but unknown to the cluster.
784

785
    """
786
    bad = False
787
    for node in node_instance:
788
      for runninginstance in node_instance[node]:
789
        if runninginstance not in instancelist:
790
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
791
                          (runninginstance, node))
792
          bad = True
793
    return bad
794

    
795
  def CheckPrereq(self):
796
    """Check prerequisites.
797

798
    This has no prerequisites.
799

800
    """
801
    pass
802

    
803
  def Exec(self, feedback_fn):
804
    """Verify integrity of cluster, performing various test on nodes.
805

806
    """
807
    bad = False
808
    feedback_fn("* Verifying global settings")
809
    for msg in self.cfg.VerifyConfig():
810
      feedback_fn("  - ERROR: %s" % msg)
811

    
812
    vg_name = self.cfg.GetVGName()
813
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
814
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
815
    node_volume = {}
816
    node_instance = {}
817

    
818
    # FIXME: verify OS list
819
    # do local checksums
820
    file_names = list(self.sstore.GetFileList())
821
    file_names.append(constants.SSL_CERT_FILE)
822
    file_names.append(constants.CLUSTER_CONF_FILE)
823
    local_checksums = utils.FingerprintFiles(file_names)
824

    
825
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
826
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
827
    all_instanceinfo = rpc.call_instance_list(nodelist)
828
    all_vglist = rpc.call_vg_list(nodelist)
829
    node_verify_param = {
830
      'filelist': file_names,
831
      'nodelist': nodelist,
832
      'hypervisor': None,
833
      }
834
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
835
    all_rversion = rpc.call_version(nodelist)
836

    
837
    for node in nodelist:
838
      feedback_fn("* Verifying node %s" % node)
839
      result = self._VerifyNode(node, file_names, local_checksums,
840
                                all_vglist[node], all_nvinfo[node],
841
                                all_rversion[node], feedback_fn)
842
      bad = bad or result
843

    
844
      # node_volume
845
      volumeinfo = all_volumeinfo[node]
846

    
847
      if isinstance(volumeinfo, basestring):
848
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
849
                    (node, volumeinfo[-400:].encode('string_escape')))
850
        bad = True
851
        node_volume[node] = {}
852
      elif not isinstance(volumeinfo, dict):
853
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
854
        bad = True
855
        continue
856
      else:
857
        node_volume[node] = volumeinfo
858

    
859
      # node_instance
860
      nodeinstance = all_instanceinfo[node]
861
      if type(nodeinstance) != list:
862
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
863
        bad = True
864
        continue
865

    
866
      node_instance[node] = nodeinstance
867

    
868
    node_vol_should = {}
869

    
870
    for instance in instancelist:
871
      feedback_fn("* Verifying instance %s" % instance)
872
      result =  self._VerifyInstance(instance, node_volume, node_instance,
873
                                     feedback_fn)
874
      bad = bad or result
875

    
876
      inst_config = self.cfg.GetInstanceInfo(instance)
877

    
878
      inst_config.MapLVsByNode(node_vol_should)
879

    
880
    feedback_fn("* Verifying orphan volumes")
881
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
882
                                       feedback_fn)
883
    bad = bad or result
884

    
885
    feedback_fn("* Verifying remaining instances")
886
    result = self._VerifyOrphanInstances(instancelist, node_instance,
887
                                         feedback_fn)
888
    bad = bad or result
889

    
890
    return int(bad)
891

    
892

    
893
class LUVerifyDisks(NoHooksLU):
894
  """Verifies the cluster disks status.
895

896
  """
897
  _OP_REQP = []
898

    
899
  def CheckPrereq(self):
900
    """Check prerequisites.
901

902
    This has no prerequisites.
903

904
    """
905
    pass
906

    
907
  def Exec(self, feedback_fn):
908
    """Verify integrity of cluster disks.
909

910
    """
911
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
912

    
913
    vg_name = self.cfg.GetVGName()
914
    nodes = utils.NiceSort(self.cfg.GetNodeList())
915
    instances = [self.cfg.GetInstanceInfo(name)
916
                 for name in self.cfg.GetInstanceList()]
917

    
918
    nv_dict = {}
919
    for inst in instances:
920
      inst_lvs = {}
921
      if (inst.status != "up" or
922
          inst.disk_template not in constants.DTS_NET_MIRROR):
923
        continue
924
      inst.MapLVsByNode(inst_lvs)
925
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
926
      for node, vol_list in inst_lvs.iteritems():
927
        for vol in vol_list:
928
          nv_dict[(node, vol)] = inst
929

    
930
    if not nv_dict:
931
      return result
932

    
933
    node_lvs = rpc.call_volume_list(nodes, vg_name)
934

    
935
    to_act = set()
936
    for node in nodes:
937
      # node_volume
938
      lvs = node_lvs[node]
939

    
940
      if isinstance(lvs, basestring):
941
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
942
        res_nlvm[node] = lvs
943
      elif not isinstance(lvs, dict):
944
        logger.Info("connection to node %s failed or invalid data returned" %
945
                    (node,))
946
        res_nodes.append(node)
947
        continue
948

    
949
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
950
        inst = nv_dict.pop((node, lv_name), None)
951
        if (not lv_online and inst is not None
952
            and inst.name not in res_instances):
953
          res_instances.append(inst.name)
954

    
955
    # any leftover items in nv_dict are missing LVs, let's arrange the
956
    # data better
957
    for key, inst in nv_dict.iteritems():
958
      if inst.name not in res_missing:
959
        res_missing[inst.name] = []
960
      res_missing[inst.name].append(key)
961

    
962
    return result
963

    
964

    
965
class LURenameCluster(LogicalUnit):
966
  """Rename the cluster.
967

968
  """
969
  HPATH = "cluster-rename"
970
  HTYPE = constants.HTYPE_CLUSTER
971
  _OP_REQP = ["name"]
972

    
973
  def BuildHooksEnv(self):
974
    """Build hooks env.
975

976
    """
977
    env = {
978
      "OP_TARGET": self.sstore.GetClusterName(),
979
      "NEW_NAME": self.op.name,
980
      }
981
    mn = self.sstore.GetMasterNode()
982
    return env, [mn], [mn]
983

    
984
  def CheckPrereq(self):
985
    """Verify that the passed name is a valid one.
986

987
    """
988
    hostname = utils.HostInfo(self.op.name)
989

    
990
    new_name = hostname.name
991
    self.ip = new_ip = hostname.ip
992
    old_name = self.sstore.GetClusterName()
993
    old_ip = self.sstore.GetMasterIP()
994
    if new_name == old_name and new_ip == old_ip:
995
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
996
                                 " cluster has changed")
997
    if new_ip != old_ip:
998
      result = utils.RunCmd(["fping", "-q", new_ip])
999
      if not result.failed:
1000
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1001
                                   " reachable on the network. Aborting." %
1002
                                   new_ip)
1003

    
1004
    self.op.name = new_name
1005

    
1006
  def Exec(self, feedback_fn):
1007
    """Rename the cluster.
1008

1009
    """
1010
    clustername = self.op.name
1011
    ip = self.ip
1012
    ss = self.sstore
1013

    
1014
    # shutdown the master IP
1015
    master = ss.GetMasterNode()
1016
    if not rpc.call_node_stop_master(master):
1017
      raise errors.OpExecError("Could not disable the master role")
1018

    
1019
    try:
1020
      # modify the sstore
1021
      ss.SetKey(ss.SS_MASTER_IP, ip)
1022
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1023

    
1024
      # Distribute updated ss config to all nodes
1025
      myself = self.cfg.GetNodeInfo(master)
1026
      dist_nodes = self.cfg.GetNodeList()
1027
      if myself.name in dist_nodes:
1028
        dist_nodes.remove(myself.name)
1029

    
1030
      logger.Debug("Copying updated ssconf data to all nodes")
1031
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1032
        fname = ss.KeyToFilename(keyname)
1033
        result = rpc.call_upload_file(dist_nodes, fname)
1034
        for to_node in dist_nodes:
1035
          if not result[to_node]:
1036
            logger.Error("copy of file %s to node %s failed" %
1037
                         (fname, to_node))
1038
    finally:
1039
      if not rpc.call_node_start_master(master):
1040
        logger.Error("Could not re-enable the master role on the master,"
1041
                     " please restart manually.")
1042

    
1043

    
1044
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1045
  """Sleep and poll for an instance's disk to sync.
1046

1047
  """
1048
  if not instance.disks:
1049
    return True
1050

    
1051
  if not oneshot:
1052
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1053

    
1054
  node = instance.primary_node
1055

    
1056
  for dev in instance.disks:
1057
    cfgw.SetDiskID(dev, node)
1058

    
1059
  retries = 0
1060
  while True:
1061
    max_time = 0
1062
    done = True
1063
    cumul_degraded = False
1064
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1065
    if not rstats:
1066
      proc.LogWarning("Can't get any data from node %s" % node)
1067
      retries += 1
1068
      if retries >= 10:
1069
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1070
                                 " aborting." % node)
1071
      time.sleep(6)
1072
      continue
1073
    retries = 0
1074
    for i in range(len(rstats)):
1075
      mstat = rstats[i]
1076
      if mstat is None:
1077
        proc.LogWarning("Can't compute data for node %s/%s" %
1078
                        (node, instance.disks[i].iv_name))
1079
        continue
1080
      # we ignore the ldisk parameter
1081
      perc_done, est_time, is_degraded, _ = mstat
1082
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1083
      if perc_done is not None:
1084
        done = False
1085
        if est_time is not None:
1086
          rem_time = "%d estimated seconds remaining" % est_time
1087
          max_time = est_time
1088
        else:
1089
          rem_time = "no time estimate"
1090
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1091
                     (instance.disks[i].iv_name, perc_done, rem_time))
1092
    if done or oneshot:
1093
      break
1094

    
1095
    if unlock:
1096
      utils.Unlock('cmd')
1097
    try:
1098
      time.sleep(min(60, max_time))
1099
    finally:
1100
      if unlock:
1101
        utils.Lock('cmd')
1102

    
1103
  if done:
1104
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1105
  return not cumul_degraded
1106

    
1107

    
1108
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1109
  """Check that mirrors are not degraded.
1110

1111
  The ldisk parameter, if True, will change the test from the
1112
  is_degraded attribute (which represents overall non-ok status for
1113
  the device(s)) to the ldisk (representing the local storage status).
1114

1115
  """
1116
  cfgw.SetDiskID(dev, node)
1117
  if ldisk:
1118
    idx = 6
1119
  else:
1120
    idx = 5
1121

    
1122
  result = True
1123
  if on_primary or dev.AssembleOnSecondary():
1124
    rstats = rpc.call_blockdev_find(node, dev)
1125
    if not rstats:
1126
      logger.ToStderr("Can't get any data from node %s" % node)
1127
      result = False
1128
    else:
1129
      result = result and (not rstats[idx])
1130
  if dev.children:
1131
    for child in dev.children:
1132
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1133

    
1134
  return result
1135

    
1136

    
1137
class LUDiagnoseOS(NoHooksLU):
1138
  """Logical unit for OS diagnose/query.
1139

1140
  """
1141
  _OP_REQP = []
1142

    
1143
  def CheckPrereq(self):
1144
    """Check prerequisites.
1145

1146
    This always succeeds, since this is a pure query LU.
1147

1148
    """
1149
    return
1150

    
1151
  def Exec(self, feedback_fn):
1152
    """Compute the list of OSes.
1153

1154
    """
1155
    node_list = self.cfg.GetNodeList()
1156
    node_data = rpc.call_os_diagnose(node_list)
1157
    if node_data == False:
1158
      raise errors.OpExecError("Can't gather the list of OSes")
1159
    return node_data
1160

    
1161

    
1162
class LURemoveNode(LogicalUnit):
1163
  """Logical unit for removing a node.
1164

1165
  """
1166
  HPATH = "node-remove"
1167
  HTYPE = constants.HTYPE_NODE
1168
  _OP_REQP = ["node_name"]
1169

    
1170
  def BuildHooksEnv(self):
1171
    """Build hooks env.
1172

1173
    This doesn't run on the target node in the pre phase as a failed
1174
    node would not allows itself to run.
1175

1176
    """
1177
    env = {
1178
      "OP_TARGET": self.op.node_name,
1179
      "NODE_NAME": self.op.node_name,
1180
      }
1181
    all_nodes = self.cfg.GetNodeList()
1182
    all_nodes.remove(self.op.node_name)
1183
    return env, all_nodes, all_nodes
1184

    
1185
  def CheckPrereq(self):
1186
    """Check prerequisites.
1187

1188
    This checks:
1189
     - the node exists in the configuration
1190
     - it does not have primary or secondary instances
1191
     - it's not the master
1192

1193
    Any errors are signalled by raising errors.OpPrereqError.
1194

1195
    """
1196
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1197
    if node is None:
1198
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1199

    
1200
    instance_list = self.cfg.GetInstanceList()
1201

    
1202
    masternode = self.sstore.GetMasterNode()
1203
    if node.name == masternode:
1204
      raise errors.OpPrereqError("Node is the master node,"
1205
                                 " you need to failover first.")
1206

    
1207
    for instance_name in instance_list:
1208
      instance = self.cfg.GetInstanceInfo(instance_name)
1209
      if node.name == instance.primary_node:
1210
        raise errors.OpPrereqError("Instance %s still running on the node,"
1211
                                   " please remove first." % instance_name)
1212
      if node.name in instance.secondary_nodes:
1213
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1214
                                   " please remove first." % instance_name)
1215
    self.op.node_name = node.name
1216
    self.node = node
1217

    
1218
  def Exec(self, feedback_fn):
1219
    """Removes the node from the cluster.
1220

1221
    """
1222
    node = self.node
1223
    logger.Info("stopping the node daemon and removing configs from node %s" %
1224
                node.name)
1225

    
1226
    rpc.call_node_leave_cluster(node.name)
1227

    
1228
    ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1229

    
1230
    logger.Info("Removing node %s from config" % node.name)
1231

    
1232
    self.cfg.RemoveNode(node.name)
1233

    
1234
    _RemoveHostFromEtcHosts(node.name)
1235

    
1236

    
1237
class LUQueryNodes(NoHooksLU):
1238
  """Logical unit for querying nodes.
1239

1240
  """
1241
  _OP_REQP = ["output_fields", "names"]
1242

    
1243
  def CheckPrereq(self):
1244
    """Check prerequisites.
1245

1246
    This checks that the fields required are valid output fields.
1247

1248
    """
1249
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1250
                                     "mtotal", "mnode", "mfree",
1251
                                     "bootid"])
1252

    
1253
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1254
                               "pinst_list", "sinst_list",
1255
                               "pip", "sip"],
1256
                       dynamic=self.dynamic_fields,
1257
                       selected=self.op.output_fields)
1258

    
1259
    self.wanted = _GetWantedNodes(self, self.op.names)
1260

    
1261
  def Exec(self, feedback_fn):
1262
    """Computes the list of nodes and their attributes.
1263

1264
    """
1265
    nodenames = self.wanted
1266
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1267

    
1268
    # begin data gathering
1269

    
1270
    if self.dynamic_fields.intersection(self.op.output_fields):
1271
      live_data = {}
1272
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1273
      for name in nodenames:
1274
        nodeinfo = node_data.get(name, None)
1275
        if nodeinfo:
1276
          live_data[name] = {
1277
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1278
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1279
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1280
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1281
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1282
            "bootid": nodeinfo['bootid'],
1283
            }
1284
        else:
1285
          live_data[name] = {}
1286
    else:
1287
      live_data = dict.fromkeys(nodenames, {})
1288

    
1289
    node_to_primary = dict([(name, set()) for name in nodenames])
1290
    node_to_secondary = dict([(name, set()) for name in nodenames])
1291

    
1292
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1293
                             "sinst_cnt", "sinst_list"))
1294
    if inst_fields & frozenset(self.op.output_fields):
1295
      instancelist = self.cfg.GetInstanceList()
1296

    
1297
      for instance_name in instancelist:
1298
        inst = self.cfg.GetInstanceInfo(instance_name)
1299
        if inst.primary_node in node_to_primary:
1300
          node_to_primary[inst.primary_node].add(inst.name)
1301
        for secnode in inst.secondary_nodes:
1302
          if secnode in node_to_secondary:
1303
            node_to_secondary[secnode].add(inst.name)
1304

    
1305
    # end data gathering
1306

    
1307
    output = []
1308
    for node in nodelist:
1309
      node_output = []
1310
      for field in self.op.output_fields:
1311
        if field == "name":
1312
          val = node.name
1313
        elif field == "pinst_list":
1314
          val = list(node_to_primary[node.name])
1315
        elif field == "sinst_list":
1316
          val = list(node_to_secondary[node.name])
1317
        elif field == "pinst_cnt":
1318
          val = len(node_to_primary[node.name])
1319
        elif field == "sinst_cnt":
1320
          val = len(node_to_secondary[node.name])
1321
        elif field == "pip":
1322
          val = node.primary_ip
1323
        elif field == "sip":
1324
          val = node.secondary_ip
1325
        elif field in self.dynamic_fields:
1326
          val = live_data[node.name].get(field, None)
1327
        else:
1328
          raise errors.ParameterError(field)
1329
        node_output.append(val)
1330
      output.append(node_output)
1331

    
1332
    return output
1333

    
1334

    
1335
class LUQueryNodeVolumes(NoHooksLU):
1336
  """Logical unit for getting volumes on node(s).
1337

1338
  """
1339
  _OP_REQP = ["nodes", "output_fields"]
1340

    
1341
  def CheckPrereq(self):
1342
    """Check prerequisites.
1343

1344
    This checks that the fields required are valid output fields.
1345

1346
    """
1347
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1348

    
1349
    _CheckOutputFields(static=["node"],
1350
                       dynamic=["phys", "vg", "name", "size", "instance"],
1351
                       selected=self.op.output_fields)
1352

    
1353

    
1354
  def Exec(self, feedback_fn):
1355
    """Computes the list of nodes and their attributes.
1356

1357
    """
1358
    nodenames = self.nodes
1359
    volumes = rpc.call_node_volumes(nodenames)
1360

    
1361
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1362
             in self.cfg.GetInstanceList()]
1363

    
1364
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1365

    
1366
    output = []
1367
    for node in nodenames:
1368
      if node not in volumes or not volumes[node]:
1369
        continue
1370

    
1371
      node_vols = volumes[node][:]
1372
      node_vols.sort(key=lambda vol: vol['dev'])
1373

    
1374
      for vol in node_vols:
1375
        node_output = []
1376
        for field in self.op.output_fields:
1377
          if field == "node":
1378
            val = node
1379
          elif field == "phys":
1380
            val = vol['dev']
1381
          elif field == "vg":
1382
            val = vol['vg']
1383
          elif field == "name":
1384
            val = vol['name']
1385
          elif field == "size":
1386
            val = int(float(vol['size']))
1387
          elif field == "instance":
1388
            for inst in ilist:
1389
              if node not in lv_by_node[inst]:
1390
                continue
1391
              if vol['name'] in lv_by_node[inst][node]:
1392
                val = inst.name
1393
                break
1394
            else:
1395
              val = '-'
1396
          else:
1397
            raise errors.ParameterError(field)
1398
          node_output.append(str(val))
1399

    
1400
        output.append(node_output)
1401

    
1402
    return output
1403

    
1404

    
1405
class LUAddNode(LogicalUnit):
1406
  """Logical unit for adding node to the cluster.
1407

1408
  """
1409
  HPATH = "node-add"
1410
  HTYPE = constants.HTYPE_NODE
1411
  _OP_REQP = ["node_name"]
1412

    
1413
  def BuildHooksEnv(self):
1414
    """Build hooks env.
1415

1416
    This will run on all nodes before, and on all nodes + the new node after.
1417

1418
    """
1419
    env = {
1420
      "OP_TARGET": self.op.node_name,
1421
      "NODE_NAME": self.op.node_name,
1422
      "NODE_PIP": self.op.primary_ip,
1423
      "NODE_SIP": self.op.secondary_ip,
1424
      }
1425
    nodes_0 = self.cfg.GetNodeList()
1426
    nodes_1 = nodes_0 + [self.op.node_name, ]
1427
    return env, nodes_0, nodes_1
1428

    
1429
  def CheckPrereq(self):
1430
    """Check prerequisites.
1431

1432
    This checks:
1433
     - the new node is not already in the config
1434
     - it is resolvable
1435
     - its parameters (single/dual homed) matches the cluster
1436

1437
    Any errors are signalled by raising errors.OpPrereqError.
1438

1439
    """
1440
    node_name = self.op.node_name
1441
    cfg = self.cfg
1442

    
1443
    dns_data = utils.HostInfo(node_name)
1444

    
1445
    node = dns_data.name
1446
    primary_ip = self.op.primary_ip = dns_data.ip
1447
    secondary_ip = getattr(self.op, "secondary_ip", None)
1448
    if secondary_ip is None:
1449
      secondary_ip = primary_ip
1450
    if not utils.IsValidIP(secondary_ip):
1451
      raise errors.OpPrereqError("Invalid secondary IP given")
1452
    self.op.secondary_ip = secondary_ip
1453
    node_list = cfg.GetNodeList()
1454
    if node in node_list:
1455
      raise errors.OpPrereqError("Node %s is already in the configuration"
1456
                                 % node)
1457

    
1458
    for existing_node_name in node_list:
1459
      existing_node = cfg.GetNodeInfo(existing_node_name)
1460
      if (existing_node.primary_ip == primary_ip or
1461
          existing_node.secondary_ip == primary_ip or
1462
          existing_node.primary_ip == secondary_ip or
1463
          existing_node.secondary_ip == secondary_ip):
1464
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1465
                                   " existing node %s" % existing_node.name)
1466

    
1467
    # check that the type of the node (single versus dual homed) is the
1468
    # same as for the master
1469
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1470
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1471
    newbie_singlehomed = secondary_ip == primary_ip
1472
    if master_singlehomed != newbie_singlehomed:
1473
      if master_singlehomed:
1474
        raise errors.OpPrereqError("The master has no private ip but the"
1475
                                   " new node has one")
1476
      else:
1477
        raise errors.OpPrereqError("The master has a private ip but the"
1478
                                   " new node doesn't have one")
1479

    
1480
    # checks reachablity
1481
    if not utils.TcpPing(utils.HostInfo().name,
1482
                         primary_ip,
1483
                         constants.DEFAULT_NODED_PORT):
1484
      raise errors.OpPrereqError("Node not reachable by ping")
1485

    
1486
    if not newbie_singlehomed:
1487
      # check reachability from my secondary ip to newbie's secondary ip
1488
      if not utils.TcpPing(myself.secondary_ip,
1489
                           secondary_ip,
1490
                           constants.DEFAULT_NODED_PORT):
1491
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1492
                                   " based ping to noded port")
1493

    
1494
    self.new_node = objects.Node(name=node,
1495
                                 primary_ip=primary_ip,
1496
                                 secondary_ip=secondary_ip)
1497

    
1498
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1499
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1500
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1501
                                   constants.VNC_PASSWORD_FILE)
1502

    
1503
  def Exec(self, feedback_fn):
1504
    """Adds the new node to the cluster.
1505

1506
    """
1507
    new_node = self.new_node
1508
    node = new_node.name
1509

    
1510
    # set up inter-node password and certificate and restarts the node daemon
1511
    gntpass = self.sstore.GetNodeDaemonPassword()
1512
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1513
      raise errors.OpExecError("ganeti password corruption detected")
1514
    f = open(constants.SSL_CERT_FILE)
1515
    try:
1516
      gntpem = f.read(8192)
1517
    finally:
1518
      f.close()
1519
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1520
    # so we use this to detect an invalid certificate; as long as the
1521
    # cert doesn't contain this, the here-document will be correctly
1522
    # parsed by the shell sequence below
1523
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1524
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1525
    if not gntpem.endswith("\n"):
1526
      raise errors.OpExecError("PEM must end with newline")
1527
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1528

    
1529
    # and then connect with ssh to set password and start ganeti-noded
1530
    # note that all the below variables are sanitized at this point,
1531
    # either by being constants or by the checks above
1532
    ss = self.sstore
1533
    mycommand = ("umask 077 && "
1534
                 "echo '%s' > '%s' && "
1535
                 "cat > '%s' << '!EOF.' && \n"
1536
                 "%s!EOF.\n%s restart" %
1537
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1538
                  constants.SSL_CERT_FILE, gntpem,
1539
                  constants.NODE_INITD_SCRIPT))
1540

    
1541
    result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1542
    if result.failed:
1543
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1544
                               " output: %s" %
1545
                               (node, result.fail_reason, result.output))
1546

    
1547
    # check connectivity
1548
    time.sleep(4)
1549

    
1550
    result = rpc.call_version([node])[node]
1551
    if result:
1552
      if constants.PROTOCOL_VERSION == result:
1553
        logger.Info("communication to node %s fine, sw version %s match" %
1554
                    (node, result))
1555
      else:
1556
        raise errors.OpExecError("Version mismatch master version %s,"
1557
                                 " node version %s" %
1558
                                 (constants.PROTOCOL_VERSION, result))
1559
    else:
1560
      raise errors.OpExecError("Cannot get version from the new node")
1561

    
1562
    # setup ssh on node
1563
    logger.Info("copy ssh key to node %s" % node)
1564
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1565
    keyarray = []
1566
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1567
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1568
                priv_key, pub_key]
1569

    
1570
    for i in keyfiles:
1571
      f = open(i, 'r')
1572
      try:
1573
        keyarray.append(f.read())
1574
      finally:
1575
        f.close()
1576

    
1577
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1578
                               keyarray[3], keyarray[4], keyarray[5])
1579

    
1580
    if not result:
1581
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1582

    
1583
    # Add node to our /etc/hosts, and add key to known_hosts
1584
    _AddHostToEtcHosts(new_node.name)
1585

    
1586
    _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1587
                      self.cfg.GetHostKey())
1588

    
1589
    if new_node.secondary_ip != new_node.primary_ip:
1590
      if not rpc.call_node_tcp_ping(new_node.name,
1591
                                    constants.LOCALHOST_IP_ADDRESS,
1592
                                    new_node.secondary_ip,
1593
                                    constants.DEFAULT_NODED_PORT,
1594
                                    10, False):
1595
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1596
                                 " you gave (%s). Please fix and re-run this"
1597
                                 " command." % new_node.secondary_ip)
1598

    
1599
    success, msg = ssh.VerifyNodeHostname(node)
1600
    if not success:
1601
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1602
                               " than the one the resolver gives: %s."
1603
                               " Please fix and re-run this command." %
1604
                               (node, msg))
1605

    
1606
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1607
    # including the node just added
1608
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1609
    dist_nodes = self.cfg.GetNodeList() + [node]
1610
    if myself.name in dist_nodes:
1611
      dist_nodes.remove(myself.name)
1612

    
1613
    logger.Debug("Copying hosts and known_hosts to all nodes")
1614
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1615
      result = rpc.call_upload_file(dist_nodes, fname)
1616
      for to_node in dist_nodes:
1617
        if not result[to_node]:
1618
          logger.Error("copy of file %s to node %s failed" %
1619
                       (fname, to_node))
1620

    
1621
    to_copy = ss.GetFileList()
1622
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1623
      to_copy.append(constants.VNC_PASSWORD_FILE)
1624
    for fname in to_copy:
1625
      if not ssh.CopyFileToNode(node, fname):
1626
        logger.Error("could not copy file %s to node %s" % (fname, node))
1627

    
1628
    logger.Info("adding node %s to cluster.conf" % node)
1629
    self.cfg.AddNode(new_node)
1630

    
1631

    
1632
class LUMasterFailover(LogicalUnit):
1633
  """Failover the master node to the current node.
1634

1635
  This is a special LU in that it must run on a non-master node.
1636

1637
  """
1638
  HPATH = "master-failover"
1639
  HTYPE = constants.HTYPE_CLUSTER
1640
  REQ_MASTER = False
1641
  _OP_REQP = []
1642

    
1643
  def BuildHooksEnv(self):
1644
    """Build hooks env.
1645

1646
    This will run on the new master only in the pre phase, and on all
1647
    the nodes in the post phase.
1648

1649
    """
1650
    env = {
1651
      "OP_TARGET": self.new_master,
1652
      "NEW_MASTER": self.new_master,
1653
      "OLD_MASTER": self.old_master,
1654
      }
1655
    return env, [self.new_master], self.cfg.GetNodeList()
1656

    
1657
  def CheckPrereq(self):
1658
    """Check prerequisites.
1659

1660
    This checks that we are not already the master.
1661

1662
    """
1663
    self.new_master = utils.HostInfo().name
1664
    self.old_master = self.sstore.GetMasterNode()
1665

    
1666
    if self.old_master == self.new_master:
1667
      raise errors.OpPrereqError("This commands must be run on the node"
1668
                                 " where you want the new master to be."
1669
                                 " %s is already the master" %
1670
                                 self.old_master)
1671

    
1672
  def Exec(self, feedback_fn):
1673
    """Failover the master node.
1674

1675
    This command, when run on a non-master node, will cause the current
1676
    master to cease being master, and the non-master to become new
1677
    master.
1678

1679
    """
1680
    #TODO: do not rely on gethostname returning the FQDN
1681
    logger.Info("setting master to %s, old master: %s" %
1682
                (self.new_master, self.old_master))
1683

    
1684
    if not rpc.call_node_stop_master(self.old_master):
1685
      logger.Error("could disable the master role on the old master"
1686
                   " %s, please disable manually" % self.old_master)
1687

    
1688
    ss = self.sstore
1689
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1690
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1691
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1692
      logger.Error("could not distribute the new simple store master file"
1693
                   " to the other nodes, please check.")
1694

    
1695
    if not rpc.call_node_start_master(self.new_master):
1696
      logger.Error("could not start the master role on the new master"
1697
                   " %s, please check" % self.new_master)
1698
      feedback_fn("Error in activating the master IP on the new master,"
1699
                  " please fix manually.")
1700

    
1701

    
1702

    
1703
class LUQueryClusterInfo(NoHooksLU):
1704
  """Query cluster configuration.
1705

1706
  """
1707
  _OP_REQP = []
1708
  REQ_MASTER = False
1709

    
1710
  def CheckPrereq(self):
1711
    """No prerequsites needed for this LU.
1712

1713
    """
1714
    pass
1715

    
1716
  def Exec(self, feedback_fn):
1717
    """Return cluster config.
1718

1719
    """
1720
    result = {
1721
      "name": self.sstore.GetClusterName(),
1722
      "software_version": constants.RELEASE_VERSION,
1723
      "protocol_version": constants.PROTOCOL_VERSION,
1724
      "config_version": constants.CONFIG_VERSION,
1725
      "os_api_version": constants.OS_API_VERSION,
1726
      "export_version": constants.EXPORT_VERSION,
1727
      "master": self.sstore.GetMasterNode(),
1728
      "architecture": (platform.architecture()[0], platform.machine()),
1729
      }
1730

    
1731
    return result
1732

    
1733

    
1734
class LUClusterCopyFile(NoHooksLU):
1735
  """Copy file to cluster.
1736

1737
  """
1738
  _OP_REQP = ["nodes", "filename"]
1739

    
1740
  def CheckPrereq(self):
1741
    """Check prerequisites.
1742

1743
    It should check that the named file exists and that the given list
1744
    of nodes is valid.
1745

1746
    """
1747
    if not os.path.exists(self.op.filename):
1748
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1749

    
1750
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1751

    
1752
  def Exec(self, feedback_fn):
1753
    """Copy a file from master to some nodes.
1754

1755
    Args:
1756
      opts - class with options as members
1757
      args - list containing a single element, the file name
1758
    Opts used:
1759
      nodes - list containing the name of target nodes; if empty, all nodes
1760

1761
    """
1762
    filename = self.op.filename
1763

    
1764
    myname = utils.HostInfo().name
1765

    
1766
    for node in self.nodes:
1767
      if node == myname:
1768
        continue
1769
      if not ssh.CopyFileToNode(node, filename):
1770
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1771

    
1772

    
1773
class LUDumpClusterConfig(NoHooksLU):
1774
  """Return a text-representation of the cluster-config.
1775

1776
  """
1777
  _OP_REQP = []
1778

    
1779
  def CheckPrereq(self):
1780
    """No prerequisites.
1781

1782
    """
1783
    pass
1784

    
1785
  def Exec(self, feedback_fn):
1786
    """Dump a representation of the cluster config to the standard output.
1787

1788
    """
1789
    return self.cfg.DumpConfig()
1790

    
1791

    
1792
class LURunClusterCommand(NoHooksLU):
1793
  """Run a command on some nodes.
1794

1795
  """
1796
  _OP_REQP = ["command", "nodes"]
1797

    
1798
  def CheckPrereq(self):
1799
    """Check prerequisites.
1800

1801
    It checks that the given list of nodes is valid.
1802

1803
    """
1804
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1805

    
1806
  def Exec(self, feedback_fn):
1807
    """Run a command on some nodes.
1808

1809
    """
1810
    data = []
1811
    for node in self.nodes:
1812
      result = ssh.SSHCall(node, "root", self.op.command)
1813
      data.append((node, result.output, result.exit_code))
1814

    
1815
    return data
1816

    
1817

    
1818
class LUActivateInstanceDisks(NoHooksLU):
1819
  """Bring up an instance's disks.
1820

1821
  """
1822
  _OP_REQP = ["instance_name"]
1823

    
1824
  def CheckPrereq(self):
1825
    """Check prerequisites.
1826

1827
    This checks that the instance is in the cluster.
1828

1829
    """
1830
    instance = self.cfg.GetInstanceInfo(
1831
      self.cfg.ExpandInstanceName(self.op.instance_name))
1832
    if instance is None:
1833
      raise errors.OpPrereqError("Instance '%s' not known" %
1834
                                 self.op.instance_name)
1835
    self.instance = instance
1836

    
1837

    
1838
  def Exec(self, feedback_fn):
1839
    """Activate the disks.
1840

1841
    """
1842
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1843
    if not disks_ok:
1844
      raise errors.OpExecError("Cannot activate block devices")
1845

    
1846
    return disks_info
1847

    
1848

    
1849
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1850
  """Prepare the block devices for an instance.
1851

1852
  This sets up the block devices on all nodes.
1853

1854
  Args:
1855
    instance: a ganeti.objects.Instance object
1856
    ignore_secondaries: if true, errors on secondary nodes won't result
1857
                        in an error return from the function
1858

1859
  Returns:
1860
    false if the operation failed
1861
    list of (host, instance_visible_name, node_visible_name) if the operation
1862
         suceeded with the mapping from node devices to instance devices
1863
  """
1864
  device_info = []
1865
  disks_ok = True
1866
  iname = instance.name
1867
  # With the two passes mechanism we try to reduce the window of
1868
  # opportunity for the race condition of switching DRBD to primary
1869
  # before handshaking occured, but we do not eliminate it
1870

    
1871
  # The proper fix would be to wait (with some limits) until the
1872
  # connection has been made and drbd transitions from WFConnection
1873
  # into any other network-connected state (Connected, SyncTarget,
1874
  # SyncSource, etc.)
1875

    
1876
  # 1st pass, assemble on all nodes in secondary mode
1877
  for inst_disk in instance.disks:
1878
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1879
      cfg.SetDiskID(node_disk, node)
1880
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1881
      if not result:
1882
        logger.Error("could not prepare block device %s on node %s"
1883
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1884
        if not ignore_secondaries:
1885
          disks_ok = False
1886

    
1887
  # FIXME: race condition on drbd migration to primary
1888

    
1889
  # 2nd pass, do only the primary node
1890
  for inst_disk in instance.disks:
1891
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1892
      if node != instance.primary_node:
1893
        continue
1894
      cfg.SetDiskID(node_disk, node)
1895
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1896
      if not result:
1897
        logger.Error("could not prepare block device %s on node %s"
1898
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1899
        disks_ok = False
1900
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1901

    
1902
  # leave the disks configured for the primary node
1903
  # this is a workaround that would be fixed better by
1904
  # improving the logical/physical id handling
1905
  for disk in instance.disks:
1906
    cfg.SetDiskID(disk, instance.primary_node)
1907

    
1908
  return disks_ok, device_info
1909

    
1910

    
1911
def _StartInstanceDisks(cfg, instance, force):
1912
  """Start the disks of an instance.
1913

1914
  """
1915
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1916
                                           ignore_secondaries=force)
1917
  if not disks_ok:
1918
    _ShutdownInstanceDisks(instance, cfg)
1919
    if force is not None and not force:
1920
      logger.Error("If the message above refers to a secondary node,"
1921
                   " you can retry the operation using '--force'.")
1922
    raise errors.OpExecError("Disk consistency error")
1923

    
1924

    
1925
class LUDeactivateInstanceDisks(NoHooksLU):
1926
  """Shutdown an instance's disks.
1927

1928
  """
1929
  _OP_REQP = ["instance_name"]
1930

    
1931
  def CheckPrereq(self):
1932
    """Check prerequisites.
1933

1934
    This checks that the instance is in the cluster.
1935

1936
    """
1937
    instance = self.cfg.GetInstanceInfo(
1938
      self.cfg.ExpandInstanceName(self.op.instance_name))
1939
    if instance is None:
1940
      raise errors.OpPrereqError("Instance '%s' not known" %
1941
                                 self.op.instance_name)
1942
    self.instance = instance
1943

    
1944
  def Exec(self, feedback_fn):
1945
    """Deactivate the disks
1946

1947
    """
1948
    instance = self.instance
1949
    ins_l = rpc.call_instance_list([instance.primary_node])
1950
    ins_l = ins_l[instance.primary_node]
1951
    if not type(ins_l) is list:
1952
      raise errors.OpExecError("Can't contact node '%s'" %
1953
                               instance.primary_node)
1954

    
1955
    if self.instance.name in ins_l:
1956
      raise errors.OpExecError("Instance is running, can't shutdown"
1957
                               " block devices.")
1958

    
1959
    _ShutdownInstanceDisks(instance, self.cfg)
1960

    
1961

    
1962
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1963
  """Shutdown block devices of an instance.
1964

1965
  This does the shutdown on all nodes of the instance.
1966

1967
  If the ignore_primary is false, errors on the primary node are
1968
  ignored.
1969

1970
  """
1971
  result = True
1972
  for disk in instance.disks:
1973
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1974
      cfg.SetDiskID(top_disk, node)
1975
      if not rpc.call_blockdev_shutdown(node, top_disk):
1976
        logger.Error("could not shutdown block device %s on node %s" %
1977
                     (disk.iv_name, node))
1978
        if not ignore_primary or node != instance.primary_node:
1979
          result = False
1980
  return result
1981

    
1982

    
1983
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1984
  """Checks if a node has enough free memory.
1985

1986
  This function check if a given node has the needed amount of free
1987
  memory. In case the node has less memory or we cannot get the
1988
  information from the node, this function raise an OpPrereqError
1989
  exception.
1990

1991
  Args:
1992
    - cfg: a ConfigWriter instance
1993
    - node: the node name
1994
    - reason: string to use in the error message
1995
    - requested: the amount of memory in MiB
1996

1997
  """
1998
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1999
  if not nodeinfo or not isinstance(nodeinfo, dict):
2000
    raise errors.OpPrereqError("Could not contact node %s for resource"
2001
                             " information" % (node,))
2002

    
2003
  free_mem = nodeinfo[node].get('memory_free')
2004
  if not isinstance(free_mem, int):
2005
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2006
                             " was '%s'" % (node, free_mem))
2007
  if requested > free_mem:
2008
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2009
                             " needed %s MiB, available %s MiB" %
2010
                             (node, reason, requested, free_mem))
2011

    
2012

    
2013
class LUStartupInstance(LogicalUnit):
2014
  """Starts an instance.
2015

2016
  """
2017
  HPATH = "instance-start"
2018
  HTYPE = constants.HTYPE_INSTANCE
2019
  _OP_REQP = ["instance_name", "force"]
2020

    
2021
  def BuildHooksEnv(self):
2022
    """Build hooks env.
2023

2024
    This runs on master, primary and secondary nodes of the instance.
2025

2026
    """
2027
    env = {
2028
      "FORCE": self.op.force,
2029
      }
2030
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2031
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2032
          list(self.instance.secondary_nodes))
2033
    return env, nl, nl
2034

    
2035
  def CheckPrereq(self):
2036
    """Check prerequisites.
2037

2038
    This checks that the instance is in the cluster.
2039

2040
    """
2041
    instance = self.cfg.GetInstanceInfo(
2042
      self.cfg.ExpandInstanceName(self.op.instance_name))
2043
    if instance is None:
2044
      raise errors.OpPrereqError("Instance '%s' not known" %
2045
                                 self.op.instance_name)
2046

    
2047
    # check bridges existance
2048
    _CheckInstanceBridgesExist(instance)
2049

    
2050
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2051
                         "starting instance %s" % instance.name,
2052
                         instance.memory)
2053

    
2054
    self.instance = instance
2055
    self.op.instance_name = instance.name
2056

    
2057
  def Exec(self, feedback_fn):
2058
    """Start the instance.
2059

2060
    """
2061
    instance = self.instance
2062
    force = self.op.force
2063
    extra_args = getattr(self.op, "extra_args", "")
2064

    
2065
    node_current = instance.primary_node
2066

    
2067
    _StartInstanceDisks(self.cfg, instance, force)
2068

    
2069
    if not rpc.call_instance_start(node_current, instance, extra_args):
2070
      _ShutdownInstanceDisks(instance, self.cfg)
2071
      raise errors.OpExecError("Could not start instance")
2072

    
2073
    self.cfg.MarkInstanceUp(instance.name)
2074

    
2075

    
2076
class LURebootInstance(LogicalUnit):
2077
  """Reboot an instance.
2078

2079
  """
2080
  HPATH = "instance-reboot"
2081
  HTYPE = constants.HTYPE_INSTANCE
2082
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2083

    
2084
  def BuildHooksEnv(self):
2085
    """Build hooks env.
2086

2087
    This runs on master, primary and secondary nodes of the instance.
2088

2089
    """
2090
    env = {
2091
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2092
      }
2093
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2094
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2095
          list(self.instance.secondary_nodes))
2096
    return env, nl, nl
2097

    
2098
  def CheckPrereq(self):
2099
    """Check prerequisites.
2100

2101
    This checks that the instance is in the cluster.
2102

2103
    """
2104
    instance = self.cfg.GetInstanceInfo(
2105
      self.cfg.ExpandInstanceName(self.op.instance_name))
2106
    if instance is None:
2107
      raise errors.OpPrereqError("Instance '%s' not known" %
2108
                                 self.op.instance_name)
2109

    
2110
    # check bridges existance
2111
    _CheckInstanceBridgesExist(instance)
2112

    
2113
    self.instance = instance
2114
    self.op.instance_name = instance.name
2115

    
2116
  def Exec(self, feedback_fn):
2117
    """Reboot the instance.
2118

2119
    """
2120
    instance = self.instance
2121
    ignore_secondaries = self.op.ignore_secondaries
2122
    reboot_type = self.op.reboot_type
2123
    extra_args = getattr(self.op, "extra_args", "")
2124

    
2125
    node_current = instance.primary_node
2126

    
2127
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2128
                           constants.INSTANCE_REBOOT_HARD,
2129
                           constants.INSTANCE_REBOOT_FULL]:
2130
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2131
                                  (constants.INSTANCE_REBOOT_SOFT,
2132
                                   constants.INSTANCE_REBOOT_HARD,
2133
                                   constants.INSTANCE_REBOOT_FULL))
2134

    
2135
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2136
                       constants.INSTANCE_REBOOT_HARD]:
2137
      if not rpc.call_instance_reboot(node_current, instance,
2138
                                      reboot_type, extra_args):
2139
        raise errors.OpExecError("Could not reboot instance")
2140
    else:
2141
      if not rpc.call_instance_shutdown(node_current, instance):
2142
        raise errors.OpExecError("could not shutdown instance for full reboot")
2143
      _ShutdownInstanceDisks(instance, self.cfg)
2144
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2145
      if not rpc.call_instance_start(node_current, instance, extra_args):
2146
        _ShutdownInstanceDisks(instance, self.cfg)
2147
        raise errors.OpExecError("Could not start instance for full reboot")
2148

    
2149
    self.cfg.MarkInstanceUp(instance.name)
2150

    
2151

    
2152
class LUShutdownInstance(LogicalUnit):
2153
  """Shutdown an instance.
2154

2155
  """
2156
  HPATH = "instance-stop"
2157
  HTYPE = constants.HTYPE_INSTANCE
2158
  _OP_REQP = ["instance_name"]
2159

    
2160
  def BuildHooksEnv(self):
2161
    """Build hooks env.
2162

2163
    This runs on master, primary and secondary nodes of the instance.
2164

2165
    """
2166
    env = _BuildInstanceHookEnvByObject(self.instance)
2167
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2168
          list(self.instance.secondary_nodes))
2169
    return env, nl, nl
2170

    
2171
  def CheckPrereq(self):
2172
    """Check prerequisites.
2173

2174
    This checks that the instance is in the cluster.
2175

2176
    """
2177
    instance = self.cfg.GetInstanceInfo(
2178
      self.cfg.ExpandInstanceName(self.op.instance_name))
2179
    if instance is None:
2180
      raise errors.OpPrereqError("Instance '%s' not known" %
2181
                                 self.op.instance_name)
2182
    self.instance = instance
2183

    
2184
  def Exec(self, feedback_fn):
2185
    """Shutdown the instance.
2186

2187
    """
2188
    instance = self.instance
2189
    node_current = instance.primary_node
2190
    if not rpc.call_instance_shutdown(node_current, instance):
2191
      logger.Error("could not shutdown instance")
2192

    
2193
    self.cfg.MarkInstanceDown(instance.name)
2194
    _ShutdownInstanceDisks(instance, self.cfg)
2195

    
2196

    
2197
class LUReinstallInstance(LogicalUnit):
2198
  """Reinstall an instance.
2199

2200
  """
2201
  HPATH = "instance-reinstall"
2202
  HTYPE = constants.HTYPE_INSTANCE
2203
  _OP_REQP = ["instance_name"]
2204

    
2205
  def BuildHooksEnv(self):
2206
    """Build hooks env.
2207

2208
    This runs on master, primary and secondary nodes of the instance.
2209

2210
    """
2211
    env = _BuildInstanceHookEnvByObject(self.instance)
2212
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2213
          list(self.instance.secondary_nodes))
2214
    return env, nl, nl
2215

    
2216
  def CheckPrereq(self):
2217
    """Check prerequisites.
2218

2219
    This checks that the instance is in the cluster and is not running.
2220

2221
    """
2222
    instance = self.cfg.GetInstanceInfo(
2223
      self.cfg.ExpandInstanceName(self.op.instance_name))
2224
    if instance is None:
2225
      raise errors.OpPrereqError("Instance '%s' not known" %
2226
                                 self.op.instance_name)
2227
    if instance.disk_template == constants.DT_DISKLESS:
2228
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2229
                                 self.op.instance_name)
2230
    if instance.status != "down":
2231
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2232
                                 self.op.instance_name)
2233
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2234
    if remote_info:
2235
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2236
                                 (self.op.instance_name,
2237
                                  instance.primary_node))
2238

    
2239
    self.op.os_type = getattr(self.op, "os_type", None)
2240
    if self.op.os_type is not None:
2241
      # OS verification
2242
      pnode = self.cfg.GetNodeInfo(
2243
        self.cfg.ExpandNodeName(instance.primary_node))
2244
      if pnode is None:
2245
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2246
                                   self.op.pnode)
2247
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2248
      if not os_obj:
2249
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2250
                                   " primary node"  % self.op.os_type)
2251

    
2252
    self.instance = instance
2253

    
2254
  def Exec(self, feedback_fn):
2255
    """Reinstall the instance.
2256

2257
    """
2258
    inst = self.instance
2259

    
2260
    if self.op.os_type is not None:
2261
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2262
      inst.os = self.op.os_type
2263
      self.cfg.AddInstance(inst)
2264

    
2265
    _StartInstanceDisks(self.cfg, inst, None)
2266
    try:
2267
      feedback_fn("Running the instance OS create scripts...")
2268
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2269
        raise errors.OpExecError("Could not install OS for instance %s"
2270
                                 " on node %s" %
2271
                                 (inst.name, inst.primary_node))
2272
    finally:
2273
      _ShutdownInstanceDisks(inst, self.cfg)
2274

    
2275

    
2276
class LURenameInstance(LogicalUnit):
2277
  """Rename an instance.
2278

2279
  """
2280
  HPATH = "instance-rename"
2281
  HTYPE = constants.HTYPE_INSTANCE
2282
  _OP_REQP = ["instance_name", "new_name"]
2283

    
2284
  def BuildHooksEnv(self):
2285
    """Build hooks env.
2286

2287
    This runs on master, primary and secondary nodes of the instance.
2288

2289
    """
2290
    env = _BuildInstanceHookEnvByObject(self.instance)
2291
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2292
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2293
          list(self.instance.secondary_nodes))
2294
    return env, nl, nl
2295

    
2296
  def CheckPrereq(self):
2297
    """Check prerequisites.
2298

2299
    This checks that the instance is in the cluster and is not running.
2300

2301
    """
2302
    instance = self.cfg.GetInstanceInfo(
2303
      self.cfg.ExpandInstanceName(self.op.instance_name))
2304
    if instance is None:
2305
      raise errors.OpPrereqError("Instance '%s' not known" %
2306
                                 self.op.instance_name)
2307
    if instance.status != "down":
2308
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2309
                                 self.op.instance_name)
2310
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2311
    if remote_info:
2312
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2313
                                 (self.op.instance_name,
2314
                                  instance.primary_node))
2315
    self.instance = instance
2316

    
2317
    # new name verification
2318
    name_info = utils.HostInfo(self.op.new_name)
2319

    
2320
    self.op.new_name = new_name = name_info.name
2321
    instance_list = self.cfg.GetInstanceList()
2322
    if new_name in instance_list:
2323
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2324
                                 instance_name)
2325

    
2326
    if not getattr(self.op, "ignore_ip", False):
2327
      command = ["fping", "-q", name_info.ip]
2328
      result = utils.RunCmd(command)
2329
      if not result.failed:
2330
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2331
                                   (name_info.ip, new_name))
2332

    
2333

    
2334
  def Exec(self, feedback_fn):
2335
    """Reinstall the instance.
2336

2337
    """
2338
    inst = self.instance
2339
    old_name = inst.name
2340

    
2341
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2342

    
2343
    # re-read the instance from the configuration after rename
2344
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2345

    
2346
    _StartInstanceDisks(self.cfg, inst, None)
2347
    try:
2348
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2349
                                          "sda", "sdb"):
2350
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2351
               " instance has been renamed in Ganeti)" %
2352
               (inst.name, inst.primary_node))
2353
        logger.Error(msg)
2354
    finally:
2355
      _ShutdownInstanceDisks(inst, self.cfg)
2356

    
2357

    
2358
class LURemoveInstance(LogicalUnit):
2359
  """Remove an instance.
2360

2361
  """
2362
  HPATH = "instance-remove"
2363
  HTYPE = constants.HTYPE_INSTANCE
2364
  _OP_REQP = ["instance_name"]
2365

    
2366
  def BuildHooksEnv(self):
2367
    """Build hooks env.
2368

2369
    This runs on master, primary and secondary nodes of the instance.
2370

2371
    """
2372
    env = _BuildInstanceHookEnvByObject(self.instance)
2373
    nl = [self.sstore.GetMasterNode()]
2374
    return env, nl, nl
2375

    
2376
  def CheckPrereq(self):
2377
    """Check prerequisites.
2378

2379
    This checks that the instance is in the cluster.
2380

2381
    """
2382
    instance = self.cfg.GetInstanceInfo(
2383
      self.cfg.ExpandInstanceName(self.op.instance_name))
2384
    if instance is None:
2385
      raise errors.OpPrereqError("Instance '%s' not known" %
2386
                                 self.op.instance_name)
2387
    self.instance = instance
2388

    
2389
  def Exec(self, feedback_fn):
2390
    """Remove the instance.
2391

2392
    """
2393
    instance = self.instance
2394
    logger.Info("shutting down instance %s on node %s" %
2395
                (instance.name, instance.primary_node))
2396

    
2397
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2398
      if self.op.ignore_failures:
2399
        feedback_fn("Warning: can't shutdown instance")
2400
      else:
2401
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2402
                                 (instance.name, instance.primary_node))
2403

    
2404
    logger.Info("removing block devices for instance %s" % instance.name)
2405

    
2406
    if not _RemoveDisks(instance, self.cfg):
2407
      if self.op.ignore_failures:
2408
        feedback_fn("Warning: can't remove instance's disks")
2409
      else:
2410
        raise errors.OpExecError("Can't remove instance's disks")
2411

    
2412
    logger.Info("removing instance %s out of cluster config" % instance.name)
2413

    
2414
    self.cfg.RemoveInstance(instance.name)
2415

    
2416

    
2417
class LUQueryInstances(NoHooksLU):
2418
  """Logical unit for querying instances.
2419

2420
  """
2421
  _OP_REQP = ["output_fields", "names"]
2422

    
2423
  def CheckPrereq(self):
2424
    """Check prerequisites.
2425

2426
    This checks that the fields required are valid output fields.
2427

2428
    """
2429
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2430
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2431
                               "admin_state", "admin_ram",
2432
                               "disk_template", "ip", "mac", "bridge",
2433
                               "sda_size", "sdb_size", "vcpus"],
2434
                       dynamic=self.dynamic_fields,
2435
                       selected=self.op.output_fields)
2436

    
2437
    self.wanted = _GetWantedInstances(self, self.op.names)
2438

    
2439
  def Exec(self, feedback_fn):
2440
    """Computes the list of nodes and their attributes.
2441

2442
    """
2443
    instance_names = self.wanted
2444
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2445
                     in instance_names]
2446

    
2447
    # begin data gathering
2448

    
2449
    nodes = frozenset([inst.primary_node for inst in instance_list])
2450

    
2451
    bad_nodes = []
2452
    if self.dynamic_fields.intersection(self.op.output_fields):
2453
      live_data = {}
2454
      node_data = rpc.call_all_instances_info(nodes)
2455
      for name in nodes:
2456
        result = node_data[name]
2457
        if result:
2458
          live_data.update(result)
2459
        elif result == False:
2460
          bad_nodes.append(name)
2461
        # else no instance is alive
2462
    else:
2463
      live_data = dict([(name, {}) for name in instance_names])
2464

    
2465
    # end data gathering
2466

    
2467
    output = []
2468
    for instance in instance_list:
2469
      iout = []
2470
      for field in self.op.output_fields:
2471
        if field == "name":
2472
          val = instance.name
2473
        elif field == "os":
2474
          val = instance.os
2475
        elif field == "pnode":
2476
          val = instance.primary_node
2477
        elif field == "snodes":
2478
          val = list(instance.secondary_nodes)
2479
        elif field == "admin_state":
2480
          val = (instance.status != "down")
2481
        elif field == "oper_state":
2482
          if instance.primary_node in bad_nodes:
2483
            val = None
2484
          else:
2485
            val = bool(live_data.get(instance.name))
2486
        elif field == "status":
2487
          if instance.primary_node in bad_nodes:
2488
            val = "ERROR_nodedown"
2489
          else:
2490
            running = bool(live_data.get(instance.name))
2491
            if running:
2492
              if instance.status != "down":
2493
                val = "running"
2494
              else:
2495
                val = "ERROR_up"
2496
            else:
2497
              if instance.status != "down":
2498
                val = "ERROR_down"
2499
              else:
2500
                val = "ADMIN_down"
2501
        elif field == "admin_ram":
2502
          val = instance.memory
2503
        elif field == "oper_ram":
2504
          if instance.primary_node in bad_nodes:
2505
            val = None
2506
          elif instance.name in live_data:
2507
            val = live_data[instance.name].get("memory", "?")
2508
          else:
2509
            val = "-"
2510
        elif field == "disk_template":
2511
          val = instance.disk_template
2512
        elif field == "ip":
2513
          val = instance.nics[0].ip
2514
        elif field == "bridge":
2515
          val = instance.nics[0].bridge
2516
        elif field == "mac":
2517
          val = instance.nics[0].mac
2518
        elif field == "sda_size" or field == "sdb_size":
2519
          disk = instance.FindDisk(field[:3])
2520
          if disk is None:
2521
            val = None
2522
          else:
2523
            val = disk.size
2524
        elif field == "vcpus":
2525
          val = instance.vcpus
2526
        else:
2527
          raise errors.ParameterError(field)
2528
        iout.append(val)
2529
      output.append(iout)
2530

    
2531
    return output
2532

    
2533

    
2534
class LUFailoverInstance(LogicalUnit):
2535
  """Failover an instance.
2536

2537
  """
2538
  HPATH = "instance-failover"
2539
  HTYPE = constants.HTYPE_INSTANCE
2540
  _OP_REQP = ["instance_name", "ignore_consistency"]
2541

    
2542
  def BuildHooksEnv(self):
2543
    """Build hooks env.
2544

2545
    This runs on master, primary and secondary nodes of the instance.
2546

2547
    """
2548
    env = {
2549
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2550
      }
2551
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2552
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2553
    return env, nl, nl
2554

    
2555
  def CheckPrereq(self):
2556
    """Check prerequisites.
2557

2558
    This checks that the instance is in the cluster.
2559

2560
    """
2561
    instance = self.cfg.GetInstanceInfo(
2562
      self.cfg.ExpandInstanceName(self.op.instance_name))
2563
    if instance is None:
2564
      raise errors.OpPrereqError("Instance '%s' not known" %
2565
                                 self.op.instance_name)
2566

    
2567
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2568
      raise errors.OpPrereqError("Instance's disk layout is not"
2569
                                 " network mirrored, cannot failover.")
2570

    
2571
    secondary_nodes = instance.secondary_nodes
2572
    if not secondary_nodes:
2573
      raise errors.ProgrammerError("no secondary node but using "
2574
                                   "DT_REMOTE_RAID1 template")
2575

    
2576
    target_node = secondary_nodes[0]
2577
    # check memory requirements on the secondary node
2578
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2579
                         instance.name, instance.memory)
2580

    
2581
    # check bridge existance
2582
    brlist = [nic.bridge for nic in instance.nics]
2583
    if not rpc.call_bridges_exist(target_node, brlist):
2584
      raise errors.OpPrereqError("One or more target bridges %s does not"
2585
                                 " exist on destination node '%s'" %
2586
                                 (brlist, target_node))
2587

    
2588
    self.instance = instance
2589

    
2590
  def Exec(self, feedback_fn):
2591
    """Failover an instance.
2592

2593
    The failover is done by shutting it down on its present node and
2594
    starting it on the secondary.
2595

2596
    """
2597
    instance = self.instance
2598

    
2599
    source_node = instance.primary_node
2600
    target_node = instance.secondary_nodes[0]
2601

    
2602
    feedback_fn("* checking disk consistency between source and target")
2603
    for dev in instance.disks:
2604
      # for remote_raid1, these are md over drbd
2605
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2606
        if not self.op.ignore_consistency:
2607
          raise errors.OpExecError("Disk %s is degraded on target node,"
2608
                                   " aborting failover." % dev.iv_name)
2609

    
2610
    feedback_fn("* shutting down instance on source node")
2611
    logger.Info("Shutting down instance %s on node %s" %
2612
                (instance.name, source_node))
2613

    
2614
    if not rpc.call_instance_shutdown(source_node, instance):
2615
      if self.op.ignore_consistency:
2616
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2617
                     " anyway. Please make sure node %s is down"  %
2618
                     (instance.name, source_node, source_node))
2619
      else:
2620
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2621
                                 (instance.name, source_node))
2622

    
2623
    feedback_fn("* deactivating the instance's disks on source node")
2624
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2625
      raise errors.OpExecError("Can't shut down the instance's disks.")
2626

    
2627
    instance.primary_node = target_node
2628
    # distribute new instance config to the other nodes
2629
    self.cfg.AddInstance(instance)
2630

    
2631
    feedback_fn("* activating the instance's disks on target node")
2632
    logger.Info("Starting instance %s on node %s" %
2633
                (instance.name, target_node))
2634

    
2635
    disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2636
                                             ignore_secondaries=True)
2637
    if not disks_ok:
2638
      _ShutdownInstanceDisks(instance, self.cfg)
2639
      raise errors.OpExecError("Can't activate the instance's disks")
2640

    
2641
    feedback_fn("* starting the instance on the target node")
2642
    if not rpc.call_instance_start(target_node, instance, None):
2643
      _ShutdownInstanceDisks(instance, self.cfg)
2644
      raise errors.OpExecError("Could not start instance %s on node %s." %
2645
                               (instance.name, target_node))
2646

    
2647

    
2648
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2649
  """Create a tree of block devices on the primary node.
2650

2651
  This always creates all devices.
2652

2653
  """
2654
  if device.children:
2655
    for child in device.children:
2656
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2657
        return False
2658

    
2659
  cfg.SetDiskID(device, node)
2660
  new_id = rpc.call_blockdev_create(node, device, device.size,
2661
                                    instance.name, True, info)
2662
  if not new_id:
2663
    return False
2664
  if device.physical_id is None:
2665
    device.physical_id = new_id
2666
  return True
2667

    
2668

    
2669
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2670
  """Create a tree of block devices on a secondary node.
2671

2672
  If this device type has to be created on secondaries, create it and
2673
  all its children.
2674

2675
  If not, just recurse to children keeping the same 'force' value.
2676

2677
  """
2678
  if device.CreateOnSecondary():
2679
    force = True
2680
  if device.children:
2681
    for child in device.children:
2682
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2683
                                        child, force, info):
2684
        return False
2685

    
2686
  if not force:
2687
    return True
2688
  cfg.SetDiskID(device, node)
2689
  new_id = rpc.call_blockdev_create(node, device, device.size,
2690
                                    instance.name, False, info)
2691
  if not new_id:
2692
    return False
2693
  if device.physical_id is None:
2694
    device.physical_id = new_id
2695
  return True
2696

    
2697

    
2698
def _GenerateUniqueNames(cfg, exts):
2699
  """Generate a suitable LV name.
2700

2701
  This will generate a logical volume name for the given instance.
2702

2703
  """
2704
  results = []
2705
  for val in exts:
2706
    new_id = cfg.GenerateUniqueID()
2707
    results.append("%s%s" % (new_id, val))
2708
  return results
2709

    
2710

    
2711
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2712
  """Generate a drbd device complete with its children.
2713

2714
  """
2715
  port = cfg.AllocatePort()
2716
  vgname = cfg.GetVGName()
2717
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2718
                          logical_id=(vgname, names[0]))
2719
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2720
                          logical_id=(vgname, names[1]))
2721
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2722
                          logical_id = (primary, secondary, port),
2723
                          children = [dev_data, dev_meta])
2724
  return drbd_dev
2725

    
2726

    
2727
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2728
  """Generate a drbd8 device complete with its children.
2729

2730
  """
2731
  port = cfg.AllocatePort()
2732
  vgname = cfg.GetVGName()
2733
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2734
                          logical_id=(vgname, names[0]))
2735
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2736
                          logical_id=(vgname, names[1]))
2737
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2738
                          logical_id = (primary, secondary, port),
2739
                          children = [dev_data, dev_meta],
2740
                          iv_name=iv_name)
2741
  return drbd_dev
2742

    
2743

    
2744
def _GenerateDiskTemplate(cfg, template_name,
2745
                          instance_name, primary_node,
2746
                          secondary_nodes, disk_sz, swap_sz):
2747
  """Generate the entire disk layout for a given template type.
2748

2749
  """
2750
  #TODO: compute space requirements
2751

    
2752
  vgname = cfg.GetVGName()
2753
  if template_name == constants.DT_DISKLESS:
2754
    disks = []
2755
  elif template_name == constants.DT_PLAIN:
2756
    if len(secondary_nodes) != 0:
2757
      raise errors.ProgrammerError("Wrong template configuration")
2758

    
2759
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2760
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2761
                           logical_id=(vgname, names[0]),
2762
                           iv_name = "sda")
2763
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2764
                           logical_id=(vgname, names[1]),
2765
                           iv_name = "sdb")
2766
    disks = [sda_dev, sdb_dev]
2767
  elif template_name == constants.DT_LOCAL_RAID1:
2768
    if len(secondary_nodes) != 0:
2769
      raise errors.ProgrammerError("Wrong template configuration")
2770

    
2771

    
2772
    names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2773
                                       ".sdb_m1", ".sdb_m2"])
2774
    sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2775
                              logical_id=(vgname, names[0]))
2776
    sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2777
                              logical_id=(vgname, names[1]))
2778
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2779
                              size=disk_sz,
2780
                              children = [sda_dev_m1, sda_dev_m2])
2781
    sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2782
                              logical_id=(vgname, names[2]))
2783
    sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2784
                              logical_id=(vgname, names[3]))
2785
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2786
                              size=swap_sz,
2787
                              children = [sdb_dev_m1, sdb_dev_m2])
2788
    disks = [md_sda_dev, md_sdb_dev]
2789
  elif template_name == constants.DT_REMOTE_RAID1:
2790
    if len(secondary_nodes) != 1:
2791
      raise errors.ProgrammerError("Wrong template configuration")
2792
    remote_node = secondary_nodes[0]
2793
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2794
                                       ".sdb_data", ".sdb_meta"])
2795
    drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2796
                                         disk_sz, names[0:2])
2797
    md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2798
                              children = [drbd_sda_dev], size=disk_sz)
2799
    drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2800
                                         swap_sz, names[2:4])
2801
    md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2802
                              children = [drbd_sdb_dev], size=swap_sz)
2803
    disks = [md_sda_dev, md_sdb_dev]
2804
  elif template_name == constants.DT_DRBD8:
2805
    if len(secondary_nodes) != 1:
2806
      raise errors.ProgrammerError("Wrong template configuration")
2807
    remote_node = secondary_nodes[0]
2808
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2809
                                       ".sdb_data", ".sdb_meta"])
2810
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2811
                                         disk_sz, names[0:2], "sda")
2812
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2813
                                         swap_sz, names[2:4], "sdb")
2814
    disks = [drbd_sda_dev, drbd_sdb_dev]
2815
  else:
2816
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2817
  return disks
2818

    
2819

    
2820
def _GetInstanceInfoText(instance):
2821
  """Compute that text that should be added to the disk's metadata.
2822

2823
  """
2824
  return "originstname+%s" % instance.name
2825

    
2826

    
2827
def _CreateDisks(cfg, instance):
2828
  """Create all disks for an instance.
2829

2830
  This abstracts away some work from AddInstance.
2831

2832
  Args:
2833
    instance: the instance object
2834

2835
  Returns:
2836
    True or False showing the success of the creation process
2837

2838
  """
2839
  info = _GetInstanceInfoText(instance)
2840

    
2841
  for device in instance.disks:
2842
    logger.Info("creating volume %s for instance %s" %
2843
              (device.iv_name, instance.name))
2844
    #HARDCODE
2845
    for secondary_node in instance.secondary_nodes:
2846
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2847
                                        device, False, info):
2848
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2849
                     (device.iv_name, device, secondary_node))
2850
        return False
2851
    #HARDCODE
2852
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2853
                                    instance, device, info):
2854
      logger.Error("failed to create volume %s on primary!" %
2855
                   device.iv_name)
2856
      return False
2857
  return True
2858

    
2859

    
2860
def _RemoveDisks(instance, cfg):
2861
  """Remove all disks for an instance.
2862

2863
  This abstracts away some work from `AddInstance()` and
2864
  `RemoveInstance()`. Note that in case some of the devices couldn't
2865
  be removed, the removal will continue with the other ones (compare
2866
  with `_CreateDisks()`).
2867

2868
  Args:
2869
    instance: the instance object
2870

2871
  Returns:
2872
    True or False showing the success of the removal proces
2873

2874
  """
2875
  logger.Info("removing block devices for instance %s" % instance.name)
2876

    
2877
  result = True
2878
  for device in instance.disks:
2879
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2880
      cfg.SetDiskID(disk, node)
2881
      if not rpc.call_blockdev_remove(node, disk):
2882
        logger.Error("could not remove block device %s on node %s,"
2883
                     " continuing anyway" %
2884
                     (device.iv_name, node))
2885
        result = False
2886
  return result
2887

    
2888

    
2889
class LUCreateInstance(LogicalUnit):
2890
  """Create an instance.
2891

2892
  """
2893
  HPATH = "instance-add"
2894
  HTYPE = constants.HTYPE_INSTANCE
2895
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2896
              "disk_template", "swap_size", "mode", "start", "vcpus",
2897
              "wait_for_sync", "ip_check", "mac"]
2898

    
2899
  def BuildHooksEnv(self):
2900
    """Build hooks env.
2901

2902
    This runs on master, primary and secondary nodes of the instance.
2903

2904
    """
2905
    env = {
2906
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2907
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2908
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2909
      "INSTANCE_ADD_MODE": self.op.mode,
2910
      }
2911
    if self.op.mode == constants.INSTANCE_IMPORT:
2912
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2913
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2914
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2915

    
2916
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2917
      primary_node=self.op.pnode,
2918
      secondary_nodes=self.secondaries,
2919
      status=self.instance_status,
2920
      os_type=self.op.os_type,
2921
      memory=self.op.mem_size,
2922
      vcpus=self.op.vcpus,
2923
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2924
    ))
2925

    
2926
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2927
          self.secondaries)
2928
    return env, nl, nl
2929

    
2930

    
2931
  def CheckPrereq(self):
2932
    """Check prerequisites.
2933

2934
    """
2935
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2936
      if not hasattr(self.op, attr):
2937
        setattr(self.op, attr, None)
2938

    
2939
    if self.op.mode not in (constants.INSTANCE_CREATE,
2940
                            constants.INSTANCE_IMPORT):
2941
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2942
                                 self.op.mode)
2943

    
2944
    if self.op.mode == constants.INSTANCE_IMPORT:
2945
      src_node = getattr(self.op, "src_node", None)
2946
      src_path = getattr(self.op, "src_path", None)
2947
      if src_node is None or src_path is None:
2948
        raise errors.OpPrereqError("Importing an instance requires source"
2949
                                   " node and path options")
2950
      src_node_full = self.cfg.ExpandNodeName(src_node)
2951
      if src_node_full is None:
2952
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2953
      self.op.src_node = src_node = src_node_full
2954

    
2955
      if not os.path.isabs(src_path):
2956
        raise errors.OpPrereqError("The source path must be absolute")
2957

    
2958
      export_info = rpc.call_export_info(src_node, src_path)
2959

    
2960
      if not export_info:
2961
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2962

    
2963
      if not export_info.has_section(constants.INISECT_EXP):
2964
        raise errors.ProgrammerError("Corrupted export config")
2965

    
2966
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2967
      if (int(ei_version) != constants.EXPORT_VERSION):
2968
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2969
                                   (ei_version, constants.EXPORT_VERSION))
2970

    
2971
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2972
        raise errors.OpPrereqError("Can't import instance with more than"
2973
                                   " one data disk")
2974

    
2975
      # FIXME: are the old os-es, disk sizes, etc. useful?
2976
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2977
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2978
                                                         'disk0_dump'))
2979
      self.src_image = diskimage
2980
    else: # INSTANCE_CREATE
2981
      if getattr(self.op, "os_type", None) is None:
2982
        raise errors.OpPrereqError("No guest OS specified")
2983

    
2984
    # check primary node
2985
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2986
    if pnode is None:
2987
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
2988
                                 self.op.pnode)
2989
    self.op.pnode = pnode.name
2990
    self.pnode = pnode
2991
    self.secondaries = []
2992
    # disk template and mirror node verification
2993
    if self.op.disk_template not in constants.DISK_TEMPLATES:
2994
      raise errors.OpPrereqError("Invalid disk template name")
2995

    
2996
    if self.op.disk_template in constants.DTS_NET_MIRROR:
2997
      if getattr(self.op, "snode", None) is None:
2998
        raise errors.OpPrereqError("The networked disk templates need"
2999
                                   " a mirror node")
3000

    
3001
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3002
      if snode_name is None:
3003
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3004
                                   self.op.snode)
3005
      elif snode_name == pnode.name:
3006
        raise errors.OpPrereqError("The secondary node cannot be"
3007
                                   " the primary node.")
3008
      self.secondaries.append(snode_name)
3009

    
3010
    # Required free disk space as a function of disk and swap space
3011
    req_size_dict = {
3012
      constants.DT_DISKLESS: None,
3013
      constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3014
      constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3015
      # 256 MB are added for drbd metadata, 128MB for each drbd device
3016
      constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3017
      constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3018
    }
3019

    
3020
    if self.op.disk_template not in req_size_dict:
3021
      raise errors.ProgrammerError("Disk template '%s' size requirement"
3022
                                   " is unknown" %  self.op.disk_template)
3023

    
3024
    req_size = req_size_dict[self.op.disk_template]
3025

    
3026
    # Check lv size requirements
3027
    if req_size is not None:
3028
      nodenames = [pnode.name] + self.secondaries
3029
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3030
      for node in nodenames:
3031
        info = nodeinfo.get(node, None)
3032
        if not info:
3033
          raise errors.OpPrereqError("Cannot get current information"
3034
                                     " from node '%s'" % nodeinfo)
3035
        vg_free = info.get('vg_free', None)
3036
        if not isinstance(vg_free, int):
3037
          raise errors.OpPrereqError("Can't compute free disk space on"
3038
                                     " node %s" % node)
3039
        if req_size > info['vg_free']:
3040
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3041
                                     " %d MB available, %d MB required" %
3042
                                     (node, info['vg_free'], req_size))
3043

    
3044
    # os verification
3045
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3046
    if not os_obj:
3047
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3048
                                 " primary node"  % self.op.os_type)
3049

    
3050
    if self.op.kernel_path == constants.VALUE_NONE:
3051
      raise errors.OpPrereqError("Can't set instance kernel to none")
3052

    
3053
    # instance verification
3054
    hostname1 = utils.HostInfo(self.op.instance_name)
3055

    
3056
    self.op.instance_name = instance_name = hostname1.name
3057
    instance_list = self.cfg.GetInstanceList()
3058
    if instance_name in instance_list:
3059
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3060
                                 instance_name)
3061

    
3062
    ip = getattr(self.op, "ip", None)
3063
    if ip is None or ip.lower() == "none":
3064
      inst_ip = None
3065
    elif ip.lower() == "auto":
3066
      inst_ip = hostname1.ip
3067
    else:
3068
      if not utils.IsValidIP(ip):
3069
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3070
                                   " like a valid IP" % ip)
3071
      inst_ip = ip
3072
    self.inst_ip = inst_ip
3073

    
3074
    if self.op.start and not self.op.ip_check:
3075
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3076
                                 " adding an instance in start mode")
3077

    
3078
    if self.op.ip_check:
3079
      if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3080
                       constants.DEFAULT_NODED_PORT):
3081
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3082
                                   (hostname1.ip, instance_name))
3083

    
3084
    # MAC address verification
3085
    if self.op.mac != "auto":
3086
      if not utils.IsValidMac(self.op.mac.lower()):
3087
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3088
                                   self.op.mac)
3089

    
3090
    # bridge verification
3091
    bridge = getattr(self.op, "bridge", None)
3092
    if bridge is None:
3093
      self.op.bridge = self.cfg.GetDefBridge()
3094
    else:
3095
      self.op.bridge = bridge
3096

    
3097
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3098
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3099
                                 " destination node '%s'" %
3100
                                 (self.op.bridge, pnode.name))
3101

    
3102
    # boot order verification
3103
    if self.op.hvm_boot_order is not None:
3104
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3105
        raise errors.OpPrereqError("invalid boot order specified,"
3106
                                   " must be one or more of [acdn]")
3107

    
3108
    if self.op.start:
3109
      self.instance_status = 'up'
3110
    else:
3111
      self.instance_status = 'down'
3112

    
3113
  def Exec(self, feedback_fn):
3114
    """Create and add the instance to the cluster.
3115

3116
    """
3117
    instance = self.op.instance_name
3118
    pnode_name = self.pnode.name
3119

    
3120
    if self.op.mac == "auto":
3121
      mac_address = self.cfg.GenerateMAC()
3122
    else:
3123
      mac_address = self.op.mac
3124

    
3125
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3126
    if self.inst_ip is not None:
3127
      nic.ip = self.inst_ip
3128

    
3129
    ht_kind = self.sstore.GetHypervisorType()
3130
    if ht_kind in constants.HTS_REQ_PORT:
3131
      network_port = self.cfg.AllocatePort()
3132
    else:
3133
      network_port = None
3134

    
3135
    disks = _GenerateDiskTemplate(self.cfg,
3136
                                  self.op.disk_template,
3137
                                  instance, pnode_name,
3138
                                  self.secondaries, self.op.disk_size,
3139
                                  self.op.swap_size)
3140

    
3141
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3142
                            primary_node=pnode_name,
3143
                            memory=self.op.mem_size,
3144
                            vcpus=self.op.vcpus,
3145
                            nics=[nic], disks=disks,
3146
                            disk_template=self.op.disk_template,
3147
                            status=self.instance_status,
3148
                            network_port=network_port,
3149
                            kernel_path=self.op.kernel_path,
3150
                            initrd_path=self.op.initrd_path,
3151
                            hvm_boot_order=self.op.hvm_boot_order,
3152
                            )
3153

    
3154
    feedback_fn("* creating instance disks...")
3155
    if not _CreateDisks(self.cfg, iobj):
3156
      _RemoveDisks(iobj, self.cfg)
3157
      raise errors.OpExecError("Device creation failed, reverting...")
3158

    
3159
    feedback_fn("adding instance %s to cluster config" % instance)
3160

    
3161
    self.cfg.AddInstance(iobj)
3162

    
3163
    if self.op.wait_for_sync:
3164
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3165
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3166
      # make sure the disks are not degraded (still sync-ing is ok)
3167
      time.sleep(15)
3168
      feedback_fn("* checking mirrors status")
3169
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3170
    else:
3171
      disk_abort = False
3172

    
3173
    if disk_abort:
3174
      _RemoveDisks(iobj, self.cfg)
3175
      self.cfg.RemoveInstance(iobj.name)
3176
      raise errors.OpExecError("There are some degraded disks for"
3177
                               " this instance")
3178

    
3179
    feedback_fn("creating os for instance %s on node %s" %
3180
                (instance, pnode_name))
3181

    
3182
    if iobj.disk_template != constants.DT_DISKLESS:
3183
      if self.op.mode == constants.INSTANCE_CREATE:
3184
        feedback_fn("* running the instance OS create scripts...")
3185
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3186
          raise errors.OpExecError("could not add os for instance %s"
3187
                                   " on node %s" %
3188
                                   (instance, pnode_name))
3189

    
3190
      elif self.op.mode == constants.INSTANCE_IMPORT:
3191
        feedback_fn("* running the instance OS import scripts...")
3192
        src_node = self.op.src_node
3193
        src_image = self.src_image
3194
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3195
                                                src_node, src_image):
3196
          raise errors.OpExecError("Could not import os for instance"
3197
                                   " %s on node %s" %
3198
                                   (instance, pnode_name))
3199
      else:
3200
        # also checked in the prereq part
3201
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3202
                                     % self.op.mode)
3203

    
3204
    if self.op.start:
3205
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3206
      feedback_fn("* starting instance...")
3207
      if not rpc.call_instance_start(pnode_name, iobj, None):
3208
        raise errors.OpExecError("Could not start instance")
3209

    
3210

    
3211
class LUConnectConsole(NoHooksLU):
3212
  """Connect to an instance's console.
3213

3214
  This is somewhat special in that it returns the command line that
3215
  you need to run on the master node in order to connect to the
3216
  console.
3217

3218
  """
3219
  _OP_REQP = ["instance_name"]
3220

    
3221
  def CheckPrereq(self):
3222
    """Check prerequisites.
3223

3224
    This checks that the instance is in the cluster.
3225

3226
    """
3227
    instance = self.cfg.GetInstanceInfo(
3228
      self.cfg.ExpandInstanceName(self.op.instance_name))
3229
    if instance is None:
3230
      raise errors.OpPrereqError("Instance '%s' not known" %
3231
                                 self.op.instance_name)
3232
    self.instance = instance
3233

    
3234
  def Exec(self, feedback_fn):
3235
    """Connect to the console of an instance
3236

3237
    """
3238
    instance = self.instance
3239
    node = instance.primary_node
3240

    
3241
    node_insts = rpc.call_instance_list([node])[node]
3242
    if node_insts is False:
3243
      raise errors.OpExecError("Can't connect to node %s." % node)
3244

    
3245
    if instance.name not in node_insts:
3246
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3247

    
3248
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3249

    
3250
    hyper = hypervisor.GetHypervisor()
3251
    console_cmd = hyper.GetShellCommandForConsole(instance)
3252
    # build ssh cmdline
3253
    argv = ["ssh", "-q", "-t"]
3254
    argv.extend(ssh.KNOWN_HOSTS_OPTS)
3255
    argv.extend(ssh.BATCH_MODE_OPTS)
3256
    argv.append(node)
3257
    argv.append(console_cmd)
3258
    return "ssh", argv
3259

    
3260

    
3261
class LUAddMDDRBDComponent(LogicalUnit):
3262
  """Adda new mirror member to an instance's disk.
3263

3264
  """
3265
  HPATH = "mirror-add"
3266
  HTYPE = constants.HTYPE_INSTANCE
3267
  _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3268

    
3269
  def BuildHooksEnv(self):
3270
    """Build hooks env.
3271

3272
    This runs on the master, the primary and all the secondaries.
3273

3274
    """
3275
    env = {
3276
      "NEW_SECONDARY": self.op.remote_node,
3277
      "DISK_NAME": self.op.disk_name,
3278
      }
3279
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3280
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3281
          self.op.remote_node,] + list(self.instance.secondary_nodes)
3282
    return env, nl, nl
3283

    
3284
  def CheckPrereq(self):
3285
    """Check prerequisites.
3286

3287
    This checks that the instance is in the cluster.
3288

3289
    """
3290
    instance = self.cfg.GetInstanceInfo(
3291
      self.cfg.ExpandInstanceName(self.op.instance_name))
3292
    if instance is None:
3293
      raise errors.OpPrereqError("Instance '%s' not known" %
3294
                                 self.op.instance_name)
3295
    self.instance = instance
3296

    
3297
    remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3298
    if remote_node is None:
3299
      raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3300
    self.remote_node = remote_node
3301

    
3302
    if remote_node == instance.primary_node:
3303
      raise errors.OpPrereqError("The specified node is the primary node of"
3304
                                 " the instance.")
3305

    
3306
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3307
      raise errors.OpPrereqError("Instance's disk layout is not"
3308
                                 " remote_raid1.")
3309
    for disk in instance.disks:
3310
      if disk.iv_name == self.op.disk_name:
3311
        break
3312
    else:
3313
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3314
                                 " instance." % self.op.disk_name)
3315
    if len(disk.children) > 1:
3316
      raise errors.OpPrereqError("The device already has two slave devices."
3317
                                 " This would create a 3-disk raid1 which we"
3318
                                 " don't allow.")
3319
    self.disk = disk
3320

    
3321
  def Exec(self, feedback_fn):
3322
    """Add the mirror component
3323

3324
    """
3325
    disk = self.disk
3326
    instance = self.instance
3327

    
3328
    remote_node = self.remote_node
3329
    lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3330
    names = _GenerateUniqueNames(self.cfg, lv_names)
3331
    new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3332
                                     remote_node, disk.size, names)
3333

    
3334
    logger.Info("adding new mirror component on secondary")
3335
    #HARDCODE
3336
    if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3337
                                      new_drbd, False,
3338
                                      _GetInstanceInfoText(instance)):
3339
      raise errors.OpExecError("Failed to create new component on secondary"
3340
                               " node %s" % remote_node)
3341

    
3342
    logger.Info("adding new mirror component on primary")
3343
    #HARDCODE
3344
    if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3345
                                    instance, new_drbd,
3346
                                    _GetInstanceInfoText(instance)):
3347
      # remove secondary dev
3348
      self.cfg.SetDiskID(new_drbd, remote_node)
3349
      rpc.call_blockdev_remove(remote_node, new_drbd)
3350
      raise errors.OpExecError("Failed to create volume on primary")
3351

    
3352
    # the device exists now
3353
    # call the primary node to add the mirror to md
3354
    logger.Info("adding new mirror component to md")
3355
    if not rpc.call_blockdev_addchildren(instance.primary_node,
3356
                                         disk, [new_drbd]):
3357
      logger.Error("Can't add mirror compoment to md!")
3358
      self.cfg.SetDiskID(new_drbd, remote_node)
3359
      if not rpc.call_blockdev_remove(remote_node, new_drbd):
3360
        logger.Error("Can't rollback on secondary")
3361
      self.cfg.SetDiskID(new_drbd, instance.primary_node)
3362
      if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3363
        logger.Error("Can't rollback on primary")
3364
      raise errors.OpExecError("Can't add mirror component to md array")
3365

    
3366
    disk.children.append(new_drbd)
3367

    
3368
    self.cfg.AddInstance(instance)
3369

    
3370
    _WaitForSync(self.cfg, instance, self.proc)
3371

    
3372
    return 0
3373

    
3374

    
3375
class LURemoveMDDRBDComponent(LogicalUnit):
3376
  """Remove a component from a remote_raid1 disk.
3377

3378
  """
3379
  HPATH = "mirror-remove"
3380
  HTYPE = constants.HTYPE_INSTANCE
3381
  _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3382

    
3383
  def BuildHooksEnv(self):
3384
    """Build hooks env.
3385

3386
    This runs on the master, the primary and all the secondaries.
3387

3388
    """
3389
    env = {
3390
      "DISK_NAME": self.op.disk_name,
3391
      "DISK_ID": self.op.disk_id,
3392
      "OLD_SECONDARY": self.old_secondary,
3393
      }
3394
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3395
    nl = [self.sstore.GetMasterNode(),
3396
          self.instance.primary_node] + list(self.instance.secondary_nodes)
3397
    return env, nl, nl
3398

    
3399
  def CheckPrereq(self):
3400
    """Check prerequisites.
3401

3402
    This checks that the instance is in the cluster.
3403

3404
    """
3405
    instance = self.cfg.GetInstanceInfo(
3406
      self.cfg.ExpandInstanceName(self.op.instance_name))
3407
    if instance is None:
3408
      raise errors.OpPrereqError("Instance '%s' not known" %
3409
                                 self.op.instance_name)
3410
    self.instance = instance
3411

    
3412
    if instance.disk_template != constants.DT_REMOTE_RAID1:
3413
      raise errors.OpPrereqError("Instance's disk layout is not"
3414
                                 " remote_raid1.")
3415
    for disk in instance.disks:
3416
      if disk.iv_name == self.op.disk_name:
3417
        break
3418
    else:
3419
      raise errors.OpPrereqError("Can't find this device ('%s') in the"
3420
                                 " instance." % self.op.disk_name)
3421
    for child in disk.children:
3422
      if (child.dev_type == constants.LD_DRBD7 and
3423
          child.logical_id[2] == self.op.disk_id):
3424
        break
3425
    else:
3426
      raise errors.OpPrereqError("Can't find the device with this port.")
3427

    
3428
    if len(disk.children) < 2:
3429
      raise errors.OpPrereqError("Cannot remove the last component from"
3430
                                 " a mirror.")
3431
    self.disk = disk
3432
    self.child = child
3433
    if self.child.logical_id[0] == instance.primary_node:
3434
      oid = 1
3435
    else:
3436
      oid = 0
3437
    self.old_secondary = self.child.logical_id[oid]
3438

    
3439
  def Exec(self, feedback_fn):
3440
    """Remove the mirror component
3441

3442
    """
3443
    instance = self.instance
3444
    disk = self.disk
3445
    child = self.child
3446
    logger.Info("remove mirror component")
3447
    self.cfg.SetDiskID(disk, instance.primary_node)
3448
    if not rpc.call_blockdev_removechildren(instance.primary_node,
3449
                                            disk, [child]):
3450
      raise errors.OpExecError("Can't remove child from mirror.")
3451

    
3452
    for node in child.logical_id[:2]:
3453
      self.cfg.SetDiskID(child, node)
3454
      if not rpc.call_blockdev_remove(node, child):
3455
        logger.Error("Warning: failed to remove device from node %s,"
3456
                     " continuing operation." % node)
3457

    
3458
    disk.children.remove(child)
3459
    self.cfg.AddInstance(instance)
3460

    
3461

    
3462
class LUReplaceDisks(LogicalUnit):
3463
  """Replace the disks of an instance.
3464

3465
  """
3466
  HPATH = "mirrors-replace"
3467
  HTYPE = constants.HTYPE_INSTANCE
3468
  _OP_REQP = ["instance_name", "mode", "disks"]
3469

    
3470
  def BuildHooksEnv(self):
3471
    """Build hooks env.
3472

3473
    This runs on the master, the primary and all the secondaries.
3474

3475
    """
3476
    env = {
3477
      "MODE": self.op.mode,
3478
      "NEW_SECONDARY": self.op.remote_node,
3479
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3480
      }
3481
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3482
    nl = [
3483
      self.sstore.GetMasterNode(),
3484
      self.instance.primary_node,
3485
      ]
3486
    if self.op.remote_node is not None:
3487
      nl.append(self.op.remote_node)
3488
    return env, nl, nl
3489

    
3490
  def CheckPrereq(self):
3491
    """Check prerequisites.
3492

3493
    This checks that the instance is in the cluster.
3494

3495
    """
3496
    instance = self.cfg.GetInstanceInfo(
3497
      self.cfg.ExpandInstanceName(self.op.instance_name))
3498
    if instance is None:
3499
      raise errors.OpPrereqError("Instance '%s' not known" %
3500
                                 self.op.instance_name)
3501
    self.instance = instance
3502
    self.op.instance_name = instance.name
3503

    
3504
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3505
      raise errors.OpPrereqError("Instance's disk layout is not"
3506
                                 " network mirrored.")
3507

    
3508
    if len(instance.secondary_nodes) != 1:
3509
      raise errors.OpPrereqError("The instance has a strange layout,"
3510
                                 " expected one secondary but found %d" %
3511
                                 len(instance.secondary_nodes))
3512

    
3513
    self.sec_node = instance.secondary_nodes[0]
3514

    
3515
    remote_node = getattr(self.op, "remote_node", None)
3516
    if remote_node is not None:
3517
      remote_node = self.cfg.ExpandNodeName(remote_node)
3518
      if remote_node is None:
3519
        raise errors.OpPrereqError("Node '%s' not known" %
3520
                                   self.op.remote_node)
3521
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3522
    else:
3523
      self.remote_node_info = None
3524
    if remote_node == instance.primary_node:
3525
      raise errors.OpPrereqError("The specified node is the primary node of"
3526
                                 " the instance.")
3527
    elif remote_node == self.sec_node:
3528
      if self.op.mode == constants.REPLACE_DISK_SEC:
3529
        # this is for DRBD8, where we can't execute the same mode of
3530
        # replacement as for drbd7 (no different port allocated)
3531
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3532
                                   " replacement")
3533
      # the user gave the current secondary, switch to
3534
      # 'no-replace-secondary' mode for drbd7
3535
      remote_node = None
3536
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3537
        self.op.mode != constants.REPLACE_DISK_ALL):
3538
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3539
                                 " disks replacement, not individual ones")
3540
    if instance.disk_template == constants.DT_DRBD8:
3541
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3542
          remote_node is not None):
3543
        # switch to replace secondary mode
3544
        self.op.mode = constants.REPLACE_DISK_SEC
3545

    
3546
      if self.op.mode == constants.REPLACE_DISK_ALL:
3547
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3548
                                   " secondary disk replacement, not"
3549
                                   " both at once")
3550
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3551
        if remote_node is not None:
3552
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3553
                                     " the secondary while doing a primary"
3554
                                     " node disk replacement")
3555
        self.tgt_node = instance.primary_node
3556
        self.oth_node = instance.secondary_nodes[0]
3557
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3558
        self.new_node = remote_node # this can be None, in which case
3559
                                    # we don't change the secondary
3560
        self.tgt_node = instance.secondary_nodes[0]
3561
        self.oth_node = instance.primary_node
3562
      else:
3563
        raise errors.ProgrammerError("Unhandled disk replace mode")
3564

    
3565
    for name in self.op.disks:
3566
      if instance.FindDisk(name) is None:
3567
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3568
                                   (name, instance.name))
3569
    self.op.remote_node = remote_node
3570

    
3571
  def _ExecRR1(self, feedback_fn):
3572
    """Replace the disks of an instance.
3573

3574
    """
3575
    instance = self.instance
3576
    iv_names = {}
3577
    # start of work
3578
    if self.op.remote_node is None:
3579
      remote_node = self.sec_node
3580
    else:
3581
      remote_node = self.op.remote_node
3582
    cfg = self.cfg
3583
    for dev in instance.disks:
3584
      size = dev.size
3585
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3586
      names = _GenerateUniqueNames(cfg, lv_names)
3587
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3588
                                       remote_node, size, names)
3589
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3590
      logger.Info("adding new mirror component on secondary for %s" %
3591
                  dev.iv_name)
3592
      #HARDCODE
3593
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3594
                                        new_drbd, False,
3595
                                        _GetInstanceInfoText(instance)):
3596
        raise errors.OpExecError("Failed to create new component on secondary"
3597
                                 " node %s. Full abort, cleanup manually!" %
3598
                                 remote_node)
3599

    
3600
      logger.Info("adding new mirror component on primary")
3601
      #HARDCODE
3602
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3603
                                      instance, new_drbd,
3604
                                      _GetInstanceInfoText(instance)):
3605
        # remove secondary dev
3606
        cfg.SetDiskID(new_drbd, remote_node)
3607
        rpc.call_blockdev_remove(remote_node, new_drbd)
3608
        raise errors.OpExecError("Failed to create volume on primary!"
3609
                                 " Full abort, cleanup manually!!")
3610

    
3611
      # the device exists now
3612
      # call the primary node to add the mirror to md
3613
      logger.Info("adding new mirror component to md")
3614
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3615
                                           [new_drbd]):
3616
        logger.Error("Can't add mirror compoment to md!")
3617
        cfg.SetDiskID(new_drbd, remote_node)
3618
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3619
          logger.Error("Can't rollback on secondary")
3620
        cfg.SetDiskID(new_drbd, instance.primary_node)
3621
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3622
          logger.Error("Can't rollback on primary")
3623
        raise errors.OpExecError("Full abort, cleanup manually!!")
3624

    
3625
      dev.children.append(new_drbd)
3626
      cfg.AddInstance(instance)
3627

    
3628
    # this can fail as the old devices are degraded and _WaitForSync
3629
    # does a combined result over all disks, so we don't check its
3630
    # return value
3631
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3632

    
3633
    # so check manually all the devices
3634
    for name in iv_names:
3635
      dev, child, new_drbd = iv_names[name]
3636
      cfg.SetDiskID(dev, instance.primary_node)
3637
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3638
      if is_degr:
3639
        raise errors.OpExecError("MD device %s is degraded!" % name)
3640
      cfg.SetDiskID(new_drbd, instance.primary_node)
3641
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3642
      if is_degr:
3643
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3644

    
3645
    for name in iv_names:
3646
      dev, child, new_drbd = iv_names[name]
3647
      logger.Info("remove mirror %s component" % name)
3648
      cfg.SetDiskID(dev, instance.primary_node)
3649
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3650
                                              dev, [child]):
3651
        logger.Error("Can't remove child from mirror, aborting"
3652
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3653
        continue
3654

    
3655
      for node in child.logical_id[:2]:
3656
        logger.Info("remove child device on %s" % node)
3657
        cfg.SetDiskID(child, node)
3658
        if not rpc.call_blockdev_remove(node, child):
3659
          logger.Error("Warning: failed to remove device from node %s,"
3660
                       " continuing operation." % node)
3661

    
3662
      dev.children.remove(child)
3663

    
3664
      cfg.AddInstance(instance)
3665

    
3666
  def _ExecD8DiskOnly(self, feedback_fn):
3667
    """Replace a disk on the primary or secondary for dbrd8.
3668

3669
    The algorithm for replace is quite complicated:
3670
      - for each disk to be replaced:
3671
        - create new LVs on the target node with unique names
3672
        - detach old LVs from the drbd device
3673
        - rename old LVs to name_replaced.<time_t>
3674
        - rename new LVs to old LVs
3675
        - attach the new LVs (with the old names now) to the drbd device
3676
      - wait for sync across all devices
3677
      - for each modified disk:
3678
        - remove old LVs (which have the name name_replaces.<time_t>)
3679

3680
    Failures are not very well handled.
3681

3682
    """
3683
    steps_total = 6
3684
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3685
    instance = self.instance
3686
    iv_names = {}
3687
    vgname = self.cfg.GetVGName()
3688
    # start of work
3689
    cfg = self.cfg
3690
    tgt_node = self.tgt_node
3691
    oth_node = self.oth_node
3692

    
3693
    # Step: check device activation
3694
    self.proc.LogStep(1, steps_total, "check device existence")
3695
    info("checking volume groups")
3696
    my_vg = cfg.GetVGName()
3697
    results = rpc.call_vg_list([oth_node, tgt_node])
3698
    if not results:
3699
      raise errors.OpExecError("Can't list volume groups on the nodes")
3700
    for node in oth_node, tgt_node:
3701
      res = results.get(node, False)
3702
      if not res or my_vg not in res:
3703
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3704
                                 (my_vg, node))
3705
    for dev in instance.disks:
3706
      if not dev.iv_name in self.op.disks:
3707
        continue
3708
      for node in tgt_node, oth_node:
3709
        info("checking %s on %s" % (dev.iv_name, node))
3710
        cfg.SetDiskID(dev, node)
3711
        if not rpc.call_blockdev_find(node, dev):
3712
          raise errors.OpExecError("Can't find device %s on node %s" %
3713
                                   (dev.iv_name, node))
3714

    
3715
    # Step: check other node consistency
3716
    self.proc.LogStep(2, steps_total, "check peer consistency")
3717
    for dev in instance.disks:
3718
      if not dev.iv_name in self.op.disks:
3719
        continue
3720
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3721
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3722
                                   oth_node==instance.primary_node):
3723
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3724
                                 " to replace disks on this node (%s)" %
3725
                                 (oth_node, tgt_node))
3726

    
3727
    # Step: create new storage
3728
    self.proc.LogStep(3, steps_total, "allocate new storage")
3729
    for dev in instance.disks:
3730
      if not dev.iv_name in self.op.disks:
3731
        continue
3732
      size = dev.size
3733
      cfg.SetDiskID(dev, tgt_node)
3734
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3735
      names = _GenerateUniqueNames(cfg, lv_names)
3736
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3737
                             logical_id=(vgname, names[0]))
3738
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3739
                             logical_id=(vgname, names[1]))
3740
      new_lvs = [lv_data, lv_meta]
3741
      old_lvs = dev.children
3742
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3743
      info("creating new local storage on %s for %s" %
3744
           (tgt_node, dev.iv_name))
3745
      # since we *always* want to create this LV, we use the
3746
      # _Create...OnPrimary (which forces the creation), even if we
3747
      # are talking about the secondary node
3748
      for new_lv in new_lvs:
3749
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3750
                                        _GetInstanceInfoText(instance)):
3751
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3752
                                   " node '%s'" %
3753
                                   (new_lv.logical_id[1], tgt_node))
3754

    
3755
    # Step: for each lv, detach+rename*2+attach
3756
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3757
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3758
      info("detaching %s drbd from local storage" % dev.iv_name)
3759
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3760
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3761
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3762
      #dev.children = []
3763
      #cfg.Update(instance)
3764

    
3765
      # ok, we created the new LVs, so now we know we have the needed
3766
      # storage; as such, we proceed on the target node to rename
3767
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3768
      # using the assumption that logical_id == physical_id (which in
3769
      # turn is the unique_id on that node)
3770

    
3771
      # FIXME(iustin): use a better name for the replaced LVs
3772
      temp_suffix = int(time.time())
3773
      ren_fn = lambda d, suff: (d.physical_id[0],
3774
                                d.physical_id[1] + "_replaced-%s" % suff)
3775
      # build the rename list based on what LVs exist on the node
3776
      rlist = []
3777
      for to_ren in old_lvs:
3778
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3779
        if find_res is not None: # device exists
3780
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3781

    
3782
      info("renaming the old LVs on the target node")
3783
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3784
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3785
      # now we rename the new LVs to the old LVs
3786
      info("renaming the new LVs on the target node")
3787
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3788
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3789
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3790

    
3791
      for old, new in zip(old_lvs, new_lvs):
3792
        new.logical_id = old.logical_id
3793
        cfg.SetDiskID(new, tgt_node)
3794

    
3795
      for disk in old_lvs:
3796
        disk.logical_id = ren_fn(disk, temp_suffix)
3797
        cfg.SetDiskID(disk, tgt_node)
3798

    
3799
      # now that the new lvs have the old name, we can add them to the device
3800
      info("adding new mirror component on %s" % tgt_node)
3801
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3802
        for new_lv in new_lvs:
3803
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3804
            warning("Can't rollback device %s", hint="manually cleanup unused"
3805
                    " logical volumes")
3806
        raise errors.OpExecError("Can't add local storage to drbd")
3807

    
3808
      dev.children = new_lvs
3809
      cfg.Update(instance)
3810

    
3811
    # Step: wait for sync
3812

    
3813
    # this can fail as the old devices are degraded and _WaitForSync
3814
    # does a combined result over all disks, so we don't check its
3815
    # return value
3816
    self.proc.LogStep(5, steps_total, "sync devices")
3817
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3818

    
3819
    # so check manually all the devices
3820
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3821
      cfg.SetDiskID(dev, instance.primary_node)
3822
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3823
      if is_degr:
3824
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3825

    
3826
    # Step: remove old storage
3827
    self.proc.LogStep(6, steps_total, "removing old storage")
3828
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3829
      info("remove logical volumes for %s" % name)
3830
      for lv in old_lvs:
3831
        cfg.SetDiskID(lv, tgt_node)
3832
        if not rpc.call_blockdev_remove(tgt_node, lv):
3833
          warning("Can't remove old LV", hint="manually remove unused LVs")
3834
          continue
3835

    
3836
  def _ExecD8Secondary(self, feedback_fn):
3837
    """Replace the secondary node for drbd8.
3838

3839
    The algorithm for replace is quite complicated:
3840
      - for all disks of the instance:
3841
        - create new LVs on the new node with same names
3842
        - shutdown the drbd device on the old secondary
3843
        - disconnect the drbd network on the primary
3844
        - create the drbd device on the new secondary
3845
        - network attach the drbd on the primary, using an artifice:
3846
          the drbd code for Attach() will connect to the network if it
3847
          finds a device which is connected to the good local disks but
3848
          not network enabled
3849
      - wait for sync across all devices
3850
      - remove all disks from the old secondary
3851

3852
    Failures are not very well handled.
3853

3854
    """
3855
    steps_total = 6
3856
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3857
    instance = self.instance
3858
    iv_names = {}
3859
    vgname = self.cfg.GetVGName()
3860
    # start of work
3861
    cfg = self.cfg
3862
    old_node = self.tgt_node
3863
    new_node = self.new_node
3864
    pri_node = instance.primary_node
3865

    
3866
    # Step: check device activation
3867
    self.proc.LogStep(1, steps_total, "check device existence")
3868
    info("checking volume groups")
3869
    my_vg = cfg.GetVGName()
3870
    results = rpc.call_vg_list([pri_node, new_node])
3871
    if not results:
3872
      raise errors.OpExecError("Can't list volume groups on the nodes")
3873
    for node in pri_node, new_node:
3874
      res = results.get(node, False)
3875
      if not res or my_vg not in res:
3876
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3877
                                 (my_vg, node))
3878
    for dev in instance.disks:
3879
      if not dev.iv_name in self.op.disks:
3880
        continue
3881
      info("checking %s on %s" % (dev.iv_name, pri_node))
3882
      cfg.SetDiskID(dev, pri_node)
3883
      if not rpc.call_blockdev_find(pri_node, dev):
3884
        raise errors.OpExecError("Can't find device %s on node %s" %
3885
                                 (dev.iv_name, pri_node))
3886

    
3887
    # Step: check other node consistency
3888
    self.proc.LogStep(2, steps_total, "check peer consistency")
3889
    for dev in instance.disks:
3890
      if not dev.iv_name in self.op.disks:
3891
        continue
3892
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3893
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3894
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3895
                                 " unsafe to replace the secondary" %
3896
                                 pri_node)
3897

    
3898
    # Step: create new storage
3899
    self.proc.LogStep(3, steps_total, "allocate new storage")
3900
    for dev in instance.disks:
3901
      size = dev.size
3902
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3903
      # since we *always* want to create this LV, we use the
3904
      # _Create...OnPrimary (which forces the creation), even if we
3905
      # are talking about the secondary node
3906
      for new_lv in dev.children:
3907
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3908
                                        _GetInstanceInfoText(instance)):
3909
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3910
                                   " node '%s'" %
3911
                                   (new_lv.logical_id[1], new_node))
3912

    
3913
      iv_names[dev.iv_name] = (dev, dev.children)
3914

    
3915
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3916
    for dev in instance.disks:
3917
      size = dev.size
3918
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3919
      # create new devices on new_node
3920
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3921
                              logical_id=(pri_node, new_node,
3922
                                          dev.logical_id[2]),
3923
                              children=dev.children)
3924
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3925
                                        new_drbd, False,
3926
                                      _GetInstanceInfoText(instance)):
3927
        raise errors.OpExecError("Failed to create new DRBD on"
3928
                                 " node '%s'" % new_node)
3929

    
3930
    for dev in instance.disks:
3931
      # we have new devices, shutdown the drbd on the old secondary
3932
      info("shutting down drbd for %s on old node" % dev.iv_name)
3933
      cfg.SetDiskID(dev, old_node)
3934
      if not rpc.call_blockdev_shutdown(old_node, dev):
3935
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3936
                hint="Please cleanup this device manually as soon as possible")
3937

    
3938
    info("detaching primary drbds from the network (=> standalone)")
3939
    done = 0
3940
    for dev in instance.disks:
3941
      cfg.SetDiskID(dev, pri_node)
3942
      # set the physical (unique in bdev terms) id to None, meaning
3943
      # detach from network
3944
      dev.physical_id = (None,) * len(dev.physical_id)
3945
      # and 'find' the device, which will 'fix' it to match the
3946
      # standalone state
3947
      if rpc.call_blockdev_find(pri_node, dev):
3948
        done += 1
3949
      else:
3950
        warning("Failed to detach drbd %s from network, unusual case" %
3951
                dev.iv_name)
3952

    
3953
    if not done:
3954
      # no detaches succeeded (very unlikely)
3955
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3956

    
3957
    # if we managed to detach at least one, we update all the disks of
3958
    # the instance to point to the new secondary
3959
    info("updating instance configuration")
3960
    for dev in instance.disks:
3961
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3962
      cfg.SetDiskID(dev, pri_node)
3963
    cfg.Update(instance)
3964

    
3965
    # and now perform the drbd attach
3966
    info("attaching primary drbds to new secondary (standalone => connected)")
3967
    failures = []
3968
    for dev in instance.disks:
3969
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3970
      # since the attach is smart, it's enough to 'find' the device,
3971
      # it will automatically activate the network, if the physical_id
3972
      # is correct
3973
      cfg.SetDiskID(dev, pri_node)
3974
      if not rpc.call_blockdev_find(pri_node, dev):
3975
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3976
                "please do a gnt-instance info to see the status of disks")
3977

    
3978
    # this can fail as the old devices are degraded and _WaitForSync
3979
    # does a combined result over all disks, so we don't check its
3980
    # return value
3981
    self.proc.LogStep(5, steps_total, "sync devices")
3982
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3983

    
3984
    # so check manually all the devices
3985
    for name, (dev, old_lvs) in iv_names.iteritems():
3986
      cfg.SetDiskID(dev, pri_node)
3987
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3988
      if is_degr:
3989
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3990

    
3991
    self.proc.LogStep(6, steps_total, "removing old storage")
3992
    for name, (dev, old_lvs) in iv_names.iteritems():
3993
      info("remove logical volumes for %s" % name)
3994
      for lv in old_lvs:
3995
        cfg.SetDiskID(lv, old_node)
3996
        if not rpc.call_blockdev_remove(old_node, lv):
3997
          warning("Can't remove LV on old secondary",
3998
                  hint="Cleanup stale volumes by hand")
3999

    
4000
  def Exec(self, feedback_fn):
4001
    """Execute disk replacement.
4002

4003
    This dispatches the disk replacement to the appropriate handler.
4004

4005
    """
4006
    instance = self.instance
4007
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4008
      fn = self._ExecRR1
4009
    elif instance.disk_template == constants.DT_DRBD8:
4010
      if self.op.remote_node is None:
4011
        fn = self._ExecD8DiskOnly
4012
      else:
4013
        fn = self._ExecD8Secondary
4014
    else:
4015
      raise errors.ProgrammerError("Unhandled disk replacement case")
4016
    return fn(feedback_fn)
4017

    
4018

    
4019
class LUQueryInstanceData(NoHooksLU):
4020
  """Query runtime instance data.
4021

4022
  """
4023
  _OP_REQP = ["instances"]
4024

    
4025
  def CheckPrereq(self):
4026
    """Check prerequisites.
4027

4028
    This only checks the optional instance list against the existing names.
4029

4030
    """
4031
    if not isinstance(self.op.instances, list):
4032
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4033
    if self.op.instances:
4034
      self.wanted_instances = []
4035
      names = self.op.instances
4036
      for name in names:
4037
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4038
        if instance is None:
4039
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4040
        self.wanted_instances.append(instance)
4041
    else:
4042
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4043
                               in self.cfg.GetInstanceList()]
4044
    return
4045

    
4046

    
4047
  def _ComputeDiskStatus(self, instance, snode, dev):
4048
    """Compute block device status.
4049

4050
    """
4051
    self.cfg.SetDiskID(dev, instance.primary_node)
4052
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4053
    if dev.dev_type in constants.LDS_DRBD:
4054
      # we change the snode then (otherwise we use the one passed in)
4055
      if dev.logical_id[0] == instance.primary_node:
4056
        snode = dev.logical_id[1]
4057
      else:
4058
        snode = dev.logical_id[0]
4059

    
4060
    if snode:
4061
      self.cfg.SetDiskID(dev, snode)
4062
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4063
    else:
4064
      dev_sstatus = None
4065

    
4066
    if dev.children:
4067
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4068
                      for child in dev.children]
4069
    else:
4070
      dev_children = []
4071

    
4072
    data = {
4073
      "iv_name": dev.iv_name,
4074
      "dev_type": dev.dev_type,
4075
      "logical_id": dev.logical_id,
4076
      "physical_id": dev.physical_id,
4077
      "pstatus": dev_pstatus,
4078
      "sstatus": dev_sstatus,
4079
      "children": dev_children,
4080
      }
4081

    
4082
    return data
4083

    
4084
  def Exec(self, feedback_fn):
4085
    """Gather and return data"""
4086
    result = {}
4087
    for instance in self.wanted_instances:
4088
      remote_info = rpc.call_instance_info(instance.primary_node,
4089
                                                instance.name)
4090
      if remote_info and "state" in remote_info:
4091
        remote_state = "up"
4092
      else:
4093
        remote_state = "down"
4094
      if instance.status == "down":
4095
        config_state = "down"
4096
      else:
4097
        config_state = "up"
4098

    
4099
      disks = [self._ComputeDiskStatus(instance, None, device)
4100
               for device in instance.disks]
4101

    
4102
      idict = {
4103
        "name": instance.name,
4104
        "config_state": config_state,
4105
        "run_state": remote_state,
4106
        "pnode": instance.primary_node,
4107
        "snodes": instance.secondary_nodes,
4108
        "os": instance.os,
4109
        "memory": instance.memory,
4110
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4111
        "disks": disks,
4112
        "network_port": instance.network_port,
4113
        "vcpus": instance.vcpus,
4114
        "kernel_path": instance.kernel_path,
4115
        "initrd_path": instance.initrd_path,
4116
        "hvm_boot_order": instance.hvm_boot_order,
4117
        }
4118

    
4119
      result[instance.name] = idict
4120

    
4121
    return result
4122

    
4123

    
4124
class LUSetInstanceParms(LogicalUnit):
4125
  """Modifies an instances's parameters.
4126

4127
  """
4128
  HPATH = "instance-modify"
4129
  HTYPE = constants.HTYPE_INSTANCE
4130
  _OP_REQP = ["instance_name"]
4131

    
4132
  def BuildHooksEnv(self):
4133
    """Build hooks env.
4134

4135
    This runs on the master, primary and secondaries.
4136

4137
    """
4138
    args = dict()
4139
    if self.mem:
4140
      args['memory'] = self.mem
4141
    if self.vcpus:
4142
      args['vcpus'] = self.vcpus
4143
    if self.do_ip or self.do_bridge or self.mac:
4144
      if self.do_ip:
4145
        ip = self.ip
4146
      else:
4147
        ip = self.instance.nics[0].ip
4148
      if self.bridge:
4149
        bridge = self.bridge
4150
      else:
4151
        bridge = self.instance.nics[0].bridge
4152
      if self.mac:
4153
        mac = self.mac
4154
      else:
4155
        mac = self.instance.nics[0].mac
4156
      args['nics'] = [(ip, bridge, mac)]
4157
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4158
    nl = [self.sstore.GetMasterNode(),
4159
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4160
    return env, nl, nl
4161

    
4162
  def CheckPrereq(self):
4163
    """Check prerequisites.
4164

4165
    This only checks the instance list against the existing names.
4166

4167
    """
4168
    self.mem = getattr(self.op, "mem", None)
4169
    self.vcpus = getattr(self.op, "vcpus", None)
4170
    self.ip = getattr(self.op, "ip", None)
4171
    self.mac = getattr(self.op, "mac", None)
4172
    self.bridge = getattr(self.op, "bridge", None)
4173
    self.kernel_path = getattr(self.op, "kernel_path", None)
4174
    self.initrd_path = getattr(self.op, "initrd_path", None)
4175
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4176
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4177
                 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4178
    if all_parms.count(None) == len(all_parms):
4179
      raise errors.OpPrereqError("No changes submitted")
4180
    if self.mem is not None:
4181
      try:
4182
        self.mem = int(self.mem)
4183
      except ValueError, err:
4184
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4185
    if self.vcpus is not None:
4186
      try:
4187
        self.vcpus = int(self.vcpus)
4188
      except ValueError, err:
4189
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4190
    if self.ip is not None:
4191
      self.do_ip = True
4192
      if self.ip.lower() == "none":
4193
        self.ip = None
4194
      else:
4195
        if not utils.IsValidIP(self.ip):
4196
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4197
    else:
4198
      self.do_ip = False
4199
    self.do_bridge = (self.bridge is not None)
4200
    if self.mac is not None:
4201
      if self.cfg.IsMacInUse(self.mac):
4202
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4203
                                   self.mac)
4204
      if not utils.IsValidMac(self.mac):
4205
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4206

    
4207
    if self.kernel_path is not None:
4208
      self.do_kernel_path = True
4209
      if self.kernel_path == constants.VALUE_NONE:
4210
        raise errors.OpPrereqError("Can't set instance to no kernel")
4211

    
4212
      if self.kernel_path != constants.VALUE_DEFAULT:
4213
        if not os.path.isabs(self.kernel_path):
4214
          raise errors.OpPrereqError("The kernel path must be an absolute"
4215
                                    " filename")
4216
    else:
4217
      self.do_kernel_path = False
4218

    
4219
    if self.initrd_path is not None:
4220
      self.do_initrd_path = True
4221
      if self.initrd_path not in (constants.VALUE_NONE,
4222
                                  constants.VALUE_DEFAULT):
4223
        if not os.path.isabs(self.initrd_path):
4224
          raise errors.OpPrereqError("The initrd path must be an absolute"
4225
                                    " filename")
4226
    else:
4227
      self.do_initrd_path = False
4228

    
4229
    # boot order verification
4230
    if self.hvm_boot_order is not None:
4231
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4232
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4233
          raise errors.OpPrereqError("invalid boot order specified,"
4234
                                     " must be one or more of [acdn]"
4235
                                     " or 'default'")
4236

    
4237
    instance = self.cfg.GetInstanceInfo(
4238
      self.cfg.ExpandInstanceName(self.op.instance_name))
4239
    if instance is None:
4240
      raise errors.OpPrereqError("No such instance name '%s'" %
4241
                                 self.op.instance_name)
4242
    self.op.instance_name = instance.name
4243
    self.instance = instance
4244
    return
4245

    
4246
  def Exec(self, feedback_fn):
4247
    """Modifies an instance.
4248

4249
    All parameters take effect only at the next restart of the instance.
4250
    """
4251
    result = []
4252
    instance = self.instance
4253
    if self.mem:
4254
      instance.memory = self.mem
4255
      result.append(("mem", self.mem))
4256
    if self.vcpus:
4257
      instance.vcpus = self.vcpus
4258
      result.append(("vcpus",  self.vcpus))
4259
    if self.do_ip:
4260
      instance.nics[0].ip = self.ip
4261
      result.append(("ip", self.ip))
4262
    if self.bridge:
4263
      instance.nics[0].bridge = self.bridge
4264
      result.append(("bridge", self.bridge))
4265
    if self.mac:
4266
      instance.nics[0].mac = self.mac
4267
      result.append(("mac", self.mac))
4268
    if self.do_kernel_path:
4269
      instance.kernel_path = self.kernel_path
4270
      result.append(("kernel_path", self.kernel_path))
4271
    if self.do_initrd_path:
4272
      instance.initrd_path = self.initrd_path
4273
      result.append(("initrd_path", self.initrd_path))
4274
    if self.hvm_boot_order:
4275
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4276
        instance.hvm_boot_order = None
4277
      else:
4278
        instance.hvm_boot_order = self.hvm_boot_order
4279
      result.append(("hvm_boot_order", self.hvm_boot_order))
4280

    
4281
    self.cfg.AddInstance(instance)
4282

    
4283
    return result
4284

    
4285

    
4286
class LUQueryExports(NoHooksLU):
4287
  """Query the exports list
4288

4289
  """
4290
  _OP_REQP = []
4291

    
4292
  def CheckPrereq(self):
4293
    """Check that the nodelist contains only existing nodes.
4294

4295
    """
4296
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4297

    
4298
  def Exec(self, feedback_fn):
4299
    """Compute the list of all the exported system images.
4300

4301
    Returns:
4302
      a dictionary with the structure node->(export-list)
4303
      where export-list is a list of the instances exported on
4304
      that node.
4305

4306
    """
4307
    return rpc.call_export_list(self.nodes)
4308

    
4309

    
4310
class LUExportInstance(LogicalUnit):
4311
  """Export an instance to an image in the cluster.
4312

4313
  """
4314
  HPATH = "instance-export"
4315
  HTYPE = constants.HTYPE_INSTANCE
4316
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4317

    
4318
  def BuildHooksEnv(self):
4319
    """Build hooks env.
4320

4321
    This will run on the master, primary node and target node.
4322

4323
    """
4324
    env = {
4325
      "EXPORT_NODE": self.op.target_node,
4326
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4327
      }
4328
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4329
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4330
          self.op.target_node]
4331
    return env, nl, nl
4332

    
4333
  def CheckPrereq(self):
4334
    """Check prerequisites.
4335

4336
    This checks that the instance name is a valid one.
4337

4338
    """
4339
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4340
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4341
    if self.instance is None:
4342
      raise errors.OpPrereqError("Instance '%s' not found" %
4343
                                 self.op.instance_name)
4344

    
4345
    # node verification
4346
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4347
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4348

    
4349
    if self.dst_node is None:
4350
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4351
                                 self.op.target_node)
4352
    self.op.target_node = self.dst_node.name
4353

    
4354
  def Exec(self, feedback_fn):
4355
    """Export an instance to an image in the cluster.
4356

4357
    """
4358
    instance = self.instance
4359
    dst_node = self.dst_node
4360
    src_node = instance.primary_node
4361
    # shutdown the instance, unless requested not to do so
4362
    if self.op.shutdown:
4363
      op = opcodes.OpShutdownInstance(instance_name=instance.name)
4364
      self.proc.ChainOpCode(op)
4365

    
4366
    vgname = self.cfg.GetVGName()
4367

    
4368
    snap_disks = []
4369

    
4370
    try:
4371
      for disk in instance.disks:
4372
        if disk.iv_name == "sda":
4373
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4374
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4375

    
4376
          if not new_dev_name:
4377
            logger.Error("could not snapshot block device %s on node %s" %
4378
                         (disk.logical_id[1], src_node))
4379
          else:
4380
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4381
                                      logical_id=(vgname, new_dev_name),
4382
                                      physical_id=(vgname, new_dev_name),
4383
                                      iv_name=disk.iv_name)
4384
            snap_disks.append(new_dev)
4385

    
4386
    finally:
4387
      if self.op.shutdown:
4388
        op = opcodes.OpStartupInstance(instance_name=instance.name,
4389
                                       force=False)
4390
        self.proc.ChainOpCode(op)
4391

    
4392
    # TODO: check for size
4393

    
4394
    for dev in snap_disks:
4395
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4396
                                           instance):
4397
        logger.Error("could not export block device %s from node"
4398
                     " %s to node %s" %
4399
                     (dev.logical_id[1], src_node, dst_node.name))
4400
      if not rpc.call_blockdev_remove(src_node, dev):
4401
        logger.Error("could not remove snapshot block device %s from"
4402
                     " node %s" % (dev.logical_id[1], src_node))
4403

    
4404
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4405
      logger.Error("could not finalize export for instance %s on node %s" %
4406
                   (instance.name, dst_node.name))
4407

    
4408
    nodelist = self.cfg.GetNodeList()
4409
    nodelist.remove(dst_node.name)
4410

    
4411
    # on one-node clusters nodelist will be empty after the removal
4412
    # if we proceed the backup would be removed because OpQueryExports
4413
    # substitutes an empty list with the full cluster node list.
4414
    if nodelist:
4415
      op = opcodes.OpQueryExports(nodes=nodelist)
4416
      exportlist = self.proc.ChainOpCode(op)
4417
      for node in exportlist:
4418
        if instance.name in exportlist[node]:
4419
          if not rpc.call_export_remove(node, instance.name):
4420
            logger.Error("could not remove older export for instance %s"
4421
                         " on node %s" % (instance.name, node))
4422

    
4423

    
4424
class TagsLU(NoHooksLU):
4425
  """Generic tags LU.
4426

4427
  This is an abstract class which is the parent of all the other tags LUs.
4428

4429
  """
4430
  def CheckPrereq(self):
4431
    """Check prerequisites.
4432

4433
    """
4434
    if self.op.kind == constants.TAG_CLUSTER:
4435
      self.target = self.cfg.GetClusterInfo()
4436
    elif self.op.kind == constants.TAG_NODE:
4437
      name = self.cfg.ExpandNodeName(self.op.name)
4438
      if name is None:
4439
        raise errors.OpPrereqError("Invalid node name (%s)" %
4440
                                   (self.op.name,))
4441
      self.op.name = name
4442
      self.target = self.cfg.GetNodeInfo(name)
4443
    elif self.op.kind == constants.TAG_INSTANCE:
4444
      name = self.cfg.ExpandInstanceName(self.op.name)
4445
      if name is None:
4446
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4447
                                   (self.op.name,))
4448
      self.op.name = name
4449
      self.target = self.cfg.GetInstanceInfo(name)
4450
    else:
4451
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4452
                                 str(self.op.kind))
4453

    
4454

    
4455
class LUGetTags(TagsLU):
4456
  """Returns the tags of a given object.
4457

4458
  """
4459
  _OP_REQP = ["kind", "name"]
4460

    
4461
  def Exec(self, feedback_fn):
4462
    """Returns the tag list.
4463

4464
    """
4465
    return self.target.GetTags()
4466

    
4467

    
4468
class LUSearchTags(NoHooksLU):
4469
  """Searches the tags for a given pattern.
4470

4471
  """
4472
  _OP_REQP = ["pattern"]
4473

    
4474
  def CheckPrereq(self):
4475
    """Check prerequisites.
4476

4477
    This checks the pattern passed for validity by compiling it.
4478

4479
    """
4480
    try:
4481
      self.re = re.compile(self.op.pattern)
4482
    except re.error, err:
4483
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4484
                                 (self.op.pattern, err))
4485

    
4486
  def Exec(self, feedback_fn):
4487
    """Returns the tag list.
4488

4489
    """
4490
    cfg = self.cfg
4491
    tgts = [("/cluster", cfg.GetClusterInfo())]
4492
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4493
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4494
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4495
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4496
    results = []
4497
    for path, target in tgts:
4498
      for tag in target.GetTags():
4499
        if self.re.search(tag):
4500
          results.append((path, tag))
4501
    return results
4502

    
4503

    
4504
class LUAddTags(TagsLU):
4505
  """Sets a tag on a given object.
4506

4507
  """
4508
  _OP_REQP = ["kind", "name", "tags"]
4509

    
4510
  def CheckPrereq(self):
4511
    """Check prerequisites.
4512

4513
    This checks the type and length of the tag name and value.
4514

4515
    """
4516
    TagsLU.CheckPrereq(self)
4517
    for tag in self.op.tags:
4518
      objects.TaggableObject.ValidateTag(tag)
4519

    
4520
  def Exec(self, feedback_fn):
4521
    """Sets the tag.
4522

4523
    """
4524
    try:
4525
      for tag in self.op.tags:
4526
        self.target.AddTag(tag)
4527
    except errors.TagError, err:
4528
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4529
    try:
4530
      self.cfg.Update(self.target)
4531
    except errors.ConfigurationError:
4532
      raise errors.OpRetryError("There has been a modification to the"
4533
                                " config file and the operation has been"
4534
                                " aborted. Please retry.")
4535

    
4536

    
4537
class LUDelTags(TagsLU):
4538
  """Delete a list of tags from a given object.
4539

4540
  """
4541
  _OP_REQP = ["kind", "name", "tags"]
4542

    
4543
  def CheckPrereq(self):
4544
    """Check prerequisites.
4545

4546
    This checks that we have the given tag.
4547

4548
    """
4549
    TagsLU.CheckPrereq(self)
4550
    for tag in self.op.tags:
4551
      objects.TaggableObject.ValidateTag(tag)
4552
    del_tags = frozenset(self.op.tags)
4553
    cur_tags = self.target.GetTags()
4554
    if not del_tags <= cur_tags:
4555
      diff_tags = del_tags - cur_tags
4556
      diff_names = ["'%s'" % tag for tag in diff_tags]
4557
      diff_names.sort()
4558
      raise errors.OpPrereqError("Tag(s) %s not found" %
4559
                                 (",".join(diff_names)))
4560

    
4561
  def Exec(self, feedback_fn):
4562
    """Remove the tag from the object.
4563

4564
    """
4565
    for tag in self.op.tags:
4566
      self.target.RemoveTag(tag)
4567
    try:
4568
      self.cfg.Update(self.target)
4569
    except errors.ConfigurationError:
4570
      raise errors.OpRetryError("There has been a modification to the"
4571
                                " config file and the operation has been"
4572
                                " aborted. Please retry.")
4573

    
4574
class LUTestDelay(NoHooksLU):
4575
  """Sleep for a specified amount of time.
4576

4577
  This LU sleeps on the master and/or nodes for a specified amoutn of
4578
  time.
4579

4580
  """
4581
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4582

    
4583
  def CheckPrereq(self):
4584
    """Check prerequisites.
4585

4586
    This checks that we have a good list of nodes and/or the duration
4587
    is valid.
4588

4589
    """
4590

    
4591
    if self.op.on_nodes:
4592
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4593

    
4594
  def Exec(self, feedback_fn):
4595
    """Do the actual sleep.
4596

4597
    """
4598
    if self.op.on_master:
4599
      if not utils.TestDelay(self.op.duration):
4600
        raise errors.OpExecError("Error during master delay test")
4601
    if self.op.on_nodes:
4602
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4603
      if not result:
4604
        raise errors.OpExecError("Complete failure from rpc call")
4605
      for node, node_result in result.items():
4606
        if not node_result:
4607
          raise errors.OpExecError("Failure during rpc call to node %s,"
4608
                                   " result: %s" % (node, node_result))