Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 901a65c1

History | View | Annotate | Download (163.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import simplejson
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46

    
47

    
48
# Check whether the simplejson module supports indentation
49
_JSON_INDENT = 2
50
try:
51
  simplejson.dumps(1, indent=_JSON_INDENT)
52
except TypeError:
53
  _JSON_INDENT = None
54

    
55

    
56
class LogicalUnit(object):
57
  """Logical Unit base class.
58

59
  Subclasses must follow these rules:
60
    - implement CheckPrereq which also fills in the opcode instance
61
      with all the fields (even if as None)
62
    - implement Exec
63
    - implement BuildHooksEnv
64
    - redefine HPATH and HTYPE
65
    - optionally redefine their run requirements (REQ_CLUSTER,
66
      REQ_MASTER); note that all commands require root permissions
67

68
  """
69
  HPATH = None
70
  HTYPE = None
71
  _OP_REQP = []
72
  REQ_CLUSTER = True
73
  REQ_MASTER = True
74

    
75
  def __init__(self, processor, op, cfg, sstore):
76
    """Constructor for LogicalUnit.
77

78
    This needs to be overriden in derived classes in order to check op
79
    validity.
80

81
    """
82
    self.proc = processor
83
    self.op = op
84
    self.cfg = cfg
85
    self.sstore = sstore
86
    self.__ssh = None
87

    
88
    for attr_name in self._OP_REQP:
89
      attr_val = getattr(op, attr_name, None)
90
      if attr_val is None:
91
        raise errors.OpPrereqError("Required parameter '%s' missing" %
92
                                   attr_name)
93
    if self.REQ_CLUSTER:
94
      if not cfg.IsCluster():
95
        raise errors.OpPrereqError("Cluster not initialized yet,"
96
                                   " use 'gnt-cluster init' first.")
97
      if self.REQ_MASTER:
98
        master = sstore.GetMasterNode()
99
        if master != utils.HostInfo().name:
100
          raise errors.OpPrereqError("Commands must be run on the master"
101
                                     " node %s" % master)
102

    
103
  def __GetSSH(self):
104
    """Returns the SshRunner object
105

106
    """
107
    if not self.__ssh:
108
      self.__ssh = ssh.SshRunner(self.sstore)
109
    return self.__ssh
110

    
111
  ssh = property(fget=__GetSSH)
112

    
113
  def CheckPrereq(self):
114
    """Check prerequisites for this LU.
115

116
    This method should check that the prerequisites for the execution
117
    of this LU are fulfilled. It can do internode communication, but
118
    it should be idempotent - no cluster or system changes are
119
    allowed.
120

121
    The method should raise errors.OpPrereqError in case something is
122
    not fulfilled. Its return value is ignored.
123

124
    This method should also update all the parameters of the opcode to
125
    their canonical form; e.g. a short node name must be fully
126
    expanded after this method has successfully completed (so that
127
    hooks, logging, etc. work correctly).
128

129
    """
130
    raise NotImplementedError
131

    
132
  def Exec(self, feedback_fn):
133
    """Execute the LU.
134

135
    This method should implement the actual work. It should raise
136
    errors.OpExecError for failures that are somewhat dealt with in
137
    code, or expected.
138

139
    """
140
    raise NotImplementedError
141

    
142
  def BuildHooksEnv(self):
143
    """Build hooks environment for this LU.
144

145
    This method should return a three-node tuple consisting of: a dict
146
    containing the environment that will be used for running the
147
    specific hook for this LU, a list of node names on which the hook
148
    should run before the execution, and a list of node names on which
149
    the hook should run after the execution.
150

151
    The keys of the dict must not have 'GANETI_' prefixed as this will
152
    be handled in the hooks runner. Also note additional keys will be
153
    added by the hooks runner. If the LU doesn't define any
154
    environment, an empty dict (and not None) should be returned.
155

156
    As for the node lists, the master should not be included in the
157
    them, as it will be added by the hooks runner in case this LU
158
    requires a cluster to run on (otherwise we don't have a node
159
    list). No nodes should be returned as an empty list (and not
160
    None).
161

162
    Note that if the HPATH for a LU class is None, this function will
163
    not be called.
164

165
    """
166
    raise NotImplementedError
167

    
168

    
169
class NoHooksLU(LogicalUnit):
170
  """Simple LU which runs no hooks.
171

172
  This LU is intended as a parent for other LogicalUnits which will
173
  run no hooks, in order to reduce duplicate code.
174

175
  """
176
  HPATH = None
177
  HTYPE = None
178

    
179
  def BuildHooksEnv(self):
180
    """Build hooks env.
181

182
    This is a no-op, since we don't run hooks.
183

184
    """
185
    return {}, [], []
186

    
187

    
188
def _AddHostToEtcHosts(hostname):
189
  """Wrapper around utils.SetEtcHostsEntry.
190

191
  """
192
  hi = utils.HostInfo(name=hostname)
193
  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
194

    
195

    
196
def _RemoveHostFromEtcHosts(hostname):
197
  """Wrapper around utils.RemoveEtcHostsEntry.
198

199
  """
200
  hi = utils.HostInfo(name=hostname)
201
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
202
  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
203

    
204

    
205
def _GetWantedNodes(lu, nodes):
206
  """Returns list of checked and expanded node names.
207

208
  Args:
209
    nodes: List of nodes (strings) or None for all
210

211
  """
212
  if not isinstance(nodes, list):
213
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
214

    
215
  if nodes:
216
    wanted = []
217

    
218
    for name in nodes:
219
      node = lu.cfg.ExpandNodeName(name)
220
      if node is None:
221
        raise errors.OpPrereqError("No such node name '%s'" % name)
222
      wanted.append(node)
223

    
224
  else:
225
    wanted = lu.cfg.GetNodeList()
226
  return utils.NiceSort(wanted)
227

    
228

    
229
def _GetWantedInstances(lu, instances):
230
  """Returns list of checked and expanded instance names.
231

232
  Args:
233
    instances: List of instances (strings) or None for all
234

235
  """
236
  if not isinstance(instances, list):
237
    raise errors.OpPrereqError("Invalid argument type 'instances'")
238

    
239
  if instances:
240
    wanted = []
241

    
242
    for name in instances:
243
      instance = lu.cfg.ExpandInstanceName(name)
244
      if instance is None:
245
        raise errors.OpPrereqError("No such instance name '%s'" % name)
246
      wanted.append(instance)
247

    
248
  else:
249
    wanted = lu.cfg.GetInstanceList()
250
  return utils.NiceSort(wanted)
251

    
252

    
253
def _CheckOutputFields(static, dynamic, selected):
254
  """Checks whether all selected fields are valid.
255

256
  Args:
257
    static: Static fields
258
    dynamic: Dynamic fields
259

260
  """
261
  static_fields = frozenset(static)
262
  dynamic_fields = frozenset(dynamic)
263

    
264
  all_fields = static_fields | dynamic_fields
265

    
266
  if not all_fields.issuperset(selected):
267
    raise errors.OpPrereqError("Unknown output fields selected: %s"
268
                               % ",".join(frozenset(selected).
269
                                          difference(all_fields)))
270

    
271

    
272
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
273
                          memory, vcpus, nics):
274
  """Builds instance related env variables for hooks from single variables.
275

276
  Args:
277
    secondary_nodes: List of secondary nodes as strings
278
  """
279
  env = {
280
    "OP_TARGET": name,
281
    "INSTANCE_NAME": name,
282
    "INSTANCE_PRIMARY": primary_node,
283
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
284
    "INSTANCE_OS_TYPE": os_type,
285
    "INSTANCE_STATUS": status,
286
    "INSTANCE_MEMORY": memory,
287
    "INSTANCE_VCPUS": vcpus,
288
  }
289

    
290
  if nics:
291
    nic_count = len(nics)
292
    for idx, (ip, bridge, mac) in enumerate(nics):
293
      if ip is None:
294
        ip = ""
295
      env["INSTANCE_NIC%d_IP" % idx] = ip
296
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
297
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
298
  else:
299
    nic_count = 0
300

    
301
  env["INSTANCE_NIC_COUNT"] = nic_count
302

    
303
  return env
304

    
305

    
306
def _BuildInstanceHookEnvByObject(instance, override=None):
307
  """Builds instance related env variables for hooks from an object.
308

309
  Args:
310
    instance: objects.Instance object of instance
311
    override: dict of values to override
312
  """
313
  args = {
314
    'name': instance.name,
315
    'primary_node': instance.primary_node,
316
    'secondary_nodes': instance.secondary_nodes,
317
    'os_type': instance.os,
318
    'status': instance.os,
319
    'memory': instance.memory,
320
    'vcpus': instance.vcpus,
321
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
322
  }
323
  if override:
324
    args.update(override)
325
  return _BuildInstanceHookEnv(**args)
326

    
327

    
328
def _HasValidVG(vglist, vgname):
329
  """Checks if the volume group list is valid.
330

331
  A non-None return value means there's an error, and the return value
332
  is the error message.
333

334
  """
335
  vgsize = vglist.get(vgname, None)
336
  if vgsize is None:
337
    return "volume group '%s' missing" % vgname
338
  elif vgsize < 20480:
339
    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
340
            (vgname, vgsize))
341
  return None
342

    
343

    
344
def _InitSSHSetup(node):
345
  """Setup the SSH configuration for the cluster.
346

347

348
  This generates a dsa keypair for root, adds the pub key to the
349
  permitted hosts and adds the hostkey to its own known hosts.
350

351
  Args:
352
    node: the name of this host as a fqdn
353

354
  """
355
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
356

    
357
  for name in priv_key, pub_key:
358
    if os.path.exists(name):
359
      utils.CreateBackup(name)
360
    utils.RemoveFile(name)
361

    
362
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
363
                         "-f", priv_key,
364
                         "-q", "-N", ""])
365
  if result.failed:
366
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
367
                             result.output)
368

    
369
  f = open(pub_key, 'r')
370
  try:
371
    utils.AddAuthorizedKey(auth_keys, f.read(8192))
372
  finally:
373
    f.close()
374

    
375

    
376
def _InitGanetiServerSetup(ss):
377
  """Setup the necessary configuration for the initial node daemon.
378

379
  This creates the nodepass file containing the shared password for
380
  the cluster and also generates the SSL certificate.
381

382
  """
383
  # Create pseudo random password
384
  randpass = sha.new(os.urandom(64)).hexdigest()
385
  # and write it into sstore
386
  ss.SetKey(ss.SS_NODED_PASS, randpass)
387

    
388
  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
389
                         "-days", str(365*5), "-nodes", "-x509",
390
                         "-keyout", constants.SSL_CERT_FILE,
391
                         "-out", constants.SSL_CERT_FILE, "-batch"])
392
  if result.failed:
393
    raise errors.OpExecError("could not generate server ssl cert, command"
394
                             " %s had exitcode %s and error message %s" %
395
                             (result.cmd, result.exit_code, result.output))
396

    
397
  os.chmod(constants.SSL_CERT_FILE, 0400)
398

    
399
  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
400

    
401
  if result.failed:
402
    raise errors.OpExecError("Could not start the node daemon, command %s"
403
                             " had exitcode %s and error %s" %
404
                             (result.cmd, result.exit_code, result.output))
405

    
406

    
407
def _CheckInstanceBridgesExist(instance):
408
  """Check that the brigdes needed by an instance exist.
409

410
  """
411
  # check bridges existance
412
  brlist = [nic.bridge for nic in instance.nics]
413
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
414
    raise errors.OpPrereqError("one or more target bridges %s does not"
415
                               " exist on destination node '%s'" %
416
                               (brlist, instance.primary_node))
417

    
418

    
419
class LUInitCluster(LogicalUnit):
420
  """Initialise the cluster.
421

422
  """
423
  HPATH = "cluster-init"
424
  HTYPE = constants.HTYPE_CLUSTER
425
  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
426
              "def_bridge", "master_netdev", "file_storage_dir"]
427
  REQ_CLUSTER = False
428

    
429
  def BuildHooksEnv(self):
430
    """Build hooks env.
431

432
    Notes: Since we don't require a cluster, we must manually add
433
    ourselves in the post-run node list.
434

435
    """
436
    env = {"OP_TARGET": self.op.cluster_name}
437
    return env, [], [self.hostname.name]
438

    
439
  def CheckPrereq(self):
440
    """Verify that the passed name is a valid one.
441

442
    """
443
    if config.ConfigWriter.IsCluster():
444
      raise errors.OpPrereqError("Cluster is already initialised")
445

    
446
    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
447
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
448
        raise errors.OpPrereqError("Please prepare the cluster VNC"
449
                                   "password file %s" %
450
                                   constants.VNC_PASSWORD_FILE)
451

    
452
    self.hostname = hostname = utils.HostInfo()
453

    
454
    if hostname.ip.startswith("127."):
455
      raise errors.OpPrereqError("This host's IP resolves to the private"
456
                                 " range (%s). Please fix DNS or %s." %
457
                                 (hostname.ip, constants.ETC_HOSTS))
458

    
459
    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
460
                         source=constants.LOCALHOST_IP_ADDRESS):
461
      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
462
                                 " to %s,\nbut this ip address does not"
463
                                 " belong to this host."
464
                                 " Aborting." % hostname.ip)
465

    
466
    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
467

    
468
    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
469
                     timeout=5):
470
      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
471

    
472
    secondary_ip = getattr(self.op, "secondary_ip", None)
473
    if secondary_ip and not utils.IsValidIP(secondary_ip):
474
      raise errors.OpPrereqError("Invalid secondary ip given")
475
    if (secondary_ip and
476
        secondary_ip != hostname.ip and
477
        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
478
                           source=constants.LOCALHOST_IP_ADDRESS))):
479
      raise errors.OpPrereqError("You gave %s as secondary IP,"
480
                                 " but it does not belong to this host." %
481
                                 secondary_ip)
482
    self.secondary_ip = secondary_ip
483

    
484
    if not hasattr(self.op, "vg_name"):
485
      self.op.vg_name = None
486
    # if vg_name not None, checks if volume group is valid
487
    if self.op.vg_name:
488
      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
489
      if vgstatus:
490
        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
491
                                   " you are not using lvm" % vgstatus)
492

    
493
    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
494

    
495
    if not os.path.isabs(self.op.file_storage_dir):
496
      raise errors.OpPrereqError("The file storage directory you have is"
497
                                 " not an absolute path.")
498

    
499
    if not os.path.exists(self.op.file_storage_dir):
500
      try:
501
        os.makedirs(self.op.file_storage_dir, 0750)
502
      except OSError, err:
503
        raise errors.OpPrereqError("Cannot create file storage directory"
504
                                   " '%s': %s" %
505
                                   (self.op.file_storage_dir, err))
506

    
507
    if not os.path.isdir(self.op.file_storage_dir):
508
      raise errors.OpPrereqError("The file storage directory '%s' is not"
509
                                 " a directory." % self.op.file_storage_dir)
510

    
511
    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
512
                    self.op.mac_prefix):
513
      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
514
                                 self.op.mac_prefix)
515

    
516
    if self.op.hypervisor_type not in constants.HYPER_TYPES:
517
      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
518
                                 self.op.hypervisor_type)
519

    
520
    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
521
    if result.failed:
522
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
523
                                 (self.op.master_netdev,
524
                                  result.output.strip()))
525

    
526
    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
527
            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
528
      raise errors.OpPrereqError("Init.d script '%s' missing or not"
529
                                 " executable." % constants.NODE_INITD_SCRIPT)
530

    
531
  def Exec(self, feedback_fn):
532
    """Initialize the cluster.
533

534
    """
535
    clustername = self.clustername
536
    hostname = self.hostname
537

    
538
    # set up the simple store
539
    self.sstore = ss = ssconf.SimpleStore()
540
    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
541
    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
542
    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
543
    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
544
    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
545
    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
546

    
547
    # set up the inter-node password and certificate
548
    _InitGanetiServerSetup(ss)
549

    
550
    # start the master ip
551
    rpc.call_node_start_master(hostname.name)
552

    
553
    # set up ssh config and /etc/hosts
554
    f = open(constants.SSH_HOST_RSA_PUB, 'r')
555
    try:
556
      sshline = f.read()
557
    finally:
558
      f.close()
559
    sshkey = sshline.split(" ")[1]
560

    
561
    _AddHostToEtcHosts(hostname.name)
562
    _InitSSHSetup(hostname.name)
563

    
564
    # init of cluster config file
565
    self.cfg = cfgw = config.ConfigWriter()
566
    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
567
                    sshkey, self.op.mac_prefix,
568
                    self.op.vg_name, self.op.def_bridge)
569

    
570
    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
571

    
572

    
573
class LUDestroyCluster(NoHooksLU):
574
  """Logical unit for destroying the cluster.
575

576
  """
577
  _OP_REQP = []
578

    
579
  def CheckPrereq(self):
580
    """Check prerequisites.
581

582
    This checks whether the cluster is empty.
583

584
    Any errors are signalled by raising errors.OpPrereqError.
585

586
    """
587
    master = self.sstore.GetMasterNode()
588

    
589
    nodelist = self.cfg.GetNodeList()
590
    if len(nodelist) != 1 or nodelist[0] != master:
591
      raise errors.OpPrereqError("There are still %d node(s) in"
592
                                 " this cluster." % (len(nodelist) - 1))
593
    instancelist = self.cfg.GetInstanceList()
594
    if instancelist:
595
      raise errors.OpPrereqError("There are still %d instance(s) in"
596
                                 " this cluster." % len(instancelist))
597

    
598
  def Exec(self, feedback_fn):
599
    """Destroys the cluster.
600

601
    """
602
    master = self.sstore.GetMasterNode()
603
    if not rpc.call_node_stop_master(master):
604
      raise errors.OpExecError("Could not disable the master role")
605
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
606
    utils.CreateBackup(priv_key)
607
    utils.CreateBackup(pub_key)
608
    rpc.call_node_leave_cluster(master)
609

    
610

    
611
class LUVerifyCluster(NoHooksLU):
612
  """Verifies the cluster status.
613

614
  """
615
  _OP_REQP = ["skip_checks"]
616

    
617
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
618
                  remote_version, feedback_fn):
619
    """Run multiple tests against a node.
620

621
    Test list:
622
      - compares ganeti version
623
      - checks vg existance and size > 20G
624
      - checks config file checksum
625
      - checks ssh to other nodes
626

627
    Args:
628
      node: name of the node to check
629
      file_list: required list of files
630
      local_cksum: dictionary of local files and their checksums
631

632
    """
633
    # compares ganeti version
634
    local_version = constants.PROTOCOL_VERSION
635
    if not remote_version:
636
      feedback_fn("  - ERROR: connection to %s failed" % (node))
637
      return True
638

    
639
    if local_version != remote_version:
640
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
641
                      (local_version, node, remote_version))
642
      return True
643

    
644
    # checks vg existance and size > 20G
645

    
646
    bad = False
647
    if not vglist:
648
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
649
                      (node,))
650
      bad = True
651
    else:
652
      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
653
      if vgstatus:
654
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
655
        bad = True
656

    
657
    # checks config file checksum
658
    # checks ssh to any
659

    
660
    if 'filelist' not in node_result:
661
      bad = True
662
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
663
    else:
664
      remote_cksum = node_result['filelist']
665
      for file_name in file_list:
666
        if file_name not in remote_cksum:
667
          bad = True
668
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
669
        elif remote_cksum[file_name] != local_cksum[file_name]:
670
          bad = True
671
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
672

    
673
    if 'nodelist' not in node_result:
674
      bad = True
675
      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
676
    else:
677
      if node_result['nodelist']:
678
        bad = True
679
        for node in node_result['nodelist']:
680
          feedback_fn("  - ERROR: communication with node '%s': %s" %
681
                          (node, node_result['nodelist'][node]))
682
    hyp_result = node_result.get('hypervisor', None)
683
    if hyp_result is not None:
684
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
685
    return bad
686

    
687
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688
                      node_instance, feedback_fn):
689
    """Verify an instance.
690

691
    This function checks to see if the required block devices are
692
    available on the instance's node.
693

694
    """
695
    bad = False
696

    
697
    node_current = instanceconfig.primary_node
698

    
699
    node_vol_should = {}
700
    instanceconfig.MapLVsByNode(node_vol_should)
701

    
702
    for node in node_vol_should:
703
      for volume in node_vol_should[node]:
704
        if node not in node_vol_is or volume not in node_vol_is[node]:
705
          feedback_fn("  - ERROR: volume %s missing on node %s" %
706
                          (volume, node))
707
          bad = True
708

    
709
    if not instanceconfig.status == 'down':
710
      if (node_current not in node_instance or
711
          not instance in node_instance[node_current]):
712
        feedback_fn("  - ERROR: instance %s not running on node %s" %
713
                        (instance, node_current))
714
        bad = True
715

    
716
    for node in node_instance:
717
      if (not node == node_current):
718
        if instance in node_instance[node]:
719
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
720
                          (instance, node))
721
          bad = True
722

    
723
    return bad
724

    
725
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726
    """Verify if there are any unknown volumes in the cluster.
727

728
    The .os, .swap and backup volumes are ignored. All other volumes are
729
    reported as unknown.
730

731
    """
732
    bad = False
733

    
734
    for node in node_vol_is:
735
      for volume in node_vol_is[node]:
736
        if node not in node_vol_should or volume not in node_vol_should[node]:
737
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
738
                      (volume, node))
739
          bad = True
740
    return bad
741

    
742
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743
    """Verify the list of running instances.
744

745
    This checks what instances are running but unknown to the cluster.
746

747
    """
748
    bad = False
749
    for node in node_instance:
750
      for runninginstance in node_instance[node]:
751
        if runninginstance not in instancelist:
752
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
753
                          (runninginstance, node))
754
          bad = True
755
    return bad
756

    
757
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758
    """Verify N+1 Memory Resilience.
759

760
    Check that if one single node dies we can still start all the instances it
761
    was primary for.
762

763
    """
764
    bad = False
765

    
766
    for node, nodeinfo in node_info.iteritems():
767
      # This code checks that every node which is now listed as secondary has
768
      # enough memory to host all instances it is supposed to should a single
769
      # other node in the cluster fail.
770
      # FIXME: not ready for failover to an arbitrary node
771
      # FIXME: does not support file-backed instances
772
      # WARNING: we currently take into account down instances as well as up
773
      # ones, considering that even if they're down someone might want to start
774
      # them even in the event of a node failure.
775
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
776
        needed_mem = 0
777
        for instance in instances:
778
          needed_mem += instance_cfg[instance].memory
779
        if nodeinfo['mfree'] < needed_mem:
780
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
781
                      " failovers should node %s fail" % (node, prinode))
782
          bad = True
783
    return bad
784

    
785
  def CheckPrereq(self):
786
    """Check prerequisites.
787

788
    Transform the list of checks we're going to skip into a set and check that
789
    all its members are valid.
790

791
    """
792
    self.skip_set = frozenset(self.op.skip_checks)
793
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
794
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
795

    
796
  def Exec(self, feedback_fn):
797
    """Verify integrity of cluster, performing various test on nodes.
798

799
    """
800
    bad = False
801
    feedback_fn("* Verifying global settings")
802
    for msg in self.cfg.VerifyConfig():
803
      feedback_fn("  - ERROR: %s" % msg)
804

    
805
    vg_name = self.cfg.GetVGName()
806
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
807
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
808
    i_non_redundant = [] # Non redundant instances
809
    node_volume = {}
810
    node_instance = {}
811
    node_info = {}
812
    instance_cfg = {}
813

    
814
    # FIXME: verify OS list
815
    # do local checksums
816
    file_names = list(self.sstore.GetFileList())
817
    file_names.append(constants.SSL_CERT_FILE)
818
    file_names.append(constants.CLUSTER_CONF_FILE)
819
    local_checksums = utils.FingerprintFiles(file_names)
820

    
821
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823
    all_instanceinfo = rpc.call_instance_list(nodelist)
824
    all_vglist = rpc.call_vg_list(nodelist)
825
    node_verify_param = {
826
      'filelist': file_names,
827
      'nodelist': nodelist,
828
      'hypervisor': None,
829
      }
830
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831
    all_rversion = rpc.call_version(nodelist)
832
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
833

    
834
    for node in nodelist:
835
      feedback_fn("* Verifying node %s" % node)
836
      result = self._VerifyNode(node, file_names, local_checksums,
837
                                all_vglist[node], all_nvinfo[node],
838
                                all_rversion[node], feedback_fn)
839
      bad = bad or result
840

    
841
      # node_volume
842
      volumeinfo = all_volumeinfo[node]
843

    
844
      if isinstance(volumeinfo, basestring):
845
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846
                    (node, volumeinfo[-400:].encode('string_escape')))
847
        bad = True
848
        node_volume[node] = {}
849
      elif not isinstance(volumeinfo, dict):
850
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
851
        bad = True
852
        continue
853
      else:
854
        node_volume[node] = volumeinfo
855

    
856
      # node_instance
857
      nodeinstance = all_instanceinfo[node]
858
      if type(nodeinstance) != list:
859
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
860
        bad = True
861
        continue
862

    
863
      node_instance[node] = nodeinstance
864

    
865
      # node_info
866
      nodeinfo = all_ninfo[node]
867
      if not isinstance(nodeinfo, dict):
868
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
869
        bad = True
870
        continue
871

    
872
      try:
873
        node_info[node] = {
874
          "mfree": int(nodeinfo['memory_free']),
875
          "dfree": int(nodeinfo['vg_free']),
876
          "pinst": [],
877
          "sinst": [],
878
          # dictionary holding all instances this node is secondary for,
879
          # grouped by their primary node. Each key is a cluster node, and each
880
          # value is a list of instances which have the key as primary and the
881
          # current node as secondary.  this is handy to calculate N+1 memory
882
          # availability if you can only failover from a primary to its
883
          # secondary.
884
          "sinst-by-pnode": {},
885
        }
886
      except ValueError:
887
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
888
        bad = True
889
        continue
890

    
891
    node_vol_should = {}
892

    
893
    for instance in instancelist:
894
      feedback_fn("* Verifying instance %s" % instance)
895
      inst_config = self.cfg.GetInstanceInfo(instance)
896
      result =  self._VerifyInstance(instance, inst_config, node_volume,
897
                                     node_instance, feedback_fn)
898
      bad = bad or result
899

    
900
      inst_config.MapLVsByNode(node_vol_should)
901

    
902
      instance_cfg[instance] = inst_config
903

    
904
      pnode = inst_config.primary_node
905
      if pnode in node_info:
906
        node_info[pnode]['pinst'].append(instance)
907
      else:
908
        feedback_fn("  - ERROR: instance %s, connection to primary node"
909
                    " %s failed" % (instance, pnode))
910
        bad = True
911

    
912
      # If the instance is non-redundant we cannot survive losing its primary
913
      # node, so we are not N+1 compliant. On the other hand we have no disk
914
      # templates with more than one secondary so that situation is not well
915
      # supported either.
916
      # FIXME: does not support file-backed instances
917
      if len(inst_config.secondary_nodes) == 0:
918
        i_non_redundant.append(instance)
919
      elif len(inst_config.secondary_nodes) > 1:
920
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
921
                    % instance)
922

    
923
      for snode in inst_config.secondary_nodes:
924
        if snode in node_info:
925
          node_info[snode]['sinst'].append(instance)
926
          if pnode not in node_info[snode]['sinst-by-pnode']:
927
            node_info[snode]['sinst-by-pnode'][pnode] = []
928
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929
        else:
930
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
931
                      " %s failed" % (instance, snode))
932

    
933
    feedback_fn("* Verifying orphan volumes")
934
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935
                                       feedback_fn)
936
    bad = bad or result
937

    
938
    feedback_fn("* Verifying remaining instances")
939
    result = self._VerifyOrphanInstances(instancelist, node_instance,
940
                                         feedback_fn)
941
    bad = bad or result
942

    
943
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944
      feedback_fn("* Verifying N+1 Memory redundancy")
945
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946
      bad = bad or result
947

    
948
    feedback_fn("* Other Notes")
949
    if i_non_redundant:
950
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951
                  % len(i_non_redundant))
952

    
953
    return int(bad)
954

    
955

    
956
class LUVerifyDisks(NoHooksLU):
957
  """Verifies the cluster disks status.
958

959
  """
960
  _OP_REQP = []
961

    
962
  def CheckPrereq(self):
963
    """Check prerequisites.
964

965
    This has no prerequisites.
966

967
    """
968
    pass
969

    
970
  def Exec(self, feedback_fn):
971
    """Verify integrity of cluster disks.
972

973
    """
974
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
975

    
976
    vg_name = self.cfg.GetVGName()
977
    nodes = utils.NiceSort(self.cfg.GetNodeList())
978
    instances = [self.cfg.GetInstanceInfo(name)
979
                 for name in self.cfg.GetInstanceList()]
980

    
981
    nv_dict = {}
982
    for inst in instances:
983
      inst_lvs = {}
984
      if (inst.status != "up" or
985
          inst.disk_template not in constants.DTS_NET_MIRROR):
986
        continue
987
      inst.MapLVsByNode(inst_lvs)
988
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
989
      for node, vol_list in inst_lvs.iteritems():
990
        for vol in vol_list:
991
          nv_dict[(node, vol)] = inst
992

    
993
    if not nv_dict:
994
      return result
995

    
996
    node_lvs = rpc.call_volume_list(nodes, vg_name)
997

    
998
    to_act = set()
999
    for node in nodes:
1000
      # node_volume
1001
      lvs = node_lvs[node]
1002

    
1003
      if isinstance(lvs, basestring):
1004
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1005
        res_nlvm[node] = lvs
1006
      elif not isinstance(lvs, dict):
1007
        logger.Info("connection to node %s failed or invalid data returned" %
1008
                    (node,))
1009
        res_nodes.append(node)
1010
        continue
1011

    
1012
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1013
        inst = nv_dict.pop((node, lv_name), None)
1014
        if (not lv_online and inst is not None
1015
            and inst.name not in res_instances):
1016
          res_instances.append(inst.name)
1017

    
1018
    # any leftover items in nv_dict are missing LVs, let's arrange the
1019
    # data better
1020
    for key, inst in nv_dict.iteritems():
1021
      if inst.name not in res_missing:
1022
        res_missing[inst.name] = []
1023
      res_missing[inst.name].append(key)
1024

    
1025
    return result
1026

    
1027

    
1028
class LURenameCluster(LogicalUnit):
1029
  """Rename the cluster.
1030

1031
  """
1032
  HPATH = "cluster-rename"
1033
  HTYPE = constants.HTYPE_CLUSTER
1034
  _OP_REQP = ["name"]
1035

    
1036
  def BuildHooksEnv(self):
1037
    """Build hooks env.
1038

1039
    """
1040
    env = {
1041
      "OP_TARGET": self.sstore.GetClusterName(),
1042
      "NEW_NAME": self.op.name,
1043
      }
1044
    mn = self.sstore.GetMasterNode()
1045
    return env, [mn], [mn]
1046

    
1047
  def CheckPrereq(self):
1048
    """Verify that the passed name is a valid one.
1049

1050
    """
1051
    hostname = utils.HostInfo(self.op.name)
1052

    
1053
    new_name = hostname.name
1054
    self.ip = new_ip = hostname.ip
1055
    old_name = self.sstore.GetClusterName()
1056
    old_ip = self.sstore.GetMasterIP()
1057
    if new_name == old_name and new_ip == old_ip:
1058
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1059
                                 " cluster has changed")
1060
    if new_ip != old_ip:
1061
      result = utils.RunCmd(["fping", "-q", new_ip])
1062
      if not result.failed:
1063
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1064
                                   " reachable on the network. Aborting." %
1065
                                   new_ip)
1066

    
1067
    self.op.name = new_name
1068

    
1069
  def Exec(self, feedback_fn):
1070
    """Rename the cluster.
1071

1072
    """
1073
    clustername = self.op.name
1074
    ip = self.ip
1075
    ss = self.sstore
1076

    
1077
    # shutdown the master IP
1078
    master = ss.GetMasterNode()
1079
    if not rpc.call_node_stop_master(master):
1080
      raise errors.OpExecError("Could not disable the master role")
1081

    
1082
    try:
1083
      # modify the sstore
1084
      ss.SetKey(ss.SS_MASTER_IP, ip)
1085
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1086

    
1087
      # Distribute updated ss config to all nodes
1088
      myself = self.cfg.GetNodeInfo(master)
1089
      dist_nodes = self.cfg.GetNodeList()
1090
      if myself.name in dist_nodes:
1091
        dist_nodes.remove(myself.name)
1092

    
1093
      logger.Debug("Copying updated ssconf data to all nodes")
1094
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1095
        fname = ss.KeyToFilename(keyname)
1096
        result = rpc.call_upload_file(dist_nodes, fname)
1097
        for to_node in dist_nodes:
1098
          if not result[to_node]:
1099
            logger.Error("copy of file %s to node %s failed" %
1100
                         (fname, to_node))
1101
    finally:
1102
      if not rpc.call_node_start_master(master):
1103
        logger.Error("Could not re-enable the master role on the master,"
1104
                     " please restart manually.")
1105

    
1106

    
1107
def _RecursiveCheckIfLVMBased(disk):
1108
  """Check if the given disk or its children are lvm-based.
1109

1110
  Args:
1111
    disk: ganeti.objects.Disk object
1112

1113
  Returns:
1114
    boolean indicating whether a LD_LV dev_type was found or not
1115

1116
  """
1117
  if disk.children:
1118
    for chdisk in disk.children:
1119
      if _RecursiveCheckIfLVMBased(chdisk):
1120
        return True
1121
  return disk.dev_type == constants.LD_LV
1122

    
1123

    
1124
class LUSetClusterParams(LogicalUnit):
1125
  """Change the parameters of the cluster.
1126

1127
  """
1128
  HPATH = "cluster-modify"
1129
  HTYPE = constants.HTYPE_CLUSTER
1130
  _OP_REQP = []
1131

    
1132
  def BuildHooksEnv(self):
1133
    """Build hooks env.
1134

1135
    """
1136
    env = {
1137
      "OP_TARGET": self.sstore.GetClusterName(),
1138
      "NEW_VG_NAME": self.op.vg_name,
1139
      }
1140
    mn = self.sstore.GetMasterNode()
1141
    return env, [mn], [mn]
1142

    
1143
  def CheckPrereq(self):
1144
    """Check prerequisites.
1145

1146
    This checks whether the given params don't conflict and
1147
    if the given volume group is valid.
1148

1149
    """
1150
    if not self.op.vg_name:
1151
      instances = [self.cfg.GetInstanceInfo(name)
1152
                   for name in self.cfg.GetInstanceList()]
1153
      for inst in instances:
1154
        for disk in inst.disks:
1155
          if _RecursiveCheckIfLVMBased(disk):
1156
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1157
                                       " lvm-based instances exist")
1158

    
1159
    # if vg_name not None, checks given volume group on all nodes
1160
    if self.op.vg_name:
1161
      node_list = self.cfg.GetNodeList()
1162
      vglist = rpc.call_vg_list(node_list)
1163
      for node in node_list:
1164
        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1165
        if vgstatus:
1166
          raise errors.OpPrereqError("Error on node '%s': %s" %
1167
                                     (node, vgstatus))
1168

    
1169
  def Exec(self, feedback_fn):
1170
    """Change the parameters of the cluster.
1171

1172
    """
1173
    if self.op.vg_name != self.cfg.GetVGName():
1174
      self.cfg.SetVGName(self.op.vg_name)
1175
    else:
1176
      feedback_fn("Cluster LVM configuration already in desired"
1177
                  " state, not changing")
1178

    
1179

    
1180
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1181
  """Sleep and poll for an instance's disk to sync.
1182

1183
  """
1184
  if not instance.disks:
1185
    return True
1186

    
1187
  if not oneshot:
1188
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1189

    
1190
  node = instance.primary_node
1191

    
1192
  for dev in instance.disks:
1193
    cfgw.SetDiskID(dev, node)
1194

    
1195
  retries = 0
1196
  while True:
1197
    max_time = 0
1198
    done = True
1199
    cumul_degraded = False
1200
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1201
    if not rstats:
1202
      proc.LogWarning("Can't get any data from node %s" % node)
1203
      retries += 1
1204
      if retries >= 10:
1205
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1206
                                 " aborting." % node)
1207
      time.sleep(6)
1208
      continue
1209
    retries = 0
1210
    for i in range(len(rstats)):
1211
      mstat = rstats[i]
1212
      if mstat is None:
1213
        proc.LogWarning("Can't compute data for node %s/%s" %
1214
                        (node, instance.disks[i].iv_name))
1215
        continue
1216
      # we ignore the ldisk parameter
1217
      perc_done, est_time, is_degraded, _ = mstat
1218
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1219
      if perc_done is not None:
1220
        done = False
1221
        if est_time is not None:
1222
          rem_time = "%d estimated seconds remaining" % est_time
1223
          max_time = est_time
1224
        else:
1225
          rem_time = "no time estimate"
1226
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1227
                     (instance.disks[i].iv_name, perc_done, rem_time))
1228
    if done or oneshot:
1229
      break
1230

    
1231
    if unlock:
1232
      utils.Unlock('cmd')
1233
    try:
1234
      time.sleep(min(60, max_time))
1235
    finally:
1236
      if unlock:
1237
        utils.Lock('cmd')
1238

    
1239
  if done:
1240
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1241
  return not cumul_degraded
1242

    
1243

    
1244
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1245
  """Check that mirrors are not degraded.
1246

1247
  The ldisk parameter, if True, will change the test from the
1248
  is_degraded attribute (which represents overall non-ok status for
1249
  the device(s)) to the ldisk (representing the local storage status).
1250

1251
  """
1252
  cfgw.SetDiskID(dev, node)
1253
  if ldisk:
1254
    idx = 6
1255
  else:
1256
    idx = 5
1257

    
1258
  result = True
1259
  if on_primary or dev.AssembleOnSecondary():
1260
    rstats = rpc.call_blockdev_find(node, dev)
1261
    if not rstats:
1262
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1263
      result = False
1264
    else:
1265
      result = result and (not rstats[idx])
1266
  if dev.children:
1267
    for child in dev.children:
1268
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1269

    
1270
  return result
1271

    
1272

    
1273
class LUDiagnoseOS(NoHooksLU):
1274
  """Logical unit for OS diagnose/query.
1275

1276
  """
1277
  _OP_REQP = ["output_fields", "names"]
1278

    
1279
  def CheckPrereq(self):
1280
    """Check prerequisites.
1281

1282
    This always succeeds, since this is a pure query LU.
1283

1284
    """
1285
    if self.op.names:
1286
      raise errors.OpPrereqError("Selective OS query not supported")
1287

    
1288
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1289
    _CheckOutputFields(static=[],
1290
                       dynamic=self.dynamic_fields,
1291
                       selected=self.op.output_fields)
1292

    
1293
  @staticmethod
1294
  def _DiagnoseByOS(node_list, rlist):
1295
    """Remaps a per-node return list into an a per-os per-node dictionary
1296

1297
      Args:
1298
        node_list: a list with the names of all nodes
1299
        rlist: a map with node names as keys and OS objects as values
1300

1301
      Returns:
1302
        map: a map with osnames as keys and as value another map, with
1303
             nodes as
1304
             keys and list of OS objects as values
1305
             e.g. {"debian-etch": {"node1": [<object>,...],
1306
                                   "node2": [<object>,]}
1307
                  }
1308

1309
    """
1310
    all_os = {}
1311
    for node_name, nr in rlist.iteritems():
1312
      if not nr:
1313
        continue
1314
      for os in nr:
1315
        if os.name not in all_os:
1316
          # build a list of nodes for this os containing empty lists
1317
          # for each node in node_list
1318
          all_os[os.name] = {}
1319
          for nname in node_list:
1320
            all_os[os.name][nname] = []
1321
        all_os[os.name][node_name].append(os)
1322
    return all_os
1323

    
1324
  def Exec(self, feedback_fn):
1325
    """Compute the list of OSes.
1326

1327
    """
1328
    node_list = self.cfg.GetNodeList()
1329
    node_data = rpc.call_os_diagnose(node_list)
1330
    if node_data == False:
1331
      raise errors.OpExecError("Can't gather the list of OSes")
1332
    pol = self._DiagnoseByOS(node_list, node_data)
1333
    output = []
1334
    for os_name, os_data in pol.iteritems():
1335
      row = []
1336
      for field in self.op.output_fields:
1337
        if field == "name":
1338
          val = os_name
1339
        elif field == "valid":
1340
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1341
        elif field == "node_status":
1342
          val = {}
1343
          for node_name, nos_list in os_data.iteritems():
1344
            val[node_name] = [(v.status, v.path) for v in nos_list]
1345
        else:
1346
          raise errors.ParameterError(field)
1347
        row.append(val)
1348
      output.append(row)
1349

    
1350
    return output
1351

    
1352

    
1353
class LURemoveNode(LogicalUnit):
1354
  """Logical unit for removing a node.
1355

1356
  """
1357
  HPATH = "node-remove"
1358
  HTYPE = constants.HTYPE_NODE
1359
  _OP_REQP = ["node_name"]
1360

    
1361
  def BuildHooksEnv(self):
1362
    """Build hooks env.
1363

1364
    This doesn't run on the target node in the pre phase as a failed
1365
    node would not allows itself to run.
1366

1367
    """
1368
    env = {
1369
      "OP_TARGET": self.op.node_name,
1370
      "NODE_NAME": self.op.node_name,
1371
      }
1372
    all_nodes = self.cfg.GetNodeList()
1373
    all_nodes.remove(self.op.node_name)
1374
    return env, all_nodes, all_nodes
1375

    
1376
  def CheckPrereq(self):
1377
    """Check prerequisites.
1378

1379
    This checks:
1380
     - the node exists in the configuration
1381
     - it does not have primary or secondary instances
1382
     - it's not the master
1383

1384
    Any errors are signalled by raising errors.OpPrereqError.
1385

1386
    """
1387
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1388
    if node is None:
1389
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1390

    
1391
    instance_list = self.cfg.GetInstanceList()
1392

    
1393
    masternode = self.sstore.GetMasterNode()
1394
    if node.name == masternode:
1395
      raise errors.OpPrereqError("Node is the master node,"
1396
                                 " you need to failover first.")
1397

    
1398
    for instance_name in instance_list:
1399
      instance = self.cfg.GetInstanceInfo(instance_name)
1400
      if node.name == instance.primary_node:
1401
        raise errors.OpPrereqError("Instance %s still running on the node,"
1402
                                   " please remove first." % instance_name)
1403
      if node.name in instance.secondary_nodes:
1404
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1405
                                   " please remove first." % instance_name)
1406
    self.op.node_name = node.name
1407
    self.node = node
1408

    
1409
  def Exec(self, feedback_fn):
1410
    """Removes the node from the cluster.
1411

1412
    """
1413
    node = self.node
1414
    logger.Info("stopping the node daemon and removing configs from node %s" %
1415
                node.name)
1416

    
1417
    rpc.call_node_leave_cluster(node.name)
1418

    
1419
    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1420

    
1421
    logger.Info("Removing node %s from config" % node.name)
1422

    
1423
    self.cfg.RemoveNode(node.name)
1424

    
1425
    _RemoveHostFromEtcHosts(node.name)
1426

    
1427

    
1428
class LUQueryNodes(NoHooksLU):
1429
  """Logical unit for querying nodes.
1430

1431
  """
1432
  _OP_REQP = ["output_fields", "names"]
1433

    
1434
  def CheckPrereq(self):
1435
    """Check prerequisites.
1436

1437
    This checks that the fields required are valid output fields.
1438

1439
    """
1440
    self.dynamic_fields = frozenset(["dtotal", "dfree",
1441
                                     "mtotal", "mnode", "mfree",
1442
                                     "bootid"])
1443

    
1444
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1445
                               "pinst_list", "sinst_list",
1446
                               "pip", "sip"],
1447
                       dynamic=self.dynamic_fields,
1448
                       selected=self.op.output_fields)
1449

    
1450
    self.wanted = _GetWantedNodes(self, self.op.names)
1451

    
1452
  def Exec(self, feedback_fn):
1453
    """Computes the list of nodes and their attributes.
1454

1455
    """
1456
    nodenames = self.wanted
1457
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1458

    
1459
    # begin data gathering
1460

    
1461
    if self.dynamic_fields.intersection(self.op.output_fields):
1462
      live_data = {}
1463
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1464
      for name in nodenames:
1465
        nodeinfo = node_data.get(name, None)
1466
        if nodeinfo:
1467
          live_data[name] = {
1468
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1469
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1470
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1471
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1472
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1473
            "bootid": nodeinfo['bootid'],
1474
            }
1475
        else:
1476
          live_data[name] = {}
1477
    else:
1478
      live_data = dict.fromkeys(nodenames, {})
1479

    
1480
    node_to_primary = dict([(name, set()) for name in nodenames])
1481
    node_to_secondary = dict([(name, set()) for name in nodenames])
1482

    
1483
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1484
                             "sinst_cnt", "sinst_list"))
1485
    if inst_fields & frozenset(self.op.output_fields):
1486
      instancelist = self.cfg.GetInstanceList()
1487

    
1488
      for instance_name in instancelist:
1489
        inst = self.cfg.GetInstanceInfo(instance_name)
1490
        if inst.primary_node in node_to_primary:
1491
          node_to_primary[inst.primary_node].add(inst.name)
1492
        for secnode in inst.secondary_nodes:
1493
          if secnode in node_to_secondary:
1494
            node_to_secondary[secnode].add(inst.name)
1495

    
1496
    # end data gathering
1497

    
1498
    output = []
1499
    for node in nodelist:
1500
      node_output = []
1501
      for field in self.op.output_fields:
1502
        if field == "name":
1503
          val = node.name
1504
        elif field == "pinst_list":
1505
          val = list(node_to_primary[node.name])
1506
        elif field == "sinst_list":
1507
          val = list(node_to_secondary[node.name])
1508
        elif field == "pinst_cnt":
1509
          val = len(node_to_primary[node.name])
1510
        elif field == "sinst_cnt":
1511
          val = len(node_to_secondary[node.name])
1512
        elif field == "pip":
1513
          val = node.primary_ip
1514
        elif field == "sip":
1515
          val = node.secondary_ip
1516
        elif field in self.dynamic_fields:
1517
          val = live_data[node.name].get(field, None)
1518
        else:
1519
          raise errors.ParameterError(field)
1520
        node_output.append(val)
1521
      output.append(node_output)
1522

    
1523
    return output
1524

    
1525

    
1526
class LUQueryNodeVolumes(NoHooksLU):
1527
  """Logical unit for getting volumes on node(s).
1528

1529
  """
1530
  _OP_REQP = ["nodes", "output_fields"]
1531

    
1532
  def CheckPrereq(self):
1533
    """Check prerequisites.
1534

1535
    This checks that the fields required are valid output fields.
1536

1537
    """
1538
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1539

    
1540
    _CheckOutputFields(static=["node"],
1541
                       dynamic=["phys", "vg", "name", "size", "instance"],
1542
                       selected=self.op.output_fields)
1543

    
1544

    
1545
  def Exec(self, feedback_fn):
1546
    """Computes the list of nodes and their attributes.
1547

1548
    """
1549
    nodenames = self.nodes
1550
    volumes = rpc.call_node_volumes(nodenames)
1551

    
1552
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1553
             in self.cfg.GetInstanceList()]
1554

    
1555
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1556

    
1557
    output = []
1558
    for node in nodenames:
1559
      if node not in volumes or not volumes[node]:
1560
        continue
1561

    
1562
      node_vols = volumes[node][:]
1563
      node_vols.sort(key=lambda vol: vol['dev'])
1564

    
1565
      for vol in node_vols:
1566
        node_output = []
1567
        for field in self.op.output_fields:
1568
          if field == "node":
1569
            val = node
1570
          elif field == "phys":
1571
            val = vol['dev']
1572
          elif field == "vg":
1573
            val = vol['vg']
1574
          elif field == "name":
1575
            val = vol['name']
1576
          elif field == "size":
1577
            val = int(float(vol['size']))
1578
          elif field == "instance":
1579
            for inst in ilist:
1580
              if node not in lv_by_node[inst]:
1581
                continue
1582
              if vol['name'] in lv_by_node[inst][node]:
1583
                val = inst.name
1584
                break
1585
            else:
1586
              val = '-'
1587
          else:
1588
            raise errors.ParameterError(field)
1589
          node_output.append(str(val))
1590

    
1591
        output.append(node_output)
1592

    
1593
    return output
1594

    
1595

    
1596
class LUAddNode(LogicalUnit):
1597
  """Logical unit for adding node to the cluster.
1598

1599
  """
1600
  HPATH = "node-add"
1601
  HTYPE = constants.HTYPE_NODE
1602
  _OP_REQP = ["node_name"]
1603

    
1604
  def BuildHooksEnv(self):
1605
    """Build hooks env.
1606

1607
    This will run on all nodes before, and on all nodes + the new node after.
1608

1609
    """
1610
    env = {
1611
      "OP_TARGET": self.op.node_name,
1612
      "NODE_NAME": self.op.node_name,
1613
      "NODE_PIP": self.op.primary_ip,
1614
      "NODE_SIP": self.op.secondary_ip,
1615
      }
1616
    nodes_0 = self.cfg.GetNodeList()
1617
    nodes_1 = nodes_0 + [self.op.node_name, ]
1618
    return env, nodes_0, nodes_1
1619

    
1620
  def CheckPrereq(self):
1621
    """Check prerequisites.
1622

1623
    This checks:
1624
     - the new node is not already in the config
1625
     - it is resolvable
1626
     - its parameters (single/dual homed) matches the cluster
1627

1628
    Any errors are signalled by raising errors.OpPrereqError.
1629

1630
    """
1631
    node_name = self.op.node_name
1632
    cfg = self.cfg
1633

    
1634
    dns_data = utils.HostInfo(node_name)
1635

    
1636
    node = dns_data.name
1637
    primary_ip = self.op.primary_ip = dns_data.ip
1638
    secondary_ip = getattr(self.op, "secondary_ip", None)
1639
    if secondary_ip is None:
1640
      secondary_ip = primary_ip
1641
    if not utils.IsValidIP(secondary_ip):
1642
      raise errors.OpPrereqError("Invalid secondary IP given")
1643
    self.op.secondary_ip = secondary_ip
1644
    node_list = cfg.GetNodeList()
1645
    if node in node_list:
1646
      raise errors.OpPrereqError("Node %s is already in the configuration"
1647
                                 % node)
1648

    
1649
    for existing_node_name in node_list:
1650
      existing_node = cfg.GetNodeInfo(existing_node_name)
1651
      if (existing_node.primary_ip == primary_ip or
1652
          existing_node.secondary_ip == primary_ip or
1653
          existing_node.primary_ip == secondary_ip or
1654
          existing_node.secondary_ip == secondary_ip):
1655
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1656
                                   " existing node %s" % existing_node.name)
1657

    
1658
    # check that the type of the node (single versus dual homed) is the
1659
    # same as for the master
1660
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1661
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1662
    newbie_singlehomed = secondary_ip == primary_ip
1663
    if master_singlehomed != newbie_singlehomed:
1664
      if master_singlehomed:
1665
        raise errors.OpPrereqError("The master has no private ip but the"
1666
                                   " new node has one")
1667
      else:
1668
        raise errors.OpPrereqError("The master has a private ip but the"
1669
                                   " new node doesn't have one")
1670

    
1671
    # checks reachablity
1672
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1673
      raise errors.OpPrereqError("Node not reachable by ping")
1674

    
1675
    if not newbie_singlehomed:
1676
      # check reachability from my secondary ip to newbie's secondary ip
1677
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1678
                           source=myself.secondary_ip):
1679
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1680
                                   " based ping to noded port")
1681

    
1682
    self.new_node = objects.Node(name=node,
1683
                                 primary_ip=primary_ip,
1684
                                 secondary_ip=secondary_ip)
1685

    
1686
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1687
      if not os.path.exists(constants.VNC_PASSWORD_FILE):
1688
        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1689
                                   constants.VNC_PASSWORD_FILE)
1690

    
1691
  def Exec(self, feedback_fn):
1692
    """Adds the new node to the cluster.
1693

1694
    """
1695
    new_node = self.new_node
1696
    node = new_node.name
1697

    
1698
    # set up inter-node password and certificate and restarts the node daemon
1699
    gntpass = self.sstore.GetNodeDaemonPassword()
1700
    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1701
      raise errors.OpExecError("ganeti password corruption detected")
1702
    f = open(constants.SSL_CERT_FILE)
1703
    try:
1704
      gntpem = f.read(8192)
1705
    finally:
1706
      f.close()
1707
    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1708
    # so we use this to detect an invalid certificate; as long as the
1709
    # cert doesn't contain this, the here-document will be correctly
1710
    # parsed by the shell sequence below
1711
    if re.search('^!EOF\.', gntpem, re.MULTILINE):
1712
      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1713
    if not gntpem.endswith("\n"):
1714
      raise errors.OpExecError("PEM must end with newline")
1715
    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1716

    
1717
    # and then connect with ssh to set password and start ganeti-noded
1718
    # note that all the below variables are sanitized at this point,
1719
    # either by being constants or by the checks above
1720
    ss = self.sstore
1721
    mycommand = ("umask 077 && "
1722
                 "echo '%s' > '%s' && "
1723
                 "cat > '%s' << '!EOF.' && \n"
1724
                 "%s!EOF.\n%s restart" %
1725
                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1726
                  constants.SSL_CERT_FILE, gntpem,
1727
                  constants.NODE_INITD_SCRIPT))
1728

    
1729
    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1730
    if result.failed:
1731
      raise errors.OpExecError("Remote command on node %s, error: %s,"
1732
                               " output: %s" %
1733
                               (node, result.fail_reason, result.output))
1734

    
1735
    # check connectivity
1736
    time.sleep(4)
1737

    
1738
    result = rpc.call_version([node])[node]
1739
    if result:
1740
      if constants.PROTOCOL_VERSION == result:
1741
        logger.Info("communication to node %s fine, sw version %s match" %
1742
                    (node, result))
1743
      else:
1744
        raise errors.OpExecError("Version mismatch master version %s,"
1745
                                 " node version %s" %
1746
                                 (constants.PROTOCOL_VERSION, result))
1747
    else:
1748
      raise errors.OpExecError("Cannot get version from the new node")
1749

    
1750
    # setup ssh on node
1751
    logger.Info("copy ssh key to node %s" % node)
1752
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1753
    keyarray = []
1754
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1755
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1756
                priv_key, pub_key]
1757

    
1758
    for i in keyfiles:
1759
      f = open(i, 'r')
1760
      try:
1761
        keyarray.append(f.read())
1762
      finally:
1763
        f.close()
1764

    
1765
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1766
                               keyarray[3], keyarray[4], keyarray[5])
1767

    
1768
    if not result:
1769
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1770

    
1771
    # Add node to our /etc/hosts, and add key to known_hosts
1772
    _AddHostToEtcHosts(new_node.name)
1773

    
1774
    if new_node.secondary_ip != new_node.primary_ip:
1775
      if not rpc.call_node_tcp_ping(new_node.name,
1776
                                    constants.LOCALHOST_IP_ADDRESS,
1777
                                    new_node.secondary_ip,
1778
                                    constants.DEFAULT_NODED_PORT,
1779
                                    10, False):
1780
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1781
                                 " you gave (%s). Please fix and re-run this"
1782
                                 " command." % new_node.secondary_ip)
1783

    
1784
    success, msg = self.ssh.VerifyNodeHostname(node)
1785
    if not success:
1786
      raise errors.OpExecError("Node '%s' claims it has a different hostname"
1787
                               " than the one the resolver gives: %s."
1788
                               " Please fix and re-run this command." %
1789
                               (node, msg))
1790

    
1791
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1792
    # including the node just added
1793
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1794
    dist_nodes = self.cfg.GetNodeList() + [node]
1795
    if myself.name in dist_nodes:
1796
      dist_nodes.remove(myself.name)
1797

    
1798
    logger.Debug("Copying hosts and known_hosts to all nodes")
1799
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1800
      result = rpc.call_upload_file(dist_nodes, fname)
1801
      for to_node in dist_nodes:
1802
        if not result[to_node]:
1803
          logger.Error("copy of file %s to node %s failed" %
1804
                       (fname, to_node))
1805

    
1806
    to_copy = ss.GetFileList()
1807
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1808
      to_copy.append(constants.VNC_PASSWORD_FILE)
1809
    for fname in to_copy:
1810
      if not self.ssh.CopyFileToNode(node, fname):
1811
        logger.Error("could not copy file %s to node %s" % (fname, node))
1812

    
1813
    logger.Info("adding node %s to cluster.conf" % node)
1814
    self.cfg.AddNode(new_node)
1815

    
1816

    
1817
class LUMasterFailover(LogicalUnit):
1818
  """Failover the master node to the current node.
1819

1820
  This is a special LU in that it must run on a non-master node.
1821

1822
  """
1823
  HPATH = "master-failover"
1824
  HTYPE = constants.HTYPE_CLUSTER
1825
  REQ_MASTER = False
1826
  _OP_REQP = []
1827

    
1828
  def BuildHooksEnv(self):
1829
    """Build hooks env.
1830

1831
    This will run on the new master only in the pre phase, and on all
1832
    the nodes in the post phase.
1833

1834
    """
1835
    env = {
1836
      "OP_TARGET": self.new_master,
1837
      "NEW_MASTER": self.new_master,
1838
      "OLD_MASTER": self.old_master,
1839
      }
1840
    return env, [self.new_master], self.cfg.GetNodeList()
1841

    
1842
  def CheckPrereq(self):
1843
    """Check prerequisites.
1844

1845
    This checks that we are not already the master.
1846

1847
    """
1848
    self.new_master = utils.HostInfo().name
1849
    self.old_master = self.sstore.GetMasterNode()
1850

    
1851
    if self.old_master == self.new_master:
1852
      raise errors.OpPrereqError("This commands must be run on the node"
1853
                                 " where you want the new master to be."
1854
                                 " %s is already the master" %
1855
                                 self.old_master)
1856

    
1857
  def Exec(self, feedback_fn):
1858
    """Failover the master node.
1859

1860
    This command, when run on a non-master node, will cause the current
1861
    master to cease being master, and the non-master to become new
1862
    master.
1863

1864
    """
1865
    #TODO: do not rely on gethostname returning the FQDN
1866
    logger.Info("setting master to %s, old master: %s" %
1867
                (self.new_master, self.old_master))
1868

    
1869
    if not rpc.call_node_stop_master(self.old_master):
1870
      logger.Error("could disable the master role on the old master"
1871
                   " %s, please disable manually" % self.old_master)
1872

    
1873
    ss = self.sstore
1874
    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1875
    if not rpc.call_upload_file(self.cfg.GetNodeList(),
1876
                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
1877
      logger.Error("could not distribute the new simple store master file"
1878
                   " to the other nodes, please check.")
1879

    
1880
    if not rpc.call_node_start_master(self.new_master):
1881
      logger.Error("could not start the master role on the new master"
1882
                   " %s, please check" % self.new_master)
1883
      feedback_fn("Error in activating the master IP on the new master,"
1884
                  " please fix manually.")
1885

    
1886

    
1887

    
1888
class LUQueryClusterInfo(NoHooksLU):
1889
  """Query cluster configuration.
1890

1891
  """
1892
  _OP_REQP = []
1893
  REQ_MASTER = False
1894

    
1895
  def CheckPrereq(self):
1896
    """No prerequsites needed for this LU.
1897

1898
    """
1899
    pass
1900

    
1901
  def Exec(self, feedback_fn):
1902
    """Return cluster config.
1903

1904
    """
1905
    result = {
1906
      "name": self.sstore.GetClusterName(),
1907
      "software_version": constants.RELEASE_VERSION,
1908
      "protocol_version": constants.PROTOCOL_VERSION,
1909
      "config_version": constants.CONFIG_VERSION,
1910
      "os_api_version": constants.OS_API_VERSION,
1911
      "export_version": constants.EXPORT_VERSION,
1912
      "master": self.sstore.GetMasterNode(),
1913
      "architecture": (platform.architecture()[0], platform.machine()),
1914
      }
1915

    
1916
    return result
1917

    
1918

    
1919
class LUClusterCopyFile(NoHooksLU):
1920
  """Copy file to cluster.
1921

1922
  """
1923
  _OP_REQP = ["nodes", "filename"]
1924

    
1925
  def CheckPrereq(self):
1926
    """Check prerequisites.
1927

1928
    It should check that the named file exists and that the given list
1929
    of nodes is valid.
1930

1931
    """
1932
    if not os.path.exists(self.op.filename):
1933
      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1934

    
1935
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1936

    
1937
  def Exec(self, feedback_fn):
1938
    """Copy a file from master to some nodes.
1939

1940
    Args:
1941
      opts - class with options as members
1942
      args - list containing a single element, the file name
1943
    Opts used:
1944
      nodes - list containing the name of target nodes; if empty, all nodes
1945

1946
    """
1947
    filename = self.op.filename
1948

    
1949
    myname = utils.HostInfo().name
1950

    
1951
    for node in self.nodes:
1952
      if node == myname:
1953
        continue
1954
      if not self.ssh.CopyFileToNode(node, filename):
1955
        logger.Error("Copy of file %s to node %s failed" % (filename, node))
1956

    
1957

    
1958
class LUDumpClusterConfig(NoHooksLU):
1959
  """Return a text-representation of the cluster-config.
1960

1961
  """
1962
  _OP_REQP = []
1963

    
1964
  def CheckPrereq(self):
1965
    """No prerequisites.
1966

1967
    """
1968
    pass
1969

    
1970
  def Exec(self, feedback_fn):
1971
    """Dump a representation of the cluster config to the standard output.
1972

1973
    """
1974
    return self.cfg.DumpConfig()
1975

    
1976

    
1977
class LURunClusterCommand(NoHooksLU):
1978
  """Run a command on some nodes.
1979

1980
  """
1981
  _OP_REQP = ["command", "nodes"]
1982

    
1983
  def CheckPrereq(self):
1984
    """Check prerequisites.
1985

1986
    It checks that the given list of nodes is valid.
1987

1988
    """
1989
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1990

    
1991
  def Exec(self, feedback_fn):
1992
    """Run a command on some nodes.
1993

1994
    """
1995
    # put the master at the end of the nodes list
1996
    master_node = self.sstore.GetMasterNode()
1997
    if master_node in self.nodes:
1998
      self.nodes.remove(master_node)
1999
      self.nodes.append(master_node)
2000

    
2001
    data = []
2002
    for node in self.nodes:
2003
      result = self.ssh.Run(node, "root", self.op.command)
2004
      data.append((node, result.output, result.exit_code))
2005

    
2006
    return data
2007

    
2008

    
2009
class LUActivateInstanceDisks(NoHooksLU):
2010
  """Bring up an instance's disks.
2011

2012
  """
2013
  _OP_REQP = ["instance_name"]
2014

    
2015
  def CheckPrereq(self):
2016
    """Check prerequisites.
2017

2018
    This checks that the instance is in the cluster.
2019

2020
    """
2021
    instance = self.cfg.GetInstanceInfo(
2022
      self.cfg.ExpandInstanceName(self.op.instance_name))
2023
    if instance is None:
2024
      raise errors.OpPrereqError("Instance '%s' not known" %
2025
                                 self.op.instance_name)
2026
    self.instance = instance
2027

    
2028

    
2029
  def Exec(self, feedback_fn):
2030
    """Activate the disks.
2031

2032
    """
2033
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2034
    if not disks_ok:
2035
      raise errors.OpExecError("Cannot activate block devices")
2036

    
2037
    return disks_info
2038

    
2039

    
2040
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2041
  """Prepare the block devices for an instance.
2042

2043
  This sets up the block devices on all nodes.
2044

2045
  Args:
2046
    instance: a ganeti.objects.Instance object
2047
    ignore_secondaries: if true, errors on secondary nodes won't result
2048
                        in an error return from the function
2049

2050
  Returns:
2051
    false if the operation failed
2052
    list of (host, instance_visible_name, node_visible_name) if the operation
2053
         suceeded with the mapping from node devices to instance devices
2054
  """
2055
  device_info = []
2056
  disks_ok = True
2057
  iname = instance.name
2058
  # With the two passes mechanism we try to reduce the window of
2059
  # opportunity for the race condition of switching DRBD to primary
2060
  # before handshaking occured, but we do not eliminate it
2061

    
2062
  # The proper fix would be to wait (with some limits) until the
2063
  # connection has been made and drbd transitions from WFConnection
2064
  # into any other network-connected state (Connected, SyncTarget,
2065
  # SyncSource, etc.)
2066

    
2067
  # 1st pass, assemble on all nodes in secondary mode
2068
  for inst_disk in instance.disks:
2069
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2070
      cfg.SetDiskID(node_disk, node)
2071
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2072
      if not result:
2073
        logger.Error("could not prepare block device %s on node %s"
2074
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2075
        if not ignore_secondaries:
2076
          disks_ok = False
2077

    
2078
  # FIXME: race condition on drbd migration to primary
2079

    
2080
  # 2nd pass, do only the primary node
2081
  for inst_disk in instance.disks:
2082
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2083
      if node != instance.primary_node:
2084
        continue
2085
      cfg.SetDiskID(node_disk, node)
2086
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2087
      if not result:
2088
        logger.Error("could not prepare block device %s on node %s"
2089
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2090
        disks_ok = False
2091
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2092

    
2093
  # leave the disks configured for the primary node
2094
  # this is a workaround that would be fixed better by
2095
  # improving the logical/physical id handling
2096
  for disk in instance.disks:
2097
    cfg.SetDiskID(disk, instance.primary_node)
2098

    
2099
  return disks_ok, device_info
2100

    
2101

    
2102
def _StartInstanceDisks(cfg, instance, force):
2103
  """Start the disks of an instance.
2104

2105
  """
2106
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2107
                                           ignore_secondaries=force)
2108
  if not disks_ok:
2109
    _ShutdownInstanceDisks(instance, cfg)
2110
    if force is not None and not force:
2111
      logger.Error("If the message above refers to a secondary node,"
2112
                   " you can retry the operation using '--force'.")
2113
    raise errors.OpExecError("Disk consistency error")
2114

    
2115

    
2116
class LUDeactivateInstanceDisks(NoHooksLU):
2117
  """Shutdown an instance's disks.
2118

2119
  """
2120
  _OP_REQP = ["instance_name"]
2121

    
2122
  def CheckPrereq(self):
2123
    """Check prerequisites.
2124

2125
    This checks that the instance is in the cluster.
2126

2127
    """
2128
    instance = self.cfg.GetInstanceInfo(
2129
      self.cfg.ExpandInstanceName(self.op.instance_name))
2130
    if instance is None:
2131
      raise errors.OpPrereqError("Instance '%s' not known" %
2132
                                 self.op.instance_name)
2133
    self.instance = instance
2134

    
2135
  def Exec(self, feedback_fn):
2136
    """Deactivate the disks
2137

2138
    """
2139
    instance = self.instance
2140
    ins_l = rpc.call_instance_list([instance.primary_node])
2141
    ins_l = ins_l[instance.primary_node]
2142
    if not type(ins_l) is list:
2143
      raise errors.OpExecError("Can't contact node '%s'" %
2144
                               instance.primary_node)
2145

    
2146
    if self.instance.name in ins_l:
2147
      raise errors.OpExecError("Instance is running, can't shutdown"
2148
                               " block devices.")
2149

    
2150
    _ShutdownInstanceDisks(instance, self.cfg)
2151

    
2152

    
2153
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2154
  """Shutdown block devices of an instance.
2155

2156
  This does the shutdown on all nodes of the instance.
2157

2158
  If the ignore_primary is false, errors on the primary node are
2159
  ignored.
2160

2161
  """
2162
  result = True
2163
  for disk in instance.disks:
2164
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2165
      cfg.SetDiskID(top_disk, node)
2166
      if not rpc.call_blockdev_shutdown(node, top_disk):
2167
        logger.Error("could not shutdown block device %s on node %s" %
2168
                     (disk.iv_name, node))
2169
        if not ignore_primary or node != instance.primary_node:
2170
          result = False
2171
  return result
2172

    
2173

    
2174
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2175
  """Checks if a node has enough free memory.
2176

2177
  This function check if a given node has the needed amount of free
2178
  memory. In case the node has less memory or we cannot get the
2179
  information from the node, this function raise an OpPrereqError
2180
  exception.
2181

2182
  Args:
2183
    - cfg: a ConfigWriter instance
2184
    - node: the node name
2185
    - reason: string to use in the error message
2186
    - requested: the amount of memory in MiB
2187

2188
  """
2189
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2190
  if not nodeinfo or not isinstance(nodeinfo, dict):
2191
    raise errors.OpPrereqError("Could not contact node %s for resource"
2192
                             " information" % (node,))
2193

    
2194
  free_mem = nodeinfo[node].get('memory_free')
2195
  if not isinstance(free_mem, int):
2196
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2197
                             " was '%s'" % (node, free_mem))
2198
  if requested > free_mem:
2199
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2200
                             " needed %s MiB, available %s MiB" %
2201
                             (node, reason, requested, free_mem))
2202

    
2203

    
2204
class LUStartupInstance(LogicalUnit):
2205
  """Starts an instance.
2206

2207
  """
2208
  HPATH = "instance-start"
2209
  HTYPE = constants.HTYPE_INSTANCE
2210
  _OP_REQP = ["instance_name", "force"]
2211

    
2212
  def BuildHooksEnv(self):
2213
    """Build hooks env.
2214

2215
    This runs on master, primary and secondary nodes of the instance.
2216

2217
    """
2218
    env = {
2219
      "FORCE": self.op.force,
2220
      }
2221
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2222
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223
          list(self.instance.secondary_nodes))
2224
    return env, nl, nl
2225

    
2226
  def CheckPrereq(self):
2227
    """Check prerequisites.
2228

2229
    This checks that the instance is in the cluster.
2230

2231
    """
2232
    instance = self.cfg.GetInstanceInfo(
2233
      self.cfg.ExpandInstanceName(self.op.instance_name))
2234
    if instance is None:
2235
      raise errors.OpPrereqError("Instance '%s' not known" %
2236
                                 self.op.instance_name)
2237

    
2238
    # check bridges existance
2239
    _CheckInstanceBridgesExist(instance)
2240

    
2241
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2242
                         "starting instance %s" % instance.name,
2243
                         instance.memory)
2244

    
2245
    self.instance = instance
2246
    self.op.instance_name = instance.name
2247

    
2248
  def Exec(self, feedback_fn):
2249
    """Start the instance.
2250

2251
    """
2252
    instance = self.instance
2253
    force = self.op.force
2254
    extra_args = getattr(self.op, "extra_args", "")
2255

    
2256
    self.cfg.MarkInstanceUp(instance.name)
2257

    
2258
    node_current = instance.primary_node
2259

    
2260
    _StartInstanceDisks(self.cfg, instance, force)
2261

    
2262
    if not rpc.call_instance_start(node_current, instance, extra_args):
2263
      _ShutdownInstanceDisks(instance, self.cfg)
2264
      raise errors.OpExecError("Could not start instance")
2265

    
2266

    
2267
class LURebootInstance(LogicalUnit):
2268
  """Reboot an instance.
2269

2270
  """
2271
  HPATH = "instance-reboot"
2272
  HTYPE = constants.HTYPE_INSTANCE
2273
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2274

    
2275
  def BuildHooksEnv(self):
2276
    """Build hooks env.
2277

2278
    This runs on master, primary and secondary nodes of the instance.
2279

2280
    """
2281
    env = {
2282
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2283
      }
2284
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2285
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2286
          list(self.instance.secondary_nodes))
2287
    return env, nl, nl
2288

    
2289
  def CheckPrereq(self):
2290
    """Check prerequisites.
2291

2292
    This checks that the instance is in the cluster.
2293

2294
    """
2295
    instance = self.cfg.GetInstanceInfo(
2296
      self.cfg.ExpandInstanceName(self.op.instance_name))
2297
    if instance is None:
2298
      raise errors.OpPrereqError("Instance '%s' not known" %
2299
                                 self.op.instance_name)
2300

    
2301
    # check bridges existance
2302
    _CheckInstanceBridgesExist(instance)
2303

    
2304
    self.instance = instance
2305
    self.op.instance_name = instance.name
2306

    
2307
  def Exec(self, feedback_fn):
2308
    """Reboot the instance.
2309

2310
    """
2311
    instance = self.instance
2312
    ignore_secondaries = self.op.ignore_secondaries
2313
    reboot_type = self.op.reboot_type
2314
    extra_args = getattr(self.op, "extra_args", "")
2315

    
2316
    node_current = instance.primary_node
2317

    
2318
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2319
                           constants.INSTANCE_REBOOT_HARD,
2320
                           constants.INSTANCE_REBOOT_FULL]:
2321
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2322
                                  (constants.INSTANCE_REBOOT_SOFT,
2323
                                   constants.INSTANCE_REBOOT_HARD,
2324
                                   constants.INSTANCE_REBOOT_FULL))
2325

    
2326
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2327
                       constants.INSTANCE_REBOOT_HARD]:
2328
      if not rpc.call_instance_reboot(node_current, instance,
2329
                                      reboot_type, extra_args):
2330
        raise errors.OpExecError("Could not reboot instance")
2331
    else:
2332
      if not rpc.call_instance_shutdown(node_current, instance):
2333
        raise errors.OpExecError("could not shutdown instance for full reboot")
2334
      _ShutdownInstanceDisks(instance, self.cfg)
2335
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2336
      if not rpc.call_instance_start(node_current, instance, extra_args):
2337
        _ShutdownInstanceDisks(instance, self.cfg)
2338
        raise errors.OpExecError("Could not start instance for full reboot")
2339

    
2340
    self.cfg.MarkInstanceUp(instance.name)
2341

    
2342

    
2343
class LUShutdownInstance(LogicalUnit):
2344
  """Shutdown an instance.
2345

2346
  """
2347
  HPATH = "instance-stop"
2348
  HTYPE = constants.HTYPE_INSTANCE
2349
  _OP_REQP = ["instance_name"]
2350

    
2351
  def BuildHooksEnv(self):
2352
    """Build hooks env.
2353

2354
    This runs on master, primary and secondary nodes of the instance.
2355

2356
    """
2357
    env = _BuildInstanceHookEnvByObject(self.instance)
2358
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2359
          list(self.instance.secondary_nodes))
2360
    return env, nl, nl
2361

    
2362
  def CheckPrereq(self):
2363
    """Check prerequisites.
2364

2365
    This checks that the instance is in the cluster.
2366

2367
    """
2368
    instance = self.cfg.GetInstanceInfo(
2369
      self.cfg.ExpandInstanceName(self.op.instance_name))
2370
    if instance is None:
2371
      raise errors.OpPrereqError("Instance '%s' not known" %
2372
                                 self.op.instance_name)
2373
    self.instance = instance
2374

    
2375
  def Exec(self, feedback_fn):
2376
    """Shutdown the instance.
2377

2378
    """
2379
    instance = self.instance
2380
    node_current = instance.primary_node
2381
    self.cfg.MarkInstanceDown(instance.name)
2382
    if not rpc.call_instance_shutdown(node_current, instance):
2383
      logger.Error("could not shutdown instance")
2384

    
2385
    _ShutdownInstanceDisks(instance, self.cfg)
2386

    
2387

    
2388
class LUReinstallInstance(LogicalUnit):
2389
  """Reinstall an instance.
2390

2391
  """
2392
  HPATH = "instance-reinstall"
2393
  HTYPE = constants.HTYPE_INSTANCE
2394
  _OP_REQP = ["instance_name"]
2395

    
2396
  def BuildHooksEnv(self):
2397
    """Build hooks env.
2398

2399
    This runs on master, primary and secondary nodes of the instance.
2400

2401
    """
2402
    env = _BuildInstanceHookEnvByObject(self.instance)
2403
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2404
          list(self.instance.secondary_nodes))
2405
    return env, nl, nl
2406

    
2407
  def CheckPrereq(self):
2408
    """Check prerequisites.
2409

2410
    This checks that the instance is in the cluster and is not running.
2411

2412
    """
2413
    instance = self.cfg.GetInstanceInfo(
2414
      self.cfg.ExpandInstanceName(self.op.instance_name))
2415
    if instance is None:
2416
      raise errors.OpPrereqError("Instance '%s' not known" %
2417
                                 self.op.instance_name)
2418
    if instance.disk_template == constants.DT_DISKLESS:
2419
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2420
                                 self.op.instance_name)
2421
    if instance.status != "down":
2422
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2423
                                 self.op.instance_name)
2424
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2425
    if remote_info:
2426
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2427
                                 (self.op.instance_name,
2428
                                  instance.primary_node))
2429

    
2430
    self.op.os_type = getattr(self.op, "os_type", None)
2431
    if self.op.os_type is not None:
2432
      # OS verification
2433
      pnode = self.cfg.GetNodeInfo(
2434
        self.cfg.ExpandNodeName(instance.primary_node))
2435
      if pnode is None:
2436
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2437
                                   self.op.pnode)
2438
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2439
      if not os_obj:
2440
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2441
                                   " primary node"  % self.op.os_type)
2442

    
2443
    self.instance = instance
2444

    
2445
  def Exec(self, feedback_fn):
2446
    """Reinstall the instance.
2447

2448
    """
2449
    inst = self.instance
2450

    
2451
    if self.op.os_type is not None:
2452
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2453
      inst.os = self.op.os_type
2454
      self.cfg.AddInstance(inst)
2455

    
2456
    _StartInstanceDisks(self.cfg, inst, None)
2457
    try:
2458
      feedback_fn("Running the instance OS create scripts...")
2459
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2460
        raise errors.OpExecError("Could not install OS for instance %s"
2461
                                 " on node %s" %
2462
                                 (inst.name, inst.primary_node))
2463
    finally:
2464
      _ShutdownInstanceDisks(inst, self.cfg)
2465

    
2466

    
2467
class LURenameInstance(LogicalUnit):
2468
  """Rename an instance.
2469

2470
  """
2471
  HPATH = "instance-rename"
2472
  HTYPE = constants.HTYPE_INSTANCE
2473
  _OP_REQP = ["instance_name", "new_name"]
2474

    
2475
  def BuildHooksEnv(self):
2476
    """Build hooks env.
2477

2478
    This runs on master, primary and secondary nodes of the instance.
2479

2480
    """
2481
    env = _BuildInstanceHookEnvByObject(self.instance)
2482
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2483
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2484
          list(self.instance.secondary_nodes))
2485
    return env, nl, nl
2486

    
2487
  def CheckPrereq(self):
2488
    """Check prerequisites.
2489

2490
    This checks that the instance is in the cluster and is not running.
2491

2492
    """
2493
    instance = self.cfg.GetInstanceInfo(
2494
      self.cfg.ExpandInstanceName(self.op.instance_name))
2495
    if instance is None:
2496
      raise errors.OpPrereqError("Instance '%s' not known" %
2497
                                 self.op.instance_name)
2498
    if instance.status != "down":
2499
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2500
                                 self.op.instance_name)
2501
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2502
    if remote_info:
2503
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2504
                                 (self.op.instance_name,
2505
                                  instance.primary_node))
2506
    self.instance = instance
2507

    
2508
    # new name verification
2509
    name_info = utils.HostInfo(self.op.new_name)
2510

    
2511
    self.op.new_name = new_name = name_info.name
2512
    instance_list = self.cfg.GetInstanceList()
2513
    if new_name in instance_list:
2514
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2515
                                 new_name)
2516

    
2517
    if not getattr(self.op, "ignore_ip", False):
2518
      command = ["fping", "-q", name_info.ip]
2519
      result = utils.RunCmd(command)
2520
      if not result.failed:
2521
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2522
                                   (name_info.ip, new_name))
2523

    
2524

    
2525
  def Exec(self, feedback_fn):
2526
    """Reinstall the instance.
2527

2528
    """
2529
    inst = self.instance
2530
    old_name = inst.name
2531

    
2532
    if inst.disk_template == constants.DT_FILE:
2533
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2534

    
2535
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2536

    
2537
    # re-read the instance from the configuration after rename
2538
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2539

    
2540
    if inst.disk_template == constants.DT_FILE:
2541
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2542
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2543
                                                old_file_storage_dir,
2544
                                                new_file_storage_dir)
2545

    
2546
      if not result:
2547
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2548
                                 " directory '%s' to '%s' (but the instance"
2549
                                 " has been renamed in Ganeti)" % (
2550
                                 inst.primary_node, old_file_storage_dir,
2551
                                 new_file_storage_dir))
2552

    
2553
      if not result[0]:
2554
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2555
                                 " (but the instance has been renamed in"
2556
                                 " Ganeti)" % (old_file_storage_dir,
2557
                                               new_file_storage_dir))
2558

    
2559
    _StartInstanceDisks(self.cfg, inst, None)
2560
    try:
2561
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2562
                                          "sda", "sdb"):
2563
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2564
               " instance has been renamed in Ganeti)" %
2565
               (inst.name, inst.primary_node))
2566
        logger.Error(msg)
2567
    finally:
2568
      _ShutdownInstanceDisks(inst, self.cfg)
2569

    
2570

    
2571
class LURemoveInstance(LogicalUnit):
2572
  """Remove an instance.
2573

2574
  """
2575
  HPATH = "instance-remove"
2576
  HTYPE = constants.HTYPE_INSTANCE
2577
  _OP_REQP = ["instance_name"]
2578

    
2579
  def BuildHooksEnv(self):
2580
    """Build hooks env.
2581

2582
    This runs on master, primary and secondary nodes of the instance.
2583

2584
    """
2585
    env = _BuildInstanceHookEnvByObject(self.instance)
2586
    nl = [self.sstore.GetMasterNode()]
2587
    return env, nl, nl
2588

    
2589
  def CheckPrereq(self):
2590
    """Check prerequisites.
2591

2592
    This checks that the instance is in the cluster.
2593

2594
    """
2595
    instance = self.cfg.GetInstanceInfo(
2596
      self.cfg.ExpandInstanceName(self.op.instance_name))
2597
    if instance is None:
2598
      raise errors.OpPrereqError("Instance '%s' not known" %
2599
                                 self.op.instance_name)
2600
    self.instance = instance
2601

    
2602
  def Exec(self, feedback_fn):
2603
    """Remove the instance.
2604

2605
    """
2606
    instance = self.instance
2607
    logger.Info("shutting down instance %s on node %s" %
2608
                (instance.name, instance.primary_node))
2609

    
2610
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2611
      if self.op.ignore_failures:
2612
        feedback_fn("Warning: can't shutdown instance")
2613
      else:
2614
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2615
                                 (instance.name, instance.primary_node))
2616

    
2617
    logger.Info("removing block devices for instance %s" % instance.name)
2618

    
2619
    if not _RemoveDisks(instance, self.cfg):
2620
      if self.op.ignore_failures:
2621
        feedback_fn("Warning: can't remove instance's disks")
2622
      else:
2623
        raise errors.OpExecError("Can't remove instance's disks")
2624

    
2625
    logger.Info("removing instance %s out of cluster config" % instance.name)
2626

    
2627
    self.cfg.RemoveInstance(instance.name)
2628

    
2629

    
2630
class LUQueryInstances(NoHooksLU):
2631
  """Logical unit for querying instances.
2632

2633
  """
2634
  _OP_REQP = ["output_fields", "names"]
2635

    
2636
  def CheckPrereq(self):
2637
    """Check prerequisites.
2638

2639
    This checks that the fields required are valid output fields.
2640

2641
    """
2642
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2643
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2644
                               "admin_state", "admin_ram",
2645
                               "disk_template", "ip", "mac", "bridge",
2646
                               "sda_size", "sdb_size", "vcpus"],
2647
                       dynamic=self.dynamic_fields,
2648
                       selected=self.op.output_fields)
2649

    
2650
    self.wanted = _GetWantedInstances(self, self.op.names)
2651

    
2652
  def Exec(self, feedback_fn):
2653
    """Computes the list of nodes and their attributes.
2654

2655
    """
2656
    instance_names = self.wanted
2657
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2658
                     in instance_names]
2659

    
2660
    # begin data gathering
2661

    
2662
    nodes = frozenset([inst.primary_node for inst in instance_list])
2663

    
2664
    bad_nodes = []
2665
    if self.dynamic_fields.intersection(self.op.output_fields):
2666
      live_data = {}
2667
      node_data = rpc.call_all_instances_info(nodes)
2668
      for name in nodes:
2669
        result = node_data[name]
2670
        if result:
2671
          live_data.update(result)
2672
        elif result == False:
2673
          bad_nodes.append(name)
2674
        # else no instance is alive
2675
    else:
2676
      live_data = dict([(name, {}) for name in instance_names])
2677

    
2678
    # end data gathering
2679

    
2680
    output = []
2681
    for instance in instance_list:
2682
      iout = []
2683
      for field in self.op.output_fields:
2684
        if field == "name":
2685
          val = instance.name
2686
        elif field == "os":
2687
          val = instance.os
2688
        elif field == "pnode":
2689
          val = instance.primary_node
2690
        elif field == "snodes":
2691
          val = list(instance.secondary_nodes)
2692
        elif field == "admin_state":
2693
          val = (instance.status != "down")
2694
        elif field == "oper_state":
2695
          if instance.primary_node in bad_nodes:
2696
            val = None
2697
          else:
2698
            val = bool(live_data.get(instance.name))
2699
        elif field == "status":
2700
          if instance.primary_node in bad_nodes:
2701
            val = "ERROR_nodedown"
2702
          else:
2703
            running = bool(live_data.get(instance.name))
2704
            if running:
2705
              if instance.status != "down":
2706
                val = "running"
2707
              else:
2708
                val = "ERROR_up"
2709
            else:
2710
              if instance.status != "down":
2711
                val = "ERROR_down"
2712
              else:
2713
                val = "ADMIN_down"
2714
        elif field == "admin_ram":
2715
          val = instance.memory
2716
        elif field == "oper_ram":
2717
          if instance.primary_node in bad_nodes:
2718
            val = None
2719
          elif instance.name in live_data:
2720
            val = live_data[instance.name].get("memory", "?")
2721
          else:
2722
            val = "-"
2723
        elif field == "disk_template":
2724
          val = instance.disk_template
2725
        elif field == "ip":
2726
          val = instance.nics[0].ip
2727
        elif field == "bridge":
2728
          val = instance.nics[0].bridge
2729
        elif field == "mac":
2730
          val = instance.nics[0].mac
2731
        elif field == "sda_size" or field == "sdb_size":
2732
          disk = instance.FindDisk(field[:3])
2733
          if disk is None:
2734
            val = None
2735
          else:
2736
            val = disk.size
2737
        elif field == "vcpus":
2738
          val = instance.vcpus
2739
        else:
2740
          raise errors.ParameterError(field)
2741
        iout.append(val)
2742
      output.append(iout)
2743

    
2744
    return output
2745

    
2746

    
2747
class LUFailoverInstance(LogicalUnit):
2748
  """Failover an instance.
2749

2750
  """
2751
  HPATH = "instance-failover"
2752
  HTYPE = constants.HTYPE_INSTANCE
2753
  _OP_REQP = ["instance_name", "ignore_consistency"]
2754

    
2755
  def BuildHooksEnv(self):
2756
    """Build hooks env.
2757

2758
    This runs on master, primary and secondary nodes of the instance.
2759

2760
    """
2761
    env = {
2762
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2763
      }
2764
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2765
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2766
    return env, nl, nl
2767

    
2768
  def CheckPrereq(self):
2769
    """Check prerequisites.
2770

2771
    This checks that the instance is in the cluster.
2772

2773
    """
2774
    instance = self.cfg.GetInstanceInfo(
2775
      self.cfg.ExpandInstanceName(self.op.instance_name))
2776
    if instance is None:
2777
      raise errors.OpPrereqError("Instance '%s' not known" %
2778
                                 self.op.instance_name)
2779

    
2780
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2781
      raise errors.OpPrereqError("Instance's disk layout is not"
2782
                                 " network mirrored, cannot failover.")
2783

    
2784
    secondary_nodes = instance.secondary_nodes
2785
    if not secondary_nodes:
2786
      raise errors.ProgrammerError("no secondary node but using "
2787
                                   "DT_REMOTE_RAID1 template")
2788

    
2789
    target_node = secondary_nodes[0]
2790
    # check memory requirements on the secondary node
2791
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2792
                         instance.name, instance.memory)
2793

    
2794
    # check bridge existance
2795
    brlist = [nic.bridge for nic in instance.nics]
2796
    if not rpc.call_bridges_exist(target_node, brlist):
2797
      raise errors.OpPrereqError("One or more target bridges %s does not"
2798
                                 " exist on destination node '%s'" %
2799
                                 (brlist, target_node))
2800

    
2801
    self.instance = instance
2802

    
2803
  def Exec(self, feedback_fn):
2804
    """Failover an instance.
2805

2806
    The failover is done by shutting it down on its present node and
2807
    starting it on the secondary.
2808

2809
    """
2810
    instance = self.instance
2811

    
2812
    source_node = instance.primary_node
2813
    target_node = instance.secondary_nodes[0]
2814

    
2815
    feedback_fn("* checking disk consistency between source and target")
2816
    for dev in instance.disks:
2817
      # for remote_raid1, these are md over drbd
2818
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2819
        if instance.status == "up" and not self.op.ignore_consistency:
2820
          raise errors.OpExecError("Disk %s is degraded on target node,"
2821
                                   " aborting failover." % dev.iv_name)
2822

    
2823
    feedback_fn("* shutting down instance on source node")
2824
    logger.Info("Shutting down instance %s on node %s" %
2825
                (instance.name, source_node))
2826

    
2827
    if not rpc.call_instance_shutdown(source_node, instance):
2828
      if self.op.ignore_consistency:
2829
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2830
                     " anyway. Please make sure node %s is down"  %
2831
                     (instance.name, source_node, source_node))
2832
      else:
2833
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2834
                                 (instance.name, source_node))
2835

    
2836
    feedback_fn("* deactivating the instance's disks on source node")
2837
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2838
      raise errors.OpExecError("Can't shut down the instance's disks.")
2839

    
2840
    instance.primary_node = target_node
2841
    # distribute new instance config to the other nodes
2842
    self.cfg.AddInstance(instance)
2843

    
2844
    # Only start the instance if it's marked as up
2845
    if instance.status == "up":
2846
      feedback_fn("* activating the instance's disks on target node")
2847
      logger.Info("Starting instance %s on node %s" %
2848
                  (instance.name, target_node))
2849

    
2850
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2851
                                               ignore_secondaries=True)
2852
      if not disks_ok:
2853
        _ShutdownInstanceDisks(instance, self.cfg)
2854
        raise errors.OpExecError("Can't activate the instance's disks")
2855

    
2856
      feedback_fn("* starting the instance on the target node")
2857
      if not rpc.call_instance_start(target_node, instance, None):
2858
        _ShutdownInstanceDisks(instance, self.cfg)
2859
        raise errors.OpExecError("Could not start instance %s on node %s." %
2860
                                 (instance.name, target_node))
2861

    
2862

    
2863
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2864
  """Create a tree of block devices on the primary node.
2865

2866
  This always creates all devices.
2867

2868
  """
2869
  if device.children:
2870
    for child in device.children:
2871
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2872
        return False
2873

    
2874
  cfg.SetDiskID(device, node)
2875
  new_id = rpc.call_blockdev_create(node, device, device.size,
2876
                                    instance.name, True, info)
2877
  if not new_id:
2878
    return False
2879
  if device.physical_id is None:
2880
    device.physical_id = new_id
2881
  return True
2882

    
2883

    
2884
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2885
  """Create a tree of block devices on a secondary node.
2886

2887
  If this device type has to be created on secondaries, create it and
2888
  all its children.
2889

2890
  If not, just recurse to children keeping the same 'force' value.
2891

2892
  """
2893
  if device.CreateOnSecondary():
2894
    force = True
2895
  if device.children:
2896
    for child in device.children:
2897
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2898
                                        child, force, info):
2899
        return False
2900

    
2901
  if not force:
2902
    return True
2903
  cfg.SetDiskID(device, node)
2904
  new_id = rpc.call_blockdev_create(node, device, device.size,
2905
                                    instance.name, False, info)
2906
  if not new_id:
2907
    return False
2908
  if device.physical_id is None:
2909
    device.physical_id = new_id
2910
  return True
2911

    
2912

    
2913
def _GenerateUniqueNames(cfg, exts):
2914
  """Generate a suitable LV name.
2915

2916
  This will generate a logical volume name for the given instance.
2917

2918
  """
2919
  results = []
2920
  for val in exts:
2921
    new_id = cfg.GenerateUniqueID()
2922
    results.append("%s%s" % (new_id, val))
2923
  return results
2924

    
2925

    
2926
def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2927
  """Generate a drbd device complete with its children.
2928

2929
  """
2930
  port = cfg.AllocatePort()
2931
  vgname = cfg.GetVGName()
2932
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2933
                          logical_id=(vgname, names[0]))
2934
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2935
                          logical_id=(vgname, names[1]))
2936
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2937
                          logical_id = (primary, secondary, port),
2938
                          children = [dev_data, dev_meta])
2939
  return drbd_dev
2940

    
2941

    
2942
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2943
  """Generate a drbd8 device complete with its children.
2944

2945
  """
2946
  port = cfg.AllocatePort()
2947
  vgname = cfg.GetVGName()
2948
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2949
                          logical_id=(vgname, names[0]))
2950
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2951
                          logical_id=(vgname, names[1]))
2952
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2953
                          logical_id = (primary, secondary, port),
2954
                          children = [dev_data, dev_meta],
2955
                          iv_name=iv_name)
2956
  return drbd_dev
2957

    
2958

    
2959
def _GenerateDiskTemplate(cfg, template_name,
2960
                          instance_name, primary_node,
2961
                          secondary_nodes, disk_sz, swap_sz,
2962
                          file_storage_dir, file_driver):
2963
  """Generate the entire disk layout for a given template type.
2964

2965
  """
2966
  #TODO: compute space requirements
2967

    
2968
  vgname = cfg.GetVGName()
2969
  if template_name == constants.DT_DISKLESS:
2970
    disks = []
2971
  elif template_name == constants.DT_PLAIN:
2972
    if len(secondary_nodes) != 0:
2973
      raise errors.ProgrammerError("Wrong template configuration")
2974

    
2975
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2976
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2977
                           logical_id=(vgname, names[0]),
2978
                           iv_name = "sda")
2979
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2980
                           logical_id=(vgname, names[1]),
2981
                           iv_name = "sdb")
2982
    disks = [sda_dev, sdb_dev]
2983
  elif template_name == constants.DT_DRBD8:
2984
    if len(secondary_nodes) != 1:
2985
      raise errors.ProgrammerError("Wrong template configuration")
2986
    remote_node = secondary_nodes[0]
2987
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2988
                                       ".sdb_data", ".sdb_meta"])
2989
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2990
                                         disk_sz, names[0:2], "sda")
2991
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2992
                                         swap_sz, names[2:4], "sdb")
2993
    disks = [drbd_sda_dev, drbd_sdb_dev]
2994
  elif template_name == constants.DT_FILE:
2995
    if len(secondary_nodes) != 0:
2996
      raise errors.ProgrammerError("Wrong template configuration")
2997

    
2998
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2999
                                iv_name="sda", logical_id=(file_driver,
3000
                                "%s/sda" % file_storage_dir))
3001
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3002
                                iv_name="sdb", logical_id=(file_driver,
3003
                                "%s/sdb" % file_storage_dir))
3004
    disks = [file_sda_dev, file_sdb_dev]
3005
  else:
3006
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3007
  return disks
3008

    
3009

    
3010
def _GetInstanceInfoText(instance):
3011
  """Compute that text that should be added to the disk's metadata.
3012

3013
  """
3014
  return "originstname+%s" % instance.name
3015

    
3016

    
3017
def _CreateDisks(cfg, instance):
3018
  """Create all disks for an instance.
3019

3020
  This abstracts away some work from AddInstance.
3021

3022
  Args:
3023
    instance: the instance object
3024

3025
  Returns:
3026
    True or False showing the success of the creation process
3027

3028
  """
3029
  info = _GetInstanceInfoText(instance)
3030

    
3031
  if instance.disk_template == constants.DT_FILE:
3032
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3033
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3034
                                              file_storage_dir)
3035

    
3036
    if not result:
3037
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3038
      return False
3039

    
3040
    if not result[0]:
3041
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3042
      return False
3043

    
3044
  for device in instance.disks:
3045
    logger.Info("creating volume %s for instance %s" %
3046
                (device.iv_name, instance.name))
3047
    #HARDCODE
3048
    for secondary_node in instance.secondary_nodes:
3049
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3050
                                        device, False, info):
3051
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3052
                     (device.iv_name, device, secondary_node))
3053
        return False
3054
    #HARDCODE
3055
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3056
                                    instance, device, info):
3057
      logger.Error("failed to create volume %s on primary!" %
3058
                   device.iv_name)
3059
      return False
3060

    
3061
  return True
3062

    
3063

    
3064
def _RemoveDisks(instance, cfg):
3065
  """Remove all disks for an instance.
3066

3067
  This abstracts away some work from `AddInstance()` and
3068
  `RemoveInstance()`. Note that in case some of the devices couldn't
3069
  be removed, the removal will continue with the other ones (compare
3070
  with `_CreateDisks()`).
3071

3072
  Args:
3073
    instance: the instance object
3074

3075
  Returns:
3076
    True or False showing the success of the removal proces
3077

3078
  """
3079
  logger.Info("removing block devices for instance %s" % instance.name)
3080

    
3081
  result = True
3082
  for device in instance.disks:
3083
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3084
      cfg.SetDiskID(disk, node)
3085
      if not rpc.call_blockdev_remove(node, disk):
3086
        logger.Error("could not remove block device %s on node %s,"
3087
                     " continuing anyway" %
3088
                     (device.iv_name, node))
3089
        result = False
3090

    
3091
  if instance.disk_template == constants.DT_FILE:
3092
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3093
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3094
                                            file_storage_dir):
3095
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3096
      result = False
3097

    
3098
  return result
3099

    
3100

    
3101
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3102
  """Compute disk size requirements in the volume group
3103

3104
  This is currently hard-coded for the two-drive layout.
3105

3106
  """
3107
  # Required free disk space as a function of disk and swap space
3108
  req_size_dict = {
3109
    constants.DT_DISKLESS: None,
3110
    constants.DT_PLAIN: disk_size + swap_size,
3111
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3112
    constants.DT_DRBD8: disk_size + swap_size + 256,
3113
    constants.DT_FILE: None,
3114
  }
3115

    
3116
  if disk_template not in req_size_dict:
3117
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3118
                                 " is unknown" %  disk_template)
3119

    
3120
  return req_size_dict[disk_template]
3121

    
3122

    
3123
class LUCreateInstance(LogicalUnit):
3124
  """Create an instance.
3125

3126
  """
3127
  HPATH = "instance-add"
3128
  HTYPE = constants.HTYPE_INSTANCE
3129
  _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
3130
              "disk_template", "swap_size", "mode", "start", "vcpus",
3131
              "wait_for_sync", "ip_check", "mac"]
3132

    
3133
  def BuildHooksEnv(self):
3134
    """Build hooks env.
3135

3136
    This runs on master, primary and secondary nodes of the instance.
3137

3138
    """
3139
    env = {
3140
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3141
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3142
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3143
      "INSTANCE_ADD_MODE": self.op.mode,
3144
      }
3145
    if self.op.mode == constants.INSTANCE_IMPORT:
3146
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3147
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3148
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3149

    
3150
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3151
      primary_node=self.op.pnode,
3152
      secondary_nodes=self.secondaries,
3153
      status=self.instance_status,
3154
      os_type=self.op.os_type,
3155
      memory=self.op.mem_size,
3156
      vcpus=self.op.vcpus,
3157
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3158
    ))
3159

    
3160
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3161
          self.secondaries)
3162
    return env, nl, nl
3163

    
3164

    
3165
  def CheckPrereq(self):
3166
    """Check prerequisites.
3167

3168
    """
3169
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
3170
      if not hasattr(self.op, attr):
3171
        setattr(self.op, attr, None)
3172

    
3173
    if self.op.mode not in (constants.INSTANCE_CREATE,
3174
                            constants.INSTANCE_IMPORT):
3175
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3176
                                 self.op.mode)
3177

    
3178
    if (not self.cfg.GetVGName() and
3179
        self.op.disk_template not in constants.DTS_NOT_LVM):
3180
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3181
                                 " instances")
3182

    
3183
    if self.op.mode == constants.INSTANCE_IMPORT:
3184
      src_node = getattr(self.op, "src_node", None)
3185
      src_path = getattr(self.op, "src_path", None)
3186
      if src_node is None or src_path is None:
3187
        raise errors.OpPrereqError("Importing an instance requires source"
3188
                                   " node and path options")
3189
      src_node_full = self.cfg.ExpandNodeName(src_node)
3190
      if src_node_full is None:
3191
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3192
      self.op.src_node = src_node = src_node_full
3193

    
3194
      if not os.path.isabs(src_path):
3195
        raise errors.OpPrereqError("The source path must be absolute")
3196

    
3197
      export_info = rpc.call_export_info(src_node, src_path)
3198

    
3199
      if not export_info:
3200
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3201

    
3202
      if not export_info.has_section(constants.INISECT_EXP):
3203
        raise errors.ProgrammerError("Corrupted export config")
3204

    
3205
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3206
      if (int(ei_version) != constants.EXPORT_VERSION):
3207
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3208
                                   (ei_version, constants.EXPORT_VERSION))
3209

    
3210
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3211
        raise errors.OpPrereqError("Can't import instance with more than"
3212
                                   " one data disk")
3213

    
3214
      # FIXME: are the old os-es, disk sizes, etc. useful?
3215
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3216
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3217
                                                         'disk0_dump'))
3218
      self.src_image = diskimage
3219
    else: # INSTANCE_CREATE
3220
      if getattr(self.op, "os_type", None) is None:
3221
        raise errors.OpPrereqError("No guest OS specified")
3222

    
3223
    #### instance parameters check
3224

    
3225
    # disk template and mirror node verification
3226
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3227
      raise errors.OpPrereqError("Invalid disk template name")
3228

    
3229
    # instance name verification
3230
    hostname1 = utils.HostInfo(self.op.instance_name)
3231

    
3232
    self.op.instance_name = instance_name = hostname1.name
3233
    instance_list = self.cfg.GetInstanceList()
3234
    if instance_name in instance_list:
3235
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3236
                                 instance_name)
3237

    
3238
    # ip validity checks
3239
    ip = getattr(self.op, "ip", None)
3240
    if ip is None or ip.lower() == "none":
3241
      inst_ip = None
3242
    elif ip.lower() == "auto":
3243
      inst_ip = hostname1.ip
3244
    else:
3245
      if not utils.IsValidIP(ip):
3246
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3247
                                   " like a valid IP" % ip)
3248
      inst_ip = ip
3249
    self.inst_ip = self.op.ip = inst_ip
3250

    
3251
    if self.op.start and not self.op.ip_check:
3252
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3253
                                 " adding an instance in start mode")
3254

    
3255
    if self.op.ip_check:
3256
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3257
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3258
                                   (hostname1.ip, instance_name))
3259

    
3260
    # MAC address verification
3261
    if self.op.mac != "auto":
3262
      if not utils.IsValidMac(self.op.mac.lower()):
3263
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3264
                                   self.op.mac)
3265

    
3266
    # bridge verification
3267
    bridge = getattr(self.op, "bridge", None)
3268
    if bridge is None:
3269
      self.op.bridge = self.cfg.GetDefBridge()
3270
    else:
3271
      self.op.bridge = bridge
3272

    
3273
    # boot order verification
3274
    if self.op.hvm_boot_order is not None:
3275
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3276
        raise errors.OpPrereqError("invalid boot order specified,"
3277
                                   " must be one or more of [acdn]")
3278
    # file storage checks
3279
    if (self.op.file_driver and
3280
        not self.op.file_driver in constants.FILE_DRIVER):
3281
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3282
                                 self.op.file_driver)
3283

    
3284
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3285
        raise errors.OpPrereqError("File storage directory not a relative"
3286
                                   " path")
3287

    
3288
    #### node related checks
3289

    
3290
    # check primary node
3291
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3292
    if pnode is None:
3293
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3294
                                 self.op.pnode)
3295
    self.op.pnode = pnode.name
3296
    self.pnode = pnode
3297
    self.secondaries = []
3298

    
3299
    # mirror node verification
3300
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3301
      if getattr(self.op, "snode", None) is None:
3302
        raise errors.OpPrereqError("The networked disk templates need"
3303
                                   " a mirror node")
3304

    
3305
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3306
      if snode_name is None:
3307
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3308
                                   self.op.snode)
3309
      elif snode_name == pnode.name:
3310
        raise errors.OpPrereqError("The secondary node cannot be"
3311
                                   " the primary node.")
3312
      self.secondaries.append(snode_name)
3313

    
3314
    req_size = _ComputeDiskSize(self.op.disk_template,
3315
                                self.op.disk_size, self.op.swap_size)
3316

    
3317
    # Check lv size requirements
3318
    if req_size is not None:
3319
      nodenames = [pnode.name] + self.secondaries
3320
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3321
      for node in nodenames:
3322
        info = nodeinfo.get(node, None)
3323
        if not info:
3324
          raise errors.OpPrereqError("Cannot get current information"
3325
                                     " from node '%s'" % nodeinfo)
3326
        vg_free = info.get('vg_free', None)
3327
        if not isinstance(vg_free, int):
3328
          raise errors.OpPrereqError("Can't compute free disk space on"
3329
                                     " node %s" % node)
3330
        if req_size > info['vg_free']:
3331
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3332
                                     " %d MB available, %d MB required" %
3333
                                     (node, info['vg_free'], req_size))
3334

    
3335
    # os verification
3336
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3337
    if not os_obj:
3338
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3339
                                 " primary node"  % self.op.os_type)
3340

    
3341
    if self.op.kernel_path == constants.VALUE_NONE:
3342
      raise errors.OpPrereqError("Can't set instance kernel to none")
3343

    
3344

    
3345
    # bridge check on primary node
3346
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3347
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3348
                                 " destination node '%s'" %
3349
                                 (self.op.bridge, pnode.name))
3350

    
3351
    if self.op.start:
3352
      self.instance_status = 'up'
3353
    else:
3354
      self.instance_status = 'down'
3355

    
3356
  def Exec(self, feedback_fn):
3357
    """Create and add the instance to the cluster.
3358

3359
    """
3360
    instance = self.op.instance_name
3361
    pnode_name = self.pnode.name
3362

    
3363
    if self.op.mac == "auto":
3364
      mac_address = self.cfg.GenerateMAC()
3365
    else:
3366
      mac_address = self.op.mac
3367

    
3368
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3369
    if self.inst_ip is not None:
3370
      nic.ip = self.inst_ip
3371

    
3372
    ht_kind = self.sstore.GetHypervisorType()
3373
    if ht_kind in constants.HTS_REQ_PORT:
3374
      network_port = self.cfg.AllocatePort()
3375
    else:
3376
      network_port = None
3377

    
3378
    # this is needed because os.path.join does not accept None arguments
3379
    if self.op.file_storage_dir is None:
3380
      string_file_storage_dir = ""
3381
    else:
3382
      string_file_storage_dir = self.op.file_storage_dir
3383

    
3384
    # build the full file storage dir path
3385
    file_storage_dir = os.path.normpath(os.path.join(
3386
                                        self.sstore.GetFileStorageDir(),
3387
                                        string_file_storage_dir, instance))
3388

    
3389

    
3390
    disks = _GenerateDiskTemplate(self.cfg,
3391
                                  self.op.disk_template,
3392
                                  instance, pnode_name,
3393
                                  self.secondaries, self.op.disk_size,
3394
                                  self.op.swap_size,
3395
                                  file_storage_dir,
3396
                                  self.op.file_driver)
3397

    
3398
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3399
                            primary_node=pnode_name,
3400
                            memory=self.op.mem_size,
3401
                            vcpus=self.op.vcpus,
3402
                            nics=[nic], disks=disks,
3403
                            disk_template=self.op.disk_template,
3404
                            status=self.instance_status,
3405
                            network_port=network_port,
3406
                            kernel_path=self.op.kernel_path,
3407
                            initrd_path=self.op.initrd_path,
3408
                            hvm_boot_order=self.op.hvm_boot_order,
3409
                            )
3410

    
3411
    feedback_fn("* creating instance disks...")
3412
    if not _CreateDisks(self.cfg, iobj):
3413
      _RemoveDisks(iobj, self.cfg)
3414
      raise errors.OpExecError("Device creation failed, reverting...")
3415

    
3416
    feedback_fn("adding instance %s to cluster config" % instance)
3417

    
3418
    self.cfg.AddInstance(iobj)
3419

    
3420
    if self.op.wait_for_sync:
3421
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3422
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3423
      # make sure the disks are not degraded (still sync-ing is ok)
3424
      time.sleep(15)
3425
      feedback_fn("* checking mirrors status")
3426
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3427
    else:
3428
      disk_abort = False
3429

    
3430
    if disk_abort:
3431
      _RemoveDisks(iobj, self.cfg)
3432
      self.cfg.RemoveInstance(iobj.name)
3433
      raise errors.OpExecError("There are some degraded disks for"
3434
                               " this instance")
3435

    
3436
    feedback_fn("creating os for instance %s on node %s" %
3437
                (instance, pnode_name))
3438

    
3439
    if iobj.disk_template != constants.DT_DISKLESS:
3440
      if self.op.mode == constants.INSTANCE_CREATE:
3441
        feedback_fn("* running the instance OS create scripts...")
3442
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3443
          raise errors.OpExecError("could not add os for instance %s"
3444
                                   " on node %s" %
3445
                                   (instance, pnode_name))
3446

    
3447
      elif self.op.mode == constants.INSTANCE_IMPORT:
3448
        feedback_fn("* running the instance OS import scripts...")
3449
        src_node = self.op.src_node
3450
        src_image = self.src_image
3451
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3452
                                                src_node, src_image):
3453
          raise errors.OpExecError("Could not import os for instance"
3454
                                   " %s on node %s" %
3455
                                   (instance, pnode_name))
3456
      else:
3457
        # also checked in the prereq part
3458
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3459
                                     % self.op.mode)
3460

    
3461
    if self.op.start:
3462
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3463
      feedback_fn("* starting instance...")
3464
      if not rpc.call_instance_start(pnode_name, iobj, None):
3465
        raise errors.OpExecError("Could not start instance")
3466

    
3467

    
3468
class LUConnectConsole(NoHooksLU):
3469
  """Connect to an instance's console.
3470

3471
  This is somewhat special in that it returns the command line that
3472
  you need to run on the master node in order to connect to the
3473
  console.
3474

3475
  """
3476
  _OP_REQP = ["instance_name"]
3477

    
3478
  def CheckPrereq(self):
3479
    """Check prerequisites.
3480

3481
    This checks that the instance is in the cluster.
3482

3483
    """
3484
    instance = self.cfg.GetInstanceInfo(
3485
      self.cfg.ExpandInstanceName(self.op.instance_name))
3486
    if instance is None:
3487
      raise errors.OpPrereqError("Instance '%s' not known" %
3488
                                 self.op.instance_name)
3489
    self.instance = instance
3490

    
3491
  def Exec(self, feedback_fn):
3492
    """Connect to the console of an instance
3493

3494
    """
3495
    instance = self.instance
3496
    node = instance.primary_node
3497

    
3498
    node_insts = rpc.call_instance_list([node])[node]
3499
    if node_insts is False:
3500
      raise errors.OpExecError("Can't connect to node %s." % node)
3501

    
3502
    if instance.name not in node_insts:
3503
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3504

    
3505
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3506

    
3507
    hyper = hypervisor.GetHypervisor()
3508
    console_cmd = hyper.GetShellCommandForConsole(instance)
3509

    
3510
    # build ssh cmdline
3511
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3512

    
3513

    
3514
class LUReplaceDisks(LogicalUnit):
3515
  """Replace the disks of an instance.
3516

3517
  """
3518
  HPATH = "mirrors-replace"
3519
  HTYPE = constants.HTYPE_INSTANCE
3520
  _OP_REQP = ["instance_name", "mode", "disks"]
3521

    
3522
  def BuildHooksEnv(self):
3523
    """Build hooks env.
3524

3525
    This runs on the master, the primary and all the secondaries.
3526

3527
    """
3528
    env = {
3529
      "MODE": self.op.mode,
3530
      "NEW_SECONDARY": self.op.remote_node,
3531
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3532
      }
3533
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3534
    nl = [
3535
      self.sstore.GetMasterNode(),
3536
      self.instance.primary_node,
3537
      ]
3538
    if self.op.remote_node is not None:
3539
      nl.append(self.op.remote_node)
3540
    return env, nl, nl
3541

    
3542
  def CheckPrereq(self):
3543
    """Check prerequisites.
3544

3545
    This checks that the instance is in the cluster.
3546

3547
    """
3548
    instance = self.cfg.GetInstanceInfo(
3549
      self.cfg.ExpandInstanceName(self.op.instance_name))
3550
    if instance is None:
3551
      raise errors.OpPrereqError("Instance '%s' not known" %
3552
                                 self.op.instance_name)
3553
    self.instance = instance
3554
    self.op.instance_name = instance.name
3555

    
3556
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3557
      raise errors.OpPrereqError("Instance's disk layout is not"
3558
                                 " network mirrored.")
3559

    
3560
    if len(instance.secondary_nodes) != 1:
3561
      raise errors.OpPrereqError("The instance has a strange layout,"
3562
                                 " expected one secondary but found %d" %
3563
                                 len(instance.secondary_nodes))
3564

    
3565
    self.sec_node = instance.secondary_nodes[0]
3566

    
3567
    remote_node = getattr(self.op, "remote_node", None)
3568
    if remote_node is not None:
3569
      remote_node = self.cfg.ExpandNodeName(remote_node)
3570
      if remote_node is None:
3571
        raise errors.OpPrereqError("Node '%s' not known" %
3572
                                   self.op.remote_node)
3573
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3574
    else:
3575
      self.remote_node_info = None
3576
    if remote_node == instance.primary_node:
3577
      raise errors.OpPrereqError("The specified node is the primary node of"
3578
                                 " the instance.")
3579
    elif remote_node == self.sec_node:
3580
      if self.op.mode == constants.REPLACE_DISK_SEC:
3581
        # this is for DRBD8, where we can't execute the same mode of
3582
        # replacement as for drbd7 (no different port allocated)
3583
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3584
                                   " replacement")
3585
      # the user gave the current secondary, switch to
3586
      # 'no-replace-secondary' mode for drbd7
3587
      remote_node = None
3588
    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3589
        self.op.mode != constants.REPLACE_DISK_ALL):
3590
      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3591
                                 " disks replacement, not individual ones")
3592
    if instance.disk_template == constants.DT_DRBD8:
3593
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3594
          remote_node is not None):
3595
        # switch to replace secondary mode
3596
        self.op.mode = constants.REPLACE_DISK_SEC
3597

    
3598
      if self.op.mode == constants.REPLACE_DISK_ALL:
3599
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3600
                                   " secondary disk replacement, not"
3601
                                   " both at once")
3602
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3603
        if remote_node is not None:
3604
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3605
                                     " the secondary while doing a primary"
3606
                                     " node disk replacement")
3607
        self.tgt_node = instance.primary_node
3608
        self.oth_node = instance.secondary_nodes[0]
3609
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3610
        self.new_node = remote_node # this can be None, in which case
3611
                                    # we don't change the secondary
3612
        self.tgt_node = instance.secondary_nodes[0]
3613
        self.oth_node = instance.primary_node
3614
      else:
3615
        raise errors.ProgrammerError("Unhandled disk replace mode")
3616

    
3617
    for name in self.op.disks:
3618
      if instance.FindDisk(name) is None:
3619
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3620
                                   (name, instance.name))
3621
    self.op.remote_node = remote_node
3622

    
3623
  def _ExecRR1(self, feedback_fn):
3624
    """Replace the disks of an instance.
3625

3626
    """
3627
    instance = self.instance
3628
    iv_names = {}
3629
    # start of work
3630
    if self.op.remote_node is None:
3631
      remote_node = self.sec_node
3632
    else:
3633
      remote_node = self.op.remote_node
3634
    cfg = self.cfg
3635
    for dev in instance.disks:
3636
      size = dev.size
3637
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3638
      names = _GenerateUniqueNames(cfg, lv_names)
3639
      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3640
                                       remote_node, size, names)
3641
      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3642
      logger.Info("adding new mirror component on secondary for %s" %
3643
                  dev.iv_name)
3644
      #HARDCODE
3645
      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3646
                                        new_drbd, False,
3647
                                        _GetInstanceInfoText(instance)):
3648
        raise errors.OpExecError("Failed to create new component on secondary"
3649
                                 " node %s. Full abort, cleanup manually!" %
3650
                                 remote_node)
3651

    
3652
      logger.Info("adding new mirror component on primary")
3653
      #HARDCODE
3654
      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3655
                                      instance, new_drbd,
3656
                                      _GetInstanceInfoText(instance)):
3657
        # remove secondary dev
3658
        cfg.SetDiskID(new_drbd, remote_node)
3659
        rpc.call_blockdev_remove(remote_node, new_drbd)
3660
        raise errors.OpExecError("Failed to create volume on primary!"
3661
                                 " Full abort, cleanup manually!!")
3662

    
3663
      # the device exists now
3664
      # call the primary node to add the mirror to md
3665
      logger.Info("adding new mirror component to md")
3666
      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3667
                                           [new_drbd]):
3668
        logger.Error("Can't add mirror compoment to md!")
3669
        cfg.SetDiskID(new_drbd, remote_node)
3670
        if not rpc.call_blockdev_remove(remote_node, new_drbd):
3671
          logger.Error("Can't rollback on secondary")
3672
        cfg.SetDiskID(new_drbd, instance.primary_node)
3673
        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3674
          logger.Error("Can't rollback on primary")
3675
        raise errors.OpExecError("Full abort, cleanup manually!!")
3676

    
3677
      dev.children.append(new_drbd)
3678
      cfg.AddInstance(instance)
3679

    
3680
    # this can fail as the old devices are degraded and _WaitForSync
3681
    # does a combined result over all disks, so we don't check its
3682
    # return value
3683
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3684

    
3685
    # so check manually all the devices
3686
    for name in iv_names:
3687
      dev, child, new_drbd = iv_names[name]
3688
      cfg.SetDiskID(dev, instance.primary_node)
3689
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3690
      if is_degr:
3691
        raise errors.OpExecError("MD device %s is degraded!" % name)
3692
      cfg.SetDiskID(new_drbd, instance.primary_node)
3693
      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3694
      if is_degr:
3695
        raise errors.OpExecError("New drbd device %s is degraded!" % name)
3696

    
3697
    for name in iv_names:
3698
      dev, child, new_drbd = iv_names[name]
3699
      logger.Info("remove mirror %s component" % name)
3700
      cfg.SetDiskID(dev, instance.primary_node)
3701
      if not rpc.call_blockdev_removechildren(instance.primary_node,
3702
                                              dev, [child]):
3703
        logger.Error("Can't remove child from mirror, aborting"
3704
                     " *this device cleanup*.\nYou need to cleanup manually!!")
3705
        continue
3706

    
3707
      for node in child.logical_id[:2]:
3708
        logger.Info("remove child device on %s" % node)
3709
        cfg.SetDiskID(child, node)
3710
        if not rpc.call_blockdev_remove(node, child):
3711
          logger.Error("Warning: failed to remove device from node %s,"
3712
                       " continuing operation." % node)
3713

    
3714
      dev.children.remove(child)
3715

    
3716
      cfg.AddInstance(instance)
3717

    
3718
  def _ExecD8DiskOnly(self, feedback_fn):
3719
    """Replace a disk on the primary or secondary for dbrd8.
3720

3721
    The algorithm for replace is quite complicated:
3722
      - for each disk to be replaced:
3723
        - create new LVs on the target node with unique names
3724
        - detach old LVs from the drbd device
3725
        - rename old LVs to name_replaced.<time_t>
3726
        - rename new LVs to old LVs
3727
        - attach the new LVs (with the old names now) to the drbd device
3728
      - wait for sync across all devices
3729
      - for each modified disk:
3730
        - remove old LVs (which have the name name_replaces.<time_t>)
3731

3732
    Failures are not very well handled.
3733

3734
    """
3735
    steps_total = 6
3736
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3737
    instance = self.instance
3738
    iv_names = {}
3739
    vgname = self.cfg.GetVGName()
3740
    # start of work
3741
    cfg = self.cfg
3742
    tgt_node = self.tgt_node
3743
    oth_node = self.oth_node
3744

    
3745
    # Step: check device activation
3746
    self.proc.LogStep(1, steps_total, "check device existence")
3747
    info("checking volume groups")
3748
    my_vg = cfg.GetVGName()
3749
    results = rpc.call_vg_list([oth_node, tgt_node])
3750
    if not results:
3751
      raise errors.OpExecError("Can't list volume groups on the nodes")
3752
    for node in oth_node, tgt_node:
3753
      res = results.get(node, False)
3754
      if not res or my_vg not in res:
3755
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3756
                                 (my_vg, node))
3757
    for dev in instance.disks:
3758
      if not dev.iv_name in self.op.disks:
3759
        continue
3760
      for node in tgt_node, oth_node:
3761
        info("checking %s on %s" % (dev.iv_name, node))
3762
        cfg.SetDiskID(dev, node)
3763
        if not rpc.call_blockdev_find(node, dev):
3764
          raise errors.OpExecError("Can't find device %s on node %s" %
3765
                                   (dev.iv_name, node))
3766

    
3767
    # Step: check other node consistency
3768
    self.proc.LogStep(2, steps_total, "check peer consistency")
3769
    for dev in instance.disks:
3770
      if not dev.iv_name in self.op.disks:
3771
        continue
3772
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3773
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3774
                                   oth_node==instance.primary_node):
3775
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3776
                                 " to replace disks on this node (%s)" %
3777
                                 (oth_node, tgt_node))
3778

    
3779
    # Step: create new storage
3780
    self.proc.LogStep(3, steps_total, "allocate new storage")
3781
    for dev in instance.disks:
3782
      if not dev.iv_name in self.op.disks:
3783
        continue
3784
      size = dev.size
3785
      cfg.SetDiskID(dev, tgt_node)
3786
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3787
      names = _GenerateUniqueNames(cfg, lv_names)
3788
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3789
                             logical_id=(vgname, names[0]))
3790
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3791
                             logical_id=(vgname, names[1]))
3792
      new_lvs = [lv_data, lv_meta]
3793
      old_lvs = dev.children
3794
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3795
      info("creating new local storage on %s for %s" %
3796
           (tgt_node, dev.iv_name))
3797
      # since we *always* want to create this LV, we use the
3798
      # _Create...OnPrimary (which forces the creation), even if we
3799
      # are talking about the secondary node
3800
      for new_lv in new_lvs:
3801
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3802
                                        _GetInstanceInfoText(instance)):
3803
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3804
                                   " node '%s'" %
3805
                                   (new_lv.logical_id[1], tgt_node))
3806

    
3807
    # Step: for each lv, detach+rename*2+attach
3808
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3809
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3810
      info("detaching %s drbd from local storage" % dev.iv_name)
3811
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3812
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3813
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3814
      #dev.children = []
3815
      #cfg.Update(instance)
3816

    
3817
      # ok, we created the new LVs, so now we know we have the needed
3818
      # storage; as such, we proceed on the target node to rename
3819
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3820
      # using the assumption that logical_id == physical_id (which in
3821
      # turn is the unique_id on that node)
3822

    
3823
      # FIXME(iustin): use a better name for the replaced LVs
3824
      temp_suffix = int(time.time())
3825
      ren_fn = lambda d, suff: (d.physical_id[0],
3826
                                d.physical_id[1] + "_replaced-%s" % suff)
3827
      # build the rename list based on what LVs exist on the node
3828
      rlist = []
3829
      for to_ren in old_lvs:
3830
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3831
        if find_res is not None: # device exists
3832
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3833

    
3834
      info("renaming the old LVs on the target node")
3835
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3836
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3837
      # now we rename the new LVs to the old LVs
3838
      info("renaming the new LVs on the target node")
3839
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3840
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3841
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3842

    
3843
      for old, new in zip(old_lvs, new_lvs):
3844
        new.logical_id = old.logical_id
3845
        cfg.SetDiskID(new, tgt_node)
3846

    
3847
      for disk in old_lvs:
3848
        disk.logical_id = ren_fn(disk, temp_suffix)
3849
        cfg.SetDiskID(disk, tgt_node)
3850

    
3851
      # now that the new lvs have the old name, we can add them to the device
3852
      info("adding new mirror component on %s" % tgt_node)
3853
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3854
        for new_lv in new_lvs:
3855
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3856
            warning("Can't rollback device %s", hint="manually cleanup unused"
3857
                    " logical volumes")
3858
        raise errors.OpExecError("Can't add local storage to drbd")
3859

    
3860
      dev.children = new_lvs
3861
      cfg.Update(instance)
3862

    
3863
    # Step: wait for sync
3864

    
3865
    # this can fail as the old devices are degraded and _WaitForSync
3866
    # does a combined result over all disks, so we don't check its
3867
    # return value
3868
    self.proc.LogStep(5, steps_total, "sync devices")
3869
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3870

    
3871
    # so check manually all the devices
3872
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3873
      cfg.SetDiskID(dev, instance.primary_node)
3874
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3875
      if is_degr:
3876
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3877

    
3878
    # Step: remove old storage
3879
    self.proc.LogStep(6, steps_total, "removing old storage")
3880
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3881
      info("remove logical volumes for %s" % name)
3882
      for lv in old_lvs:
3883
        cfg.SetDiskID(lv, tgt_node)
3884
        if not rpc.call_blockdev_remove(tgt_node, lv):
3885
          warning("Can't remove old LV", hint="manually remove unused LVs")
3886
          continue
3887

    
3888
  def _ExecD8Secondary(self, feedback_fn):
3889
    """Replace the secondary node for drbd8.
3890

3891
    The algorithm for replace is quite complicated:
3892
      - for all disks of the instance:
3893
        - create new LVs on the new node with same names
3894
        - shutdown the drbd device on the old secondary
3895
        - disconnect the drbd network on the primary
3896
        - create the drbd device on the new secondary
3897
        - network attach the drbd on the primary, using an artifice:
3898
          the drbd code for Attach() will connect to the network if it
3899
          finds a device which is connected to the good local disks but
3900
          not network enabled
3901
      - wait for sync across all devices
3902
      - remove all disks from the old secondary
3903

3904
    Failures are not very well handled.
3905

3906
    """
3907
    steps_total = 6
3908
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3909
    instance = self.instance
3910
    iv_names = {}
3911
    vgname = self.cfg.GetVGName()
3912
    # start of work
3913
    cfg = self.cfg
3914
    old_node = self.tgt_node
3915
    new_node = self.new_node
3916
    pri_node = instance.primary_node
3917

    
3918
    # Step: check device activation
3919
    self.proc.LogStep(1, steps_total, "check device existence")
3920
    info("checking volume groups")
3921
    my_vg = cfg.GetVGName()
3922
    results = rpc.call_vg_list([pri_node, new_node])
3923
    if not results:
3924
      raise errors.OpExecError("Can't list volume groups on the nodes")
3925
    for node in pri_node, new_node:
3926
      res = results.get(node, False)
3927
      if not res or my_vg not in res:
3928
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3929
                                 (my_vg, node))
3930
    for dev in instance.disks:
3931
      if not dev.iv_name in self.op.disks:
3932
        continue
3933
      info("checking %s on %s" % (dev.iv_name, pri_node))
3934
      cfg.SetDiskID(dev, pri_node)
3935
      if not rpc.call_blockdev_find(pri_node, dev):
3936
        raise errors.OpExecError("Can't find device %s on node %s" %
3937
                                 (dev.iv_name, pri_node))
3938

    
3939
    # Step: check other node consistency
3940
    self.proc.LogStep(2, steps_total, "check peer consistency")
3941
    for dev in instance.disks:
3942
      if not dev.iv_name in self.op.disks:
3943
        continue
3944
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3945
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3946
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3947
                                 " unsafe to replace the secondary" %
3948
                                 pri_node)
3949

    
3950
    # Step: create new storage
3951
    self.proc.LogStep(3, steps_total, "allocate new storage")
3952
    for dev in instance.disks:
3953
      size = dev.size
3954
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3955
      # since we *always* want to create this LV, we use the
3956
      # _Create...OnPrimary (which forces the creation), even if we
3957
      # are talking about the secondary node
3958
      for new_lv in dev.children:
3959
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3960
                                        _GetInstanceInfoText(instance)):
3961
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3962
                                   " node '%s'" %
3963
                                   (new_lv.logical_id[1], new_node))
3964

    
3965
      iv_names[dev.iv_name] = (dev, dev.children)
3966

    
3967
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3968
    for dev in instance.disks:
3969
      size = dev.size
3970
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3971
      # create new devices on new_node
3972
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3973
                              logical_id=(pri_node, new_node,
3974
                                          dev.logical_id[2]),
3975
                              children=dev.children)
3976
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3977
                                        new_drbd, False,
3978
                                      _GetInstanceInfoText(instance)):
3979
        raise errors.OpExecError("Failed to create new DRBD on"
3980
                                 " node '%s'" % new_node)
3981

    
3982
    for dev in instance.disks:
3983
      # we have new devices, shutdown the drbd on the old secondary
3984
      info("shutting down drbd for %s on old node" % dev.iv_name)
3985
      cfg.SetDiskID(dev, old_node)
3986
      if not rpc.call_blockdev_shutdown(old_node, dev):
3987
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3988
                hint="Please cleanup this device manually as soon as possible")
3989

    
3990
    info("detaching primary drbds from the network (=> standalone)")
3991
    done = 0
3992
    for dev in instance.disks:
3993
      cfg.SetDiskID(dev, pri_node)
3994
      # set the physical (unique in bdev terms) id to None, meaning
3995
      # detach from network
3996
      dev.physical_id = (None,) * len(dev.physical_id)
3997
      # and 'find' the device, which will 'fix' it to match the
3998
      # standalone state
3999
      if rpc.call_blockdev_find(pri_node, dev):
4000
        done += 1
4001
      else:
4002
        warning("Failed to detach drbd %s from network, unusual case" %
4003
                dev.iv_name)
4004

    
4005
    if not done:
4006
      # no detaches succeeded (very unlikely)
4007
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4008

    
4009
    # if we managed to detach at least one, we update all the disks of
4010
    # the instance to point to the new secondary
4011
    info("updating instance configuration")
4012
    for dev in instance.disks:
4013
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4014
      cfg.SetDiskID(dev, pri_node)
4015
    cfg.Update(instance)
4016

    
4017
    # and now perform the drbd attach
4018
    info("attaching primary drbds to new secondary (standalone => connected)")
4019
    failures = []
4020
    for dev in instance.disks:
4021
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4022
      # since the attach is smart, it's enough to 'find' the device,
4023
      # it will automatically activate the network, if the physical_id
4024
      # is correct
4025
      cfg.SetDiskID(dev, pri_node)
4026
      if not rpc.call_blockdev_find(pri_node, dev):
4027
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4028
                "please do a gnt-instance info to see the status of disks")
4029

    
4030
    # this can fail as the old devices are degraded and _WaitForSync
4031
    # does a combined result over all disks, so we don't check its
4032
    # return value
4033
    self.proc.LogStep(5, steps_total, "sync devices")
4034
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4035

    
4036
    # so check manually all the devices
4037
    for name, (dev, old_lvs) in iv_names.iteritems():
4038
      cfg.SetDiskID(dev, pri_node)
4039
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4040
      if is_degr:
4041
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4042

    
4043
    self.proc.LogStep(6, steps_total, "removing old storage")
4044
    for name, (dev, old_lvs) in iv_names.iteritems():
4045
      info("remove logical volumes for %s" % name)
4046
      for lv in old_lvs:
4047
        cfg.SetDiskID(lv, old_node)
4048
        if not rpc.call_blockdev_remove(old_node, lv):
4049
          warning("Can't remove LV on old secondary",
4050
                  hint="Cleanup stale volumes by hand")
4051

    
4052
  def Exec(self, feedback_fn):
4053
    """Execute disk replacement.
4054

4055
    This dispatches the disk replacement to the appropriate handler.
4056

4057
    """
4058
    instance = self.instance
4059
    if instance.disk_template == constants.DT_REMOTE_RAID1:
4060
      fn = self._ExecRR1
4061
    elif instance.disk_template == constants.DT_DRBD8:
4062
      if self.op.remote_node is None:
4063
        fn = self._ExecD8DiskOnly
4064
      else:
4065
        fn = self._ExecD8Secondary
4066
    else:
4067
      raise errors.ProgrammerError("Unhandled disk replacement case")
4068
    return fn(feedback_fn)
4069

    
4070

    
4071
class LUQueryInstanceData(NoHooksLU):
4072
  """Query runtime instance data.
4073

4074
  """
4075
  _OP_REQP = ["instances"]
4076

    
4077
  def CheckPrereq(self):
4078
    """Check prerequisites.
4079

4080
    This only checks the optional instance list against the existing names.
4081

4082
    """
4083
    if not isinstance(self.op.instances, list):
4084
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4085
    if self.op.instances:
4086
      self.wanted_instances = []
4087
      names = self.op.instances
4088
      for name in names:
4089
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4090
        if instance is None:
4091
          raise errors.OpPrereqError("No such instance name '%s'" % name)
4092
        self.wanted_instances.append(instance)
4093
    else:
4094
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4095
                               in self.cfg.GetInstanceList()]
4096
    return
4097

    
4098

    
4099
  def _ComputeDiskStatus(self, instance, snode, dev):
4100
    """Compute block device status.
4101

4102
    """
4103
    self.cfg.SetDiskID(dev, instance.primary_node)
4104
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4105
    if dev.dev_type in constants.LDS_DRBD:
4106
      # we change the snode then (otherwise we use the one passed in)
4107
      if dev.logical_id[0] == instance.primary_node:
4108
        snode = dev.logical_id[1]
4109
      else:
4110
        snode = dev.logical_id[0]
4111

    
4112
    if snode:
4113
      self.cfg.SetDiskID(dev, snode)
4114
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4115
    else:
4116
      dev_sstatus = None
4117

    
4118
    if dev.children:
4119
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4120
                      for child in dev.children]
4121
    else:
4122
      dev_children = []
4123

    
4124
    data = {
4125
      "iv_name": dev.iv_name,
4126
      "dev_type": dev.dev_type,
4127
      "logical_id": dev.logical_id,
4128
      "physical_id": dev.physical_id,
4129
      "pstatus": dev_pstatus,
4130
      "sstatus": dev_sstatus,
4131
      "children": dev_children,
4132
      }
4133

    
4134
    return data
4135

    
4136
  def Exec(self, feedback_fn):
4137
    """Gather and return data"""
4138
    result = {}
4139
    for instance in self.wanted_instances:
4140
      remote_info = rpc.call_instance_info(instance.primary_node,
4141
                                                instance.name)
4142
      if remote_info and "state" in remote_info:
4143
        remote_state = "up"
4144
      else:
4145
        remote_state = "down"
4146
      if instance.status == "down":
4147
        config_state = "down"
4148
      else:
4149
        config_state = "up"
4150

    
4151
      disks = [self._ComputeDiskStatus(instance, None, device)
4152
               for device in instance.disks]
4153

    
4154
      idict = {
4155
        "name": instance.name,
4156
        "config_state": config_state,
4157
        "run_state": remote_state,
4158
        "pnode": instance.primary_node,
4159
        "snodes": instance.secondary_nodes,
4160
        "os": instance.os,
4161
        "memory": instance.memory,
4162
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4163
        "disks": disks,
4164
        "network_port": instance.network_port,
4165
        "vcpus": instance.vcpus,
4166
        "kernel_path": instance.kernel_path,
4167
        "initrd_path": instance.initrd_path,
4168
        "hvm_boot_order": instance.hvm_boot_order,
4169
        }
4170

    
4171
      result[instance.name] = idict
4172

    
4173
    return result
4174

    
4175

    
4176
class LUSetInstanceParams(LogicalUnit):
4177
  """Modifies an instances's parameters.
4178

4179
  """
4180
  HPATH = "instance-modify"
4181
  HTYPE = constants.HTYPE_INSTANCE
4182
  _OP_REQP = ["instance_name"]
4183

    
4184
  def BuildHooksEnv(self):
4185
    """Build hooks env.
4186

4187
    This runs on the master, primary and secondaries.
4188

4189
    """
4190
    args = dict()
4191
    if self.mem:
4192
      args['memory'] = self.mem
4193
    if self.vcpus:
4194
      args['vcpus'] = self.vcpus
4195
    if self.do_ip or self.do_bridge or self.mac:
4196
      if self.do_ip:
4197
        ip = self.ip
4198
      else:
4199
        ip = self.instance.nics[0].ip
4200
      if self.bridge:
4201
        bridge = self.bridge
4202
      else:
4203
        bridge = self.instance.nics[0].bridge
4204
      if self.mac:
4205
        mac = self.mac
4206
      else:
4207
        mac = self.instance.nics[0].mac
4208
      args['nics'] = [(ip, bridge, mac)]
4209
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4210
    nl = [self.sstore.GetMasterNode(),
4211
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4212
    return env, nl, nl
4213

    
4214
  def CheckPrereq(self):
4215
    """Check prerequisites.
4216

4217
    This only checks the instance list against the existing names.
4218

4219
    """
4220
    self.mem = getattr(self.op, "mem", None)
4221
    self.vcpus = getattr(self.op, "vcpus", None)
4222
    self.ip = getattr(self.op, "ip", None)
4223
    self.mac = getattr(self.op, "mac", None)
4224
    self.bridge = getattr(self.op, "bridge", None)
4225
    self.kernel_path = getattr(self.op, "kernel_path", None)
4226
    self.initrd_path = getattr(self.op, "initrd_path", None)
4227
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4228
    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4229
                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4230
    if all_params.count(None) == len(all_params):
4231
      raise errors.OpPrereqError("No changes submitted")
4232
    if self.mem is not None:
4233
      try:
4234
        self.mem = int(self.mem)
4235
      except ValueError, err:
4236
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4237
    if self.vcpus is not None:
4238
      try:
4239
        self.vcpus = int(self.vcpus)
4240
      except ValueError, err:
4241
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4242
    if self.ip is not None:
4243
      self.do_ip = True
4244
      if self.ip.lower() == "none":
4245
        self.ip = None
4246
      else:
4247
        if not utils.IsValidIP(self.ip):
4248
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4249
    else:
4250
      self.do_ip = False
4251
    self.do_bridge = (self.bridge is not None)
4252
    if self.mac is not None:
4253
      if self.cfg.IsMacInUse(self.mac):
4254
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4255
                                   self.mac)
4256
      if not utils.IsValidMac(self.mac):
4257
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4258

    
4259
    if self.kernel_path is not None:
4260
      self.do_kernel_path = True
4261
      if self.kernel_path == constants.VALUE_NONE:
4262
        raise errors.OpPrereqError("Can't set instance to no kernel")
4263

    
4264
      if self.kernel_path != constants.VALUE_DEFAULT:
4265
        if not os.path.isabs(self.kernel_path):
4266
          raise errors.OpPrereqError("The kernel path must be an absolute"
4267
                                    " filename")
4268
    else:
4269
      self.do_kernel_path = False
4270

    
4271
    if self.initrd_path is not None:
4272
      self.do_initrd_path = True
4273
      if self.initrd_path not in (constants.VALUE_NONE,
4274
                                  constants.VALUE_DEFAULT):
4275
        if not os.path.isabs(self.initrd_path):
4276
          raise errors.OpPrereqError("The initrd path must be an absolute"
4277
                                    " filename")
4278
    else:
4279
      self.do_initrd_path = False
4280

    
4281
    # boot order verification
4282
    if self.hvm_boot_order is not None:
4283
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4284
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4285
          raise errors.OpPrereqError("invalid boot order specified,"
4286
                                     " must be one or more of [acdn]"
4287
                                     " or 'default'")
4288

    
4289
    instance = self.cfg.GetInstanceInfo(
4290
      self.cfg.ExpandInstanceName(self.op.instance_name))
4291
    if instance is None:
4292
      raise errors.OpPrereqError("No such instance name '%s'" %
4293
                                 self.op.instance_name)
4294
    self.op.instance_name = instance.name
4295
    self.instance = instance
4296
    return
4297

    
4298
  def Exec(self, feedback_fn):
4299
    """Modifies an instance.
4300

4301
    All parameters take effect only at the next restart of the instance.
4302
    """
4303
    result = []
4304
    instance = self.instance
4305
    if self.mem:
4306
      instance.memory = self.mem
4307
      result.append(("mem", self.mem))
4308
    if self.vcpus:
4309
      instance.vcpus = self.vcpus
4310
      result.append(("vcpus",  self.vcpus))
4311
    if self.do_ip:
4312
      instance.nics[0].ip = self.ip
4313
      result.append(("ip", self.ip))
4314
    if self.bridge:
4315
      instance.nics[0].bridge = self.bridge
4316
      result.append(("bridge", self.bridge))
4317
    if self.mac:
4318
      instance.nics[0].mac = self.mac
4319
      result.append(("mac", self.mac))
4320
    if self.do_kernel_path:
4321
      instance.kernel_path = self.kernel_path
4322
      result.append(("kernel_path", self.kernel_path))
4323
    if self.do_initrd_path:
4324
      instance.initrd_path = self.initrd_path
4325
      result.append(("initrd_path", self.initrd_path))
4326
    if self.hvm_boot_order:
4327
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4328
        instance.hvm_boot_order = None
4329
      else:
4330
        instance.hvm_boot_order = self.hvm_boot_order
4331
      result.append(("hvm_boot_order", self.hvm_boot_order))
4332

    
4333
    self.cfg.AddInstance(instance)
4334

    
4335
    return result
4336

    
4337

    
4338
class LUQueryExports(NoHooksLU):
4339
  """Query the exports list
4340

4341
  """
4342
  _OP_REQP = []
4343

    
4344
  def CheckPrereq(self):
4345
    """Check that the nodelist contains only existing nodes.
4346

4347
    """
4348
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4349

    
4350
  def Exec(self, feedback_fn):
4351
    """Compute the list of all the exported system images.
4352

4353
    Returns:
4354
      a dictionary with the structure node->(export-list)
4355
      where export-list is a list of the instances exported on
4356
      that node.
4357

4358
    """
4359
    return rpc.call_export_list(self.nodes)
4360

    
4361

    
4362
class LUExportInstance(LogicalUnit):
4363
  """Export an instance to an image in the cluster.
4364

4365
  """
4366
  HPATH = "instance-export"
4367
  HTYPE = constants.HTYPE_INSTANCE
4368
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4369

    
4370
  def BuildHooksEnv(self):
4371
    """Build hooks env.
4372

4373
    This will run on the master, primary node and target node.
4374

4375
    """
4376
    env = {
4377
      "EXPORT_NODE": self.op.target_node,
4378
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4379
      }
4380
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4381
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4382
          self.op.target_node]
4383
    return env, nl, nl
4384

    
4385
  def CheckPrereq(self):
4386
    """Check prerequisites.
4387

4388
    This checks that the instance name is a valid one.
4389

4390
    """
4391
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4392
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4393
    if self.instance is None:
4394
      raise errors.OpPrereqError("Instance '%s' not found" %
4395
                                 self.op.instance_name)
4396

    
4397
    # node verification
4398
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4399
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4400

    
4401
    if self.dst_node is None:
4402
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4403
                                 self.op.target_node)
4404
    self.op.target_node = self.dst_node.name
4405

    
4406
  def Exec(self, feedback_fn):
4407
    """Export an instance to an image in the cluster.
4408

4409
    """
4410
    instance = self.instance
4411
    dst_node = self.dst_node
4412
    src_node = instance.primary_node
4413
    if self.op.shutdown:
4414
      # shutdown the instance, but not the disks
4415
      if not rpc.call_instance_shutdown(src_node, instance):
4416
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4417
                                 (instance.name, src_node))
4418

    
4419
    vgname = self.cfg.GetVGName()
4420

    
4421
    snap_disks = []
4422

    
4423
    try:
4424
      for disk in instance.disks:
4425
        if disk.iv_name == "sda":
4426
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4427
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4428

    
4429
          if not new_dev_name:
4430
            logger.Error("could not snapshot block device %s on node %s" %
4431
                         (disk.logical_id[1], src_node))
4432
          else:
4433
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4434
                                      logical_id=(vgname, new_dev_name),
4435
                                      physical_id=(vgname, new_dev_name),
4436
                                      iv_name=disk.iv_name)
4437
            snap_disks.append(new_dev)
4438

    
4439
    finally:
4440
      if self.op.shutdown and instance.status == "up":
4441
        if not rpc.call_instance_start(src_node, instance, None):
4442
          _ShutdownInstanceDisks(instance, self.cfg)
4443
          raise errors.OpExecError("Could not start instance")
4444

    
4445
    # TODO: check for size
4446

    
4447
    for dev in snap_disks:
4448
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4449
        logger.Error("could not export block device %s from node %s to node %s"
4450
                     % (dev.logical_id[1], src_node, dst_node.name))
4451
      if not rpc.call_blockdev_remove(src_node, dev):
4452
        logger.Error("could not remove snapshot block device %s from node %s" %
4453
                     (dev.logical_id[1], src_node))
4454

    
4455
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4456
      logger.Error("could not finalize export for instance %s on node %s" %
4457
                   (instance.name, dst_node.name))
4458

    
4459
    nodelist = self.cfg.GetNodeList()
4460
    nodelist.remove(dst_node.name)
4461

    
4462
    # on one-node clusters nodelist will be empty after the removal
4463
    # if we proceed the backup would be removed because OpQueryExports
4464
    # substitutes an empty list with the full cluster node list.
4465
    if nodelist:
4466
      op = opcodes.OpQueryExports(nodes=nodelist)
4467
      exportlist = self.proc.ChainOpCode(op)
4468
      for node in exportlist:
4469
        if instance.name in exportlist[node]:
4470
          if not rpc.call_export_remove(node, instance.name):
4471
            logger.Error("could not remove older export for instance %s"
4472
                         " on node %s" % (instance.name, node))
4473

    
4474

    
4475
class TagsLU(NoHooksLU):
4476
  """Generic tags LU.
4477

4478
  This is an abstract class which is the parent of all the other tags LUs.
4479

4480
  """
4481
  def CheckPrereq(self):
4482
    """Check prerequisites.
4483

4484
    """
4485
    if self.op.kind == constants.TAG_CLUSTER:
4486
      self.target = self.cfg.GetClusterInfo()
4487
    elif self.op.kind == constants.TAG_NODE:
4488
      name = self.cfg.ExpandNodeName(self.op.name)
4489
      if name is None:
4490
        raise errors.OpPrereqError("Invalid node name (%s)" %
4491
                                   (self.op.name,))
4492
      self.op.name = name
4493
      self.target = self.cfg.GetNodeInfo(name)
4494
    elif self.op.kind == constants.TAG_INSTANCE:
4495
      name = self.cfg.ExpandInstanceName(self.op.name)
4496
      if name is None:
4497
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4498
                                   (self.op.name,))
4499
      self.op.name = name
4500
      self.target = self.cfg.GetInstanceInfo(name)
4501
    else:
4502
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4503
                                 str(self.op.kind))
4504

    
4505

    
4506
class LUGetTags(TagsLU):
4507
  """Returns the tags of a given object.
4508

4509
  """
4510
  _OP_REQP = ["kind", "name"]
4511

    
4512
  def Exec(self, feedback_fn):
4513
    """Returns the tag list.
4514

4515
    """
4516
    return self.target.GetTags()
4517

    
4518

    
4519
class LUSearchTags(NoHooksLU):
4520
  """Searches the tags for a given pattern.
4521

4522
  """
4523
  _OP_REQP = ["pattern"]
4524

    
4525
  def CheckPrereq(self):
4526
    """Check prerequisites.
4527

4528
    This checks the pattern passed for validity by compiling it.
4529

4530
    """
4531
    try:
4532
      self.re = re.compile(self.op.pattern)
4533
    except re.error, err:
4534
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4535
                                 (self.op.pattern, err))
4536

    
4537
  def Exec(self, feedback_fn):
4538
    """Returns the tag list.
4539

4540
    """
4541
    cfg = self.cfg
4542
    tgts = [("/cluster", cfg.GetClusterInfo())]
4543
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4544
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4545
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4546
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4547
    results = []
4548
    for path, target in tgts:
4549
      for tag in target.GetTags():
4550
        if self.re.search(tag):
4551
          results.append((path, tag))
4552
    return results
4553

    
4554

    
4555
class LUAddTags(TagsLU):
4556
  """Sets a tag on a given object.
4557

4558
  """
4559
  _OP_REQP = ["kind", "name", "tags"]
4560

    
4561
  def CheckPrereq(self):
4562
    """Check prerequisites.
4563

4564
    This checks the type and length of the tag name and value.
4565

4566
    """
4567
    TagsLU.CheckPrereq(self)
4568
    for tag in self.op.tags:
4569
      objects.TaggableObject.ValidateTag(tag)
4570

    
4571
  def Exec(self, feedback_fn):
4572
    """Sets the tag.
4573

4574
    """
4575
    try:
4576
      for tag in self.op.tags:
4577
        self.target.AddTag(tag)
4578
    except errors.TagError, err:
4579
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4580
    try:
4581
      self.cfg.Update(self.target)
4582
    except errors.ConfigurationError:
4583
      raise errors.OpRetryError("There has been a modification to the"
4584
                                " config file and the operation has been"
4585
                                " aborted. Please retry.")
4586

    
4587

    
4588
class LUDelTags(TagsLU):
4589
  """Delete a list of tags from a given object.
4590

4591
  """
4592
  _OP_REQP = ["kind", "name", "tags"]
4593

    
4594
  def CheckPrereq(self):
4595
    """Check prerequisites.
4596

4597
    This checks that we have the given tag.
4598

4599
    """
4600
    TagsLU.CheckPrereq(self)
4601
    for tag in self.op.tags:
4602
      objects.TaggableObject.ValidateTag(tag)
4603
    del_tags = frozenset(self.op.tags)
4604
    cur_tags = self.target.GetTags()
4605
    if not del_tags <= cur_tags:
4606
      diff_tags = del_tags - cur_tags
4607
      diff_names = ["'%s'" % tag for tag in diff_tags]
4608
      diff_names.sort()
4609
      raise errors.OpPrereqError("Tag(s) %s not found" %
4610
                                 (",".join(diff_names)))
4611

    
4612
  def Exec(self, feedback_fn):
4613
    """Remove the tag from the object.
4614

4615
    """
4616
    for tag in self.op.tags:
4617
      self.target.RemoveTag(tag)
4618
    try:
4619
      self.cfg.Update(self.target)
4620
    except errors.ConfigurationError:
4621
      raise errors.OpRetryError("There has been a modification to the"
4622
                                " config file and the operation has been"
4623
                                " aborted. Please retry.")
4624

    
4625
class LUTestDelay(NoHooksLU):
4626
  """Sleep for a specified amount of time.
4627

4628
  This LU sleeps on the master and/or nodes for a specified amoutn of
4629
  time.
4630

4631
  """
4632
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4633

    
4634
  def CheckPrereq(self):
4635
    """Check prerequisites.
4636

4637
    This checks that we have a good list of nodes and/or the duration
4638
    is valid.
4639

4640
    """
4641

    
4642
    if self.op.on_nodes:
4643
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4644

    
4645
  def Exec(self, feedback_fn):
4646
    """Do the actual sleep.
4647

4648
    """
4649
    if self.op.on_master:
4650
      if not utils.TestDelay(self.op.duration):
4651
        raise errors.OpExecError("Error during master delay test")
4652
    if self.op.on_nodes:
4653
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4654
      if not result:
4655
        raise errors.OpExecError("Complete failure from rpc call")
4656
      for node, node_result in result.items():
4657
        if not node_result:
4658
          raise errors.OpExecError("Failure during rpc call to node %s,"
4659
                                   " result: %s" % (node, node_result))
4660

    
4661

    
4662
def _IAllocatorGetClusterData(cfg, sstore):
4663
  """Compute the generic allocator input data.
4664

4665
  This is the data that is independent of the actual operation.
4666

4667
  """
4668
  # cluster data
4669
  data = {
4670
    "version": 1,
4671
    "cluster_name": sstore.GetClusterName(),
4672
    "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4673
    # we don't have job IDs
4674
    }
4675

    
4676
  # node data
4677
  node_results = {}
4678
  node_list = cfg.GetNodeList()
4679
  node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4680
  for nname in node_list:
4681
    ninfo = cfg.GetNodeInfo(nname)
4682
    if nname not in node_data or not isinstance(node_data[nname], dict):
4683
      raise errors.OpExecError("Can't get data for node %s" % nname)
4684
    remote_info = node_data[nname]
4685
    for attr in ['memory_total', 'memory_free',
4686
                 'vg_size', 'vg_free']:
4687
      if attr not in remote_info:
4688
        raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4689
                                 (nname, attr))
4690
      try:
4691
        int(remote_info[attr])
4692
      except ValueError, err:
4693
        raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4694
                                 " %s" % (nname, attr, str(err)))
4695
    pnr = {
4696
      "tags": list(ninfo.GetTags()),
4697
      "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4698
      "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4699
      "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4700
      "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4701
      "primary_ip": ninfo.primary_ip,
4702
      "secondary_ip": ninfo.secondary_ip,
4703
      }
4704
    node_results[nname] = pnr
4705
  data["nodes"] = node_results
4706

    
4707
  # instance data
4708
  instance_data = {}
4709
  i_list = cfg.GetInstanceList()
4710
  for iname in i_list:
4711
    iinfo = cfg.GetInstanceInfo(iname)
4712
    nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4713
                for n in iinfo.nics]
4714
    pir = {
4715
      "tags": list(iinfo.GetTags()),
4716
      "should_run": iinfo.status == "up",
4717
      "vcpus": iinfo.vcpus,
4718
      "memory": iinfo.memory,
4719
      "os": iinfo.os,
4720
      "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4721
      "nics": nic_data,
4722
      "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4723
      "disk_template": iinfo.disk_template,
4724
      }
4725
    instance_data[iname] = pir
4726

    
4727
  data["instances"] = instance_data
4728

    
4729
  return data
4730

    
4731

    
4732
def _IAllocatorAddNewInstance(data, op):
4733
  """Add new instance data to allocator structure.
4734

4735
  This in combination with _AllocatorGetClusterData will create the
4736
  correct structure needed as input for the allocator.
4737

4738
  The checks for the completeness of the opcode must have already been
4739
  done.
4740

4741
  """
4742
  if len(op.disks) != 2:
4743
    raise errors.OpExecError("Only two-disk configurations supported")
4744

    
4745
  disk_space = _ComputeDiskSize(op.disk_template,
4746
                                op.disks[0]["size"], op.disks[1]["size"])
4747

    
4748
  request = {
4749
    "type": "allocate",
4750
    "name": op.name,
4751
    "disk_template": op.disk_template,
4752
    "tags": op.tags,
4753
    "os": op.os,
4754
    "vcpus": op.vcpus,
4755
    "memory": op.mem_size,
4756
    "disks": op.disks,
4757
    "disk_space_total": disk_space,
4758
    "nics": op.nics,
4759
    }
4760
  data["request"] = request
4761

    
4762

    
4763
def _IAllocatorAddRelocateInstance(data, op):
4764
  """Add relocate instance data to allocator structure.
4765

4766
  This in combination with _IAllocatorGetClusterData will create the
4767
  correct structure needed as input for the allocator.
4768

4769
  The checks for the completeness of the opcode must have already been
4770
  done.
4771

4772
  """
4773
  request = {
4774
    "type": "replace_secondary",
4775
    "name": op.name,
4776
    }
4777
  data["request"] = request
4778

    
4779

    
4780
def _IAllocatorRun(name, data):
4781
  """Run an instance allocator and return the results.
4782

4783
  """
4784
  alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4785
                                os.path.isfile)
4786
  if alloc_script is None:
4787
    raise errors.OpExecError("Can't find allocator")
4788

    
4789
  fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4790
  try:
4791
    os.write(fd, data)
4792
    os.close(fd)
4793
    result = utils.RunCmd([alloc_script, fin_name])
4794
    if result.failed:
4795
      raise errors.OpExecError("Instance allocator call failed: %s,"
4796
                               " output: %s" %
4797
                               (result.fail_reason, result.stdout))
4798
  finally:
4799
    os.unlink(fin_name)
4800
  return result.stdout
4801

    
4802

    
4803
class LUTestAllocator(NoHooksLU):
4804
  """Run allocator tests.
4805

4806
  This LU runs the allocator tests
4807

4808
  """
4809
  _OP_REQP = ["direction", "mode", "name"]
4810

    
4811
  def CheckPrereq(self):
4812
    """Check prerequisites.
4813

4814
    This checks the opcode parameters depending on the director and mode test.
4815

4816
    """
4817
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4818
      for attr in ["name", "mem_size", "disks", "disk_template",
4819
                   "os", "tags", "nics", "vcpus"]:
4820
        if not hasattr(self.op, attr):
4821
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4822
                                     attr)
4823
      iname = self.cfg.ExpandInstanceName(self.op.name)
4824
      if iname is not None:
4825
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4826
                                   iname)
4827
      if not isinstance(self.op.nics, list):
4828
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4829
      for row in self.op.nics:
4830
        if (not isinstance(row, dict) or
4831
            "mac" not in row or
4832
            "ip" not in row or
4833
            "bridge" not in row):
4834
          raise errors.OpPrereqError("Invalid contents of the"
4835
                                     " 'nics' parameter")
4836
      if not isinstance(self.op.disks, list):
4837
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4838
      if len(self.op.disks) != 2:
4839
        raise errors.OpPrereqError("Only two-disk configurations supported")
4840
      for row in self.op.disks:
4841
        if (not isinstance(row, dict) or
4842
            "size" not in row or
4843
            not isinstance(row["size"], int) or
4844
            "mode" not in row or
4845
            row["mode"] not in ['r', 'w']):
4846
          raise errors.OpPrereqError("Invalid contents of the"
4847
                                     " 'disks' parameter")
4848
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4849
      if not hasattr(self.op, "name"):
4850
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4851
      fname = self.cfg.ExpandInstanceName(self.op.name)
4852
      if fname is None:
4853
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4854
                                   self.op.name)
4855
      self.op.name = fname
4856
    else:
4857
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4858
                                 self.op.mode)
4859

    
4860
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4861
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4862
        raise errors.OpPrereqError("Missing allocator name")
4863
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4864
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4865
                                 self.op.direction)
4866

    
4867
  def Exec(self, feedback_fn):
4868
    """Run the allocator test.
4869

4870
    """
4871
    data = _IAllocatorGetClusterData(self.cfg, self.sstore)
4872
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4873
      _IAllocatorAddNewInstance(data, self.op)
4874
    else:
4875
      _IAllocatorAddRelocateInstance(data, self.op)
4876

    
4877
    if _JSON_INDENT is None:
4878
      text = simplejson.dumps(data)
4879
    else:
4880
      text = simplejson.dumps(data, indent=_JSON_INDENT)
4881
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4882
      result = text
4883
    else:
4884
      result = _IAllocatorRun(self.op.allocator, text)
4885
    return result