Add option for the number of VCPUs in instance listing
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275   else:
276     nic_count = 0
277
278   env["INSTANCE_NIC_COUNT"] = nic_count
279
280   return env
281
282
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284   """Builds instance related env variables for hooks from an object.
285
286   Args:
287     instance: objects.Instance object of instance
288     override: dict of values to override
289   """
290   args = {
291     'name': instance.name,
292     'primary_node': instance.primary_node,
293     'secondary_nodes': instance.secondary_nodes,
294     'os_type': instance.os,
295     'status': instance.os,
296     'memory': instance.memory,
297     'vcpus': instance.vcpus,
298     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299   }
300   if override:
301     args.update(override)
302   return _BuildInstanceHookEnv(**args)
303
304
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306   """Ensure a node has a correct known_hosts entry.
307
308   Args:
309     fullnode - Fully qualified domain name of host. (str)
310     ip       - IPv4 address of host (str)
311     pubkey   - the public key of the cluster
312
313   """
314   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316   else:
317     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318
319   inthere = False
320
321   save_lines = []
322   add_lines = []
323   removed = False
324
325   for rawline in f:
326     logger.Debug('read %s' % (repr(rawline),))
327
328     parts = rawline.rstrip('\r\n').split()
329
330     # Ignore unwanted lines
331     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332       fields = parts[0].split(',')
333       key = parts[2]
334
335       haveall = True
336       havesome = False
337       for spec in [ ip, fullnode ]:
338         if spec not in fields:
339           haveall = False
340         if spec in fields:
341           havesome = True
342
343       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344       if haveall and key == pubkey:
345         inthere = True
346         save_lines.append(rawline)
347         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348         continue
349
350       if havesome and (not haveall or key != pubkey):
351         removed = True
352         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353         continue
354
355     save_lines.append(rawline)
356
357   if not inthere:
358     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360
361   if removed:
362     save_lines = save_lines + add_lines
363
364     # Write a new file and replace old.
365     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366                                    constants.DATA_DIR)
367     newfile = os.fdopen(fd, 'w')
368     try:
369       newfile.write(''.join(save_lines))
370     finally:
371       newfile.close()
372     logger.Debug("Wrote new known_hosts.")
373     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374
375   elif add_lines:
376     # Simply appending a new line will do the trick.
377     f.seek(0, 2)
378     for add in add_lines:
379       f.write(add)
380
381   f.close()
382
383
384 def _HasValidVG(vglist, vgname):
385   """Checks if the volume group list is valid.
386
387   A non-None return value means there's an error, and the return value
388   is the error message.
389
390   """
391   vgsize = vglist.get(vgname, None)
392   if vgsize is None:
393     return "volume group '%s' missing" % vgname
394   elif vgsize < 20480:
395     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396             (vgname, vgsize))
397   return None
398
399
400 def _InitSSHSetup(node):
401   """Setup the SSH configuration for the cluster.
402
403
404   This generates a dsa keypair for root, adds the pub key to the
405   permitted hosts and adds the hostkey to its own known hosts.
406
407   Args:
408     node: the name of this host as a fqdn
409
410   """
411   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412
413   for name in priv_key, pub_key:
414     if os.path.exists(name):
415       utils.CreateBackup(name)
416     utils.RemoveFile(name)
417
418   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419                          "-f", priv_key,
420                          "-q", "-N", ""])
421   if result.failed:
422     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423                              result.output)
424
425   f = open(pub_key, 'r')
426   try:
427     utils.AddAuthorizedKey(auth_keys, f.read(8192))
428   finally:
429     f.close()
430
431
432 def _InitGanetiServerSetup(ss):
433   """Setup the necessary configuration for the initial node daemon.
434
435   This creates the nodepass file containing the shared password for
436   the cluster and also generates the SSL certificate.
437
438   """
439   # Create pseudo random password
440   randpass = sha.new(os.urandom(64)).hexdigest()
441   # and write it into sstore
442   ss.SetKey(ss.SS_NODED_PASS, randpass)
443
444   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445                          "-days", str(365*5), "-nodes", "-x509",
446                          "-keyout", constants.SSL_CERT_FILE,
447                          "-out", constants.SSL_CERT_FILE, "-batch"])
448   if result.failed:
449     raise errors.OpExecError("could not generate server ssl cert, command"
450                              " %s had exitcode %s and error message %s" %
451                              (result.cmd, result.exit_code, result.output))
452
453   os.chmod(constants.SSL_CERT_FILE, 0400)
454
455   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456
457   if result.failed:
458     raise errors.OpExecError("Could not start the node daemon, command %s"
459                              " had exitcode %s and error %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462
463 def _CheckInstanceBridgesExist(instance):
464   """Check that the brigdes needed by an instance exist.
465
466   """
467   # check bridges existance
468   brlist = [nic.bridge for nic in instance.nics]
469   if not rpc.call_bridges_exist(instance.primary_node, brlist):
470     raise errors.OpPrereqError("one or more target bridges %s does not"
471                                " exist on destination node '%s'" %
472                                (brlist, instance.primary_node))
473
474
475 class LUInitCluster(LogicalUnit):
476   """Initialise the cluster.
477
478   """
479   HPATH = "cluster-init"
480   HTYPE = constants.HTYPE_CLUSTER
481   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482               "def_bridge", "master_netdev"]
483   REQ_CLUSTER = False
484
485   def BuildHooksEnv(self):
486     """Build hooks env.
487
488     Notes: Since we don't require a cluster, we must manually add
489     ourselves in the post-run node list.
490
491     """
492     env = {"OP_TARGET": self.op.cluster_name}
493     return env, [], [self.hostname.name]
494
495   def CheckPrereq(self):
496     """Verify that the passed name is a valid one.
497
498     """
499     if config.ConfigWriter.IsCluster():
500       raise errors.OpPrereqError("Cluster is already initialised")
501
502     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503       if not os.path.exists(constants.VNC_PASSWORD_FILE):
504         raise errors.OpPrereqError("Please prepare the cluster VNC"
505                                    "password file %s" %
506                                    constants.VNC_PASSWORD_FILE)
507
508     self.hostname = hostname = utils.HostInfo()
509
510     if hostname.ip.startswith("127."):
511       raise errors.OpPrereqError("This host's IP resolves to the private"
512                                  " range (%s). Please fix DNS or /etc/hosts." %
513                                  (hostname.ip,))
514
515     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
516
517     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518                          constants.DEFAULT_NODED_PORT):
519       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520                                  " to %s,\nbut this ip address does not"
521                                  " belong to this host."
522                                  " Aborting." % hostname.ip)
523
524     secondary_ip = getattr(self.op, "secondary_ip", None)
525     if secondary_ip and not utils.IsValidIP(secondary_ip):
526       raise errors.OpPrereqError("Invalid secondary ip given")
527     if (secondary_ip and
528         secondary_ip != hostname.ip and
529         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530                            constants.DEFAULT_NODED_PORT))):
531       raise errors.OpPrereqError("You gave %s as secondary IP,"
532                                  " but it does not belong to this host." %
533                                  secondary_ip)
534     self.secondary_ip = secondary_ip
535
536     # checks presence of the volume group given
537     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
538
539     if vgstatus:
540       raise errors.OpPrereqError("Error: %s" % vgstatus)
541
542     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
543                     self.op.mac_prefix):
544       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
545                                  self.op.mac_prefix)
546
547     if self.op.hypervisor_type not in constants.HYPER_TYPES:
548       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549                                  self.op.hypervisor_type)
550
551     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
552     if result.failed:
553       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554                                  (self.op.master_netdev,
555                                   result.output.strip()))
556
557     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559       raise errors.OpPrereqError("Init.d script '%s' missing or not"
560                                  " executable." % constants.NODE_INITD_SCRIPT)
561
562   def Exec(self, feedback_fn):
563     """Initialize the cluster.
564
565     """
566     clustername = self.clustername
567     hostname = self.hostname
568
569     # set up the simple store
570     self.sstore = ss = ssconf.SimpleStore()
571     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
576
577     # set up the inter-node password and certificate
578     _InitGanetiServerSetup(ss)
579
580     # start the master ip
581     rpc.call_node_start_master(hostname.name)
582
583     # set up ssh config and /etc/hosts
584     f = open(constants.SSH_HOST_RSA_PUB, 'r')
585     try:
586       sshline = f.read()
587     finally:
588       f.close()
589     sshkey = sshline.split(" ")[1]
590
591     _AddHostToEtcHosts(hostname.name)
592
593     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
594
595     _InitSSHSetup(hostname.name)
596
597     # init of cluster config file
598     self.cfg = cfgw = config.ConfigWriter()
599     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600                     sshkey, self.op.mac_prefix,
601                     self.op.vg_name, self.op.def_bridge)
602
603
604 class LUDestroyCluster(NoHooksLU):
605   """Logical unit for destroying the cluster.
606
607   """
608   _OP_REQP = []
609
610   def CheckPrereq(self):
611     """Check prerequisites.
612
613     This checks whether the cluster is empty.
614
615     Any errors are signalled by raising errors.OpPrereqError.
616
617     """
618     master = self.sstore.GetMasterNode()
619
620     nodelist = self.cfg.GetNodeList()
621     if len(nodelist) != 1 or nodelist[0] != master:
622       raise errors.OpPrereqError("There are still %d node(s) in"
623                                  " this cluster." % (len(nodelist) - 1))
624     instancelist = self.cfg.GetInstanceList()
625     if instancelist:
626       raise errors.OpPrereqError("There are still %d instance(s) in"
627                                  " this cluster." % len(instancelist))
628
629   def Exec(self, feedback_fn):
630     """Destroys the cluster.
631
632     """
633     master = self.sstore.GetMasterNode()
634     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635     utils.CreateBackup(priv_key)
636     utils.CreateBackup(pub_key)
637     rpc.call_node_leave_cluster(master)
638
639
640 class LUVerifyCluster(NoHooksLU):
641   """Verifies the cluster status.
642
643   """
644   _OP_REQP = []
645
646   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647                   remote_version, feedback_fn):
648     """Run multiple tests against a node.
649
650     Test list:
651       - compares ganeti version
652       - checks vg existance and size > 20G
653       - checks config file checksum
654       - checks ssh to other nodes
655
656     Args:
657       node: name of the node to check
658       file_list: required list of files
659       local_cksum: dictionary of local files and their checksums
660
661     """
662     # compares ganeti version
663     local_version = constants.PROTOCOL_VERSION
664     if not remote_version:
665       feedback_fn(" - ERROR: connection to %s failed" % (node))
666       return True
667
668     if local_version != remote_version:
669       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
670                       (local_version, node, remote_version))
671       return True
672
673     # checks vg existance and size > 20G
674
675     bad = False
676     if not vglist:
677       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
678                       (node,))
679       bad = True
680     else:
681       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
682       if vgstatus:
683         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
684         bad = True
685
686     # checks config file checksum
687     # checks ssh to any
688
689     if 'filelist' not in node_result:
690       bad = True
691       feedback_fn("  - ERROR: node hasn't returned file checksum data")
692     else:
693       remote_cksum = node_result['filelist']
694       for file_name in file_list:
695         if file_name not in remote_cksum:
696           bad = True
697           feedback_fn("  - ERROR: file '%s' missing" % file_name)
698         elif remote_cksum[file_name] != local_cksum[file_name]:
699           bad = True
700           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
701
702     if 'nodelist' not in node_result:
703       bad = True
704       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
705     else:
706       if node_result['nodelist']:
707         bad = True
708         for node in node_result['nodelist']:
709           feedback_fn("  - ERROR: communication with node '%s': %s" %
710                           (node, node_result['nodelist'][node]))
711     hyp_result = node_result.get('hypervisor', None)
712     if hyp_result is not None:
713       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
714     return bad
715
716   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717     """Verify an instance.
718
719     This function checks to see if the required block devices are
720     available on the instance's node.
721
722     """
723     bad = False
724
725     instancelist = self.cfg.GetInstanceList()
726     if not instance in instancelist:
727       feedback_fn("  - ERROR: instance %s not in instance list %s" %
728                       (instance, instancelist))
729       bad = True
730
731     instanceconfig = self.cfg.GetInstanceInfo(instance)
732     node_current = instanceconfig.primary_node
733
734     node_vol_should = {}
735     instanceconfig.MapLVsByNode(node_vol_should)
736
737     for node in node_vol_should:
738       for volume in node_vol_should[node]:
739         if node not in node_vol_is or volume not in node_vol_is[node]:
740           feedback_fn("  - ERROR: volume %s missing on node %s" %
741                           (volume, node))
742           bad = True
743
744     if not instanceconfig.status == 'down':
745       if not instance in node_instance[node_current]:
746         feedback_fn("  - ERROR: instance %s not running on node %s" %
747                         (instance, node_current))
748         bad = True
749
750     for node in node_instance:
751       if (not node == node_current):
752         if instance in node_instance[node]:
753           feedback_fn("  - ERROR: instance %s should not run on node %s" %
754                           (instance, node))
755           bad = True
756
757     return bad
758
759   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760     """Verify if there are any unknown volumes in the cluster.
761
762     The .os, .swap and backup volumes are ignored. All other volumes are
763     reported as unknown.
764
765     """
766     bad = False
767
768     for node in node_vol_is:
769       for volume in node_vol_is[node]:
770         if node not in node_vol_should or volume not in node_vol_should[node]:
771           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
772                       (volume, node))
773           bad = True
774     return bad
775
776   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777     """Verify the list of running instances.
778
779     This checks what instances are running but unknown to the cluster.
780
781     """
782     bad = False
783     for node in node_instance:
784       for runninginstance in node_instance[node]:
785         if runninginstance not in instancelist:
786           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
787                           (runninginstance, node))
788           bad = True
789     return bad
790
791   def CheckPrereq(self):
792     """Check prerequisites.
793
794     This has no prerequisites.
795
796     """
797     pass
798
799   def Exec(self, feedback_fn):
800     """Verify integrity of cluster, performing various test on nodes.
801
802     """
803     bad = False
804     feedback_fn("* Verifying global settings")
805     for msg in self.cfg.VerifyConfig():
806       feedback_fn("  - ERROR: %s" % msg)
807
808     vg_name = self.cfg.GetVGName()
809     nodelist = utils.NiceSort(self.cfg.GetNodeList())
810     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
811     node_volume = {}
812     node_instance = {}
813
814     # FIXME: verify OS list
815     # do local checksums
816     file_names = list(self.sstore.GetFileList())
817     file_names.append(constants.SSL_CERT_FILE)
818     file_names.append(constants.CLUSTER_CONF_FILE)
819     local_checksums = utils.FingerprintFiles(file_names)
820
821     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823     all_instanceinfo = rpc.call_instance_list(nodelist)
824     all_vglist = rpc.call_vg_list(nodelist)
825     node_verify_param = {
826       'filelist': file_names,
827       'nodelist': nodelist,
828       'hypervisor': None,
829       }
830     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831     all_rversion = rpc.call_version(nodelist)
832
833     for node in nodelist:
834       feedback_fn("* Verifying node %s" % node)
835       result = self._VerifyNode(node, file_names, local_checksums,
836                                 all_vglist[node], all_nvinfo[node],
837                                 all_rversion[node], feedback_fn)
838       bad = bad or result
839
840       # node_volume
841       volumeinfo = all_volumeinfo[node]
842
843       if isinstance(volumeinfo, basestring):
844         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
845                     (node, volumeinfo[-400:].encode('string_escape')))
846         bad = True
847         node_volume[node] = {}
848       elif not isinstance(volumeinfo, dict):
849         feedback_fn("  - ERROR: connection to %s failed" % (node,))
850         bad = True
851         continue
852       else:
853         node_volume[node] = volumeinfo
854
855       # node_instance
856       nodeinstance = all_instanceinfo[node]
857       if type(nodeinstance) != list:
858         feedback_fn("  - ERROR: connection to %s failed" % (node,))
859         bad = True
860         continue
861
862       node_instance[node] = nodeinstance
863
864     node_vol_should = {}
865
866     for instance in instancelist:
867       feedback_fn("* Verifying instance %s" % instance)
868       result =  self._VerifyInstance(instance, node_volume, node_instance,
869                                      feedback_fn)
870       bad = bad or result
871
872       inst_config = self.cfg.GetInstanceInfo(instance)
873
874       inst_config.MapLVsByNode(node_vol_should)
875
876     feedback_fn("* Verifying orphan volumes")
877     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
878                                        feedback_fn)
879     bad = bad or result
880
881     feedback_fn("* Verifying remaining instances")
882     result = self._VerifyOrphanInstances(instancelist, node_instance,
883                                          feedback_fn)
884     bad = bad or result
885
886     return int(bad)
887
888
889 class LUVerifyDisks(NoHooksLU):
890   """Verifies the cluster disks status.
891
892   """
893   _OP_REQP = []
894
895   def CheckPrereq(self):
896     """Check prerequisites.
897
898     This has no prerequisites.
899
900     """
901     pass
902
903   def Exec(self, feedback_fn):
904     """Verify integrity of cluster disks.
905
906     """
907     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
908
909     vg_name = self.cfg.GetVGName()
910     nodes = utils.NiceSort(self.cfg.GetNodeList())
911     instances = [self.cfg.GetInstanceInfo(name)
912                  for name in self.cfg.GetInstanceList()]
913
914     nv_dict = {}
915     for inst in instances:
916       inst_lvs = {}
917       if (inst.status != "up" or
918           inst.disk_template not in constants.DTS_NET_MIRROR):
919         continue
920       inst.MapLVsByNode(inst_lvs)
921       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
922       for node, vol_list in inst_lvs.iteritems():
923         for vol in vol_list:
924           nv_dict[(node, vol)] = inst
925
926     if not nv_dict:
927       return result
928
929     node_lvs = rpc.call_volume_list(nodes, vg_name)
930
931     to_act = set()
932     for node in nodes:
933       # node_volume
934       lvs = node_lvs[node]
935
936       if isinstance(lvs, basestring):
937         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
938         res_nlvm[node] = lvs
939       elif not isinstance(lvs, dict):
940         logger.Info("connection to node %s failed or invalid data returned" %
941                     (node,))
942         res_nodes.append(node)
943         continue
944
945       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
946         inst = nv_dict.pop((node, lv_name), None)
947         if (not lv_online and inst is not None
948             and inst.name not in res_instances):
949             res_instances.append(inst.name)
950
951     # any leftover items in nv_dict are missing LVs, let's arrange the
952     # data better
953     for key, inst in nv_dict.iteritems():
954       if inst.name not in res_missing:
955         res_missing[inst.name] = []
956       res_missing[inst.name].append(key)
957
958     return result
959
960
961 class LURenameCluster(LogicalUnit):
962   """Rename the cluster.
963
964   """
965   HPATH = "cluster-rename"
966   HTYPE = constants.HTYPE_CLUSTER
967   _OP_REQP = ["name"]
968
969   def BuildHooksEnv(self):
970     """Build hooks env.
971
972     """
973     env = {
974       "OP_TARGET": self.op.sstore.GetClusterName(),
975       "NEW_NAME": self.op.name,
976       }
977     mn = self.sstore.GetMasterNode()
978     return env, [mn], [mn]
979
980   def CheckPrereq(self):
981     """Verify that the passed name is a valid one.
982
983     """
984     hostname = utils.HostInfo(self.op.name)
985
986     new_name = hostname.name
987     self.ip = new_ip = hostname.ip
988     old_name = self.sstore.GetClusterName()
989     old_ip = self.sstore.GetMasterIP()
990     if new_name == old_name and new_ip == old_ip:
991       raise errors.OpPrereqError("Neither the name nor the IP address of the"
992                                  " cluster has changed")
993     if new_ip != old_ip:
994       result = utils.RunCmd(["fping", "-q", new_ip])
995       if not result.failed:
996         raise errors.OpPrereqError("The given cluster IP address (%s) is"
997                                    " reachable on the network. Aborting." %
998                                    new_ip)
999
1000     self.op.name = new_name
1001
1002   def Exec(self, feedback_fn):
1003     """Rename the cluster.
1004
1005     """
1006     clustername = self.op.name
1007     ip = self.ip
1008     ss = self.sstore
1009
1010     # shutdown the master IP
1011     master = ss.GetMasterNode()
1012     if not rpc.call_node_stop_master(master):
1013       raise errors.OpExecError("Could not disable the master role")
1014
1015     try:
1016       # modify the sstore
1017       ss.SetKey(ss.SS_MASTER_IP, ip)
1018       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1019
1020       # Distribute updated ss config to all nodes
1021       myself = self.cfg.GetNodeInfo(master)
1022       dist_nodes = self.cfg.GetNodeList()
1023       if myself.name in dist_nodes:
1024         dist_nodes.remove(myself.name)
1025
1026       logger.Debug("Copying updated ssconf data to all nodes")
1027       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1028         fname = ss.KeyToFilename(keyname)
1029         result = rpc.call_upload_file(dist_nodes, fname)
1030         for to_node in dist_nodes:
1031           if not result[to_node]:
1032             logger.Error("copy of file %s to node %s failed" %
1033                          (fname, to_node))
1034     finally:
1035       if not rpc.call_node_start_master(master):
1036         logger.Error("Could not re-enable the master role on the master,"
1037                      " please restart manually.")
1038
1039
1040 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1041   """Sleep and poll for an instance's disk to sync.
1042
1043   """
1044   if not instance.disks:
1045     return True
1046
1047   if not oneshot:
1048     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1049
1050   node = instance.primary_node
1051
1052   for dev in instance.disks:
1053     cfgw.SetDiskID(dev, node)
1054
1055   retries = 0
1056   while True:
1057     max_time = 0
1058     done = True
1059     cumul_degraded = False
1060     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1061     if not rstats:
1062       proc.LogWarning("Can't get any data from node %s" % node)
1063       retries += 1
1064       if retries >= 10:
1065         raise errors.RemoteError("Can't contact node %s for mirror data,"
1066                                  " aborting." % node)
1067       time.sleep(6)
1068       continue
1069     retries = 0
1070     for i in range(len(rstats)):
1071       mstat = rstats[i]
1072       if mstat is None:
1073         proc.LogWarning("Can't compute data for node %s/%s" %
1074                         (node, instance.disks[i].iv_name))
1075         continue
1076       # we ignore the ldisk parameter
1077       perc_done, est_time, is_degraded, _ = mstat
1078       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1079       if perc_done is not None:
1080         done = False
1081         if est_time is not None:
1082           rem_time = "%d estimated seconds remaining" % est_time
1083           max_time = est_time
1084         else:
1085           rem_time = "no time estimate"
1086         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1087                      (instance.disks[i].iv_name, perc_done, rem_time))
1088     if done or oneshot:
1089       break
1090
1091     if unlock:
1092       utils.Unlock('cmd')
1093     try:
1094       time.sleep(min(60, max_time))
1095     finally:
1096       if unlock:
1097         utils.Lock('cmd')
1098
1099   if done:
1100     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1101   return not cumul_degraded
1102
1103
1104 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1105   """Check that mirrors are not degraded.
1106
1107   The ldisk parameter, if True, will change the test from the
1108   is_degraded attribute (which represents overall non-ok status for
1109   the device(s)) to the ldisk (representing the local storage status).
1110
1111   """
1112   cfgw.SetDiskID(dev, node)
1113   if ldisk:
1114     idx = 6
1115   else:
1116     idx = 5
1117
1118   result = True
1119   if on_primary or dev.AssembleOnSecondary():
1120     rstats = rpc.call_blockdev_find(node, dev)
1121     if not rstats:
1122       logger.ToStderr("Can't get any data from node %s" % node)
1123       result = False
1124     else:
1125       result = result and (not rstats[idx])
1126   if dev.children:
1127     for child in dev.children:
1128       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1129
1130   return result
1131
1132
1133 class LUDiagnoseOS(NoHooksLU):
1134   """Logical unit for OS diagnose/query.
1135
1136   """
1137   _OP_REQP = []
1138
1139   def CheckPrereq(self):
1140     """Check prerequisites.
1141
1142     This always succeeds, since this is a pure query LU.
1143
1144     """
1145     return
1146
1147   def Exec(self, feedback_fn):
1148     """Compute the list of OSes.
1149
1150     """
1151     node_list = self.cfg.GetNodeList()
1152     node_data = rpc.call_os_diagnose(node_list)
1153     if node_data == False:
1154       raise errors.OpExecError("Can't gather the list of OSes")
1155     return node_data
1156
1157
1158 class LURemoveNode(LogicalUnit):
1159   """Logical unit for removing a node.
1160
1161   """
1162   HPATH = "node-remove"
1163   HTYPE = constants.HTYPE_NODE
1164   _OP_REQP = ["node_name"]
1165
1166   def BuildHooksEnv(self):
1167     """Build hooks env.
1168
1169     This doesn't run on the target node in the pre phase as a failed
1170     node would not allows itself to run.
1171
1172     """
1173     env = {
1174       "OP_TARGET": self.op.node_name,
1175       "NODE_NAME": self.op.node_name,
1176       }
1177     all_nodes = self.cfg.GetNodeList()
1178     all_nodes.remove(self.op.node_name)
1179     return env, all_nodes, all_nodes
1180
1181   def CheckPrereq(self):
1182     """Check prerequisites.
1183
1184     This checks:
1185      - the node exists in the configuration
1186      - it does not have primary or secondary instances
1187      - it's not the master
1188
1189     Any errors are signalled by raising errors.OpPrereqError.
1190
1191     """
1192     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1193     if node is None:
1194       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1195
1196     instance_list = self.cfg.GetInstanceList()
1197
1198     masternode = self.sstore.GetMasterNode()
1199     if node.name == masternode:
1200       raise errors.OpPrereqError("Node is the master node,"
1201                                  " you need to failover first.")
1202
1203     for instance_name in instance_list:
1204       instance = self.cfg.GetInstanceInfo(instance_name)
1205       if node.name == instance.primary_node:
1206         raise errors.OpPrereqError("Instance %s still running on the node,"
1207                                    " please remove first." % instance_name)
1208       if node.name in instance.secondary_nodes:
1209         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1210                                    " please remove first." % instance_name)
1211     self.op.node_name = node.name
1212     self.node = node
1213
1214   def Exec(self, feedback_fn):
1215     """Removes the node from the cluster.
1216
1217     """
1218     node = self.node
1219     logger.Info("stopping the node daemon and removing configs from node %s" %
1220                 node.name)
1221
1222     rpc.call_node_leave_cluster(node.name)
1223
1224     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1225
1226     logger.Info("Removing node %s from config" % node.name)
1227
1228     self.cfg.RemoveNode(node.name)
1229
1230     _RemoveHostFromEtcHosts(node.name)
1231
1232
1233 class LUQueryNodes(NoHooksLU):
1234   """Logical unit for querying nodes.
1235
1236   """
1237   _OP_REQP = ["output_fields", "names"]
1238
1239   def CheckPrereq(self):
1240     """Check prerequisites.
1241
1242     This checks that the fields required are valid output fields.
1243
1244     """
1245     self.dynamic_fields = frozenset(["dtotal", "dfree",
1246                                      "mtotal", "mnode", "mfree",
1247                                      "bootid"])
1248
1249     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1250                                "pinst_list", "sinst_list",
1251                                "pip", "sip"],
1252                        dynamic=self.dynamic_fields,
1253                        selected=self.op.output_fields)
1254
1255     self.wanted = _GetWantedNodes(self, self.op.names)
1256
1257   def Exec(self, feedback_fn):
1258     """Computes the list of nodes and their attributes.
1259
1260     """
1261     nodenames = self.wanted
1262     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1263
1264     # begin data gathering
1265
1266     if self.dynamic_fields.intersection(self.op.output_fields):
1267       live_data = {}
1268       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1269       for name in nodenames:
1270         nodeinfo = node_data.get(name, None)
1271         if nodeinfo:
1272           live_data[name] = {
1273             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1274             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1275             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1276             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1277             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1278             "bootid": nodeinfo['bootid'],
1279             }
1280         else:
1281           live_data[name] = {}
1282     else:
1283       live_data = dict.fromkeys(nodenames, {})
1284
1285     node_to_primary = dict([(name, set()) for name in nodenames])
1286     node_to_secondary = dict([(name, set()) for name in nodenames])
1287
1288     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1289                              "sinst_cnt", "sinst_list"))
1290     if inst_fields & frozenset(self.op.output_fields):
1291       instancelist = self.cfg.GetInstanceList()
1292
1293       for instance_name in instancelist:
1294         inst = self.cfg.GetInstanceInfo(instance_name)
1295         if inst.primary_node in node_to_primary:
1296           node_to_primary[inst.primary_node].add(inst.name)
1297         for secnode in inst.secondary_nodes:
1298           if secnode in node_to_secondary:
1299             node_to_secondary[secnode].add(inst.name)
1300
1301     # end data gathering
1302
1303     output = []
1304     for node in nodelist:
1305       node_output = []
1306       for field in self.op.output_fields:
1307         if field == "name":
1308           val = node.name
1309         elif field == "pinst_list":
1310           val = list(node_to_primary[node.name])
1311         elif field == "sinst_list":
1312           val = list(node_to_secondary[node.name])
1313         elif field == "pinst_cnt":
1314           val = len(node_to_primary[node.name])
1315         elif field == "sinst_cnt":
1316           val = len(node_to_secondary[node.name])
1317         elif field == "pip":
1318           val = node.primary_ip
1319         elif field == "sip":
1320           val = node.secondary_ip
1321         elif field in self.dynamic_fields:
1322           val = live_data[node.name].get(field, None)
1323         else:
1324           raise errors.ParameterError(field)
1325         node_output.append(val)
1326       output.append(node_output)
1327
1328     return output
1329
1330
1331 class LUQueryNodeVolumes(NoHooksLU):
1332   """Logical unit for getting volumes on node(s).
1333
1334   """
1335   _OP_REQP = ["nodes", "output_fields"]
1336
1337   def CheckPrereq(self):
1338     """Check prerequisites.
1339
1340     This checks that the fields required are valid output fields.
1341
1342     """
1343     self.nodes = _GetWantedNodes(self, self.op.nodes)
1344
1345     _CheckOutputFields(static=["node"],
1346                        dynamic=["phys", "vg", "name", "size", "instance"],
1347                        selected=self.op.output_fields)
1348
1349
1350   def Exec(self, feedback_fn):
1351     """Computes the list of nodes and their attributes.
1352
1353     """
1354     nodenames = self.nodes
1355     volumes = rpc.call_node_volumes(nodenames)
1356
1357     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1358              in self.cfg.GetInstanceList()]
1359
1360     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1361
1362     output = []
1363     for node in nodenames:
1364       if node not in volumes or not volumes[node]:
1365         continue
1366
1367       node_vols = volumes[node][:]
1368       node_vols.sort(key=lambda vol: vol['dev'])
1369
1370       for vol in node_vols:
1371         node_output = []
1372         for field in self.op.output_fields:
1373           if field == "node":
1374             val = node
1375           elif field == "phys":
1376             val = vol['dev']
1377           elif field == "vg":
1378             val = vol['vg']
1379           elif field == "name":
1380             val = vol['name']
1381           elif field == "size":
1382             val = int(float(vol['size']))
1383           elif field == "instance":
1384             for inst in ilist:
1385               if node not in lv_by_node[inst]:
1386                 continue
1387               if vol['name'] in lv_by_node[inst][node]:
1388                 val = inst.name
1389                 break
1390             else:
1391               val = '-'
1392           else:
1393             raise errors.ParameterError(field)
1394           node_output.append(str(val))
1395
1396         output.append(node_output)
1397
1398     return output
1399
1400
1401 class LUAddNode(LogicalUnit):
1402   """Logical unit for adding node to the cluster.
1403
1404   """
1405   HPATH = "node-add"
1406   HTYPE = constants.HTYPE_NODE
1407   _OP_REQP = ["node_name"]
1408
1409   def BuildHooksEnv(self):
1410     """Build hooks env.
1411
1412     This will run on all nodes before, and on all nodes + the new node after.
1413
1414     """
1415     env = {
1416       "OP_TARGET": self.op.node_name,
1417       "NODE_NAME": self.op.node_name,
1418       "NODE_PIP": self.op.primary_ip,
1419       "NODE_SIP": self.op.secondary_ip,
1420       }
1421     nodes_0 = self.cfg.GetNodeList()
1422     nodes_1 = nodes_0 + [self.op.node_name, ]
1423     return env, nodes_0, nodes_1
1424
1425   def CheckPrereq(self):
1426     """Check prerequisites.
1427
1428     This checks:
1429      - the new node is not already in the config
1430      - it is resolvable
1431      - its parameters (single/dual homed) matches the cluster
1432
1433     Any errors are signalled by raising errors.OpPrereqError.
1434
1435     """
1436     node_name = self.op.node_name
1437     cfg = self.cfg
1438
1439     dns_data = utils.HostInfo(node_name)
1440
1441     node = dns_data.name
1442     primary_ip = self.op.primary_ip = dns_data.ip
1443     secondary_ip = getattr(self.op, "secondary_ip", None)
1444     if secondary_ip is None:
1445       secondary_ip = primary_ip
1446     if not utils.IsValidIP(secondary_ip):
1447       raise errors.OpPrereqError("Invalid secondary IP given")
1448     self.op.secondary_ip = secondary_ip
1449     node_list = cfg.GetNodeList()
1450     if node in node_list:
1451       raise errors.OpPrereqError("Node %s is already in the configuration"
1452                                  % node)
1453
1454     for existing_node_name in node_list:
1455       existing_node = cfg.GetNodeInfo(existing_node_name)
1456       if (existing_node.primary_ip == primary_ip or
1457           existing_node.secondary_ip == primary_ip or
1458           existing_node.primary_ip == secondary_ip or
1459           existing_node.secondary_ip == secondary_ip):
1460         raise errors.OpPrereqError("New node ip address(es) conflict with"
1461                                    " existing node %s" % existing_node.name)
1462
1463     # check that the type of the node (single versus dual homed) is the
1464     # same as for the master
1465     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1466     master_singlehomed = myself.secondary_ip == myself.primary_ip
1467     newbie_singlehomed = secondary_ip == primary_ip
1468     if master_singlehomed != newbie_singlehomed:
1469       if master_singlehomed:
1470         raise errors.OpPrereqError("The master has no private ip but the"
1471                                    " new node has one")
1472       else:
1473         raise errors.OpPrereqError("The master has a private ip but the"
1474                                    " new node doesn't have one")
1475
1476     # checks reachablity
1477     if not utils.TcpPing(utils.HostInfo().name,
1478                          primary_ip,
1479                          constants.DEFAULT_NODED_PORT):
1480       raise errors.OpPrereqError("Node not reachable by ping")
1481
1482     if not newbie_singlehomed:
1483       # check reachability from my secondary ip to newbie's secondary ip
1484       if not utils.TcpPing(myself.secondary_ip,
1485                            secondary_ip,
1486                            constants.DEFAULT_NODED_PORT):
1487         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1488                                    " based ping to noded port")
1489
1490     self.new_node = objects.Node(name=node,
1491                                  primary_ip=primary_ip,
1492                                  secondary_ip=secondary_ip)
1493
1494     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1495       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1496         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1497                                    constants.VNC_PASSWORD_FILE)
1498
1499   def Exec(self, feedback_fn):
1500     """Adds the new node to the cluster.
1501
1502     """
1503     new_node = self.new_node
1504     node = new_node.name
1505
1506     # set up inter-node password and certificate and restarts the node daemon
1507     gntpass = self.sstore.GetNodeDaemonPassword()
1508     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1509       raise errors.OpExecError("ganeti password corruption detected")
1510     f = open(constants.SSL_CERT_FILE)
1511     try:
1512       gntpem = f.read(8192)
1513     finally:
1514       f.close()
1515     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1516     # so we use this to detect an invalid certificate; as long as the
1517     # cert doesn't contain this, the here-document will be correctly
1518     # parsed by the shell sequence below
1519     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1520       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1521     if not gntpem.endswith("\n"):
1522       raise errors.OpExecError("PEM must end with newline")
1523     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1524
1525     # and then connect with ssh to set password and start ganeti-noded
1526     # note that all the below variables are sanitized at this point,
1527     # either by being constants or by the checks above
1528     ss = self.sstore
1529     mycommand = ("umask 077 && "
1530                  "echo '%s' > '%s' && "
1531                  "cat > '%s' << '!EOF.' && \n"
1532                  "%s!EOF.\n%s restart" %
1533                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1534                   constants.SSL_CERT_FILE, gntpem,
1535                   constants.NODE_INITD_SCRIPT))
1536
1537     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1538     if result.failed:
1539       raise errors.OpExecError("Remote command on node %s, error: %s,"
1540                                " output: %s" %
1541                                (node, result.fail_reason, result.output))
1542
1543     # check connectivity
1544     time.sleep(4)
1545
1546     result = rpc.call_version([node])[node]
1547     if result:
1548       if constants.PROTOCOL_VERSION == result:
1549         logger.Info("communication to node %s fine, sw version %s match" %
1550                     (node, result))
1551       else:
1552         raise errors.OpExecError("Version mismatch master version %s,"
1553                                  " node version %s" %
1554                                  (constants.PROTOCOL_VERSION, result))
1555     else:
1556       raise errors.OpExecError("Cannot get version from the new node")
1557
1558     # setup ssh on node
1559     logger.Info("copy ssh key to node %s" % node)
1560     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1561     keyarray = []
1562     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1563                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1564                 priv_key, pub_key]
1565
1566     for i in keyfiles:
1567       f = open(i, 'r')
1568       try:
1569         keyarray.append(f.read())
1570       finally:
1571         f.close()
1572
1573     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1574                                keyarray[3], keyarray[4], keyarray[5])
1575
1576     if not result:
1577       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1578
1579     # Add node to our /etc/hosts, and add key to known_hosts
1580     _AddHostToEtcHosts(new_node.name)
1581
1582     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1583                       self.cfg.GetHostKey())
1584
1585     if new_node.secondary_ip != new_node.primary_ip:
1586       if not rpc.call_node_tcp_ping(new_node.name,
1587                                     constants.LOCALHOST_IP_ADDRESS,
1588                                     new_node.secondary_ip,
1589                                     constants.DEFAULT_NODED_PORT,
1590                                     10, False):
1591         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1592                                  " you gave (%s). Please fix and re-run this"
1593                                  " command." % new_node.secondary_ip)
1594
1595     success, msg = ssh.VerifyNodeHostname(node)
1596     if not success:
1597       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1598                                " than the one the resolver gives: %s."
1599                                " Please fix and re-run this command." %
1600                                (node, msg))
1601
1602     # Distribute updated /etc/hosts and known_hosts to all nodes,
1603     # including the node just added
1604     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1605     dist_nodes = self.cfg.GetNodeList() + [node]
1606     if myself.name in dist_nodes:
1607       dist_nodes.remove(myself.name)
1608
1609     logger.Debug("Copying hosts and known_hosts to all nodes")
1610     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1611       result = rpc.call_upload_file(dist_nodes, fname)
1612       for to_node in dist_nodes:
1613         if not result[to_node]:
1614           logger.Error("copy of file %s to node %s failed" %
1615                        (fname, to_node))
1616
1617     to_copy = ss.GetFileList()
1618     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1619       to_copy.append(constants.VNC_PASSWORD_FILE)
1620     for fname in to_copy:
1621       if not ssh.CopyFileToNode(node, fname):
1622         logger.Error("could not copy file %s to node %s" % (fname, node))
1623
1624     logger.Info("adding node %s to cluster.conf" % node)
1625     self.cfg.AddNode(new_node)
1626
1627
1628 class LUMasterFailover(LogicalUnit):
1629   """Failover the master node to the current node.
1630
1631   This is a special LU in that it must run on a non-master node.
1632
1633   """
1634   HPATH = "master-failover"
1635   HTYPE = constants.HTYPE_CLUSTER
1636   REQ_MASTER = False
1637   _OP_REQP = []
1638
1639   def BuildHooksEnv(self):
1640     """Build hooks env.
1641
1642     This will run on the new master only in the pre phase, and on all
1643     the nodes in the post phase.
1644
1645     """
1646     env = {
1647       "OP_TARGET": self.new_master,
1648       "NEW_MASTER": self.new_master,
1649       "OLD_MASTER": self.old_master,
1650       }
1651     return env, [self.new_master], self.cfg.GetNodeList()
1652
1653   def CheckPrereq(self):
1654     """Check prerequisites.
1655
1656     This checks that we are not already the master.
1657
1658     """
1659     self.new_master = utils.HostInfo().name
1660     self.old_master = self.sstore.GetMasterNode()
1661
1662     if self.old_master == self.new_master:
1663       raise errors.OpPrereqError("This commands must be run on the node"
1664                                  " where you want the new master to be."
1665                                  " %s is already the master" %
1666                                  self.old_master)
1667
1668   def Exec(self, feedback_fn):
1669     """Failover the master node.
1670
1671     This command, when run on a non-master node, will cause the current
1672     master to cease being master, and the non-master to become new
1673     master.
1674
1675     """
1676     #TODO: do not rely on gethostname returning the FQDN
1677     logger.Info("setting master to %s, old master: %s" %
1678                 (self.new_master, self.old_master))
1679
1680     if not rpc.call_node_stop_master(self.old_master):
1681       logger.Error("could disable the master role on the old master"
1682                    " %s, please disable manually" % self.old_master)
1683
1684     ss = self.sstore
1685     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1686     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1687                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1688       logger.Error("could not distribute the new simple store master file"
1689                    " to the other nodes, please check.")
1690
1691     if not rpc.call_node_start_master(self.new_master):
1692       logger.Error("could not start the master role on the new master"
1693                    " %s, please check" % self.new_master)
1694       feedback_fn("Error in activating the master IP on the new master,"
1695                   " please fix manually.")
1696
1697
1698
1699 class LUQueryClusterInfo(NoHooksLU):
1700   """Query cluster configuration.
1701
1702   """
1703   _OP_REQP = []
1704   REQ_MASTER = False
1705
1706   def CheckPrereq(self):
1707     """No prerequsites needed for this LU.
1708
1709     """
1710     pass
1711
1712   def Exec(self, feedback_fn):
1713     """Return cluster config.
1714
1715     """
1716     result = {
1717       "name": self.sstore.GetClusterName(),
1718       "software_version": constants.RELEASE_VERSION,
1719       "protocol_version": constants.PROTOCOL_VERSION,
1720       "config_version": constants.CONFIG_VERSION,
1721       "os_api_version": constants.OS_API_VERSION,
1722       "export_version": constants.EXPORT_VERSION,
1723       "master": self.sstore.GetMasterNode(),
1724       "architecture": (platform.architecture()[0], platform.machine()),
1725       }
1726
1727     return result
1728
1729
1730 class LUClusterCopyFile(NoHooksLU):
1731   """Copy file to cluster.
1732
1733   """
1734   _OP_REQP = ["nodes", "filename"]
1735
1736   def CheckPrereq(self):
1737     """Check prerequisites.
1738
1739     It should check that the named file exists and that the given list
1740     of nodes is valid.
1741
1742     """
1743     if not os.path.exists(self.op.filename):
1744       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1745
1746     self.nodes = _GetWantedNodes(self, self.op.nodes)
1747
1748   def Exec(self, feedback_fn):
1749     """Copy a file from master to some nodes.
1750
1751     Args:
1752       opts - class with options as members
1753       args - list containing a single element, the file name
1754     Opts used:
1755       nodes - list containing the name of target nodes; if empty, all nodes
1756
1757     """
1758     filename = self.op.filename
1759
1760     myname = utils.HostInfo().name
1761
1762     for node in self.nodes:
1763       if node == myname:
1764         continue
1765       if not ssh.CopyFileToNode(node, filename):
1766         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1767
1768
1769 class LUDumpClusterConfig(NoHooksLU):
1770   """Return a text-representation of the cluster-config.
1771
1772   """
1773   _OP_REQP = []
1774
1775   def CheckPrereq(self):
1776     """No prerequisites.
1777
1778     """
1779     pass
1780
1781   def Exec(self, feedback_fn):
1782     """Dump a representation of the cluster config to the standard output.
1783
1784     """
1785     return self.cfg.DumpConfig()
1786
1787
1788 class LURunClusterCommand(NoHooksLU):
1789   """Run a command on some nodes.
1790
1791   """
1792   _OP_REQP = ["command", "nodes"]
1793
1794   def CheckPrereq(self):
1795     """Check prerequisites.
1796
1797     It checks that the given list of nodes is valid.
1798
1799     """
1800     self.nodes = _GetWantedNodes(self, self.op.nodes)
1801
1802   def Exec(self, feedback_fn):
1803     """Run a command on some nodes.
1804
1805     """
1806     data = []
1807     for node in self.nodes:
1808       result = ssh.SSHCall(node, "root", self.op.command)
1809       data.append((node, result.output, result.exit_code))
1810
1811     return data
1812
1813
1814 class LUActivateInstanceDisks(NoHooksLU):
1815   """Bring up an instance's disks.
1816
1817   """
1818   _OP_REQP = ["instance_name"]
1819
1820   def CheckPrereq(self):
1821     """Check prerequisites.
1822
1823     This checks that the instance is in the cluster.
1824
1825     """
1826     instance = self.cfg.GetInstanceInfo(
1827       self.cfg.ExpandInstanceName(self.op.instance_name))
1828     if instance is None:
1829       raise errors.OpPrereqError("Instance '%s' not known" %
1830                                  self.op.instance_name)
1831     self.instance = instance
1832
1833
1834   def Exec(self, feedback_fn):
1835     """Activate the disks.
1836
1837     """
1838     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1839     if not disks_ok:
1840       raise errors.OpExecError("Cannot activate block devices")
1841
1842     return disks_info
1843
1844
1845 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1846   """Prepare the block devices for an instance.
1847
1848   This sets up the block devices on all nodes.
1849
1850   Args:
1851     instance: a ganeti.objects.Instance object
1852     ignore_secondaries: if true, errors on secondary nodes won't result
1853                         in an error return from the function
1854
1855   Returns:
1856     false if the operation failed
1857     list of (host, instance_visible_name, node_visible_name) if the operation
1858          suceeded with the mapping from node devices to instance devices
1859   """
1860   device_info = []
1861   disks_ok = True
1862   for inst_disk in instance.disks:
1863     master_result = None
1864     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1865       cfg.SetDiskID(node_disk, node)
1866       is_primary = node == instance.primary_node
1867       result = rpc.call_blockdev_assemble(node, node_disk,
1868                                           instance.name, is_primary)
1869       if not result:
1870         logger.Error("could not prepare block device %s on node %s"
1871                      " (is_primary=%s)" %
1872                      (inst_disk.iv_name, node, is_primary))
1873         if is_primary or not ignore_secondaries:
1874           disks_ok = False
1875       if is_primary:
1876         master_result = result
1877     device_info.append((instance.primary_node, inst_disk.iv_name,
1878                         master_result))
1879
1880   # leave the disks configured for the primary node
1881   # this is a workaround that would be fixed better by
1882   # improving the logical/physical id handling
1883   for disk in instance.disks:
1884     cfg.SetDiskID(disk, instance.primary_node)
1885
1886   return disks_ok, device_info
1887
1888
1889 def _StartInstanceDisks(cfg, instance, force):
1890   """Start the disks of an instance.
1891
1892   """
1893   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1894                                            ignore_secondaries=force)
1895   if not disks_ok:
1896     _ShutdownInstanceDisks(instance, cfg)
1897     if force is not None and not force:
1898       logger.Error("If the message above refers to a secondary node,"
1899                    " you can retry the operation using '--force'.")
1900     raise errors.OpExecError("Disk consistency error")
1901
1902
1903 class LUDeactivateInstanceDisks(NoHooksLU):
1904   """Shutdown an instance's disks.
1905
1906   """
1907   _OP_REQP = ["instance_name"]
1908
1909   def CheckPrereq(self):
1910     """Check prerequisites.
1911
1912     This checks that the instance is in the cluster.
1913
1914     """
1915     instance = self.cfg.GetInstanceInfo(
1916       self.cfg.ExpandInstanceName(self.op.instance_name))
1917     if instance is None:
1918       raise errors.OpPrereqError("Instance '%s' not known" %
1919                                  self.op.instance_name)
1920     self.instance = instance
1921
1922   def Exec(self, feedback_fn):
1923     """Deactivate the disks
1924
1925     """
1926     instance = self.instance
1927     ins_l = rpc.call_instance_list([instance.primary_node])
1928     ins_l = ins_l[instance.primary_node]
1929     if not type(ins_l) is list:
1930       raise errors.OpExecError("Can't contact node '%s'" %
1931                                instance.primary_node)
1932
1933     if self.instance.name in ins_l:
1934       raise errors.OpExecError("Instance is running, can't shutdown"
1935                                " block devices.")
1936
1937     _ShutdownInstanceDisks(instance, self.cfg)
1938
1939
1940 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1941   """Shutdown block devices of an instance.
1942
1943   This does the shutdown on all nodes of the instance.
1944
1945   If the ignore_primary is false, errors on the primary node are
1946   ignored.
1947
1948   """
1949   result = True
1950   for disk in instance.disks:
1951     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1952       cfg.SetDiskID(top_disk, node)
1953       if not rpc.call_blockdev_shutdown(node, top_disk):
1954         logger.Error("could not shutdown block device %s on node %s" %
1955                      (disk.iv_name, node))
1956         if not ignore_primary or node != instance.primary_node:
1957           result = False
1958   return result
1959
1960
1961 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1962   """Checks if a node has enough free memory.
1963
1964   This function check if a given node has the needed amount of free
1965   memory. In case the node has less memory or we cannot get the
1966   information from the node, this function raise an OpPrereqError
1967   exception.
1968
1969   Args:
1970     - cfg: a ConfigWriter instance
1971     - node: the node name
1972     - reason: string to use in the error message
1973     - requested: the amount of memory in MiB
1974
1975   """
1976   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1977   if not nodeinfo or not isinstance(nodeinfo, dict):
1978     raise errors.OpPrereqError("Could not contact node %s for resource"
1979                              " information" % (node,))
1980
1981   free_mem = nodeinfo[node].get('memory_free')
1982   if not isinstance(free_mem, int):
1983     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1984                              " was '%s'" % (node, free_mem))
1985   if requested > free_mem:
1986     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1987                              " needed %s MiB, available %s MiB" %
1988                              (node, reason, requested, free_mem))
1989
1990
1991 class LUStartupInstance(LogicalUnit):
1992   """Starts an instance.
1993
1994   """
1995   HPATH = "instance-start"
1996   HTYPE = constants.HTYPE_INSTANCE
1997   _OP_REQP = ["instance_name", "force"]
1998
1999   def BuildHooksEnv(self):
2000     """Build hooks env.
2001
2002     This runs on master, primary and secondary nodes of the instance.
2003
2004     """
2005     env = {
2006       "FORCE": self.op.force,
2007       }
2008     env.update(_BuildInstanceHookEnvByObject(self.instance))
2009     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2010           list(self.instance.secondary_nodes))
2011     return env, nl, nl
2012
2013   def CheckPrereq(self):
2014     """Check prerequisites.
2015
2016     This checks that the instance is in the cluster.
2017
2018     """
2019     instance = self.cfg.GetInstanceInfo(
2020       self.cfg.ExpandInstanceName(self.op.instance_name))
2021     if instance is None:
2022       raise errors.OpPrereqError("Instance '%s' not known" %
2023                                  self.op.instance_name)
2024
2025     # check bridges existance
2026     _CheckInstanceBridgesExist(instance)
2027
2028     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2029                          "starting instance %s" % instance.name,
2030                          instance.memory)
2031
2032     self.instance = instance
2033     self.op.instance_name = instance.name
2034
2035   def Exec(self, feedback_fn):
2036     """Start the instance.
2037
2038     """
2039     instance = self.instance
2040     force = self.op.force
2041     extra_args = getattr(self.op, "extra_args", "")
2042
2043     node_current = instance.primary_node
2044
2045     _StartInstanceDisks(self.cfg, instance, force)
2046
2047     if not rpc.call_instance_start(node_current, instance, extra_args):
2048       _ShutdownInstanceDisks(instance, self.cfg)
2049       raise errors.OpExecError("Could not start instance")
2050
2051     self.cfg.MarkInstanceUp(instance.name)
2052
2053
2054 class LURebootInstance(LogicalUnit):
2055   """Reboot an instance.
2056
2057   """
2058   HPATH = "instance-reboot"
2059   HTYPE = constants.HTYPE_INSTANCE
2060   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2061
2062   def BuildHooksEnv(self):
2063     """Build hooks env.
2064
2065     This runs on master, primary and secondary nodes of the instance.
2066
2067     """
2068     env = {
2069       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2070       }
2071     env.update(_BuildInstanceHookEnvByObject(self.instance))
2072     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2073           list(self.instance.secondary_nodes))
2074     return env, nl, nl
2075
2076   def CheckPrereq(self):
2077     """Check prerequisites.
2078
2079     This checks that the instance is in the cluster.
2080
2081     """
2082     instance = self.cfg.GetInstanceInfo(
2083       self.cfg.ExpandInstanceName(self.op.instance_name))
2084     if instance is None:
2085       raise errors.OpPrereqError("Instance '%s' not known" %
2086                                  self.op.instance_name)
2087
2088     # check bridges existance
2089     _CheckInstanceBridgesExist(instance)
2090
2091     self.instance = instance
2092     self.op.instance_name = instance.name
2093
2094   def Exec(self, feedback_fn):
2095     """Reboot the instance.
2096
2097     """
2098     instance = self.instance
2099     ignore_secondaries = self.op.ignore_secondaries
2100     reboot_type = self.op.reboot_type
2101     extra_args = getattr(self.op, "extra_args", "")
2102
2103     node_current = instance.primary_node
2104
2105     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2106                            constants.INSTANCE_REBOOT_HARD,
2107                            constants.INSTANCE_REBOOT_FULL]:
2108       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2109                                   (constants.INSTANCE_REBOOT_SOFT,
2110                                    constants.INSTANCE_REBOOT_HARD,
2111                                    constants.INSTANCE_REBOOT_FULL))
2112
2113     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2114                        constants.INSTANCE_REBOOT_HARD]:
2115       if not rpc.call_instance_reboot(node_current, instance,
2116                                       reboot_type, extra_args):
2117         raise errors.OpExecError("Could not reboot instance")
2118     else:
2119       if not rpc.call_instance_shutdown(node_current, instance):
2120         raise errors.OpExecError("could not shutdown instance for full reboot")
2121       _ShutdownInstanceDisks(instance, self.cfg)
2122       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2123       if not rpc.call_instance_start(node_current, instance, extra_args):
2124         _ShutdownInstanceDisks(instance, self.cfg)
2125         raise errors.OpExecError("Could not start instance for full reboot")
2126
2127     self.cfg.MarkInstanceUp(instance.name)
2128
2129
2130 class LUShutdownInstance(LogicalUnit):
2131   """Shutdown an instance.
2132
2133   """
2134   HPATH = "instance-stop"
2135   HTYPE = constants.HTYPE_INSTANCE
2136   _OP_REQP = ["instance_name"]
2137
2138   def BuildHooksEnv(self):
2139     """Build hooks env.
2140
2141     This runs on master, primary and secondary nodes of the instance.
2142
2143     """
2144     env = _BuildInstanceHookEnvByObject(self.instance)
2145     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2146           list(self.instance.secondary_nodes))
2147     return env, nl, nl
2148
2149   def CheckPrereq(self):
2150     """Check prerequisites.
2151
2152     This checks that the instance is in the cluster.
2153
2154     """
2155     instance = self.cfg.GetInstanceInfo(
2156       self.cfg.ExpandInstanceName(self.op.instance_name))
2157     if instance is None:
2158       raise errors.OpPrereqError("Instance '%s' not known" %
2159                                  self.op.instance_name)
2160     self.instance = instance
2161
2162   def Exec(self, feedback_fn):
2163     """Shutdown the instance.
2164
2165     """
2166     instance = self.instance
2167     node_current = instance.primary_node
2168     if not rpc.call_instance_shutdown(node_current, instance):
2169       logger.Error("could not shutdown instance")
2170
2171     self.cfg.MarkInstanceDown(instance.name)
2172     _ShutdownInstanceDisks(instance, self.cfg)
2173
2174
2175 class LUReinstallInstance(LogicalUnit):
2176   """Reinstall an instance.
2177
2178   """
2179   HPATH = "instance-reinstall"
2180   HTYPE = constants.HTYPE_INSTANCE
2181   _OP_REQP = ["instance_name"]
2182
2183   def BuildHooksEnv(self):
2184     """Build hooks env.
2185
2186     This runs on master, primary and secondary nodes of the instance.
2187
2188     """
2189     env = _BuildInstanceHookEnvByObject(self.instance)
2190     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2191           list(self.instance.secondary_nodes))
2192     return env, nl, nl
2193
2194   def CheckPrereq(self):
2195     """Check prerequisites.
2196
2197     This checks that the instance is in the cluster and is not running.
2198
2199     """
2200     instance = self.cfg.GetInstanceInfo(
2201       self.cfg.ExpandInstanceName(self.op.instance_name))
2202     if instance is None:
2203       raise errors.OpPrereqError("Instance '%s' not known" %
2204                                  self.op.instance_name)
2205     if instance.disk_template == constants.DT_DISKLESS:
2206       raise errors.OpPrereqError("Instance '%s' has no disks" %
2207                                  self.op.instance_name)
2208     if instance.status != "down":
2209       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2210                                  self.op.instance_name)
2211     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2212     if remote_info:
2213       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2214                                  (self.op.instance_name,
2215                                   instance.primary_node))
2216
2217     self.op.os_type = getattr(self.op, "os_type", None)
2218     if self.op.os_type is not None:
2219       # OS verification
2220       pnode = self.cfg.GetNodeInfo(
2221         self.cfg.ExpandNodeName(instance.primary_node))
2222       if pnode is None:
2223         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2224                                    self.op.pnode)
2225       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2226       if not os_obj:
2227         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2228                                    " primary node"  % self.op.os_type)
2229
2230     self.instance = instance
2231
2232   def Exec(self, feedback_fn):
2233     """Reinstall the instance.
2234
2235     """
2236     inst = self.instance
2237
2238     if self.op.os_type is not None:
2239       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2240       inst.os = self.op.os_type
2241       self.cfg.AddInstance(inst)
2242
2243     _StartInstanceDisks(self.cfg, inst, None)
2244     try:
2245       feedback_fn("Running the instance OS create scripts...")
2246       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2247         raise errors.OpExecError("Could not install OS for instance %s"
2248                                  " on node %s" %
2249                                  (inst.name, inst.primary_node))
2250     finally:
2251       _ShutdownInstanceDisks(inst, self.cfg)
2252
2253
2254 class LURenameInstance(LogicalUnit):
2255   """Rename an instance.
2256
2257   """
2258   HPATH = "instance-rename"
2259   HTYPE = constants.HTYPE_INSTANCE
2260   _OP_REQP = ["instance_name", "new_name"]
2261
2262   def BuildHooksEnv(self):
2263     """Build hooks env.
2264
2265     This runs on master, primary and secondary nodes of the instance.
2266
2267     """
2268     env = _BuildInstanceHookEnvByObject(self.instance)
2269     env["INSTANCE_NEW_NAME"] = self.op.new_name
2270     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2271           list(self.instance.secondary_nodes))
2272     return env, nl, nl
2273
2274   def CheckPrereq(self):
2275     """Check prerequisites.
2276
2277     This checks that the instance is in the cluster and is not running.
2278
2279     """
2280     instance = self.cfg.GetInstanceInfo(
2281       self.cfg.ExpandInstanceName(self.op.instance_name))
2282     if instance is None:
2283       raise errors.OpPrereqError("Instance '%s' not known" %
2284                                  self.op.instance_name)
2285     if instance.status != "down":
2286       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2287                                  self.op.instance_name)
2288     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2289     if remote_info:
2290       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2291                                  (self.op.instance_name,
2292                                   instance.primary_node))
2293     self.instance = instance
2294
2295     # new name verification
2296     name_info = utils.HostInfo(self.op.new_name)
2297
2298     self.op.new_name = new_name = name_info.name
2299     if not getattr(self.op, "ignore_ip", False):
2300       command = ["fping", "-q", name_info.ip]
2301       result = utils.RunCmd(command)
2302       if not result.failed:
2303         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2304                                    (name_info.ip, new_name))
2305
2306
2307   def Exec(self, feedback_fn):
2308     """Reinstall the instance.
2309
2310     """
2311     inst = self.instance
2312     old_name = inst.name
2313
2314     self.cfg.RenameInstance(inst.name, self.op.new_name)
2315
2316     # re-read the instance from the configuration after rename
2317     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2318
2319     _StartInstanceDisks(self.cfg, inst, None)
2320     try:
2321       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2322                                           "sda", "sdb"):
2323         msg = ("Could run OS rename script for instance %s on node %s (but the"
2324                " instance has been renamed in Ganeti)" %
2325                (inst.name, inst.primary_node))
2326         logger.Error(msg)
2327     finally:
2328       _ShutdownInstanceDisks(inst, self.cfg)
2329
2330
2331 class LURemoveInstance(LogicalUnit):
2332   """Remove an instance.
2333
2334   """
2335   HPATH = "instance-remove"
2336   HTYPE = constants.HTYPE_INSTANCE
2337   _OP_REQP = ["instance_name"]
2338
2339   def BuildHooksEnv(self):
2340     """Build hooks env.
2341
2342     This runs on master, primary and secondary nodes of the instance.
2343
2344     """
2345     env = _BuildInstanceHookEnvByObject(self.instance)
2346     nl = [self.sstore.GetMasterNode()]
2347     return env, nl, nl
2348
2349   def CheckPrereq(self):
2350     """Check prerequisites.
2351
2352     This checks that the instance is in the cluster.
2353
2354     """
2355     instance = self.cfg.GetInstanceInfo(
2356       self.cfg.ExpandInstanceName(self.op.instance_name))
2357     if instance is None:
2358       raise errors.OpPrereqError("Instance '%s' not known" %
2359                                  self.op.instance_name)
2360     self.instance = instance
2361
2362   def Exec(self, feedback_fn):
2363     """Remove the instance.
2364
2365     """
2366     instance = self.instance
2367     logger.Info("shutting down instance %s on node %s" %
2368                 (instance.name, instance.primary_node))
2369
2370     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2371       if self.op.ignore_failures:
2372         feedback_fn("Warning: can't shutdown instance")
2373       else:
2374         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2375                                  (instance.name, instance.primary_node))
2376
2377     logger.Info("removing block devices for instance %s" % instance.name)
2378
2379     if not _RemoveDisks(instance, self.cfg):
2380       if self.op.ignore_failures:
2381         feedback_fn("Warning: can't remove instance's disks")
2382       else:
2383         raise errors.OpExecError("Can't remove instance's disks")
2384
2385     logger.Info("removing instance %s out of cluster config" % instance.name)
2386
2387     self.cfg.RemoveInstance(instance.name)
2388
2389
2390 class LUQueryInstances(NoHooksLU):
2391   """Logical unit for querying instances.
2392
2393   """
2394   _OP_REQP = ["output_fields", "names"]
2395
2396   def CheckPrereq(self):
2397     """Check prerequisites.
2398
2399     This checks that the fields required are valid output fields.
2400
2401     """
2402     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2403     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2404                                "admin_state", "admin_ram",
2405                                "disk_template", "ip", "mac", "bridge",
2406                                "sda_size", "sdb_size", "vcpus"],
2407                        dynamic=self.dynamic_fields,
2408                        selected=self.op.output_fields)
2409
2410     self.wanted = _GetWantedInstances(self, self.op.names)
2411
2412   def Exec(self, feedback_fn):
2413     """Computes the list of nodes and their attributes.
2414
2415     """
2416     instance_names = self.wanted
2417     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2418                      in instance_names]
2419
2420     # begin data gathering
2421
2422     nodes = frozenset([inst.primary_node for inst in instance_list])
2423
2424     bad_nodes = []
2425     if self.dynamic_fields.intersection(self.op.output_fields):
2426       live_data = {}
2427       node_data = rpc.call_all_instances_info(nodes)
2428       for name in nodes:
2429         result = node_data[name]
2430         if result:
2431           live_data.update(result)
2432         elif result == False:
2433           bad_nodes.append(name)
2434         # else no instance is alive
2435     else:
2436       live_data = dict([(name, {}) for name in instance_names])
2437
2438     # end data gathering
2439
2440     output = []
2441     for instance in instance_list:
2442       iout = []
2443       for field in self.op.output_fields:
2444         if field == "name":
2445           val = instance.name
2446         elif field == "os":
2447           val = instance.os
2448         elif field == "pnode":
2449           val = instance.primary_node
2450         elif field == "snodes":
2451           val = list(instance.secondary_nodes)
2452         elif field == "admin_state":
2453           val = (instance.status != "down")
2454         elif field == "oper_state":
2455           if instance.primary_node in bad_nodes:
2456             val = None
2457           else:
2458             val = bool(live_data.get(instance.name))
2459         elif field == "admin_ram":
2460           val = instance.memory
2461         elif field == "oper_ram":
2462           if instance.primary_node in bad_nodes:
2463             val = None
2464           elif instance.name in live_data:
2465             val = live_data[instance.name].get("memory", "?")
2466           else:
2467             val = "-"
2468         elif field == "disk_template":
2469           val = instance.disk_template
2470         elif field == "ip":
2471           val = instance.nics[0].ip
2472         elif field == "bridge":
2473           val = instance.nics[0].bridge
2474         elif field == "mac":
2475           val = instance.nics[0].mac
2476         elif field == "sda_size" or field == "sdb_size":
2477           disk = instance.FindDisk(field[:3])
2478           if disk is None:
2479             val = None
2480           else:
2481             val = disk.size
2482         elif field == "vcpus":
2483           val = instance.vcpus
2484         else:
2485           raise errors.ParameterError(field)
2486         iout.append(val)
2487       output.append(iout)
2488
2489     return output
2490
2491
2492 class LUFailoverInstance(LogicalUnit):
2493   """Failover an instance.
2494
2495   """
2496   HPATH = "instance-failover"
2497   HTYPE = constants.HTYPE_INSTANCE
2498   _OP_REQP = ["instance_name", "ignore_consistency"]
2499
2500   def BuildHooksEnv(self):
2501     """Build hooks env.
2502
2503     This runs on master, primary and secondary nodes of the instance.
2504
2505     """
2506     env = {
2507       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2508       }
2509     env.update(_BuildInstanceHookEnvByObject(self.instance))
2510     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2511     return env, nl, nl
2512
2513   def CheckPrereq(self):
2514     """Check prerequisites.
2515
2516     This checks that the instance is in the cluster.
2517
2518     """
2519     instance = self.cfg.GetInstanceInfo(
2520       self.cfg.ExpandInstanceName(self.op.instance_name))
2521     if instance is None:
2522       raise errors.OpPrereqError("Instance '%s' not known" %
2523                                  self.op.instance_name)
2524
2525     if instance.disk_template not in constants.DTS_NET_MIRROR:
2526       raise errors.OpPrereqError("Instance's disk layout is not"
2527                                  " network mirrored, cannot failover.")
2528
2529     secondary_nodes = instance.secondary_nodes
2530     if not secondary_nodes:
2531       raise errors.ProgrammerError("no secondary node but using "
2532                                    "DT_REMOTE_RAID1 template")
2533
2534     target_node = secondary_nodes[0]
2535     # check memory requirements on the secondary node
2536     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2537                          instance.name, instance.memory)
2538
2539     # check bridge existance
2540     brlist = [nic.bridge for nic in instance.nics]
2541     if not rpc.call_bridges_exist(target_node, brlist):
2542       raise errors.OpPrereqError("One or more target bridges %s does not"
2543                                  " exist on destination node '%s'" %
2544                                  (brlist, target_node))
2545
2546     self.instance = instance
2547
2548   def Exec(self, feedback_fn):
2549     """Failover an instance.
2550
2551     The failover is done by shutting it down on its present node and
2552     starting it on the secondary.
2553
2554     """
2555     instance = self.instance
2556
2557     source_node = instance.primary_node
2558     target_node = instance.secondary_nodes[0]
2559
2560     feedback_fn("* checking disk consistency between source and target")
2561     for dev in instance.disks:
2562       # for remote_raid1, these are md over drbd
2563       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2564         if not self.op.ignore_consistency:
2565           raise errors.OpExecError("Disk %s is degraded on target node,"
2566                                    " aborting failover." % dev.iv_name)
2567
2568     feedback_fn("* shutting down instance on source node")
2569     logger.Info("Shutting down instance %s on node %s" %
2570                 (instance.name, source_node))
2571
2572     if not rpc.call_instance_shutdown(source_node, instance):
2573       if self.op.ignore_consistency:
2574         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2575                      " anyway. Please make sure node %s is down"  %
2576                      (instance.name, source_node, source_node))
2577       else:
2578         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2579                                  (instance.name, source_node))
2580
2581     feedback_fn("* deactivating the instance's disks on source node")
2582     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2583       raise errors.OpExecError("Can't shut down the instance's disks.")
2584
2585     instance.primary_node = target_node
2586     # distribute new instance config to the other nodes
2587     self.cfg.AddInstance(instance)
2588
2589     feedback_fn("* activating the instance's disks on target node")
2590     logger.Info("Starting instance %s on node %s" %
2591                 (instance.name, target_node))
2592
2593     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2594                                              ignore_secondaries=True)
2595     if not disks_ok:
2596       _ShutdownInstanceDisks(instance, self.cfg)
2597       raise errors.OpExecError("Can't activate the instance's disks")
2598
2599     feedback_fn("* starting the instance on the target node")
2600     if not rpc.call_instance_start(target_node, instance, None):
2601       _ShutdownInstanceDisks(instance, self.cfg)
2602       raise errors.OpExecError("Could not start instance %s on node %s." %
2603                                (instance.name, target_node))
2604
2605
2606 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2607   """Create a tree of block devices on the primary node.
2608
2609   This always creates all devices.
2610
2611   """
2612   if device.children:
2613     for child in device.children:
2614       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2615         return False
2616
2617   cfg.SetDiskID(device, node)
2618   new_id = rpc.call_blockdev_create(node, device, device.size,
2619                                     instance.name, True, info)
2620   if not new_id:
2621     return False
2622   if device.physical_id is None:
2623     device.physical_id = new_id
2624   return True
2625
2626
2627 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2628   """Create a tree of block devices on a secondary node.
2629
2630   If this device type has to be created on secondaries, create it and
2631   all its children.
2632
2633   If not, just recurse to children keeping the same 'force' value.
2634
2635   """
2636   if device.CreateOnSecondary():
2637     force = True
2638   if device.children:
2639     for child in device.children:
2640       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2641                                         child, force, info):
2642         return False
2643
2644   if not force:
2645     return True
2646   cfg.SetDiskID(device, node)
2647   new_id = rpc.call_blockdev_create(node, device, device.size,
2648                                     instance.name, False, info)
2649   if not new_id:
2650     return False
2651   if device.physical_id is None:
2652     device.physical_id = new_id
2653   return True
2654
2655
2656 def _GenerateUniqueNames(cfg, exts):
2657   """Generate a suitable LV name.
2658
2659   This will generate a logical volume name for the given instance.
2660
2661   """
2662   results = []
2663   for val in exts:
2664     new_id = cfg.GenerateUniqueID()
2665     results.append("%s%s" % (new_id, val))
2666   return results
2667
2668
2669 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2670   """Generate a drbd device complete with its children.
2671
2672   """
2673   port = cfg.AllocatePort()
2674   vgname = cfg.GetVGName()
2675   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2676                           logical_id=(vgname, names[0]))
2677   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2678                           logical_id=(vgname, names[1]))
2679   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2680                           logical_id = (primary, secondary, port),
2681                           children = [dev_data, dev_meta])
2682   return drbd_dev
2683
2684
2685 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2686   """Generate a drbd8 device complete with its children.
2687
2688   """
2689   port = cfg.AllocatePort()
2690   vgname = cfg.GetVGName()
2691   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2692                           logical_id=(vgname, names[0]))
2693   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2694                           logical_id=(vgname, names[1]))
2695   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2696                           logical_id = (primary, secondary, port),
2697                           children = [dev_data, dev_meta],
2698                           iv_name=iv_name)
2699   return drbd_dev
2700
2701 def _GenerateDiskTemplate(cfg, template_name,
2702                           instance_name, primary_node,
2703                           secondary_nodes, disk_sz, swap_sz):
2704   """Generate the entire disk layout for a given template type.
2705
2706   """
2707   #TODO: compute space requirements
2708
2709   vgname = cfg.GetVGName()
2710   if template_name == "diskless":
2711     disks = []
2712   elif template_name == "plain":
2713     if len(secondary_nodes) != 0:
2714       raise errors.ProgrammerError("Wrong template configuration")
2715
2716     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2717     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2718                            logical_id=(vgname, names[0]),
2719                            iv_name = "sda")
2720     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2721                            logical_id=(vgname, names[1]),
2722                            iv_name = "sdb")
2723     disks = [sda_dev, sdb_dev]
2724   elif template_name == "local_raid1":
2725     if len(secondary_nodes) != 0:
2726       raise errors.ProgrammerError("Wrong template configuration")
2727
2728
2729     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2730                                        ".sdb_m1", ".sdb_m2"])
2731     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2732                               logical_id=(vgname, names[0]))
2733     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2734                               logical_id=(vgname, names[1]))
2735     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2736                               size=disk_sz,
2737                               children = [sda_dev_m1, sda_dev_m2])
2738     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2739                               logical_id=(vgname, names[2]))
2740     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2741                               logical_id=(vgname, names[3]))
2742     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2743                               size=swap_sz,
2744                               children = [sdb_dev_m1, sdb_dev_m2])
2745     disks = [md_sda_dev, md_sdb_dev]
2746   elif template_name == constants.DT_REMOTE_RAID1:
2747     if len(secondary_nodes) != 1:
2748       raise errors.ProgrammerError("Wrong template configuration")
2749     remote_node = secondary_nodes[0]
2750     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2751                                        ".sdb_data", ".sdb_meta"])
2752     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2753                                          disk_sz, names[0:2])
2754     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2755                               children = [drbd_sda_dev], size=disk_sz)
2756     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2757                                          swap_sz, names[2:4])
2758     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2759                               children = [drbd_sdb_dev], size=swap_sz)
2760     disks = [md_sda_dev, md_sdb_dev]
2761   elif template_name == constants.DT_DRBD8:
2762     if len(secondary_nodes) != 1:
2763       raise errors.ProgrammerError("Wrong template configuration")
2764     remote_node = secondary_nodes[0]
2765     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2766                                        ".sdb_data", ".sdb_meta"])
2767     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2768                                          disk_sz, names[0:2], "sda")
2769     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2770                                          swap_sz, names[2:4], "sdb")
2771     disks = [drbd_sda_dev, drbd_sdb_dev]
2772   else:
2773     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2774   return disks
2775
2776
2777 def _GetInstanceInfoText(instance):
2778   """Compute that text that should be added to the disk's metadata.
2779
2780   """
2781   return "originstname+%s" % instance.name
2782
2783
2784 def _CreateDisks(cfg, instance):
2785   """Create all disks for an instance.
2786
2787   This abstracts away some work from AddInstance.
2788
2789   Args:
2790     instance: the instance object
2791
2792   Returns:
2793     True or False showing the success of the creation process
2794
2795   """
2796   info = _GetInstanceInfoText(instance)
2797
2798   for device in instance.disks:
2799     logger.Info("creating volume %s for instance %s" %
2800               (device.iv_name, instance.name))
2801     #HARDCODE
2802     for secondary_node in instance.secondary_nodes:
2803       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2804                                         device, False, info):
2805         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2806                      (device.iv_name, device, secondary_node))
2807         return False
2808     #HARDCODE
2809     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2810                                     instance, device, info):
2811       logger.Error("failed to create volume %s on primary!" %
2812                    device.iv_name)
2813       return False
2814   return True
2815
2816
2817 def _RemoveDisks(instance, cfg):
2818   """Remove all disks for an instance.
2819
2820   This abstracts away some work from `AddInstance()` and
2821   `RemoveInstance()`. Note that in case some of the devices couldn't
2822   be removed, the removal will continue with the other ones (compare
2823   with `_CreateDisks()`).
2824
2825   Args:
2826     instance: the instance object
2827
2828   Returns:
2829     True or False showing the success of the removal proces
2830
2831   """
2832   logger.Info("removing block devices for instance %s" % instance.name)
2833
2834   result = True
2835   for device in instance.disks:
2836     for node, disk in device.ComputeNodeTree(instance.primary_node):
2837       cfg.SetDiskID(disk, node)
2838       if not rpc.call_blockdev_remove(node, disk):
2839         logger.Error("could not remove block device %s on node %s,"
2840                      " continuing anyway" %
2841                      (device.iv_name, node))
2842         result = False
2843   return result
2844
2845
2846 class LUCreateInstance(LogicalUnit):
2847   """Create an instance.
2848
2849   """
2850   HPATH = "instance-add"
2851   HTYPE = constants.HTYPE_INSTANCE
2852   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2853               "disk_template", "swap_size", "mode", "start", "vcpus",
2854               "wait_for_sync", "ip_check", "mac"]
2855
2856   def BuildHooksEnv(self):
2857     """Build hooks env.
2858
2859     This runs on master, primary and secondary nodes of the instance.
2860
2861     """
2862     env = {
2863       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2864       "INSTANCE_DISK_SIZE": self.op.disk_size,
2865       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2866       "INSTANCE_ADD_MODE": self.op.mode,
2867       }
2868     if self.op.mode == constants.INSTANCE_IMPORT:
2869       env["INSTANCE_SRC_NODE"] = self.op.src_node
2870       env["INSTANCE_SRC_PATH"] = self.op.src_path
2871       env["INSTANCE_SRC_IMAGE"] = self.src_image
2872
2873     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2874       primary_node=self.op.pnode,
2875       secondary_nodes=self.secondaries,
2876       status=self.instance_status,
2877       os_type=self.op.os_type,
2878       memory=self.op.mem_size,
2879       vcpus=self.op.vcpus,
2880       nics=[(self.inst_ip, self.op.bridge)],
2881     ))
2882
2883     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2884           self.secondaries)
2885     return env, nl, nl
2886
2887
2888   def CheckPrereq(self):
2889     """Check prerequisites.
2890
2891     """
2892     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2893       if not hasattr(self.op, attr):
2894         setattr(self.op, attr, None)
2895
2896     if self.op.mode not in (constants.INSTANCE_CREATE,
2897                             constants.INSTANCE_IMPORT):
2898       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2899                                  self.op.mode)
2900
2901     if self.op.mode == constants.INSTANCE_IMPORT:
2902       src_node = getattr(self.op, "src_node", None)
2903       src_path = getattr(self.op, "src_path", None)
2904       if src_node is None or src_path is None:
2905         raise errors.OpPrereqError("Importing an instance requires source"
2906                                    " node and path options")
2907       src_node_full = self.cfg.ExpandNodeName(src_node)
2908       if src_node_full is None:
2909         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2910       self.op.src_node = src_node = src_node_full
2911
2912       if not os.path.isabs(src_path):
2913         raise errors.OpPrereqError("The source path must be absolute")
2914
2915       export_info = rpc.call_export_info(src_node, src_path)
2916
2917       if not export_info:
2918         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2919
2920       if not export_info.has_section(constants.INISECT_EXP):
2921         raise errors.ProgrammerError("Corrupted export config")
2922
2923       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2924       if (int(ei_version) != constants.EXPORT_VERSION):
2925         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2926                                    (ei_version, constants.EXPORT_VERSION))
2927
2928       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2929         raise errors.OpPrereqError("Can't import instance with more than"
2930                                    " one data disk")
2931
2932       # FIXME: are the old os-es, disk sizes, etc. useful?
2933       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2934       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2935                                                          'disk0_dump'))
2936       self.src_image = diskimage
2937     else: # INSTANCE_CREATE
2938       if getattr(self.op, "os_type", None) is None:
2939         raise errors.OpPrereqError("No guest OS specified")
2940
2941     # check primary node
2942     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2943     if pnode is None:
2944       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2945                                  self.op.pnode)
2946     self.op.pnode = pnode.name
2947     self.pnode = pnode
2948     self.secondaries = []
2949     # disk template and mirror node verification
2950     if self.op.disk_template not in constants.DISK_TEMPLATES:
2951       raise errors.OpPrereqError("Invalid disk template name")
2952
2953     if self.op.disk_template in constants.DTS_NET_MIRROR:
2954       if getattr(self.op, "snode", None) is None:
2955         raise errors.OpPrereqError("The networked disk templates need"
2956                                    " a mirror node")
2957
2958       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2959       if snode_name is None:
2960         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2961                                    self.op.snode)
2962       elif snode_name == pnode.name:
2963         raise errors.OpPrereqError("The secondary node cannot be"
2964                                    " the primary node.")
2965       self.secondaries.append(snode_name)
2966
2967     # Required free disk space as a function of disk and swap space
2968     req_size_dict = {
2969       constants.DT_DISKLESS: None,
2970       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2971       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2972       # 256 MB are added for drbd metadata, 128MB for each drbd device
2973       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2974       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2975     }
2976
2977     if self.op.disk_template not in req_size_dict:
2978       raise errors.ProgrammerError("Disk template '%s' size requirement"
2979                                    " is unknown" %  self.op.disk_template)
2980
2981     req_size = req_size_dict[self.op.disk_template]
2982
2983     # Check lv size requirements
2984     if req_size is not None:
2985       nodenames = [pnode.name] + self.secondaries
2986       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2987       for node in nodenames:
2988         info = nodeinfo.get(node, None)
2989         if not info:
2990           raise errors.OpPrereqError("Cannot get current information"
2991                                      " from node '%s'" % nodeinfo)
2992         vg_free = info.get('vg_free', None)
2993         if not isinstance(vg_free, int):
2994           raise errors.OpPrereqError("Can't compute free disk space on"
2995                                      " node %s" % node)
2996         if req_size > info['vg_free']:
2997           raise errors.OpPrereqError("Not enough disk space on target node %s."
2998                                      " %d MB available, %d MB required" %
2999                                      (node, info['vg_free'], req_size))
3000
3001     # os verification
3002     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3003     if not os_obj:
3004       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3005                                  " primary node"  % self.op.os_type)
3006
3007     if self.op.kernel_path == constants.VALUE_NONE:
3008       raise errors.OpPrereqError("Can't set instance kernel to none")
3009
3010     # instance verification
3011     hostname1 = utils.HostInfo(self.op.instance_name)
3012
3013     self.op.instance_name = instance_name = hostname1.name
3014     instance_list = self.cfg.GetInstanceList()
3015     if instance_name in instance_list:
3016       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3017                                  instance_name)
3018
3019     ip = getattr(self.op, "ip", None)
3020     if ip is None or ip.lower() == "none":
3021       inst_ip = None
3022     elif ip.lower() == "auto":
3023       inst_ip = hostname1.ip
3024     else:
3025       if not utils.IsValidIP(ip):
3026         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3027                                    " like a valid IP" % ip)
3028       inst_ip = ip
3029     self.inst_ip = inst_ip
3030
3031     if self.op.start and not self.op.ip_check:
3032       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3033                                  " adding an instance in start mode")
3034
3035     if self.op.ip_check:
3036       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3037                        constants.DEFAULT_NODED_PORT):
3038         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3039                                    (hostname1.ip, instance_name))
3040
3041     # MAC address verification
3042     if self.op.mac != "auto":
3043       if not utils.IsValidMac(self.op.mac.lower()):
3044         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3045                                    self.op.mac)
3046
3047     # bridge verification
3048     bridge = getattr(self.op, "bridge", None)
3049     if bridge is None:
3050       self.op.bridge = self.cfg.GetDefBridge()
3051     else:
3052       self.op.bridge = bridge
3053
3054     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3055       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3056                                  " destination node '%s'" %
3057                                  (self.op.bridge, pnode.name))
3058
3059     # boot order verification
3060     if self.op.hvm_boot_order is not None:
3061       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3062              raise errors.OpPrereqError("invalid boot order specified,"
3063                                         " must be one or more of [acdn]")
3064
3065     if self.op.start:
3066       self.instance_status = 'up'
3067     else:
3068       self.instance_status = 'down'
3069
3070   def Exec(self, feedback_fn):
3071     """Create and add the instance to the cluster.
3072
3073     """
3074     instance = self.op.instance_name
3075     pnode_name = self.pnode.name
3076
3077     if self.op.mac == "auto":
3078       mac_address = self.cfg.GenerateMAC()
3079     else:
3080       mac_address = self.op.mac
3081
3082     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3083     if self.inst_ip is not None:
3084       nic.ip = self.inst_ip
3085
3086     ht_kind = self.sstore.GetHypervisorType()
3087     if ht_kind in constants.HTS_REQ_PORT:
3088       network_port = self.cfg.AllocatePort()
3089     else:
3090       network_port = None
3091
3092     disks = _GenerateDiskTemplate(self.cfg,
3093                                   self.op.disk_template,
3094                                   instance, pnode_name,
3095                                   self.secondaries, self.op.disk_size,
3096                                   self.op.swap_size)
3097
3098     iobj = objects.Instance(name=instance, os=self.op.os_type,
3099                             primary_node=pnode_name,
3100                             memory=self.op.mem_size,
3101                             vcpus=self.op.vcpus,
3102                             nics=[nic], disks=disks,
3103                             disk_template=self.op.disk_template,
3104                             status=self.instance_status,
3105                             network_port=network_port,
3106                             kernel_path=self.op.kernel_path,
3107                             initrd_path=self.op.initrd_path,
3108                             hvm_boot_order=self.op.hvm_boot_order,
3109                             )
3110
3111     feedback_fn("* creating instance disks...")
3112     if not _CreateDisks(self.cfg, iobj):
3113       _RemoveDisks(iobj, self.cfg)
3114       raise errors.OpExecError("Device creation failed, reverting...")
3115
3116     feedback_fn("adding instance %s to cluster config" % instance)
3117
3118     self.cfg.AddInstance(iobj)
3119
3120     if self.op.wait_for_sync:
3121       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3122     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3123       # make sure the disks are not degraded (still sync-ing is ok)
3124       time.sleep(15)
3125       feedback_fn("* checking mirrors status")
3126       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3127     else:
3128       disk_abort = False
3129
3130     if disk_abort:
3131       _RemoveDisks(iobj, self.cfg)
3132       self.cfg.RemoveInstance(iobj.name)
3133       raise errors.OpExecError("There are some degraded disks for"
3134                                " this instance")
3135
3136     feedback_fn("creating os for instance %s on node %s" %
3137                 (instance, pnode_name))
3138
3139     if iobj.disk_template != constants.DT_DISKLESS:
3140       if self.op.mode == constants.INSTANCE_CREATE:
3141         feedback_fn("* running the instance OS create scripts...")
3142         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3143           raise errors.OpExecError("could not add os for instance %s"
3144                                    " on node %s" %
3145                                    (instance, pnode_name))
3146
3147       elif self.op.mode == constants.INSTANCE_IMPORT:
3148         feedback_fn("* running the instance OS import scripts...")
3149         src_node = self.op.src_node
3150         src_image = self.src_image
3151         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3152                                                 src_node, src_image):
3153           raise errors.OpExecError("Could not import os for instance"
3154                                    " %s on node %s" %
3155                                    (instance, pnode_name))
3156       else:
3157         # also checked in the prereq part
3158         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3159                                      % self.op.mode)
3160
3161     if self.op.start:
3162       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3163       feedback_fn("* starting instance...")
3164       if not rpc.call_instance_start(pnode_name, iobj, None):
3165         raise errors.OpExecError("Could not start instance")
3166
3167
3168 class LUConnectConsole(NoHooksLU):
3169   """Connect to an instance's console.
3170
3171   This is somewhat special in that it returns the command line that
3172   you need to run on the master node in order to connect to the
3173   console.
3174
3175   """
3176   _OP_REQP = ["instance_name"]
3177
3178   def CheckPrereq(self):
3179     """Check prerequisites.
3180
3181     This checks that the instance is in the cluster.
3182
3183     """
3184     instance = self.cfg.GetInstanceInfo(
3185       self.cfg.ExpandInstanceName(self.op.instance_name))
3186     if instance is None:
3187       raise errors.OpPrereqError("Instance '%s' not known" %
3188                                  self.op.instance_name)
3189     self.instance = instance
3190
3191   def Exec(self, feedback_fn):
3192     """Connect to the console of an instance
3193
3194     """
3195     instance = self.instance
3196     node = instance.primary_node
3197
3198     node_insts = rpc.call_instance_list([node])[node]
3199     if node_insts is False:
3200       raise errors.OpExecError("Can't connect to node %s." % node)
3201
3202     if instance.name not in node_insts:
3203       raise errors.OpExecError("Instance %s is not running." % instance.name)
3204
3205     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3206
3207     hyper = hypervisor.GetHypervisor()
3208     console_cmd = hyper.GetShellCommandForConsole(instance)
3209     # build ssh cmdline
3210     argv = ["ssh", "-q", "-t"]
3211     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3212     argv.extend(ssh.BATCH_MODE_OPTS)
3213     argv.append(node)
3214     argv.append(console_cmd)
3215     return "ssh", argv
3216
3217
3218 class LUAddMDDRBDComponent(LogicalUnit):
3219   """Adda new mirror member to an instance's disk.
3220
3221   """
3222   HPATH = "mirror-add"
3223   HTYPE = constants.HTYPE_INSTANCE
3224   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3225
3226   def BuildHooksEnv(self):
3227     """Build hooks env.
3228
3229     This runs on the master, the primary and all the secondaries.
3230
3231     """
3232     env = {
3233       "NEW_SECONDARY": self.op.remote_node,
3234       "DISK_NAME": self.op.disk_name,
3235       }
3236     env.update(_BuildInstanceHookEnvByObject(self.instance))
3237     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3238           self.op.remote_node,] + list(self.instance.secondary_nodes)
3239     return env, nl, nl
3240
3241   def CheckPrereq(self):
3242     """Check prerequisites.
3243
3244     This checks that the instance is in the cluster.
3245
3246     """
3247     instance = self.cfg.GetInstanceInfo(
3248       self.cfg.ExpandInstanceName(self.op.instance_name))
3249     if instance is None:
3250       raise errors.OpPrereqError("Instance '%s' not known" %
3251                                  self.op.instance_name)
3252     self.instance = instance
3253
3254     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3255     if remote_node is None:
3256       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3257     self.remote_node = remote_node
3258
3259     if remote_node == instance.primary_node:
3260       raise errors.OpPrereqError("The specified node is the primary node of"
3261                                  " the instance.")
3262
3263     if instance.disk_template != constants.DT_REMOTE_RAID1:
3264       raise errors.OpPrereqError("Instance's disk layout is not"
3265                                  " remote_raid1.")
3266     for disk in instance.disks:
3267       if disk.iv_name == self.op.disk_name:
3268         break
3269     else:
3270       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3271                                  " instance." % self.op.disk_name)
3272     if len(disk.children) > 1:
3273       raise errors.OpPrereqError("The device already has two slave devices."
3274                                  " This would create a 3-disk raid1 which we"
3275                                  " don't allow.")
3276     self.disk = disk
3277
3278   def Exec(self, feedback_fn):
3279     """Add the mirror component
3280
3281     """
3282     disk = self.disk
3283     instance = self.instance
3284
3285     remote_node = self.remote_node
3286     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3287     names = _GenerateUniqueNames(self.cfg, lv_names)
3288     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3289                                      remote_node, disk.size, names)
3290
3291     logger.Info("adding new mirror component on secondary")
3292     #HARDCODE
3293     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3294                                       new_drbd, False,
3295                                       _GetInstanceInfoText(instance)):
3296       raise errors.OpExecError("Failed to create new component on secondary"
3297                                " node %s" % remote_node)
3298
3299     logger.Info("adding new mirror component on primary")
3300     #HARDCODE
3301     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3302                                     instance, new_drbd,
3303                                     _GetInstanceInfoText(instance)):
3304       # remove secondary dev
3305       self.cfg.SetDiskID(new_drbd, remote_node)
3306       rpc.call_blockdev_remove(remote_node, new_drbd)
3307       raise errors.OpExecError("Failed to create volume on primary")
3308
3309     # the device exists now
3310     # call the primary node to add the mirror to md
3311     logger.Info("adding new mirror component to md")
3312     if not rpc.call_blockdev_addchildren(instance.primary_node,
3313                                          disk, [new_drbd]):
3314       logger.Error("Can't add mirror compoment to md!")
3315       self.cfg.SetDiskID(new_drbd, remote_node)
3316       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3317         logger.Error("Can't rollback on secondary")
3318       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3319       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3320         logger.Error("Can't rollback on primary")
3321       raise errors.OpExecError("Can't add mirror component to md array")
3322
3323     disk.children.append(new_drbd)
3324
3325     self.cfg.AddInstance(instance)
3326
3327     _WaitForSync(self.cfg, instance, self.proc)
3328
3329     return 0
3330
3331
3332 class LURemoveMDDRBDComponent(LogicalUnit):
3333   """Remove a component from a remote_raid1 disk.
3334
3335   """
3336   HPATH = "mirror-remove"
3337   HTYPE = constants.HTYPE_INSTANCE
3338   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3339
3340   def BuildHooksEnv(self):
3341     """Build hooks env.
3342
3343     This runs on the master, the primary and all the secondaries.
3344
3345     """
3346     env = {
3347       "DISK_NAME": self.op.disk_name,
3348       "DISK_ID": self.op.disk_id,
3349       "OLD_SECONDARY": self.old_secondary,
3350       }
3351     env.update(_BuildInstanceHookEnvByObject(self.instance))
3352     nl = [self.sstore.GetMasterNode(),
3353           self.instance.primary_node] + list(self.instance.secondary_nodes)
3354     return env, nl, nl
3355
3356   def CheckPrereq(self):
3357     """Check prerequisites.
3358
3359     This checks that the instance is in the cluster.
3360
3361     """
3362     instance = self.cfg.GetInstanceInfo(
3363       self.cfg.ExpandInstanceName(self.op.instance_name))
3364     if instance is None:
3365       raise errors.OpPrereqError("Instance '%s' not known" %
3366                                  self.op.instance_name)
3367     self.instance = instance
3368
3369     if instance.disk_template != constants.DT_REMOTE_RAID1:
3370       raise errors.OpPrereqError("Instance's disk layout is not"
3371                                  " remote_raid1.")
3372     for disk in instance.disks:
3373       if disk.iv_name == self.op.disk_name:
3374         break
3375     else:
3376       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3377                                  " instance." % self.op.disk_name)
3378     for child in disk.children:
3379       if (child.dev_type == constants.LD_DRBD7 and
3380           child.logical_id[2] == self.op.disk_id):
3381         break
3382     else:
3383       raise errors.OpPrereqError("Can't find the device with this port.")
3384
3385     if len(disk.children) < 2:
3386       raise errors.OpPrereqError("Cannot remove the last component from"
3387                                  " a mirror.")
3388     self.disk = disk
3389     self.child = child
3390     if self.child.logical_id[0] == instance.primary_node:
3391       oid = 1
3392     else:
3393       oid = 0
3394     self.old_secondary = self.child.logical_id[oid]
3395
3396   def Exec(self, feedback_fn):
3397     """Remove the mirror component
3398
3399     """
3400     instance = self.instance
3401     disk = self.disk
3402     child = self.child
3403     logger.Info("remove mirror component")
3404     self.cfg.SetDiskID(disk, instance.primary_node)
3405     if not rpc.call_blockdev_removechildren(instance.primary_node,
3406                                             disk, [child]):
3407       raise errors.OpExecError("Can't remove child from mirror.")
3408
3409     for node in child.logical_id[:2]:
3410       self.cfg.SetDiskID(child, node)
3411       if not rpc.call_blockdev_remove(node, child):
3412         logger.Error("Warning: failed to remove device from node %s,"
3413                      " continuing operation." % node)
3414
3415     disk.children.remove(child)
3416     self.cfg.AddInstance(instance)
3417
3418
3419 class LUReplaceDisks(LogicalUnit):
3420   """Replace the disks of an instance.
3421
3422   """
3423   HPATH = "mirrors-replace"
3424   HTYPE = constants.HTYPE_INSTANCE
3425   _OP_REQP = ["instance_name", "mode", "disks"]
3426
3427   def BuildHooksEnv(self):
3428     """Build hooks env.
3429
3430     This runs on the master, the primary and all the secondaries.
3431
3432     """
3433     env = {
3434       "MODE": self.op.mode,
3435       "NEW_SECONDARY": self.op.remote_node,
3436       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3437       }
3438     env.update(_BuildInstanceHookEnvByObject(self.instance))
3439     nl = [
3440       self.sstore.GetMasterNode(),
3441       self.instance.primary_node,
3442       ]
3443     if self.op.remote_node is not None:
3444       nl.append(self.op.remote_node)
3445     return env, nl, nl
3446
3447   def CheckPrereq(self):
3448     """Check prerequisites.
3449
3450     This checks that the instance is in the cluster.
3451
3452     """
3453     instance = self.cfg.GetInstanceInfo(
3454       self.cfg.ExpandInstanceName(self.op.instance_name))
3455     if instance is None:
3456       raise errors.OpPrereqError("Instance '%s' not known" %
3457                                  self.op.instance_name)
3458     self.instance = instance
3459     self.op.instance_name = instance.name
3460
3461     if instance.disk_template not in constants.DTS_NET_MIRROR:
3462       raise errors.OpPrereqError("Instance's disk layout is not"
3463                                  " network mirrored.")
3464
3465     if len(instance.secondary_nodes) != 1:
3466       raise errors.OpPrereqError("The instance has a strange layout,"
3467                                  " expected one secondary but found %d" %
3468                                  len(instance.secondary_nodes))
3469
3470     self.sec_node = instance.secondary_nodes[0]
3471
3472     remote_node = getattr(self.op, "remote_node", None)
3473     if remote_node is not None:
3474       remote_node = self.cfg.ExpandNodeName(remote_node)
3475       if remote_node is None:
3476         raise errors.OpPrereqError("Node '%s' not known" %
3477                                    self.op.remote_node)
3478       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3479     else:
3480       self.remote_node_info = None
3481     if remote_node == instance.primary_node:
3482       raise errors.OpPrereqError("The specified node is the primary node of"
3483                                  " the instance.")
3484     elif remote_node == self.sec_node:
3485       if self.op.mode == constants.REPLACE_DISK_SEC:
3486         # this is for DRBD8, where we can't execute the same mode of
3487         # replacement as for drbd7 (no different port allocated)
3488         raise errors.OpPrereqError("Same secondary given, cannot execute"
3489                                    " replacement")
3490       # the user gave the current secondary, switch to
3491       # 'no-replace-secondary' mode for drbd7
3492       remote_node = None
3493     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3494         self.op.mode != constants.REPLACE_DISK_ALL):
3495       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3496                                  " disks replacement, not individual ones")
3497     if instance.disk_template == constants.DT_DRBD8:
3498       if (self.op.mode == constants.REPLACE_DISK_ALL and
3499           remote_node is not None):
3500         # switch to replace secondary mode
3501         self.op.mode = constants.REPLACE_DISK_SEC
3502
3503       if self.op.mode == constants.REPLACE_DISK_ALL:
3504         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3505                                    " secondary disk replacement, not"
3506                                    " both at once")
3507       elif self.op.mode == constants.REPLACE_DISK_PRI:
3508         if remote_node is not None:
3509           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3510                                      " the secondary while doing a primary"
3511                                      " node disk replacement")
3512         self.tgt_node = instance.primary_node
3513         self.oth_node = instance.secondary_nodes[0]
3514       elif self.op.mode == constants.REPLACE_DISK_SEC:
3515         self.new_node = remote_node # this can be None, in which case
3516                                     # we don't change the secondary
3517         self.tgt_node = instance.secondary_nodes[0]
3518         self.oth_node = instance.primary_node
3519       else:
3520         raise errors.ProgrammerError("Unhandled disk replace mode")
3521
3522     for name in self.op.disks:
3523       if instance.FindDisk(name) is None:
3524         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3525                                    (name, instance.name))
3526     self.op.remote_node = remote_node
3527
3528   def _ExecRR1(self, feedback_fn):
3529     """Replace the disks of an instance.
3530
3531     """
3532     instance = self.instance
3533     iv_names = {}
3534     # start of work
3535     if self.op.remote_node is None:
3536       remote_node = self.sec_node
3537     else:
3538       remote_node = self.op.remote_node
3539     cfg = self.cfg
3540     for dev in instance.disks:
3541       size = dev.size
3542       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3543       names = _GenerateUniqueNames(cfg, lv_names)
3544       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3545                                        remote_node, size, names)
3546       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3547       logger.Info("adding new mirror component on secondary for %s" %
3548                   dev.iv_name)
3549       #HARDCODE
3550       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3551                                         new_drbd, False,
3552                                         _GetInstanceInfoText(instance)):
3553         raise errors.OpExecError("Failed to create new component on secondary"
3554                                  " node %s. Full abort, cleanup manually!" %
3555                                  remote_node)
3556
3557       logger.Info("adding new mirror component on primary")
3558       #HARDCODE
3559       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3560                                       instance, new_drbd,
3561                                       _GetInstanceInfoText(instance)):
3562         # remove secondary dev
3563         cfg.SetDiskID(new_drbd, remote_node)
3564         rpc.call_blockdev_remove(remote_node, new_drbd)
3565         raise errors.OpExecError("Failed to create volume on primary!"
3566                                  " Full abort, cleanup manually!!")
3567
3568       # the device exists now
3569       # call the primary node to add the mirror to md
3570       logger.Info("adding new mirror component to md")
3571       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3572                                            [new_drbd]):
3573         logger.Error("Can't add mirror compoment to md!")
3574         cfg.SetDiskID(new_drbd, remote_node)
3575         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3576           logger.Error("Can't rollback on secondary")
3577         cfg.SetDiskID(new_drbd, instance.primary_node)
3578         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3579           logger.Error("Can't rollback on primary")
3580         raise errors.OpExecError("Full abort, cleanup manually!!")
3581
3582       dev.children.append(new_drbd)
3583       cfg.AddInstance(instance)
3584
3585     # this can fail as the old devices are degraded and _WaitForSync
3586     # does a combined result over all disks, so we don't check its
3587     # return value
3588     _WaitForSync(cfg, instance, self.proc, unlock=True)
3589
3590     # so check manually all the devices
3591     for name in iv_names:
3592       dev, child, new_drbd = iv_names[name]
3593       cfg.SetDiskID(dev, instance.primary_node)
3594       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3595       if is_degr:
3596         raise errors.OpExecError("MD device %s is degraded!" % name)
3597       cfg.SetDiskID(new_drbd, instance.primary_node)
3598       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3599       if is_degr:
3600         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3601
3602     for name in iv_names:
3603       dev, child, new_drbd = iv_names[name]
3604       logger.Info("remove mirror %s component" % name)
3605       cfg.SetDiskID(dev, instance.primary_node)
3606       if not rpc.call_blockdev_removechildren(instance.primary_node,
3607                                               dev, [child]):
3608         logger.Error("Can't remove child from mirror, aborting"
3609                      " *this device cleanup*.\nYou need to cleanup manually!!")
3610         continue
3611
3612       for node in child.logical_id[:2]:
3613         logger.Info("remove child device on %s" % node)
3614         cfg.SetDiskID(child, node)
3615         if not rpc.call_blockdev_remove(node, child):
3616           logger.Error("Warning: failed to remove device from node %s,"
3617                        " continuing operation." % node)
3618
3619       dev.children.remove(child)
3620
3621       cfg.AddInstance(instance)
3622
3623   def _ExecD8DiskOnly(self, feedback_fn):
3624     """Replace a disk on the primary or secondary for dbrd8.
3625
3626     The algorithm for replace is quite complicated:
3627       - for each disk to be replaced:
3628         - create new LVs on the target node with unique names
3629         - detach old LVs from the drbd device
3630         - rename old LVs to name_replaced.<time_t>
3631         - rename new LVs to old LVs
3632         - attach the new LVs (with the old names now) to the drbd device
3633       - wait for sync across all devices
3634       - for each modified disk:
3635         - remove old LVs (which have the name name_replaces.<time_t>)
3636
3637     Failures are not very well handled.
3638
3639     """
3640     steps_total = 6
3641     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3642     instance = self.instance
3643     iv_names = {}
3644     vgname = self.cfg.GetVGName()
3645     # start of work
3646     cfg = self.cfg
3647     tgt_node = self.tgt_node
3648     oth_node = self.oth_node
3649
3650     # Step: check device activation
3651     self.proc.LogStep(1, steps_total, "check device existence")
3652     info("checking volume groups")
3653     my_vg = cfg.GetVGName()
3654     results = rpc.call_vg_list([oth_node, tgt_node])
3655     if not results:
3656       raise errors.OpExecError("Can't list volume groups on the nodes")
3657     for node in oth_node, tgt_node:
3658       res = results.get(node, False)
3659       if not res or my_vg not in res:
3660         raise errors.OpExecError("Volume group '%s' not found on %s" %
3661                                  (my_vg, node))
3662     for dev in instance.disks:
3663       if not dev.iv_name in self.op.disks:
3664         continue
3665       for node in tgt_node, oth_node:
3666         info("checking %s on %s" % (dev.iv_name, node))
3667         cfg.SetDiskID(dev, node)
3668         if not rpc.call_blockdev_find(node, dev):
3669           raise errors.OpExecError("Can't find device %s on node %s" %
3670                                    (dev.iv_name, node))
3671
3672     # Step: check other node consistency
3673     self.proc.LogStep(2, steps_total, "check peer consistency")
3674     for dev in instance.disks:
3675       if not dev.iv_name in self.op.disks:
3676         continue
3677       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3678       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3679                                    oth_node==instance.primary_node):
3680         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3681                                  " to replace disks on this node (%s)" %
3682                                  (oth_node, tgt_node))
3683
3684     # Step: create new storage
3685     self.proc.LogStep(3, steps_total, "allocate new storage")
3686     for dev in instance.disks:
3687       if not dev.iv_name in self.op.disks:
3688         continue
3689       size = dev.size
3690       cfg.SetDiskID(dev, tgt_node)
3691       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3692       names = _GenerateUniqueNames(cfg, lv_names)
3693       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3694                              logical_id=(vgname, names[0]))
3695       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3696                              logical_id=(vgname, names[1]))
3697       new_lvs = [lv_data, lv_meta]
3698       old_lvs = dev.children
3699       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3700       info("creating new local storage on %s for %s" %
3701            (tgt_node, dev.iv_name))
3702       # since we *always* want to create this LV, we use the
3703       # _Create...OnPrimary (which forces the creation), even if we
3704       # are talking about the secondary node
3705       for new_lv in new_lvs:
3706         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3707                                         _GetInstanceInfoText(instance)):
3708           raise errors.OpExecError("Failed to create new LV named '%s' on"
3709                                    " node '%s'" %
3710                                    (new_lv.logical_id[1], tgt_node))
3711
3712     # Step: for each lv, detach+rename*2+attach
3713     self.proc.LogStep(4, steps_total, "change drbd configuration")
3714     for dev, old_lvs, new_lvs in iv_names.itervalues():
3715       info("detaching %s drbd from local storage" % dev.iv_name)
3716       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3717         raise errors.OpExecError("Can't detach drbd from local storage on node"
3718                                  " %s for device %s" % (tgt_node, dev.iv_name))
3719       #dev.children = []
3720       #cfg.Update(instance)
3721
3722       # ok, we created the new LVs, so now we know we have the needed
3723       # storage; as such, we proceed on the target node to rename
3724       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3725       # using the assumption than logical_id == physical_id (which in
3726       # turn is the unique_id on that node)
3727
3728       # FIXME(iustin): use a better name for the replaced LVs
3729       temp_suffix = int(time.time())
3730       ren_fn = lambda d, suff: (d.physical_id[0],
3731                                 d.physical_id[1] + "_replaced-%s" % suff)
3732       # build the rename list based on what LVs exist on the node
3733       rlist = []
3734       for to_ren in old_lvs:
3735         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3736         if find_res is not None: # device exists
3737           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3738
3739       info("renaming the old LVs on the target node")
3740       if not rpc.call_blockdev_rename(tgt_node, rlist):
3741         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3742       # now we rename the new LVs to the old LVs
3743       info("renaming the new LVs on the target node")
3744       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3745       if not rpc.call_blockdev_rename(tgt_node, rlist):
3746         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3747
3748       for old, new in zip(old_lvs, new_lvs):
3749         new.logical_id = old.logical_id
3750         cfg.SetDiskID(new, tgt_node)
3751
3752       for disk in old_lvs:
3753         disk.logical_id = ren_fn(disk, temp_suffix)
3754         cfg.SetDiskID(disk, tgt_node)
3755
3756       # now that the new lvs have the old name, we can add them to the device
3757       info("adding new mirror component on %s" % tgt_node)
3758       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3759         for new_lv in new_lvs:
3760           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3761             warning("Can't rollback device %s", hint="manually cleanup unused"
3762                     " logical volumes")
3763         raise errors.OpExecError("Can't add local storage to drbd")
3764
3765       dev.children = new_lvs
3766       cfg.Update(instance)
3767
3768     # Step: wait for sync
3769
3770     # this can fail as the old devices are degraded and _WaitForSync
3771     # does a combined result over all disks, so we don't check its
3772     # return value
3773     self.proc.LogStep(5, steps_total, "sync devices")
3774     _WaitForSync(cfg, instance, self.proc, unlock=True)
3775
3776     # so check manually all the devices
3777     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3778       cfg.SetDiskID(dev, instance.primary_node)
3779       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3780       if is_degr:
3781         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3782
3783     # Step: remove old storage
3784     self.proc.LogStep(6, steps_total, "removing old storage")
3785     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3786       info("remove logical volumes for %s" % name)
3787       for lv in old_lvs:
3788         cfg.SetDiskID(lv, tgt_node)
3789         if not rpc.call_blockdev_remove(tgt_node, lv):
3790           warning("Can't remove old LV", hint="manually remove unused LVs")
3791           continue
3792
3793   def _ExecD8Secondary(self, feedback_fn):
3794     """Replace the secondary node for drbd8.
3795
3796     The algorithm for replace is quite complicated:
3797       - for all disks of the instance:
3798         - create new LVs on the new node with same names
3799         - shutdown the drbd device on the old secondary
3800         - disconnect the drbd network on the primary
3801         - create the drbd device on the new secondary
3802         - network attach the drbd on the primary, using an artifice:
3803           the drbd code for Attach() will connect to the network if it
3804           finds a device which is connected to the good local disks but
3805           not network enabled
3806       - wait for sync across all devices
3807       - remove all disks from the old secondary
3808
3809     Failures are not very well handled.
3810
3811     """
3812     steps_total = 6
3813     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3814     instance = self.instance
3815     iv_names = {}
3816     vgname = self.cfg.GetVGName()
3817     # start of work
3818     cfg = self.cfg
3819     old_node = self.tgt_node
3820     new_node = self.new_node
3821     pri_node = instance.primary_node
3822
3823     # Step: check device activation
3824     self.proc.LogStep(1, steps_total, "check device existence")
3825     info("checking volume groups")
3826     my_vg = cfg.GetVGName()
3827     results = rpc.call_vg_list([pri_node, new_node])
3828     if not results:
3829       raise errors.OpExecError("Can't list volume groups on the nodes")
3830     for node in pri_node, new_node:
3831       res = results.get(node, False)
3832       if not res or my_vg not in res:
3833         raise errors.OpExecError("Volume group '%s' not found on %s" %
3834                                  (my_vg, node))
3835     for dev in instance.disks:
3836       if not dev.iv_name in self.op.disks:
3837         continue
3838       info("checking %s on %s" % (dev.iv_name, pri_node))
3839       cfg.SetDiskID(dev, pri_node)
3840       if not rpc.call_blockdev_find(pri_node, dev):
3841         raise errors.OpExecError("Can't find device %s on node %s" %
3842                                  (dev.iv_name, pri_node))
3843
3844     # Step: check other node consistency
3845     self.proc.LogStep(2, steps_total, "check peer consistency")
3846     for dev in instance.disks:
3847       if not dev.iv_name in self.op.disks:
3848         continue
3849       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3850       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3851         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3852                                  " unsafe to replace the secondary" %
3853                                  pri_node)
3854
3855     # Step: create new storage
3856     self.proc.LogStep(3, steps_total, "allocate new storage")
3857     for dev in instance.disks:
3858       size = dev.size
3859       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3860       # since we *always* want to create this LV, we use the
3861       # _Create...OnPrimary (which forces the creation), even if we
3862       # are talking about the secondary node
3863       for new_lv in dev.children:
3864         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3865                                         _GetInstanceInfoText(instance)):
3866           raise errors.OpExecError("Failed to create new LV named '%s' on"
3867                                    " node '%s'" %
3868                                    (new_lv.logical_id[1], new_node))
3869
3870       iv_names[dev.iv_name] = (dev, dev.children)
3871
3872     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3873     for dev in instance.disks:
3874       size = dev.size
3875       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3876       # create new devices on new_node
3877       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3878                               logical_id=(pri_node, new_node,
3879                                           dev.logical_id[2]),
3880                               children=dev.children)
3881       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3882                                         new_drbd, False,
3883                                       _GetInstanceInfoText(instance)):
3884         raise errors.OpExecError("Failed to create new DRBD on"
3885                                  " node '%s'" % new_node)
3886
3887     for dev in instance.disks:
3888       # we have new devices, shutdown the drbd on the old secondary
3889       info("shutting down drbd for %s on old node" % dev.iv_name)
3890       cfg.SetDiskID(dev, old_node)
3891       if not rpc.call_blockdev_shutdown(old_node, dev):
3892         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3893                 hint="Please cleanup this device manually as soon as possible")
3894
3895     info("detaching primary drbds from the network (=> standalone)")
3896     done = 0
3897     for dev in instance.disks:
3898       cfg.SetDiskID(dev, pri_node)
3899       # set the physical (unique in bdev terms) id to None, meaning
3900       # detach from network
3901       dev.physical_id = (None,) * len(dev.physical_id)
3902       # and 'find' the device, which will 'fix' it to match the
3903       # standalone state
3904       if rpc.call_blockdev_find(pri_node, dev):
3905         done += 1
3906       else:
3907         warning("Failed to detach drbd %s from network, unusual case" %
3908                 dev.iv_name)
3909
3910     if not done:
3911       # no detaches succeeded (very unlikely)
3912       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3913
3914     # if we managed to detach at least one, we update all the disks of
3915     # the instance to point to the new secondary
3916     info("updating instance configuration")
3917     for dev in instance.disks:
3918       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3919       cfg.SetDiskID(dev, pri_node)
3920     cfg.Update(instance)
3921
3922     # and now perform the drbd attach
3923     info("attaching primary drbds to new secondary (standalone => connected)")
3924     failures = []
3925     for dev in instance.disks:
3926       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3927       # since the attach is smart, it's enough to 'find' the device,
3928       # it will automatically activate the network, if the physical_id
3929       # is correct
3930       cfg.SetDiskID(dev, pri_node)
3931       if not rpc.call_blockdev_find(pri_node, dev):
3932         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3933                 "please do a gnt-instance info to see the status of disks")
3934
3935     # this can fail as the old devices are degraded and _WaitForSync
3936     # does a combined result over all disks, so we don't check its
3937     # return value
3938     self.proc.LogStep(5, steps_total, "sync devices")
3939     _WaitForSync(cfg, instance, self.proc, unlock=True)
3940
3941     # so check manually all the devices
3942     for name, (dev, old_lvs) in iv_names.iteritems():
3943       cfg.SetDiskID(dev, pri_node)
3944       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3945       if is_degr:
3946         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3947
3948     self.proc.LogStep(6, steps_total, "removing old storage")
3949     for name, (dev, old_lvs) in iv_names.iteritems():
3950       info("remove logical volumes for %s" % name)
3951       for lv in old_lvs:
3952         cfg.SetDiskID(lv, old_node)
3953         if not rpc.call_blockdev_remove(old_node, lv):
3954           warning("Can't remove LV on old secondary",
3955                   hint="Cleanup stale volumes by hand")
3956
3957   def Exec(self, feedback_fn):
3958     """Execute disk replacement.
3959
3960     This dispatches the disk replacement to the appropriate handler.
3961
3962     """
3963     instance = self.instance
3964     if instance.disk_template == constants.DT_REMOTE_RAID1:
3965       fn = self._ExecRR1
3966     elif instance.disk_template == constants.DT_DRBD8:
3967       if self.op.remote_node is None:
3968         fn = self._ExecD8DiskOnly
3969       else:
3970         fn = self._ExecD8Secondary
3971     else:
3972       raise errors.ProgrammerError("Unhandled disk replacement case")
3973     return fn(feedback_fn)
3974
3975
3976 class LUQueryInstanceData(NoHooksLU):
3977   """Query runtime instance data.
3978
3979   """
3980   _OP_REQP = ["instances"]
3981
3982   def CheckPrereq(self):
3983     """Check prerequisites.
3984
3985     This only checks the optional instance list against the existing names.
3986
3987     """
3988     if not isinstance(self.op.instances, list):
3989       raise errors.OpPrereqError("Invalid argument type 'instances'")
3990     if self.op.instances:
3991       self.wanted_instances = []
3992       names = self.op.instances
3993       for name in names:
3994         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3995         if instance is None:
3996           raise errors.OpPrereqError("No such instance name '%s'" % name)
3997       self.wanted_instances.append(instance)
3998     else:
3999       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4000                                in self.cfg.GetInstanceList()]
4001     return
4002
4003
4004   def _ComputeDiskStatus(self, instance, snode, dev):
4005     """Compute block device status.
4006
4007     """
4008     self.cfg.SetDiskID(dev, instance.primary_node)
4009     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4010     if dev.dev_type in constants.LDS_DRBD:
4011       # we change the snode then (otherwise we use the one passed in)
4012       if dev.logical_id[0] == instance.primary_node:
4013         snode = dev.logical_id[1]
4014       else:
4015         snode = dev.logical_id[0]
4016
4017     if snode:
4018       self.cfg.SetDiskID(dev, snode)
4019       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4020     else:
4021       dev_sstatus = None
4022
4023     if dev.children:
4024       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4025                       for child in dev.children]
4026     else:
4027       dev_children = []
4028
4029     data = {
4030       "iv_name": dev.iv_name,
4031       "dev_type": dev.dev_type,
4032       "logical_id": dev.logical_id,
4033       "physical_id": dev.physical_id,
4034       "pstatus": dev_pstatus,
4035       "sstatus": dev_sstatus,
4036       "children": dev_children,
4037       }
4038
4039     return data
4040
4041   def Exec(self, feedback_fn):
4042     """Gather and return data"""
4043     result = {}
4044     for instance in self.wanted_instances:
4045       remote_info = rpc.call_instance_info(instance.primary_node,
4046                                                 instance.name)
4047       if remote_info and "state" in remote_info:
4048         remote_state = "up"
4049       else:
4050         remote_state = "down"
4051       if instance.status == "down":
4052         config_state = "down"
4053       else:
4054         config_state = "up"
4055
4056       disks = [self._ComputeDiskStatus(instance, None, device)
4057                for device in instance.disks]
4058
4059       idict = {
4060         "name": instance.name,
4061         "config_state": config_state,
4062         "run_state": remote_state,
4063         "pnode": instance.primary_node,
4064         "snodes": instance.secondary_nodes,
4065         "os": instance.os,
4066         "memory": instance.memory,
4067         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4068         "disks": disks,
4069         "network_port": instance.network_port,
4070         "vcpus": instance.vcpus,
4071         "kernel_path": instance.kernel_path,
4072         "initrd_path": instance.initrd_path,
4073         "hvm_boot_order": instance.hvm_boot_order,
4074         }
4075
4076       result[instance.name] = idict
4077
4078     return result
4079
4080
4081 class LUSetInstanceParms(LogicalUnit):
4082   """Modifies an instances's parameters.
4083
4084   """
4085   HPATH = "instance-modify"
4086   HTYPE = constants.HTYPE_INSTANCE
4087   _OP_REQP = ["instance_name"]
4088
4089   def BuildHooksEnv(self):
4090     """Build hooks env.
4091
4092     This runs on the master, primary and secondaries.
4093
4094     """
4095     args = dict()
4096     if self.mem:
4097       args['memory'] = self.mem
4098     if self.vcpus:
4099       args['vcpus'] = self.vcpus
4100     if self.do_ip or self.do_bridge:
4101       if self.do_ip:
4102         ip = self.ip
4103       else:
4104         ip = self.instance.nics[0].ip
4105       if self.bridge:
4106         bridge = self.bridge
4107       else:
4108         bridge = self.instance.nics[0].bridge
4109       args['nics'] = [(ip, bridge)]
4110     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4111     nl = [self.sstore.GetMasterNode(),
4112           self.instance.primary_node] + list(self.instance.secondary_nodes)
4113     return env, nl, nl
4114
4115   def CheckPrereq(self):
4116     """Check prerequisites.
4117
4118     This only checks the instance list against the existing names.
4119
4120     """
4121     self.mem = getattr(self.op, "mem", None)
4122     self.vcpus = getattr(self.op, "vcpus", None)
4123     self.ip = getattr(self.op, "ip", None)
4124     self.mac = getattr(self.op, "mac", None)
4125     self.bridge = getattr(self.op, "bridge", None)
4126     self.kernel_path = getattr(self.op, "kernel_path", None)
4127     self.initrd_path = getattr(self.op, "initrd_path", None)
4128     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4129     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4130                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4131     if all_parms.count(None) == len(all_parms):
4132       raise errors.OpPrereqError("No changes submitted")
4133     if self.mem is not None:
4134       try:
4135         self.mem = int(self.mem)
4136       except ValueError, err:
4137         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4138     if self.vcpus is not None:
4139       try:
4140         self.vcpus = int(self.vcpus)
4141       except ValueError, err:
4142         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4143     if self.ip is not None:
4144       self.do_ip = True
4145       if self.ip.lower() == "none":
4146         self.ip = None
4147       else:
4148         if not utils.IsValidIP(self.ip):
4149           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4150     else:
4151       self.do_ip = False
4152     self.do_bridge = (self.bridge is not None)
4153     if self.mac is not None:
4154       if self.cfg.IsMacInUse(self.mac):
4155         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4156                                    self.mac)
4157       if not utils.IsValidMac(self.mac):
4158         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4159
4160     if self.kernel_path is not None:
4161       self.do_kernel_path = True
4162       if self.kernel_path == constants.VALUE_NONE:
4163         raise errors.OpPrereqError("Can't set instance to no kernel")
4164
4165       if self.kernel_path != constants.VALUE_DEFAULT:
4166         if not os.path.isabs(self.kernel_path):
4167           raise errors.OpPrereqError("The kernel path must be an absolute"
4168                                     " filename")
4169     else:
4170       self.do_kernel_path = False
4171
4172     if self.initrd_path is not None:
4173       self.do_initrd_path = True
4174       if self.initrd_path not in (constants.VALUE_NONE,
4175                                   constants.VALUE_DEFAULT):
4176         if not os.path.isabs(self.initrd_path):
4177           raise errors.OpPrereqError("The initrd path must be an absolute"
4178                                     " filename")
4179     else:
4180       self.do_initrd_path = False
4181
4182     # boot order verification
4183     if self.hvm_boot_order is not None:
4184       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4185         if len(self.hvm_boot_order.strip("acdn")) != 0:
4186           raise errors.OpPrereqError("invalid boot order specified,"
4187                                      " must be one or more of [acdn]"
4188                                      " or 'default'")
4189
4190     instance = self.cfg.GetInstanceInfo(
4191       self.cfg.ExpandInstanceName(self.op.instance_name))
4192     if instance is None:
4193       raise errors.OpPrereqError("No such instance name '%s'" %
4194                                  self.op.instance_name)
4195     self.op.instance_name = instance.name
4196     self.instance = instance
4197     return
4198
4199   def Exec(self, feedback_fn):
4200     """Modifies an instance.
4201
4202     All parameters take effect only at the next restart of the instance.
4203     """
4204     result = []
4205     instance = self.instance
4206     if self.mem:
4207       instance.memory = self.mem
4208       result.append(("mem", self.mem))
4209     if self.vcpus:
4210       instance.vcpus = self.vcpus
4211       result.append(("vcpus",  self.vcpus))
4212     if self.do_ip:
4213       instance.nics[0].ip = self.ip
4214       result.append(("ip", self.ip))
4215     if self.bridge:
4216       instance.nics[0].bridge = self.bridge
4217       result.append(("bridge", self.bridge))
4218     if self.mac:
4219       instance.nics[0].mac = self.mac
4220       result.append(("mac", self.mac))
4221     if self.do_kernel_path:
4222       instance.kernel_path = self.kernel_path
4223       result.append(("kernel_path", self.kernel_path))
4224     if self.do_initrd_path:
4225       instance.initrd_path = self.initrd_path
4226       result.append(("initrd_path", self.initrd_path))
4227     if self.hvm_boot_order:
4228       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4229         instance.hvm_boot_order = None
4230       else:
4231         instance.hvm_boot_order = self.hvm_boot_order
4232       result.append(("hvm_boot_order", self.hvm_boot_order))
4233
4234     self.cfg.AddInstance(instance)
4235
4236     return result
4237
4238
4239 class LUQueryExports(NoHooksLU):
4240   """Query the exports list
4241
4242   """
4243   _OP_REQP = []
4244
4245   def CheckPrereq(self):
4246     """Check that the nodelist contains only existing nodes.
4247
4248     """
4249     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4250
4251   def Exec(self, feedback_fn):
4252     """Compute the list of all the exported system images.
4253
4254     Returns:
4255       a dictionary with the structure node->(export-list)
4256       where export-list is a list of the instances exported on
4257       that node.
4258
4259     """
4260     return rpc.call_export_list(self.nodes)
4261
4262
4263 class LUExportInstance(LogicalUnit):
4264   """Export an instance to an image in the cluster.
4265
4266   """
4267   HPATH = "instance-export"
4268   HTYPE = constants.HTYPE_INSTANCE
4269   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4270
4271   def BuildHooksEnv(self):
4272     """Build hooks env.
4273
4274     This will run on the master, primary node and target node.
4275
4276     """
4277     env = {
4278       "EXPORT_NODE": self.op.target_node,
4279       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4280       }
4281     env.update(_BuildInstanceHookEnvByObject(self.instance))
4282     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4283           self.op.target_node]
4284     return env, nl, nl
4285
4286   def CheckPrereq(self):
4287     """Check prerequisites.
4288
4289     This checks that the instance name is a valid one.
4290
4291     """
4292     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4293     self.instance = self.cfg.GetInstanceInfo(instance_name)
4294     if self.instance is None:
4295       raise errors.OpPrereqError("Instance '%s' not found" %
4296                                  self.op.instance_name)
4297
4298     # node verification
4299     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4300     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4301
4302     if self.dst_node is None:
4303       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4304                                  self.op.target_node)
4305     self.op.target_node = self.dst_node.name
4306
4307   def Exec(self, feedback_fn):
4308     """Export an instance to an image in the cluster.
4309
4310     """
4311     instance = self.instance
4312     dst_node = self.dst_node
4313     src_node = instance.primary_node
4314     # shutdown the instance, unless requested not to do so
4315     if self.op.shutdown:
4316       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4317       self.proc.ChainOpCode(op)
4318
4319     vgname = self.cfg.GetVGName()
4320
4321     snap_disks = []
4322
4323     try:
4324       for disk in instance.disks:
4325         if disk.iv_name == "sda":
4326           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4327           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4328
4329           if not new_dev_name:
4330             logger.Error("could not snapshot block device %s on node %s" %
4331                          (disk.logical_id[1], src_node))
4332           else:
4333             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4334                                       logical_id=(vgname, new_dev_name),
4335                                       physical_id=(vgname, new_dev_name),
4336                                       iv_name=disk.iv_name)
4337             snap_disks.append(new_dev)
4338
4339     finally:
4340       if self.op.shutdown:
4341         op = opcodes.OpStartupInstance(instance_name=instance.name,
4342                                        force=False)
4343         self.proc.ChainOpCode(op)
4344
4345     # TODO: check for size
4346
4347     for dev in snap_disks:
4348       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4349                                            instance):
4350         logger.Error("could not export block device %s from node"
4351                      " %s to node %s" %
4352                      (dev.logical_id[1], src_node, dst_node.name))
4353       if not rpc.call_blockdev_remove(src_node, dev):
4354         logger.Error("could not remove snapshot block device %s from"
4355                      " node %s" % (dev.logical_id[1], src_node))
4356
4357     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4358       logger.Error("could not finalize export for instance %s on node %s" %
4359                    (instance.name, dst_node.name))
4360
4361     nodelist = self.cfg.GetNodeList()
4362     nodelist.remove(dst_node.name)
4363
4364     # on one-node clusters nodelist will be empty after the removal
4365     # if we proceed the backup would be removed because OpQueryExports
4366     # substitutes an empty list with the full cluster node list.
4367     if nodelist:
4368       op = opcodes.OpQueryExports(nodes=nodelist)
4369       exportlist = self.proc.ChainOpCode(op)
4370       for node in exportlist:
4371         if instance.name in exportlist[node]:
4372           if not rpc.call_export_remove(node, instance.name):
4373             logger.Error("could not remove older export for instance %s"
4374                          " on node %s" % (instance.name, node))
4375
4376
4377 class TagsLU(NoHooksLU):
4378   """Generic tags LU.
4379
4380   This is an abstract class which is the parent of all the other tags LUs.
4381
4382   """
4383   def CheckPrereq(self):
4384     """Check prerequisites.
4385
4386     """
4387     if self.op.kind == constants.TAG_CLUSTER:
4388       self.target = self.cfg.GetClusterInfo()
4389     elif self.op.kind == constants.TAG_NODE:
4390       name = self.cfg.ExpandNodeName(self.op.name)
4391       if name is None:
4392         raise errors.OpPrereqError("Invalid node name (%s)" %
4393                                    (self.op.name,))
4394       self.op.name = name
4395       self.target = self.cfg.GetNodeInfo(name)
4396     elif self.op.kind == constants.TAG_INSTANCE:
4397       name = self.cfg.ExpandInstanceName(self.op.name)
4398       if name is None:
4399         raise errors.OpPrereqError("Invalid instance name (%s)" %
4400                                    (self.op.name,))
4401       self.op.name = name
4402       self.target = self.cfg.GetInstanceInfo(name)
4403     else:
4404       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4405                                  str(self.op.kind))
4406
4407
4408 class LUGetTags(TagsLU):
4409   """Returns the tags of a given object.
4410
4411   """
4412   _OP_REQP = ["kind", "name"]
4413
4414   def Exec(self, feedback_fn):
4415     """Returns the tag list.
4416
4417     """
4418     return self.target.GetTags()
4419
4420
4421 class LUSearchTags(NoHooksLU):
4422   """Searches the tags for a given pattern.
4423
4424   """
4425   _OP_REQP = ["pattern"]
4426
4427   def CheckPrereq(self):
4428     """Check prerequisites.
4429
4430     This checks the pattern passed for validity by compiling it.
4431
4432     """
4433     try:
4434       self.re = re.compile(self.op.pattern)
4435     except re.error, err:
4436       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4437                                  (self.op.pattern, err))
4438
4439   def Exec(self, feedback_fn):
4440     """Returns the tag list.
4441
4442     """
4443     cfg = self.cfg
4444     tgts = [("/cluster", cfg.GetClusterInfo())]
4445     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4446     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4447     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4448     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4449     results = []
4450     for path, target in tgts:
4451       for tag in target.GetTags():
4452         if self.re.search(tag):
4453           results.append((path, tag))
4454     return results
4455
4456
4457 class LUAddTags(TagsLU):
4458   """Sets a tag on a given object.
4459
4460   """
4461   _OP_REQP = ["kind", "name", "tags"]
4462
4463   def CheckPrereq(self):
4464     """Check prerequisites.
4465
4466     This checks the type and length of the tag name and value.
4467
4468     """
4469     TagsLU.CheckPrereq(self)
4470     for tag in self.op.tags:
4471       objects.TaggableObject.ValidateTag(tag)
4472
4473   def Exec(self, feedback_fn):
4474     """Sets the tag.
4475
4476     """
4477     try:
4478       for tag in self.op.tags:
4479         self.target.AddTag(tag)
4480     except errors.TagError, err:
4481       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4482     try:
4483       self.cfg.Update(self.target)
4484     except errors.ConfigurationError:
4485       raise errors.OpRetryError("There has been a modification to the"
4486                                 " config file and the operation has been"
4487                                 " aborted. Please retry.")
4488
4489
4490 class LUDelTags(TagsLU):
4491   """Delete a list of tags from a given object.
4492
4493   """
4494   _OP_REQP = ["kind", "name", "tags"]
4495
4496   def CheckPrereq(self):
4497     """Check prerequisites.
4498
4499     This checks that we have the given tag.
4500
4501     """
4502     TagsLU.CheckPrereq(self)
4503     for tag in self.op.tags:
4504       objects.TaggableObject.ValidateTag(tag)
4505     del_tags = frozenset(self.op.tags)
4506     cur_tags = self.target.GetTags()
4507     if not del_tags <= cur_tags:
4508       diff_tags = del_tags - cur_tags
4509       diff_names = ["'%s'" % tag for tag in diff_tags]
4510       diff_names.sort()
4511       raise errors.OpPrereqError("Tag(s) %s not found" %
4512                                  (",".join(diff_names)))
4513
4514   def Exec(self, feedback_fn):
4515     """Remove the tag from the object.
4516
4517     """
4518     for tag in self.op.tags:
4519       self.target.RemoveTag(tag)
4520     try:
4521       self.cfg.Update(self.target)
4522     except errors.ConfigurationError:
4523       raise errors.OpRetryError("There has been a modification to the"
4524                                 " config file and the operation has been"
4525                                 " aborted. Please retry.")