Verify: fix ERROR message indentation
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import simplejson
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47
48 # Check whether the simplejson module supports indentation
49 _JSON_INDENT = 2
50 try:
51   simplejson.dumps(1, indent=_JSON_INDENT)
52 except TypeError:
53   _JSON_INDENT = None
54
55
56 class LogicalUnit(object):
57   """Logical Unit base class.
58
59   Subclasses must follow these rules:
60     - implement CheckPrereq which also fills in the opcode instance
61       with all the fields (even if as None)
62     - implement Exec
63     - implement BuildHooksEnv
64     - redefine HPATH and HTYPE
65     - optionally redefine their run requirements (REQ_CLUSTER,
66       REQ_MASTER); note that all commands require root permissions
67
68   """
69   HPATH = None
70   HTYPE = None
71   _OP_REQP = []
72   REQ_CLUSTER = True
73   REQ_MASTER = True
74
75   def __init__(self, processor, op, cfg, sstore):
76     """Constructor for LogicalUnit.
77
78     This needs to be overriden in derived classes in order to check op
79     validity.
80
81     """
82     self.proc = processor
83     self.op = op
84     self.cfg = cfg
85     self.sstore = sstore
86     for attr_name in self._OP_REQP:
87       attr_val = getattr(op, attr_name, None)
88       if attr_val is None:
89         raise errors.OpPrereqError("Required parameter '%s' missing" %
90                                    attr_name)
91     if self.REQ_CLUSTER:
92       if not cfg.IsCluster():
93         raise errors.OpPrereqError("Cluster not initialized yet,"
94                                    " use 'gnt-cluster init' first.")
95       if self.REQ_MASTER:
96         master = sstore.GetMasterNode()
97         if master != utils.HostInfo().name:
98           raise errors.OpPrereqError("Commands must be run on the master"
99                                      " node %s" % master)
100
101   def CheckPrereq(self):
102     """Check prerequisites for this LU.
103
104     This method should check that the prerequisites for the execution
105     of this LU are fulfilled. It can do internode communication, but
106     it should be idempotent - no cluster or system changes are
107     allowed.
108
109     The method should raise errors.OpPrereqError in case something is
110     not fulfilled. Its return value is ignored.
111
112     This method should also update all the parameters of the opcode to
113     their canonical form; e.g. a short node name must be fully
114     expanded after this method has successfully completed (so that
115     hooks, logging, etc. work correctly).
116
117     """
118     raise NotImplementedError
119
120   def Exec(self, feedback_fn):
121     """Execute the LU.
122
123     This method should implement the actual work. It should raise
124     errors.OpExecError for failures that are somewhat dealt with in
125     code, or expected.
126
127     """
128     raise NotImplementedError
129
130   def BuildHooksEnv(self):
131     """Build hooks environment for this LU.
132
133     This method should return a three-node tuple consisting of: a dict
134     containing the environment that will be used for running the
135     specific hook for this LU, a list of node names on which the hook
136     should run before the execution, and a list of node names on which
137     the hook should run after the execution.
138
139     The keys of the dict must not have 'GANETI_' prefixed as this will
140     be handled in the hooks runner. Also note additional keys will be
141     added by the hooks runner. If the LU doesn't define any
142     environment, an empty dict (and not None) should be returned.
143
144     As for the node lists, the master should not be included in the
145     them, as it will be added by the hooks runner in case this LU
146     requires a cluster to run on (otherwise we don't have a node
147     list). No nodes should be returned as an empty list (and not
148     None).
149
150     Note that if the HPATH for a LU class is None, this function will
151     not be called.
152
153     """
154     raise NotImplementedError
155
156
157 class NoHooksLU(LogicalUnit):
158   """Simple LU which runs no hooks.
159
160   This LU is intended as a parent for other LogicalUnits which will
161   run no hooks, in order to reduce duplicate code.
162
163   """
164   HPATH = None
165   HTYPE = None
166
167   def BuildHooksEnv(self):
168     """Build hooks env.
169
170     This is a no-op, since we don't run hooks.
171
172     """
173     return {}, [], []
174
175
176 def _AddHostToEtcHosts(hostname):
177   """Wrapper around utils.SetEtcHostsEntry.
178
179   """
180   hi = utils.HostInfo(name=hostname)
181   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182
183
184 def _RemoveHostFromEtcHosts(hostname):
185   """Wrapper around utils.RemoveEtcHostsEntry.
186
187   """
188   hi = utils.HostInfo(name=hostname)
189   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
190   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191
192
193 def _GetWantedNodes(lu, nodes):
194   """Returns list of checked and expanded node names.
195
196   Args:
197     nodes: List of nodes (strings) or None for all
198
199   """
200   if not isinstance(nodes, list):
201     raise errors.OpPrereqError("Invalid argument type 'nodes'")
202
203   if nodes:
204     wanted = []
205
206     for name in nodes:
207       node = lu.cfg.ExpandNodeName(name)
208       if node is None:
209         raise errors.OpPrereqError("No such node name '%s'" % name)
210       wanted.append(node)
211
212   else:
213     wanted = lu.cfg.GetNodeList()
214   return utils.NiceSort(wanted)
215
216
217 def _GetWantedInstances(lu, instances):
218   """Returns list of checked and expanded instance names.
219
220   Args:
221     instances: List of instances (strings) or None for all
222
223   """
224   if not isinstance(instances, list):
225     raise errors.OpPrereqError("Invalid argument type 'instances'")
226
227   if instances:
228     wanted = []
229
230     for name in instances:
231       instance = lu.cfg.ExpandInstanceName(name)
232       if instance is None:
233         raise errors.OpPrereqError("No such instance name '%s'" % name)
234       wanted.append(instance)
235
236   else:
237     wanted = lu.cfg.GetInstanceList()
238   return utils.NiceSort(wanted)
239
240
241 def _CheckOutputFields(static, dynamic, selected):
242   """Checks whether all selected fields are valid.
243
244   Args:
245     static: Static fields
246     dynamic: Dynamic fields
247
248   """
249   static_fields = frozenset(static)
250   dynamic_fields = frozenset(dynamic)
251
252   all_fields = static_fields | dynamic_fields
253
254   if not all_fields.issuperset(selected):
255     raise errors.OpPrereqError("Unknown output fields selected: %s"
256                                % ",".join(frozenset(selected).
257                                           difference(all_fields)))
258
259
260 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
261                           memory, vcpus, nics):
262   """Builds instance related env variables for hooks from single variables.
263
264   Args:
265     secondary_nodes: List of secondary nodes as strings
266   """
267   env = {
268     "OP_TARGET": name,
269     "INSTANCE_NAME": name,
270     "INSTANCE_PRIMARY": primary_node,
271     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
272     "INSTANCE_OS_TYPE": os_type,
273     "INSTANCE_STATUS": status,
274     "INSTANCE_MEMORY": memory,
275     "INSTANCE_VCPUS": vcpus,
276   }
277
278   if nics:
279     nic_count = len(nics)
280     for idx, (ip, bridge, mac) in enumerate(nics):
281       if ip is None:
282         ip = ""
283       env["INSTANCE_NIC%d_IP" % idx] = ip
284       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
285       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
286   else:
287     nic_count = 0
288
289   env["INSTANCE_NIC_COUNT"] = nic_count
290
291   return env
292
293
294 def _BuildInstanceHookEnvByObject(instance, override=None):
295   """Builds instance related env variables for hooks from an object.
296
297   Args:
298     instance: objects.Instance object of instance
299     override: dict of values to override
300   """
301   args = {
302     'name': instance.name,
303     'primary_node': instance.primary_node,
304     'secondary_nodes': instance.secondary_nodes,
305     'os_type': instance.os,
306     'status': instance.os,
307     'memory': instance.memory,
308     'vcpus': instance.vcpus,
309     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310   }
311   if override:
312     args.update(override)
313   return _BuildInstanceHookEnv(**args)
314
315
316 def _UpdateKnownHosts(fullnode, ip, pubkey):
317   """Ensure a node has a correct known_hosts entry.
318
319   Args:
320     fullnode - Fully qualified domain name of host. (str)
321     ip       - IPv4 address of host (str)
322     pubkey   - the public key of the cluster
323
324   """
325   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
327   else:
328     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
329
330   inthere = False
331
332   save_lines = []
333   add_lines = []
334   removed = False
335
336   for rawline in f:
337     logger.Debug('read %s' % (repr(rawline),))
338
339     parts = rawline.rstrip('\r\n').split()
340
341     # Ignore unwanted lines
342     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
343       fields = parts[0].split(',')
344       key = parts[2]
345
346       haveall = True
347       havesome = False
348       for spec in [ ip, fullnode ]:
349         if spec not in fields:
350           haveall = False
351         if spec in fields:
352           havesome = True
353
354       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
355       if haveall and key == pubkey:
356         inthere = True
357         save_lines.append(rawline)
358         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359         continue
360
361       if havesome and (not haveall or key != pubkey):
362         removed = True
363         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364         continue
365
366     save_lines.append(rawline)
367
368   if not inthere:
369     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
370     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371
372   if removed:
373     save_lines = save_lines + add_lines
374
375     # Write a new file and replace old.
376     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
377                                    constants.DATA_DIR)
378     newfile = os.fdopen(fd, 'w')
379     try:
380       newfile.write(''.join(save_lines))
381     finally:
382       newfile.close()
383     logger.Debug("Wrote new known_hosts.")
384     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385
386   elif add_lines:
387     # Simply appending a new line will do the trick.
388     f.seek(0, 2)
389     for add in add_lines:
390       f.write(add)
391
392   f.close()
393
394
395 def _HasValidVG(vglist, vgname):
396   """Checks if the volume group list is valid.
397
398   A non-None return value means there's an error, and the return value
399   is the error message.
400
401   """
402   vgsize = vglist.get(vgname, None)
403   if vgsize is None:
404     return "volume group '%s' missing" % vgname
405   elif vgsize < 20480:
406     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
407             (vgname, vgsize))
408   return None
409
410
411 def _InitSSHSetup(node):
412   """Setup the SSH configuration for the cluster.
413
414
415   This generates a dsa keypair for root, adds the pub key to the
416   permitted hosts and adds the hostkey to its own known hosts.
417
418   Args:
419     node: the name of this host as a fqdn
420
421   """
422   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
423
424   for name in priv_key, pub_key:
425     if os.path.exists(name):
426       utils.CreateBackup(name)
427     utils.RemoveFile(name)
428
429   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
430                          "-f", priv_key,
431                          "-q", "-N", ""])
432   if result.failed:
433     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434                              result.output)
435
436   f = open(pub_key, 'r')
437   try:
438     utils.AddAuthorizedKey(auth_keys, f.read(8192))
439   finally:
440     f.close()
441
442
443 def _InitGanetiServerSetup(ss):
444   """Setup the necessary configuration for the initial node daemon.
445
446   This creates the nodepass file containing the shared password for
447   the cluster and also generates the SSL certificate.
448
449   """
450   # Create pseudo random password
451   randpass = sha.new(os.urandom(64)).hexdigest()
452   # and write it into sstore
453   ss.SetKey(ss.SS_NODED_PASS, randpass)
454
455   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
456                          "-days", str(365*5), "-nodes", "-x509",
457                          "-keyout", constants.SSL_CERT_FILE,
458                          "-out", constants.SSL_CERT_FILE, "-batch"])
459   if result.failed:
460     raise errors.OpExecError("could not generate server ssl cert, command"
461                              " %s had exitcode %s and error message %s" %
462                              (result.cmd, result.exit_code, result.output))
463
464   os.chmod(constants.SSL_CERT_FILE, 0400)
465
466   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467
468   if result.failed:
469     raise errors.OpExecError("Could not start the node daemon, command %s"
470                              " had exitcode %s and error %s" %
471                              (result.cmd, result.exit_code, result.output))
472
473
474 def _CheckInstanceBridgesExist(instance):
475   """Check that the brigdes needed by an instance exist.
476
477   """
478   # check bridges existance
479   brlist = [nic.bridge for nic in instance.nics]
480   if not rpc.call_bridges_exist(instance.primary_node, brlist):
481     raise errors.OpPrereqError("one or more target bridges %s does not"
482                                " exist on destination node '%s'" %
483                                (brlist, instance.primary_node))
484
485
486 class LUInitCluster(LogicalUnit):
487   """Initialise the cluster.
488
489   """
490   HPATH = "cluster-init"
491   HTYPE = constants.HTYPE_CLUSTER
492   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
493               "def_bridge", "master_netdev"]
494   REQ_CLUSTER = False
495
496   def BuildHooksEnv(self):
497     """Build hooks env.
498
499     Notes: Since we don't require a cluster, we must manually add
500     ourselves in the post-run node list.
501
502     """
503     env = {"OP_TARGET": self.op.cluster_name}
504     return env, [], [self.hostname.name]
505
506   def CheckPrereq(self):
507     """Verify that the passed name is a valid one.
508
509     """
510     if config.ConfigWriter.IsCluster():
511       raise errors.OpPrereqError("Cluster is already initialised")
512
513     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
514       if not os.path.exists(constants.VNC_PASSWORD_FILE):
515         raise errors.OpPrereqError("Please prepare the cluster VNC"
516                                    "password file %s" %
517                                    constants.VNC_PASSWORD_FILE)
518
519     self.hostname = hostname = utils.HostInfo()
520
521     if hostname.ip.startswith("127."):
522       raise errors.OpPrereqError("This host's IP resolves to the private"
523                                  " range (%s). Please fix DNS or %s." %
524                                  (hostname.ip, constants.ETC_HOSTS))
525
526     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
527                          source=constants.LOCALHOST_IP_ADDRESS):
528       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
529                                  " to %s,\nbut this ip address does not"
530                                  " belong to this host."
531                                  " Aborting." % hostname.ip)
532
533     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
534
535     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
536                      timeout=5):
537       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
538
539     secondary_ip = getattr(self.op, "secondary_ip", None)
540     if secondary_ip and not utils.IsValidIP(secondary_ip):
541       raise errors.OpPrereqError("Invalid secondary ip given")
542     if (secondary_ip and
543         secondary_ip != hostname.ip and
544         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
545                            source=constants.LOCALHOST_IP_ADDRESS))):
546       raise errors.OpPrereqError("You gave %s as secondary IP,"
547                                  " but it does not belong to this host." %
548                                  secondary_ip)
549     self.secondary_ip = secondary_ip
550
551     # checks presence of the volume group given
552     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553
554     if vgstatus:
555       raise errors.OpPrereqError("Error: %s" % vgstatus)
556
557     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
558                     self.op.mac_prefix):
559       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560                                  self.op.mac_prefix)
561
562     if self.op.hypervisor_type not in constants.HYPER_TYPES:
563       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
564                                  self.op.hypervisor_type)
565
566     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
567     if result.failed:
568       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
569                                  (self.op.master_netdev,
570                                   result.output.strip()))
571
572     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
573             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
574       raise errors.OpPrereqError("Init.d script '%s' missing or not"
575                                  " executable." % constants.NODE_INITD_SCRIPT)
576
577   def Exec(self, feedback_fn):
578     """Initialize the cluster.
579
580     """
581     clustername = self.clustername
582     hostname = self.hostname
583
584     # set up the simple store
585     self.sstore = ss = ssconf.SimpleStore()
586     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
587     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
588     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
589     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
590     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
591
592     # set up the inter-node password and certificate
593     _InitGanetiServerSetup(ss)
594
595     # start the master ip
596     rpc.call_node_start_master(hostname.name)
597
598     # set up ssh config and /etc/hosts
599     f = open(constants.SSH_HOST_RSA_PUB, 'r')
600     try:
601       sshline = f.read()
602     finally:
603       f.close()
604     sshkey = sshline.split(" ")[1]
605
606     _AddHostToEtcHosts(hostname.name)
607
608     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
609
610     _InitSSHSetup(hostname.name)
611
612     # init of cluster config file
613     self.cfg = cfgw = config.ConfigWriter()
614     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
615                     sshkey, self.op.mac_prefix,
616                     self.op.vg_name, self.op.def_bridge)
617
618
619 class LUDestroyCluster(NoHooksLU):
620   """Logical unit for destroying the cluster.
621
622   """
623   _OP_REQP = []
624
625   def CheckPrereq(self):
626     """Check prerequisites.
627
628     This checks whether the cluster is empty.
629
630     Any errors are signalled by raising errors.OpPrereqError.
631
632     """
633     master = self.sstore.GetMasterNode()
634
635     nodelist = self.cfg.GetNodeList()
636     if len(nodelist) != 1 or nodelist[0] != master:
637       raise errors.OpPrereqError("There are still %d node(s) in"
638                                  " this cluster." % (len(nodelist) - 1))
639     instancelist = self.cfg.GetInstanceList()
640     if instancelist:
641       raise errors.OpPrereqError("There are still %d instance(s) in"
642                                  " this cluster." % len(instancelist))
643
644   def Exec(self, feedback_fn):
645     """Destroys the cluster.
646
647     """
648     master = self.sstore.GetMasterNode()
649     if not rpc.call_node_stop_master(master):
650       raise errors.OpExecError("Could not disable the master role")
651     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
652     utils.CreateBackup(priv_key)
653     utils.CreateBackup(pub_key)
654     rpc.call_node_leave_cluster(master)
655
656
657 class LUVerifyCluster(NoHooksLU):
658   """Verifies the cluster status.
659
660   """
661   _OP_REQP = []
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     hyp_result = node_result.get('hypervisor', None)
729     if hyp_result is not None:
730       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
731     return bad
732
733   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
734     """Verify an instance.
735
736     This function checks to see if the required block devices are
737     available on the instance's node.
738
739     """
740     bad = False
741
742     instancelist = self.cfg.GetInstanceList()
743     if not instance in instancelist:
744       feedback_fn("  - ERROR: instance %s not in instance list %s" %
745                       (instance, instancelist))
746       bad = True
747
748     instanceconfig = self.cfg.GetInstanceInfo(instance)
749     node_current = instanceconfig.primary_node
750
751     node_vol_should = {}
752     instanceconfig.MapLVsByNode(node_vol_should)
753
754     for node in node_vol_should:
755       for volume in node_vol_should[node]:
756         if node not in node_vol_is or volume not in node_vol_is[node]:
757           feedback_fn("  - ERROR: volume %s missing on node %s" %
758                           (volume, node))
759           bad = True
760
761     if not instanceconfig.status == 'down':
762       if not instance in node_instance[node_current]:
763         feedback_fn("  - ERROR: instance %s not running on node %s" %
764                         (instance, node_current))
765         bad = True
766
767     for node in node_instance:
768       if (not node == node_current):
769         if instance in node_instance[node]:
770           feedback_fn("  - ERROR: instance %s should not run on node %s" %
771                           (instance, node))
772           bad = True
773
774     return bad
775
776   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
777     """Verify if there are any unknown volumes in the cluster.
778
779     The .os, .swap and backup volumes are ignored. All other volumes are
780     reported as unknown.
781
782     """
783     bad = False
784
785     for node in node_vol_is:
786       for volume in node_vol_is[node]:
787         if node not in node_vol_should or volume not in node_vol_should[node]:
788           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
789                       (volume, node))
790           bad = True
791     return bad
792
793   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
794     """Verify the list of running instances.
795
796     This checks what instances are running but unknown to the cluster.
797
798     """
799     bad = False
800     for node in node_instance:
801       for runninginstance in node_instance[node]:
802         if runninginstance not in instancelist:
803           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
804                           (runninginstance, node))
805           bad = True
806     return bad
807
808   def CheckPrereq(self):
809     """Check prerequisites.
810
811     This has no prerequisites.
812
813     """
814     pass
815
816   def Exec(self, feedback_fn):
817     """Verify integrity of cluster, performing various test on nodes.
818
819     """
820     bad = False
821     feedback_fn("* Verifying global settings")
822     for msg in self.cfg.VerifyConfig():
823       feedback_fn("  - ERROR: %s" % msg)
824
825     vg_name = self.cfg.GetVGName()
826     nodelist = utils.NiceSort(self.cfg.GetNodeList())
827     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
828     node_volume = {}
829     node_instance = {}
830
831     # FIXME: verify OS list
832     # do local checksums
833     file_names = list(self.sstore.GetFileList())
834     file_names.append(constants.SSL_CERT_FILE)
835     file_names.append(constants.CLUSTER_CONF_FILE)
836     local_checksums = utils.FingerprintFiles(file_names)
837
838     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
839     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
840     all_instanceinfo = rpc.call_instance_list(nodelist)
841     all_vglist = rpc.call_vg_list(nodelist)
842     node_verify_param = {
843       'filelist': file_names,
844       'nodelist': nodelist,
845       'hypervisor': None,
846       }
847     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
848     all_rversion = rpc.call_version(nodelist)
849
850     for node in nodelist:
851       feedback_fn("* Verifying node %s" % node)
852       result = self._VerifyNode(node, file_names, local_checksums,
853                                 all_vglist[node], all_nvinfo[node],
854                                 all_rversion[node], feedback_fn)
855       bad = bad or result
856
857       # node_volume
858       volumeinfo = all_volumeinfo[node]
859
860       if isinstance(volumeinfo, basestring):
861         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
862                     (node, volumeinfo[-400:].encode('string_escape')))
863         bad = True
864         node_volume[node] = {}
865       elif not isinstance(volumeinfo, dict):
866         feedback_fn("  - ERROR: connection to %s failed" % (node,))
867         bad = True
868         continue
869       else:
870         node_volume[node] = volumeinfo
871
872       # node_instance
873       nodeinstance = all_instanceinfo[node]
874       if type(nodeinstance) != list:
875         feedback_fn("  - ERROR: connection to %s failed" % (node,))
876         bad = True
877         continue
878
879       node_instance[node] = nodeinstance
880
881     node_vol_should = {}
882
883     for instance in instancelist:
884       feedback_fn("* Verifying instance %s" % instance)
885       result =  self._VerifyInstance(instance, node_volume, node_instance,
886                                      feedback_fn)
887       bad = bad or result
888
889       inst_config = self.cfg.GetInstanceInfo(instance)
890
891       inst_config.MapLVsByNode(node_vol_should)
892
893     feedback_fn("* Verifying orphan volumes")
894     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
895                                        feedback_fn)
896     bad = bad or result
897
898     feedback_fn("* Verifying remaining instances")
899     result = self._VerifyOrphanInstances(instancelist, node_instance,
900                                          feedback_fn)
901     bad = bad or result
902
903     return int(bad)
904
905
906 class LUVerifyDisks(NoHooksLU):
907   """Verifies the cluster disks status.
908
909   """
910   _OP_REQP = []
911
912   def CheckPrereq(self):
913     """Check prerequisites.
914
915     This has no prerequisites.
916
917     """
918     pass
919
920   def Exec(self, feedback_fn):
921     """Verify integrity of cluster disks.
922
923     """
924     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
925
926     vg_name = self.cfg.GetVGName()
927     nodes = utils.NiceSort(self.cfg.GetNodeList())
928     instances = [self.cfg.GetInstanceInfo(name)
929                  for name in self.cfg.GetInstanceList()]
930
931     nv_dict = {}
932     for inst in instances:
933       inst_lvs = {}
934       if (inst.status != "up" or
935           inst.disk_template not in constants.DTS_NET_MIRROR):
936         continue
937       inst.MapLVsByNode(inst_lvs)
938       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
939       for node, vol_list in inst_lvs.iteritems():
940         for vol in vol_list:
941           nv_dict[(node, vol)] = inst
942
943     if not nv_dict:
944       return result
945
946     node_lvs = rpc.call_volume_list(nodes, vg_name)
947
948     to_act = set()
949     for node in nodes:
950       # node_volume
951       lvs = node_lvs[node]
952
953       if isinstance(lvs, basestring):
954         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
955         res_nlvm[node] = lvs
956       elif not isinstance(lvs, dict):
957         logger.Info("connection to node %s failed or invalid data returned" %
958                     (node,))
959         res_nodes.append(node)
960         continue
961
962       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
963         inst = nv_dict.pop((node, lv_name), None)
964         if (not lv_online and inst is not None
965             and inst.name not in res_instances):
966           res_instances.append(inst.name)
967
968     # any leftover items in nv_dict are missing LVs, let's arrange the
969     # data better
970     for key, inst in nv_dict.iteritems():
971       if inst.name not in res_missing:
972         res_missing[inst.name] = []
973       res_missing[inst.name].append(key)
974
975     return result
976
977
978 class LURenameCluster(LogicalUnit):
979   """Rename the cluster.
980
981   """
982   HPATH = "cluster-rename"
983   HTYPE = constants.HTYPE_CLUSTER
984   _OP_REQP = ["name"]
985
986   def BuildHooksEnv(self):
987     """Build hooks env.
988
989     """
990     env = {
991       "OP_TARGET": self.sstore.GetClusterName(),
992       "NEW_NAME": self.op.name,
993       }
994     mn = self.sstore.GetMasterNode()
995     return env, [mn], [mn]
996
997   def CheckPrereq(self):
998     """Verify that the passed name is a valid one.
999
1000     """
1001     hostname = utils.HostInfo(self.op.name)
1002
1003     new_name = hostname.name
1004     self.ip = new_ip = hostname.ip
1005     old_name = self.sstore.GetClusterName()
1006     old_ip = self.sstore.GetMasterIP()
1007     if new_name == old_name and new_ip == old_ip:
1008       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1009                                  " cluster has changed")
1010     if new_ip != old_ip:
1011       result = utils.RunCmd(["fping", "-q", new_ip])
1012       if not result.failed:
1013         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1014                                    " reachable on the network. Aborting." %
1015                                    new_ip)
1016
1017     self.op.name = new_name
1018
1019   def Exec(self, feedback_fn):
1020     """Rename the cluster.
1021
1022     """
1023     clustername = self.op.name
1024     ip = self.ip
1025     ss = self.sstore
1026
1027     # shutdown the master IP
1028     master = ss.GetMasterNode()
1029     if not rpc.call_node_stop_master(master):
1030       raise errors.OpExecError("Could not disable the master role")
1031
1032     try:
1033       # modify the sstore
1034       ss.SetKey(ss.SS_MASTER_IP, ip)
1035       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1036
1037       # Distribute updated ss config to all nodes
1038       myself = self.cfg.GetNodeInfo(master)
1039       dist_nodes = self.cfg.GetNodeList()
1040       if myself.name in dist_nodes:
1041         dist_nodes.remove(myself.name)
1042
1043       logger.Debug("Copying updated ssconf data to all nodes")
1044       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1045         fname = ss.KeyToFilename(keyname)
1046         result = rpc.call_upload_file(dist_nodes, fname)
1047         for to_node in dist_nodes:
1048           if not result[to_node]:
1049             logger.Error("copy of file %s to node %s failed" %
1050                          (fname, to_node))
1051     finally:
1052       if not rpc.call_node_start_master(master):
1053         logger.Error("Could not re-enable the master role on the master,"
1054                      " please restart manually.")
1055
1056
1057 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1058   """Sleep and poll for an instance's disk to sync.
1059
1060   """
1061   if not instance.disks:
1062     return True
1063
1064   if not oneshot:
1065     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1066
1067   node = instance.primary_node
1068
1069   for dev in instance.disks:
1070     cfgw.SetDiskID(dev, node)
1071
1072   retries = 0
1073   while True:
1074     max_time = 0
1075     done = True
1076     cumul_degraded = False
1077     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1078     if not rstats:
1079       proc.LogWarning("Can't get any data from node %s" % node)
1080       retries += 1
1081       if retries >= 10:
1082         raise errors.RemoteError("Can't contact node %s for mirror data,"
1083                                  " aborting." % node)
1084       time.sleep(6)
1085       continue
1086     retries = 0
1087     for i in range(len(rstats)):
1088       mstat = rstats[i]
1089       if mstat is None:
1090         proc.LogWarning("Can't compute data for node %s/%s" %
1091                         (node, instance.disks[i].iv_name))
1092         continue
1093       # we ignore the ldisk parameter
1094       perc_done, est_time, is_degraded, _ = mstat
1095       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1096       if perc_done is not None:
1097         done = False
1098         if est_time is not None:
1099           rem_time = "%d estimated seconds remaining" % est_time
1100           max_time = est_time
1101         else:
1102           rem_time = "no time estimate"
1103         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1104                      (instance.disks[i].iv_name, perc_done, rem_time))
1105     if done or oneshot:
1106       break
1107
1108     if unlock:
1109       utils.Unlock('cmd')
1110     try:
1111       time.sleep(min(60, max_time))
1112     finally:
1113       if unlock:
1114         utils.Lock('cmd')
1115
1116   if done:
1117     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1118   return not cumul_degraded
1119
1120
1121 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1122   """Check that mirrors are not degraded.
1123
1124   The ldisk parameter, if True, will change the test from the
1125   is_degraded attribute (which represents overall non-ok status for
1126   the device(s)) to the ldisk (representing the local storage status).
1127
1128   """
1129   cfgw.SetDiskID(dev, node)
1130   if ldisk:
1131     idx = 6
1132   else:
1133     idx = 5
1134
1135   result = True
1136   if on_primary or dev.AssembleOnSecondary():
1137     rstats = rpc.call_blockdev_find(node, dev)
1138     if not rstats:
1139       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1140       result = False
1141     else:
1142       result = result and (not rstats[idx])
1143   if dev.children:
1144     for child in dev.children:
1145       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1146
1147   return result
1148
1149
1150 class LUDiagnoseOS(NoHooksLU):
1151   """Logical unit for OS diagnose/query.
1152
1153   """
1154   _OP_REQP = []
1155
1156   def CheckPrereq(self):
1157     """Check prerequisites.
1158
1159     This always succeeds, since this is a pure query LU.
1160
1161     """
1162     return
1163
1164   def Exec(self, feedback_fn):
1165     """Compute the list of OSes.
1166
1167     """
1168     node_list = self.cfg.GetNodeList()
1169     node_data = rpc.call_os_diagnose(node_list)
1170     if node_data == False:
1171       raise errors.OpExecError("Can't gather the list of OSes")
1172     return node_data
1173
1174
1175 class LURemoveNode(LogicalUnit):
1176   """Logical unit for removing a node.
1177
1178   """
1179   HPATH = "node-remove"
1180   HTYPE = constants.HTYPE_NODE
1181   _OP_REQP = ["node_name"]
1182
1183   def BuildHooksEnv(self):
1184     """Build hooks env.
1185
1186     This doesn't run on the target node in the pre phase as a failed
1187     node would not allows itself to run.
1188
1189     """
1190     env = {
1191       "OP_TARGET": self.op.node_name,
1192       "NODE_NAME": self.op.node_name,
1193       }
1194     all_nodes = self.cfg.GetNodeList()
1195     all_nodes.remove(self.op.node_name)
1196     return env, all_nodes, all_nodes
1197
1198   def CheckPrereq(self):
1199     """Check prerequisites.
1200
1201     This checks:
1202      - the node exists in the configuration
1203      - it does not have primary or secondary instances
1204      - it's not the master
1205
1206     Any errors are signalled by raising errors.OpPrereqError.
1207
1208     """
1209     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1210     if node is None:
1211       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1212
1213     instance_list = self.cfg.GetInstanceList()
1214
1215     masternode = self.sstore.GetMasterNode()
1216     if node.name == masternode:
1217       raise errors.OpPrereqError("Node is the master node,"
1218                                  " you need to failover first.")
1219
1220     for instance_name in instance_list:
1221       instance = self.cfg.GetInstanceInfo(instance_name)
1222       if node.name == instance.primary_node:
1223         raise errors.OpPrereqError("Instance %s still running on the node,"
1224                                    " please remove first." % instance_name)
1225       if node.name in instance.secondary_nodes:
1226         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1227                                    " please remove first." % instance_name)
1228     self.op.node_name = node.name
1229     self.node = node
1230
1231   def Exec(self, feedback_fn):
1232     """Removes the node from the cluster.
1233
1234     """
1235     node = self.node
1236     logger.Info("stopping the node daemon and removing configs from node %s" %
1237                 node.name)
1238
1239     rpc.call_node_leave_cluster(node.name)
1240
1241     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1242
1243     logger.Info("Removing node %s from config" % node.name)
1244
1245     self.cfg.RemoveNode(node.name)
1246
1247     _RemoveHostFromEtcHosts(node.name)
1248
1249
1250 class LUQueryNodes(NoHooksLU):
1251   """Logical unit for querying nodes.
1252
1253   """
1254   _OP_REQP = ["output_fields", "names"]
1255
1256   def CheckPrereq(self):
1257     """Check prerequisites.
1258
1259     This checks that the fields required are valid output fields.
1260
1261     """
1262     self.dynamic_fields = frozenset(["dtotal", "dfree",
1263                                      "mtotal", "mnode", "mfree",
1264                                      "bootid"])
1265
1266     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1267                                "pinst_list", "sinst_list",
1268                                "pip", "sip"],
1269                        dynamic=self.dynamic_fields,
1270                        selected=self.op.output_fields)
1271
1272     self.wanted = _GetWantedNodes(self, self.op.names)
1273
1274   def Exec(self, feedback_fn):
1275     """Computes the list of nodes and their attributes.
1276
1277     """
1278     nodenames = self.wanted
1279     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1280
1281     # begin data gathering
1282
1283     if self.dynamic_fields.intersection(self.op.output_fields):
1284       live_data = {}
1285       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1286       for name in nodenames:
1287         nodeinfo = node_data.get(name, None)
1288         if nodeinfo:
1289           live_data[name] = {
1290             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1291             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1292             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1293             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1294             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1295             "bootid": nodeinfo['bootid'],
1296             }
1297         else:
1298           live_data[name] = {}
1299     else:
1300       live_data = dict.fromkeys(nodenames, {})
1301
1302     node_to_primary = dict([(name, set()) for name in nodenames])
1303     node_to_secondary = dict([(name, set()) for name in nodenames])
1304
1305     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1306                              "sinst_cnt", "sinst_list"))
1307     if inst_fields & frozenset(self.op.output_fields):
1308       instancelist = self.cfg.GetInstanceList()
1309
1310       for instance_name in instancelist:
1311         inst = self.cfg.GetInstanceInfo(instance_name)
1312         if inst.primary_node in node_to_primary:
1313           node_to_primary[inst.primary_node].add(inst.name)
1314         for secnode in inst.secondary_nodes:
1315           if secnode in node_to_secondary:
1316             node_to_secondary[secnode].add(inst.name)
1317
1318     # end data gathering
1319
1320     output = []
1321     for node in nodelist:
1322       node_output = []
1323       for field in self.op.output_fields:
1324         if field == "name":
1325           val = node.name
1326         elif field == "pinst_list":
1327           val = list(node_to_primary[node.name])
1328         elif field == "sinst_list":
1329           val = list(node_to_secondary[node.name])
1330         elif field == "pinst_cnt":
1331           val = len(node_to_primary[node.name])
1332         elif field == "sinst_cnt":
1333           val = len(node_to_secondary[node.name])
1334         elif field == "pip":
1335           val = node.primary_ip
1336         elif field == "sip":
1337           val = node.secondary_ip
1338         elif field in self.dynamic_fields:
1339           val = live_data[node.name].get(field, None)
1340         else:
1341           raise errors.ParameterError(field)
1342         node_output.append(val)
1343       output.append(node_output)
1344
1345     return output
1346
1347
1348 class LUQueryNodeVolumes(NoHooksLU):
1349   """Logical unit for getting volumes on node(s).
1350
1351   """
1352   _OP_REQP = ["nodes", "output_fields"]
1353
1354   def CheckPrereq(self):
1355     """Check prerequisites.
1356
1357     This checks that the fields required are valid output fields.
1358
1359     """
1360     self.nodes = _GetWantedNodes(self, self.op.nodes)
1361
1362     _CheckOutputFields(static=["node"],
1363                        dynamic=["phys", "vg", "name", "size", "instance"],
1364                        selected=self.op.output_fields)
1365
1366
1367   def Exec(self, feedback_fn):
1368     """Computes the list of nodes and their attributes.
1369
1370     """
1371     nodenames = self.nodes
1372     volumes = rpc.call_node_volumes(nodenames)
1373
1374     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1375              in self.cfg.GetInstanceList()]
1376
1377     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1378
1379     output = []
1380     for node in nodenames:
1381       if node not in volumes or not volumes[node]:
1382         continue
1383
1384       node_vols = volumes[node][:]
1385       node_vols.sort(key=lambda vol: vol['dev'])
1386
1387       for vol in node_vols:
1388         node_output = []
1389         for field in self.op.output_fields:
1390           if field == "node":
1391             val = node
1392           elif field == "phys":
1393             val = vol['dev']
1394           elif field == "vg":
1395             val = vol['vg']
1396           elif field == "name":
1397             val = vol['name']
1398           elif field == "size":
1399             val = int(float(vol['size']))
1400           elif field == "instance":
1401             for inst in ilist:
1402               if node not in lv_by_node[inst]:
1403                 continue
1404               if vol['name'] in lv_by_node[inst][node]:
1405                 val = inst.name
1406                 break
1407             else:
1408               val = '-'
1409           else:
1410             raise errors.ParameterError(field)
1411           node_output.append(str(val))
1412
1413         output.append(node_output)
1414
1415     return output
1416
1417
1418 class LUAddNode(LogicalUnit):
1419   """Logical unit for adding node to the cluster.
1420
1421   """
1422   HPATH = "node-add"
1423   HTYPE = constants.HTYPE_NODE
1424   _OP_REQP = ["node_name"]
1425
1426   def BuildHooksEnv(self):
1427     """Build hooks env.
1428
1429     This will run on all nodes before, and on all nodes + the new node after.
1430
1431     """
1432     env = {
1433       "OP_TARGET": self.op.node_name,
1434       "NODE_NAME": self.op.node_name,
1435       "NODE_PIP": self.op.primary_ip,
1436       "NODE_SIP": self.op.secondary_ip,
1437       }
1438     nodes_0 = self.cfg.GetNodeList()
1439     nodes_1 = nodes_0 + [self.op.node_name, ]
1440     return env, nodes_0, nodes_1
1441
1442   def CheckPrereq(self):
1443     """Check prerequisites.
1444
1445     This checks:
1446      - the new node is not already in the config
1447      - it is resolvable
1448      - its parameters (single/dual homed) matches the cluster
1449
1450     Any errors are signalled by raising errors.OpPrereqError.
1451
1452     """
1453     node_name = self.op.node_name
1454     cfg = self.cfg
1455
1456     dns_data = utils.HostInfo(node_name)
1457
1458     node = dns_data.name
1459     primary_ip = self.op.primary_ip = dns_data.ip
1460     secondary_ip = getattr(self.op, "secondary_ip", None)
1461     if secondary_ip is None:
1462       secondary_ip = primary_ip
1463     if not utils.IsValidIP(secondary_ip):
1464       raise errors.OpPrereqError("Invalid secondary IP given")
1465     self.op.secondary_ip = secondary_ip
1466     node_list = cfg.GetNodeList()
1467     if node in node_list:
1468       raise errors.OpPrereqError("Node %s is already in the configuration"
1469                                  % node)
1470
1471     for existing_node_name in node_list:
1472       existing_node = cfg.GetNodeInfo(existing_node_name)
1473       if (existing_node.primary_ip == primary_ip or
1474           existing_node.secondary_ip == primary_ip or
1475           existing_node.primary_ip == secondary_ip or
1476           existing_node.secondary_ip == secondary_ip):
1477         raise errors.OpPrereqError("New node ip address(es) conflict with"
1478                                    " existing node %s" % existing_node.name)
1479
1480     # check that the type of the node (single versus dual homed) is the
1481     # same as for the master
1482     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1483     master_singlehomed = myself.secondary_ip == myself.primary_ip
1484     newbie_singlehomed = secondary_ip == primary_ip
1485     if master_singlehomed != newbie_singlehomed:
1486       if master_singlehomed:
1487         raise errors.OpPrereqError("The master has no private ip but the"
1488                                    " new node has one")
1489       else:
1490         raise errors.OpPrereqError("The master has a private ip but the"
1491                                    " new node doesn't have one")
1492
1493     # checks reachablity
1494     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1495       raise errors.OpPrereqError("Node not reachable by ping")
1496
1497     if not newbie_singlehomed:
1498       # check reachability from my secondary ip to newbie's secondary ip
1499       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1500                            source=myself.secondary_ip):
1501         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1502                                    " based ping to noded port")
1503
1504     self.new_node = objects.Node(name=node,
1505                                  primary_ip=primary_ip,
1506                                  secondary_ip=secondary_ip)
1507
1508     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1509       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1510         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1511                                    constants.VNC_PASSWORD_FILE)
1512
1513   def Exec(self, feedback_fn):
1514     """Adds the new node to the cluster.
1515
1516     """
1517     new_node = self.new_node
1518     node = new_node.name
1519
1520     # set up inter-node password and certificate and restarts the node daemon
1521     gntpass = self.sstore.GetNodeDaemonPassword()
1522     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1523       raise errors.OpExecError("ganeti password corruption detected")
1524     f = open(constants.SSL_CERT_FILE)
1525     try:
1526       gntpem = f.read(8192)
1527     finally:
1528       f.close()
1529     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1530     # so we use this to detect an invalid certificate; as long as the
1531     # cert doesn't contain this, the here-document will be correctly
1532     # parsed by the shell sequence below
1533     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1534       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1535     if not gntpem.endswith("\n"):
1536       raise errors.OpExecError("PEM must end with newline")
1537     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1538
1539     # and then connect with ssh to set password and start ganeti-noded
1540     # note that all the below variables are sanitized at this point,
1541     # either by being constants or by the checks above
1542     ss = self.sstore
1543     mycommand = ("umask 077 && "
1544                  "echo '%s' > '%s' && "
1545                  "cat > '%s' << '!EOF.' && \n"
1546                  "%s!EOF.\n%s restart" %
1547                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1548                   constants.SSL_CERT_FILE, gntpem,
1549                   constants.NODE_INITD_SCRIPT))
1550
1551     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1552     if result.failed:
1553       raise errors.OpExecError("Remote command on node %s, error: %s,"
1554                                " output: %s" %
1555                                (node, result.fail_reason, result.output))
1556
1557     # check connectivity
1558     time.sleep(4)
1559
1560     result = rpc.call_version([node])[node]
1561     if result:
1562       if constants.PROTOCOL_VERSION == result:
1563         logger.Info("communication to node %s fine, sw version %s match" %
1564                     (node, result))
1565       else:
1566         raise errors.OpExecError("Version mismatch master version %s,"
1567                                  " node version %s" %
1568                                  (constants.PROTOCOL_VERSION, result))
1569     else:
1570       raise errors.OpExecError("Cannot get version from the new node")
1571
1572     # setup ssh on node
1573     logger.Info("copy ssh key to node %s" % node)
1574     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1575     keyarray = []
1576     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1577                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1578                 priv_key, pub_key]
1579
1580     for i in keyfiles:
1581       f = open(i, 'r')
1582       try:
1583         keyarray.append(f.read())
1584       finally:
1585         f.close()
1586
1587     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1588                                keyarray[3], keyarray[4], keyarray[5])
1589
1590     if not result:
1591       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1592
1593     # Add node to our /etc/hosts, and add key to known_hosts
1594     _AddHostToEtcHosts(new_node.name)
1595
1596     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1597                       self.cfg.GetHostKey())
1598
1599     if new_node.secondary_ip != new_node.primary_ip:
1600       if not rpc.call_node_tcp_ping(new_node.name,
1601                                     constants.LOCALHOST_IP_ADDRESS,
1602                                     new_node.secondary_ip,
1603                                     constants.DEFAULT_NODED_PORT,
1604                                     10, False):
1605         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1606                                  " you gave (%s). Please fix and re-run this"
1607                                  " command." % new_node.secondary_ip)
1608
1609     success, msg = ssh.VerifyNodeHostname(node)
1610     if not success:
1611       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1612                                " than the one the resolver gives: %s."
1613                                " Please fix and re-run this command." %
1614                                (node, msg))
1615
1616     # Distribute updated /etc/hosts and known_hosts to all nodes,
1617     # including the node just added
1618     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1619     dist_nodes = self.cfg.GetNodeList() + [node]
1620     if myself.name in dist_nodes:
1621       dist_nodes.remove(myself.name)
1622
1623     logger.Debug("Copying hosts and known_hosts to all nodes")
1624     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1625       result = rpc.call_upload_file(dist_nodes, fname)
1626       for to_node in dist_nodes:
1627         if not result[to_node]:
1628           logger.Error("copy of file %s to node %s failed" %
1629                        (fname, to_node))
1630
1631     to_copy = ss.GetFileList()
1632     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1633       to_copy.append(constants.VNC_PASSWORD_FILE)
1634     for fname in to_copy:
1635       if not ssh.CopyFileToNode(node, fname):
1636         logger.Error("could not copy file %s to node %s" % (fname, node))
1637
1638     logger.Info("adding node %s to cluster.conf" % node)
1639     self.cfg.AddNode(new_node)
1640
1641
1642 class LUMasterFailover(LogicalUnit):
1643   """Failover the master node to the current node.
1644
1645   This is a special LU in that it must run on a non-master node.
1646
1647   """
1648   HPATH = "master-failover"
1649   HTYPE = constants.HTYPE_CLUSTER
1650   REQ_MASTER = False
1651   _OP_REQP = []
1652
1653   def BuildHooksEnv(self):
1654     """Build hooks env.
1655
1656     This will run on the new master only in the pre phase, and on all
1657     the nodes in the post phase.
1658
1659     """
1660     env = {
1661       "OP_TARGET": self.new_master,
1662       "NEW_MASTER": self.new_master,
1663       "OLD_MASTER": self.old_master,
1664       }
1665     return env, [self.new_master], self.cfg.GetNodeList()
1666
1667   def CheckPrereq(self):
1668     """Check prerequisites.
1669
1670     This checks that we are not already the master.
1671
1672     """
1673     self.new_master = utils.HostInfo().name
1674     self.old_master = self.sstore.GetMasterNode()
1675
1676     if self.old_master == self.new_master:
1677       raise errors.OpPrereqError("This commands must be run on the node"
1678                                  " where you want the new master to be."
1679                                  " %s is already the master" %
1680                                  self.old_master)
1681
1682   def Exec(self, feedback_fn):
1683     """Failover the master node.
1684
1685     This command, when run on a non-master node, will cause the current
1686     master to cease being master, and the non-master to become new
1687     master.
1688
1689     """
1690     #TODO: do not rely on gethostname returning the FQDN
1691     logger.Info("setting master to %s, old master: %s" %
1692                 (self.new_master, self.old_master))
1693
1694     if not rpc.call_node_stop_master(self.old_master):
1695       logger.Error("could disable the master role on the old master"
1696                    " %s, please disable manually" % self.old_master)
1697
1698     ss = self.sstore
1699     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1700     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1701                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1702       logger.Error("could not distribute the new simple store master file"
1703                    " to the other nodes, please check.")
1704
1705     if not rpc.call_node_start_master(self.new_master):
1706       logger.Error("could not start the master role on the new master"
1707                    " %s, please check" % self.new_master)
1708       feedback_fn("Error in activating the master IP on the new master,"
1709                   " please fix manually.")
1710
1711
1712
1713 class LUQueryClusterInfo(NoHooksLU):
1714   """Query cluster configuration.
1715
1716   """
1717   _OP_REQP = []
1718   REQ_MASTER = False
1719
1720   def CheckPrereq(self):
1721     """No prerequsites needed for this LU.
1722
1723     """
1724     pass
1725
1726   def Exec(self, feedback_fn):
1727     """Return cluster config.
1728
1729     """
1730     result = {
1731       "name": self.sstore.GetClusterName(),
1732       "software_version": constants.RELEASE_VERSION,
1733       "protocol_version": constants.PROTOCOL_VERSION,
1734       "config_version": constants.CONFIG_VERSION,
1735       "os_api_version": constants.OS_API_VERSION,
1736       "export_version": constants.EXPORT_VERSION,
1737       "master": self.sstore.GetMasterNode(),
1738       "architecture": (platform.architecture()[0], platform.machine()),
1739       }
1740
1741     return result
1742
1743
1744 class LUClusterCopyFile(NoHooksLU):
1745   """Copy file to cluster.
1746
1747   """
1748   _OP_REQP = ["nodes", "filename"]
1749
1750   def CheckPrereq(self):
1751     """Check prerequisites.
1752
1753     It should check that the named file exists and that the given list
1754     of nodes is valid.
1755
1756     """
1757     if not os.path.exists(self.op.filename):
1758       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1759
1760     self.nodes = _GetWantedNodes(self, self.op.nodes)
1761
1762   def Exec(self, feedback_fn):
1763     """Copy a file from master to some nodes.
1764
1765     Args:
1766       opts - class with options as members
1767       args - list containing a single element, the file name
1768     Opts used:
1769       nodes - list containing the name of target nodes; if empty, all nodes
1770
1771     """
1772     filename = self.op.filename
1773
1774     myname = utils.HostInfo().name
1775
1776     for node in self.nodes:
1777       if node == myname:
1778         continue
1779       if not ssh.CopyFileToNode(node, filename):
1780         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1781
1782
1783 class LUDumpClusterConfig(NoHooksLU):
1784   """Return a text-representation of the cluster-config.
1785
1786   """
1787   _OP_REQP = []
1788
1789   def CheckPrereq(self):
1790     """No prerequisites.
1791
1792     """
1793     pass
1794
1795   def Exec(self, feedback_fn):
1796     """Dump a representation of the cluster config to the standard output.
1797
1798     """
1799     return self.cfg.DumpConfig()
1800
1801
1802 class LURunClusterCommand(NoHooksLU):
1803   """Run a command on some nodes.
1804
1805   """
1806   _OP_REQP = ["command", "nodes"]
1807
1808   def CheckPrereq(self):
1809     """Check prerequisites.
1810
1811     It checks that the given list of nodes is valid.
1812
1813     """
1814     self.nodes = _GetWantedNodes(self, self.op.nodes)
1815
1816   def Exec(self, feedback_fn):
1817     """Run a command on some nodes.
1818
1819     """
1820     # put the master at the end of the nodes list
1821     master_node = self.sstore.GetMasterNode()
1822     if master_node in self.nodes:
1823       self.nodes.remove(master_node)
1824       self.nodes.append(master_node)
1825
1826     data = []
1827     for node in self.nodes:
1828       result = ssh.SSHCall(node, "root", self.op.command)
1829       data.append((node, result.output, result.exit_code))
1830
1831     return data
1832
1833
1834 class LUActivateInstanceDisks(NoHooksLU):
1835   """Bring up an instance's disks.
1836
1837   """
1838   _OP_REQP = ["instance_name"]
1839
1840   def CheckPrereq(self):
1841     """Check prerequisites.
1842
1843     This checks that the instance is in the cluster.
1844
1845     """
1846     instance = self.cfg.GetInstanceInfo(
1847       self.cfg.ExpandInstanceName(self.op.instance_name))
1848     if instance is None:
1849       raise errors.OpPrereqError("Instance '%s' not known" %
1850                                  self.op.instance_name)
1851     self.instance = instance
1852
1853
1854   def Exec(self, feedback_fn):
1855     """Activate the disks.
1856
1857     """
1858     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1859     if not disks_ok:
1860       raise errors.OpExecError("Cannot activate block devices")
1861
1862     return disks_info
1863
1864
1865 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1866   """Prepare the block devices for an instance.
1867
1868   This sets up the block devices on all nodes.
1869
1870   Args:
1871     instance: a ganeti.objects.Instance object
1872     ignore_secondaries: if true, errors on secondary nodes won't result
1873                         in an error return from the function
1874
1875   Returns:
1876     false if the operation failed
1877     list of (host, instance_visible_name, node_visible_name) if the operation
1878          suceeded with the mapping from node devices to instance devices
1879   """
1880   device_info = []
1881   disks_ok = True
1882   iname = instance.name
1883   # With the two passes mechanism we try to reduce the window of
1884   # opportunity for the race condition of switching DRBD to primary
1885   # before handshaking occured, but we do not eliminate it
1886
1887   # The proper fix would be to wait (with some limits) until the
1888   # connection has been made and drbd transitions from WFConnection
1889   # into any other network-connected state (Connected, SyncTarget,
1890   # SyncSource, etc.)
1891
1892   # 1st pass, assemble on all nodes in secondary mode
1893   for inst_disk in instance.disks:
1894     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1895       cfg.SetDiskID(node_disk, node)
1896       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1897       if not result:
1898         logger.Error("could not prepare block device %s on node %s"
1899                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1900         if not ignore_secondaries:
1901           disks_ok = False
1902
1903   # FIXME: race condition on drbd migration to primary
1904
1905   # 2nd pass, do only the primary node
1906   for inst_disk in instance.disks:
1907     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1908       if node != instance.primary_node:
1909         continue
1910       cfg.SetDiskID(node_disk, node)
1911       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1912       if not result:
1913         logger.Error("could not prepare block device %s on node %s"
1914                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1915         disks_ok = False
1916     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1917
1918   # leave the disks configured for the primary node
1919   # this is a workaround that would be fixed better by
1920   # improving the logical/physical id handling
1921   for disk in instance.disks:
1922     cfg.SetDiskID(disk, instance.primary_node)
1923
1924   return disks_ok, device_info
1925
1926
1927 def _StartInstanceDisks(cfg, instance, force):
1928   """Start the disks of an instance.
1929
1930   """
1931   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1932                                            ignore_secondaries=force)
1933   if not disks_ok:
1934     _ShutdownInstanceDisks(instance, cfg)
1935     if force is not None and not force:
1936       logger.Error("If the message above refers to a secondary node,"
1937                    " you can retry the operation using '--force'.")
1938     raise errors.OpExecError("Disk consistency error")
1939
1940
1941 class LUDeactivateInstanceDisks(NoHooksLU):
1942   """Shutdown an instance's disks.
1943
1944   """
1945   _OP_REQP = ["instance_name"]
1946
1947   def CheckPrereq(self):
1948     """Check prerequisites.
1949
1950     This checks that the instance is in the cluster.
1951
1952     """
1953     instance = self.cfg.GetInstanceInfo(
1954       self.cfg.ExpandInstanceName(self.op.instance_name))
1955     if instance is None:
1956       raise errors.OpPrereqError("Instance '%s' not known" %
1957                                  self.op.instance_name)
1958     self.instance = instance
1959
1960   def Exec(self, feedback_fn):
1961     """Deactivate the disks
1962
1963     """
1964     instance = self.instance
1965     ins_l = rpc.call_instance_list([instance.primary_node])
1966     ins_l = ins_l[instance.primary_node]
1967     if not type(ins_l) is list:
1968       raise errors.OpExecError("Can't contact node '%s'" %
1969                                instance.primary_node)
1970
1971     if self.instance.name in ins_l:
1972       raise errors.OpExecError("Instance is running, can't shutdown"
1973                                " block devices.")
1974
1975     _ShutdownInstanceDisks(instance, self.cfg)
1976
1977
1978 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1979   """Shutdown block devices of an instance.
1980
1981   This does the shutdown on all nodes of the instance.
1982
1983   If the ignore_primary is false, errors on the primary node are
1984   ignored.
1985
1986   """
1987   result = True
1988   for disk in instance.disks:
1989     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1990       cfg.SetDiskID(top_disk, node)
1991       if not rpc.call_blockdev_shutdown(node, top_disk):
1992         logger.Error("could not shutdown block device %s on node %s" %
1993                      (disk.iv_name, node))
1994         if not ignore_primary or node != instance.primary_node:
1995           result = False
1996   return result
1997
1998
1999 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2000   """Checks if a node has enough free memory.
2001
2002   This function check if a given node has the needed amount of free
2003   memory. In case the node has less memory or we cannot get the
2004   information from the node, this function raise an OpPrereqError
2005   exception.
2006
2007   Args:
2008     - cfg: a ConfigWriter instance
2009     - node: the node name
2010     - reason: string to use in the error message
2011     - requested: the amount of memory in MiB
2012
2013   """
2014   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2015   if not nodeinfo or not isinstance(nodeinfo, dict):
2016     raise errors.OpPrereqError("Could not contact node %s for resource"
2017                              " information" % (node,))
2018
2019   free_mem = nodeinfo[node].get('memory_free')
2020   if not isinstance(free_mem, int):
2021     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2022                              " was '%s'" % (node, free_mem))
2023   if requested > free_mem:
2024     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2025                              " needed %s MiB, available %s MiB" %
2026                              (node, reason, requested, free_mem))
2027
2028
2029 class LUStartupInstance(LogicalUnit):
2030   """Starts an instance.
2031
2032   """
2033   HPATH = "instance-start"
2034   HTYPE = constants.HTYPE_INSTANCE
2035   _OP_REQP = ["instance_name", "force"]
2036
2037   def BuildHooksEnv(self):
2038     """Build hooks env.
2039
2040     This runs on master, primary and secondary nodes of the instance.
2041
2042     """
2043     env = {
2044       "FORCE": self.op.force,
2045       }
2046     env.update(_BuildInstanceHookEnvByObject(self.instance))
2047     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2048           list(self.instance.secondary_nodes))
2049     return env, nl, nl
2050
2051   def CheckPrereq(self):
2052     """Check prerequisites.
2053
2054     This checks that the instance is in the cluster.
2055
2056     """
2057     instance = self.cfg.GetInstanceInfo(
2058       self.cfg.ExpandInstanceName(self.op.instance_name))
2059     if instance is None:
2060       raise errors.OpPrereqError("Instance '%s' not known" %
2061                                  self.op.instance_name)
2062
2063     # check bridges existance
2064     _CheckInstanceBridgesExist(instance)
2065
2066     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2067                          "starting instance %s" % instance.name,
2068                          instance.memory)
2069
2070     self.instance = instance
2071     self.op.instance_name = instance.name
2072
2073   def Exec(self, feedback_fn):
2074     """Start the instance.
2075
2076     """
2077     instance = self.instance
2078     force = self.op.force
2079     extra_args = getattr(self.op, "extra_args", "")
2080
2081     self.cfg.MarkInstanceUp(instance.name)
2082
2083     node_current = instance.primary_node
2084
2085     _StartInstanceDisks(self.cfg, instance, force)
2086
2087     if not rpc.call_instance_start(node_current, instance, extra_args):
2088       _ShutdownInstanceDisks(instance, self.cfg)
2089       raise errors.OpExecError("Could not start instance")
2090
2091
2092 class LURebootInstance(LogicalUnit):
2093   """Reboot an instance.
2094
2095   """
2096   HPATH = "instance-reboot"
2097   HTYPE = constants.HTYPE_INSTANCE
2098   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2099
2100   def BuildHooksEnv(self):
2101     """Build hooks env.
2102
2103     This runs on master, primary and secondary nodes of the instance.
2104
2105     """
2106     env = {
2107       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2108       }
2109     env.update(_BuildInstanceHookEnvByObject(self.instance))
2110     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2111           list(self.instance.secondary_nodes))
2112     return env, nl, nl
2113
2114   def CheckPrereq(self):
2115     """Check prerequisites.
2116
2117     This checks that the instance is in the cluster.
2118
2119     """
2120     instance = self.cfg.GetInstanceInfo(
2121       self.cfg.ExpandInstanceName(self.op.instance_name))
2122     if instance is None:
2123       raise errors.OpPrereqError("Instance '%s' not known" %
2124                                  self.op.instance_name)
2125
2126     # check bridges existance
2127     _CheckInstanceBridgesExist(instance)
2128
2129     self.instance = instance
2130     self.op.instance_name = instance.name
2131
2132   def Exec(self, feedback_fn):
2133     """Reboot the instance.
2134
2135     """
2136     instance = self.instance
2137     ignore_secondaries = self.op.ignore_secondaries
2138     reboot_type = self.op.reboot_type
2139     extra_args = getattr(self.op, "extra_args", "")
2140
2141     node_current = instance.primary_node
2142
2143     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2144                            constants.INSTANCE_REBOOT_HARD,
2145                            constants.INSTANCE_REBOOT_FULL]:
2146       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2147                                   (constants.INSTANCE_REBOOT_SOFT,
2148                                    constants.INSTANCE_REBOOT_HARD,
2149                                    constants.INSTANCE_REBOOT_FULL))
2150
2151     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2152                        constants.INSTANCE_REBOOT_HARD]:
2153       if not rpc.call_instance_reboot(node_current, instance,
2154                                       reboot_type, extra_args):
2155         raise errors.OpExecError("Could not reboot instance")
2156     else:
2157       if not rpc.call_instance_shutdown(node_current, instance):
2158         raise errors.OpExecError("could not shutdown instance for full reboot")
2159       _ShutdownInstanceDisks(instance, self.cfg)
2160       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2161       if not rpc.call_instance_start(node_current, instance, extra_args):
2162         _ShutdownInstanceDisks(instance, self.cfg)
2163         raise errors.OpExecError("Could not start instance for full reboot")
2164
2165     self.cfg.MarkInstanceUp(instance.name)
2166
2167
2168 class LUShutdownInstance(LogicalUnit):
2169   """Shutdown an instance.
2170
2171   """
2172   HPATH = "instance-stop"
2173   HTYPE = constants.HTYPE_INSTANCE
2174   _OP_REQP = ["instance_name"]
2175
2176   def BuildHooksEnv(self):
2177     """Build hooks env.
2178
2179     This runs on master, primary and secondary nodes of the instance.
2180
2181     """
2182     env = _BuildInstanceHookEnvByObject(self.instance)
2183     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2184           list(self.instance.secondary_nodes))
2185     return env, nl, nl
2186
2187   def CheckPrereq(self):
2188     """Check prerequisites.
2189
2190     This checks that the instance is in the cluster.
2191
2192     """
2193     instance = self.cfg.GetInstanceInfo(
2194       self.cfg.ExpandInstanceName(self.op.instance_name))
2195     if instance is None:
2196       raise errors.OpPrereqError("Instance '%s' not known" %
2197                                  self.op.instance_name)
2198     self.instance = instance
2199
2200   def Exec(self, feedback_fn):
2201     """Shutdown the instance.
2202
2203     """
2204     instance = self.instance
2205     node_current = instance.primary_node
2206     self.cfg.MarkInstanceDown(instance.name)
2207     if not rpc.call_instance_shutdown(node_current, instance):
2208       logger.Error("could not shutdown instance")
2209
2210     _ShutdownInstanceDisks(instance, self.cfg)
2211
2212
2213 class LUReinstallInstance(LogicalUnit):
2214   """Reinstall an instance.
2215
2216   """
2217   HPATH = "instance-reinstall"
2218   HTYPE = constants.HTYPE_INSTANCE
2219   _OP_REQP = ["instance_name"]
2220
2221   def BuildHooksEnv(self):
2222     """Build hooks env.
2223
2224     This runs on master, primary and secondary nodes of the instance.
2225
2226     """
2227     env = _BuildInstanceHookEnvByObject(self.instance)
2228     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2229           list(self.instance.secondary_nodes))
2230     return env, nl, nl
2231
2232   def CheckPrereq(self):
2233     """Check prerequisites.
2234
2235     This checks that the instance is in the cluster and is not running.
2236
2237     """
2238     instance = self.cfg.GetInstanceInfo(
2239       self.cfg.ExpandInstanceName(self.op.instance_name))
2240     if instance is None:
2241       raise errors.OpPrereqError("Instance '%s' not known" %
2242                                  self.op.instance_name)
2243     if instance.disk_template == constants.DT_DISKLESS:
2244       raise errors.OpPrereqError("Instance '%s' has no disks" %
2245                                  self.op.instance_name)
2246     if instance.status != "down":
2247       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2248                                  self.op.instance_name)
2249     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2250     if remote_info:
2251       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2252                                  (self.op.instance_name,
2253                                   instance.primary_node))
2254
2255     self.op.os_type = getattr(self.op, "os_type", None)
2256     if self.op.os_type is not None:
2257       # OS verification
2258       pnode = self.cfg.GetNodeInfo(
2259         self.cfg.ExpandNodeName(instance.primary_node))
2260       if pnode is None:
2261         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2262                                    self.op.pnode)
2263       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2264       if not os_obj:
2265         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2266                                    " primary node"  % self.op.os_type)
2267
2268     self.instance = instance
2269
2270   def Exec(self, feedback_fn):
2271     """Reinstall the instance.
2272
2273     """
2274     inst = self.instance
2275
2276     if self.op.os_type is not None:
2277       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2278       inst.os = self.op.os_type
2279       self.cfg.AddInstance(inst)
2280
2281     _StartInstanceDisks(self.cfg, inst, None)
2282     try:
2283       feedback_fn("Running the instance OS create scripts...")
2284       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2285         raise errors.OpExecError("Could not install OS for instance %s"
2286                                  " on node %s" %
2287                                  (inst.name, inst.primary_node))
2288     finally:
2289       _ShutdownInstanceDisks(inst, self.cfg)
2290
2291
2292 class LURenameInstance(LogicalUnit):
2293   """Rename an instance.
2294
2295   """
2296   HPATH = "instance-rename"
2297   HTYPE = constants.HTYPE_INSTANCE
2298   _OP_REQP = ["instance_name", "new_name"]
2299
2300   def BuildHooksEnv(self):
2301     """Build hooks env.
2302
2303     This runs on master, primary and secondary nodes of the instance.
2304
2305     """
2306     env = _BuildInstanceHookEnvByObject(self.instance)
2307     env["INSTANCE_NEW_NAME"] = self.op.new_name
2308     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2309           list(self.instance.secondary_nodes))
2310     return env, nl, nl
2311
2312   def CheckPrereq(self):
2313     """Check prerequisites.
2314
2315     This checks that the instance is in the cluster and is not running.
2316
2317     """
2318     instance = self.cfg.GetInstanceInfo(
2319       self.cfg.ExpandInstanceName(self.op.instance_name))
2320     if instance is None:
2321       raise errors.OpPrereqError("Instance '%s' not known" %
2322                                  self.op.instance_name)
2323     if instance.status != "down":
2324       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2325                                  self.op.instance_name)
2326     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2327     if remote_info:
2328       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2329                                  (self.op.instance_name,
2330                                   instance.primary_node))
2331     self.instance = instance
2332
2333     # new name verification
2334     name_info = utils.HostInfo(self.op.new_name)
2335
2336     self.op.new_name = new_name = name_info.name
2337     instance_list = self.cfg.GetInstanceList()
2338     if new_name in instance_list:
2339       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2340                                  instance_name)
2341
2342     if not getattr(self.op, "ignore_ip", False):
2343       command = ["fping", "-q", name_info.ip]
2344       result = utils.RunCmd(command)
2345       if not result.failed:
2346         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2347                                    (name_info.ip, new_name))
2348
2349
2350   def Exec(self, feedback_fn):
2351     """Reinstall the instance.
2352
2353     """
2354     inst = self.instance
2355     old_name = inst.name
2356
2357     self.cfg.RenameInstance(inst.name, self.op.new_name)
2358
2359     # re-read the instance from the configuration after rename
2360     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2361
2362     _StartInstanceDisks(self.cfg, inst, None)
2363     try:
2364       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2365                                           "sda", "sdb"):
2366         msg = ("Could run OS rename script for instance %s on node %s (but the"
2367                " instance has been renamed in Ganeti)" %
2368                (inst.name, inst.primary_node))
2369         logger.Error(msg)
2370     finally:
2371       _ShutdownInstanceDisks(inst, self.cfg)
2372
2373
2374 class LURemoveInstance(LogicalUnit):
2375   """Remove an instance.
2376
2377   """
2378   HPATH = "instance-remove"
2379   HTYPE = constants.HTYPE_INSTANCE
2380   _OP_REQP = ["instance_name"]
2381
2382   def BuildHooksEnv(self):
2383     """Build hooks env.
2384
2385     This runs on master, primary and secondary nodes of the instance.
2386
2387     """
2388     env = _BuildInstanceHookEnvByObject(self.instance)
2389     nl = [self.sstore.GetMasterNode()]
2390     return env, nl, nl
2391
2392   def CheckPrereq(self):
2393     """Check prerequisites.
2394
2395     This checks that the instance is in the cluster.
2396
2397     """
2398     instance = self.cfg.GetInstanceInfo(
2399       self.cfg.ExpandInstanceName(self.op.instance_name))
2400     if instance is None:
2401       raise errors.OpPrereqError("Instance '%s' not known" %
2402                                  self.op.instance_name)
2403     self.instance = instance
2404
2405   def Exec(self, feedback_fn):
2406     """Remove the instance.
2407
2408     """
2409     instance = self.instance
2410     logger.Info("shutting down instance %s on node %s" %
2411                 (instance.name, instance.primary_node))
2412
2413     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2414       if self.op.ignore_failures:
2415         feedback_fn("Warning: can't shutdown instance")
2416       else:
2417         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2418                                  (instance.name, instance.primary_node))
2419
2420     logger.Info("removing block devices for instance %s" % instance.name)
2421
2422     if not _RemoveDisks(instance, self.cfg):
2423       if self.op.ignore_failures:
2424         feedback_fn("Warning: can't remove instance's disks")
2425       else:
2426         raise errors.OpExecError("Can't remove instance's disks")
2427
2428     logger.Info("removing instance %s out of cluster config" % instance.name)
2429
2430     self.cfg.RemoveInstance(instance.name)
2431
2432
2433 class LUQueryInstances(NoHooksLU):
2434   """Logical unit for querying instances.
2435
2436   """
2437   _OP_REQP = ["output_fields", "names"]
2438
2439   def CheckPrereq(self):
2440     """Check prerequisites.
2441
2442     This checks that the fields required are valid output fields.
2443
2444     """
2445     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2446     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2447                                "admin_state", "admin_ram",
2448                                "disk_template", "ip", "mac", "bridge",
2449                                "sda_size", "sdb_size", "vcpus"],
2450                        dynamic=self.dynamic_fields,
2451                        selected=self.op.output_fields)
2452
2453     self.wanted = _GetWantedInstances(self, self.op.names)
2454
2455   def Exec(self, feedback_fn):
2456     """Computes the list of nodes and their attributes.
2457
2458     """
2459     instance_names = self.wanted
2460     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2461                      in instance_names]
2462
2463     # begin data gathering
2464
2465     nodes = frozenset([inst.primary_node for inst in instance_list])
2466
2467     bad_nodes = []
2468     if self.dynamic_fields.intersection(self.op.output_fields):
2469       live_data = {}
2470       node_data = rpc.call_all_instances_info(nodes)
2471       for name in nodes:
2472         result = node_data[name]
2473         if result:
2474           live_data.update(result)
2475         elif result == False:
2476           bad_nodes.append(name)
2477         # else no instance is alive
2478     else:
2479       live_data = dict([(name, {}) for name in instance_names])
2480
2481     # end data gathering
2482
2483     output = []
2484     for instance in instance_list:
2485       iout = []
2486       for field in self.op.output_fields:
2487         if field == "name":
2488           val = instance.name
2489         elif field == "os":
2490           val = instance.os
2491         elif field == "pnode":
2492           val = instance.primary_node
2493         elif field == "snodes":
2494           val = list(instance.secondary_nodes)
2495         elif field == "admin_state":
2496           val = (instance.status != "down")
2497         elif field == "oper_state":
2498           if instance.primary_node in bad_nodes:
2499             val = None
2500           else:
2501             val = bool(live_data.get(instance.name))
2502         elif field == "status":
2503           if instance.primary_node in bad_nodes:
2504             val = "ERROR_nodedown"
2505           else:
2506             running = bool(live_data.get(instance.name))
2507             if running:
2508               if instance.status != "down":
2509                 val = "running"
2510               else:
2511                 val = "ERROR_up"
2512             else:
2513               if instance.status != "down":
2514                 val = "ERROR_down"
2515               else:
2516                 val = "ADMIN_down"
2517         elif field == "admin_ram":
2518           val = instance.memory
2519         elif field == "oper_ram":
2520           if instance.primary_node in bad_nodes:
2521             val = None
2522           elif instance.name in live_data:
2523             val = live_data[instance.name].get("memory", "?")
2524           else:
2525             val = "-"
2526         elif field == "disk_template":
2527           val = instance.disk_template
2528         elif field == "ip":
2529           val = instance.nics[0].ip
2530         elif field == "bridge":
2531           val = instance.nics[0].bridge
2532         elif field == "mac":
2533           val = instance.nics[0].mac
2534         elif field == "sda_size" or field == "sdb_size":
2535           disk = instance.FindDisk(field[:3])
2536           if disk is None:
2537             val = None
2538           else:
2539             val = disk.size
2540         elif field == "vcpus":
2541           val = instance.vcpus
2542         else:
2543           raise errors.ParameterError(field)
2544         iout.append(val)
2545       output.append(iout)
2546
2547     return output
2548
2549
2550 class LUFailoverInstance(LogicalUnit):
2551   """Failover an instance.
2552
2553   """
2554   HPATH = "instance-failover"
2555   HTYPE = constants.HTYPE_INSTANCE
2556   _OP_REQP = ["instance_name", "ignore_consistency"]
2557
2558   def BuildHooksEnv(self):
2559     """Build hooks env.
2560
2561     This runs on master, primary and secondary nodes of the instance.
2562
2563     """
2564     env = {
2565       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2566       }
2567     env.update(_BuildInstanceHookEnvByObject(self.instance))
2568     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2569     return env, nl, nl
2570
2571   def CheckPrereq(self):
2572     """Check prerequisites.
2573
2574     This checks that the instance is in the cluster.
2575
2576     """
2577     instance = self.cfg.GetInstanceInfo(
2578       self.cfg.ExpandInstanceName(self.op.instance_name))
2579     if instance is None:
2580       raise errors.OpPrereqError("Instance '%s' not known" %
2581                                  self.op.instance_name)
2582
2583     if instance.disk_template not in constants.DTS_NET_MIRROR:
2584       raise errors.OpPrereqError("Instance's disk layout is not"
2585                                  " network mirrored, cannot failover.")
2586
2587     secondary_nodes = instance.secondary_nodes
2588     if not secondary_nodes:
2589       raise errors.ProgrammerError("no secondary node but using "
2590                                    "DT_REMOTE_RAID1 template")
2591
2592     target_node = secondary_nodes[0]
2593     # check memory requirements on the secondary node
2594     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2595                          instance.name, instance.memory)
2596
2597     # check bridge existance
2598     brlist = [nic.bridge for nic in instance.nics]
2599     if not rpc.call_bridges_exist(target_node, brlist):
2600       raise errors.OpPrereqError("One or more target bridges %s does not"
2601                                  " exist on destination node '%s'" %
2602                                  (brlist, target_node))
2603
2604     self.instance = instance
2605
2606   def Exec(self, feedback_fn):
2607     """Failover an instance.
2608
2609     The failover is done by shutting it down on its present node and
2610     starting it on the secondary.
2611
2612     """
2613     instance = self.instance
2614
2615     source_node = instance.primary_node
2616     target_node = instance.secondary_nodes[0]
2617
2618     feedback_fn("* checking disk consistency between source and target")
2619     for dev in instance.disks:
2620       # for remote_raid1, these are md over drbd
2621       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2622         if instance.status == "up" and not self.op.ignore_consistency:
2623           raise errors.OpExecError("Disk %s is degraded on target node,"
2624                                    " aborting failover." % dev.iv_name)
2625
2626     feedback_fn("* shutting down instance on source node")
2627     logger.Info("Shutting down instance %s on node %s" %
2628                 (instance.name, source_node))
2629
2630     if not rpc.call_instance_shutdown(source_node, instance):
2631       if self.op.ignore_consistency:
2632         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2633                      " anyway. Please make sure node %s is down"  %
2634                      (instance.name, source_node, source_node))
2635       else:
2636         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2637                                  (instance.name, source_node))
2638
2639     feedback_fn("* deactivating the instance's disks on source node")
2640     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2641       raise errors.OpExecError("Can't shut down the instance's disks.")
2642
2643     instance.primary_node = target_node
2644     # distribute new instance config to the other nodes
2645     self.cfg.AddInstance(instance)
2646
2647     # Only start the instance if it's marked as up
2648     if instance.status == "up":
2649       feedback_fn("* activating the instance's disks on target node")
2650       logger.Info("Starting instance %s on node %s" %
2651                   (instance.name, target_node))
2652
2653       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2654                                                ignore_secondaries=True)
2655       if not disks_ok:
2656         _ShutdownInstanceDisks(instance, self.cfg)
2657         raise errors.OpExecError("Can't activate the instance's disks")
2658
2659       feedback_fn("* starting the instance on the target node")
2660       if not rpc.call_instance_start(target_node, instance, None):
2661         _ShutdownInstanceDisks(instance, self.cfg)
2662         raise errors.OpExecError("Could not start instance %s on node %s." %
2663                                  (instance.name, target_node))
2664
2665
2666 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2667   """Create a tree of block devices on the primary node.
2668
2669   This always creates all devices.
2670
2671   """
2672   if device.children:
2673     for child in device.children:
2674       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2675         return False
2676
2677   cfg.SetDiskID(device, node)
2678   new_id = rpc.call_blockdev_create(node, device, device.size,
2679                                     instance.name, True, info)
2680   if not new_id:
2681     return False
2682   if device.physical_id is None:
2683     device.physical_id = new_id
2684   return True
2685
2686
2687 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2688   """Create a tree of block devices on a secondary node.
2689
2690   If this device type has to be created on secondaries, create it and
2691   all its children.
2692
2693   If not, just recurse to children keeping the same 'force' value.
2694
2695   """
2696   if device.CreateOnSecondary():
2697     force = True
2698   if device.children:
2699     for child in device.children:
2700       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2701                                         child, force, info):
2702         return False
2703
2704   if not force:
2705     return True
2706   cfg.SetDiskID(device, node)
2707   new_id = rpc.call_blockdev_create(node, device, device.size,
2708                                     instance.name, False, info)
2709   if not new_id:
2710     return False
2711   if device.physical_id is None:
2712     device.physical_id = new_id
2713   return True
2714
2715
2716 def _GenerateUniqueNames(cfg, exts):
2717   """Generate a suitable LV name.
2718
2719   This will generate a logical volume name for the given instance.
2720
2721   """
2722   results = []
2723   for val in exts:
2724     new_id = cfg.GenerateUniqueID()
2725     results.append("%s%s" % (new_id, val))
2726   return results
2727
2728
2729 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2730   """Generate a drbd device complete with its children.
2731
2732   """
2733   port = cfg.AllocatePort()
2734   vgname = cfg.GetVGName()
2735   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2736                           logical_id=(vgname, names[0]))
2737   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2738                           logical_id=(vgname, names[1]))
2739   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2740                           logical_id = (primary, secondary, port),
2741                           children = [dev_data, dev_meta])
2742   return drbd_dev
2743
2744
2745 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2746   """Generate a drbd8 device complete with its children.
2747
2748   """
2749   port = cfg.AllocatePort()
2750   vgname = cfg.GetVGName()
2751   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2752                           logical_id=(vgname, names[0]))
2753   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2754                           logical_id=(vgname, names[1]))
2755   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2756                           logical_id = (primary, secondary, port),
2757                           children = [dev_data, dev_meta],
2758                           iv_name=iv_name)
2759   return drbd_dev
2760
2761 def _GenerateDiskTemplate(cfg, template_name,
2762                           instance_name, primary_node,
2763                           secondary_nodes, disk_sz, swap_sz):
2764   """Generate the entire disk layout for a given template type.
2765
2766   """
2767   #TODO: compute space requirements
2768
2769   vgname = cfg.GetVGName()
2770   if template_name == constants.DT_DISKLESS:
2771     disks = []
2772   elif template_name == constants.DT_PLAIN:
2773     if len(secondary_nodes) != 0:
2774       raise errors.ProgrammerError("Wrong template configuration")
2775
2776     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2777     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2778                            logical_id=(vgname, names[0]),
2779                            iv_name = "sda")
2780     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2781                            logical_id=(vgname, names[1]),
2782                            iv_name = "sdb")
2783     disks = [sda_dev, sdb_dev]
2784   elif template_name == constants.DT_LOCAL_RAID1:
2785     if len(secondary_nodes) != 0:
2786       raise errors.ProgrammerError("Wrong template configuration")
2787
2788
2789     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2790                                        ".sdb_m1", ".sdb_m2"])
2791     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2792                               logical_id=(vgname, names[0]))
2793     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2794                               logical_id=(vgname, names[1]))
2795     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2796                               size=disk_sz,
2797                               children = [sda_dev_m1, sda_dev_m2])
2798     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2799                               logical_id=(vgname, names[2]))
2800     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2801                               logical_id=(vgname, names[3]))
2802     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2803                               size=swap_sz,
2804                               children = [sdb_dev_m1, sdb_dev_m2])
2805     disks = [md_sda_dev, md_sdb_dev]
2806   elif template_name == constants.DT_REMOTE_RAID1:
2807     if len(secondary_nodes) != 1:
2808       raise errors.ProgrammerError("Wrong template configuration")
2809     remote_node = secondary_nodes[0]
2810     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2811                                        ".sdb_data", ".sdb_meta"])
2812     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2813                                          disk_sz, names[0:2])
2814     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2815                               children = [drbd_sda_dev], size=disk_sz)
2816     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2817                                          swap_sz, names[2:4])
2818     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2819                               children = [drbd_sdb_dev], size=swap_sz)
2820     disks = [md_sda_dev, md_sdb_dev]
2821   elif template_name == constants.DT_DRBD8:
2822     if len(secondary_nodes) != 1:
2823       raise errors.ProgrammerError("Wrong template configuration")
2824     remote_node = secondary_nodes[0]
2825     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2826                                        ".sdb_data", ".sdb_meta"])
2827     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2828                                          disk_sz, names[0:2], "sda")
2829     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2830                                          swap_sz, names[2:4], "sdb")
2831     disks = [drbd_sda_dev, drbd_sdb_dev]
2832   else:
2833     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2834   return disks
2835
2836
2837 def _GetInstanceInfoText(instance):
2838   """Compute that text that should be added to the disk's metadata.
2839
2840   """
2841   return "originstname+%s" % instance.name
2842
2843
2844 def _CreateDisks(cfg, instance):
2845   """Create all disks for an instance.
2846
2847   This abstracts away some work from AddInstance.
2848
2849   Args:
2850     instance: the instance object
2851
2852   Returns:
2853     True or False showing the success of the creation process
2854
2855   """
2856   info = _GetInstanceInfoText(instance)
2857
2858   for device in instance.disks:
2859     logger.Info("creating volume %s for instance %s" %
2860               (device.iv_name, instance.name))
2861     #HARDCODE
2862     for secondary_node in instance.secondary_nodes:
2863       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2864                                         device, False, info):
2865         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2866                      (device.iv_name, device, secondary_node))
2867         return False
2868     #HARDCODE
2869     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2870                                     instance, device, info):
2871       logger.Error("failed to create volume %s on primary!" %
2872                    device.iv_name)
2873       return False
2874   return True
2875
2876
2877 def _RemoveDisks(instance, cfg):
2878   """Remove all disks for an instance.
2879
2880   This abstracts away some work from `AddInstance()` and
2881   `RemoveInstance()`. Note that in case some of the devices couldn't
2882   be removed, the removal will continue with the other ones (compare
2883   with `_CreateDisks()`).
2884
2885   Args:
2886     instance: the instance object
2887
2888   Returns:
2889     True or False showing the success of the removal proces
2890
2891   """
2892   logger.Info("removing block devices for instance %s" % instance.name)
2893
2894   result = True
2895   for device in instance.disks:
2896     for node, disk in device.ComputeNodeTree(instance.primary_node):
2897       cfg.SetDiskID(disk, node)
2898       if not rpc.call_blockdev_remove(node, disk):
2899         logger.Error("could not remove block device %s on node %s,"
2900                      " continuing anyway" %
2901                      (device.iv_name, node))
2902         result = False
2903   return result
2904
2905
2906 class LUCreateInstance(LogicalUnit):
2907   """Create an instance.
2908
2909   """
2910   HPATH = "instance-add"
2911   HTYPE = constants.HTYPE_INSTANCE
2912   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2913               "disk_template", "swap_size", "mode", "start", "vcpus",
2914               "wait_for_sync", "ip_check", "mac"]
2915
2916   def BuildHooksEnv(self):
2917     """Build hooks env.
2918
2919     This runs on master, primary and secondary nodes of the instance.
2920
2921     """
2922     env = {
2923       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2924       "INSTANCE_DISK_SIZE": self.op.disk_size,
2925       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2926       "INSTANCE_ADD_MODE": self.op.mode,
2927       }
2928     if self.op.mode == constants.INSTANCE_IMPORT:
2929       env["INSTANCE_SRC_NODE"] = self.op.src_node
2930       env["INSTANCE_SRC_PATH"] = self.op.src_path
2931       env["INSTANCE_SRC_IMAGE"] = self.src_image
2932
2933     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2934       primary_node=self.op.pnode,
2935       secondary_nodes=self.secondaries,
2936       status=self.instance_status,
2937       os_type=self.op.os_type,
2938       memory=self.op.mem_size,
2939       vcpus=self.op.vcpus,
2940       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2941     ))
2942
2943     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2944           self.secondaries)
2945     return env, nl, nl
2946
2947
2948   def CheckPrereq(self):
2949     """Check prerequisites.
2950
2951     """
2952     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2953       if not hasattr(self.op, attr):
2954         setattr(self.op, attr, None)
2955
2956     if self.op.mode not in (constants.INSTANCE_CREATE,
2957                             constants.INSTANCE_IMPORT):
2958       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2959                                  self.op.mode)
2960
2961     if self.op.mode == constants.INSTANCE_IMPORT:
2962       src_node = getattr(self.op, "src_node", None)
2963       src_path = getattr(self.op, "src_path", None)
2964       if src_node is None or src_path is None:
2965         raise errors.OpPrereqError("Importing an instance requires source"
2966                                    " node and path options")
2967       src_node_full = self.cfg.ExpandNodeName(src_node)
2968       if src_node_full is None:
2969         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2970       self.op.src_node = src_node = src_node_full
2971
2972       if not os.path.isabs(src_path):
2973         raise errors.OpPrereqError("The source path must be absolute")
2974
2975       export_info = rpc.call_export_info(src_node, src_path)
2976
2977       if not export_info:
2978         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2979
2980       if not export_info.has_section(constants.INISECT_EXP):
2981         raise errors.ProgrammerError("Corrupted export config")
2982
2983       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2984       if (int(ei_version) != constants.EXPORT_VERSION):
2985         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2986                                    (ei_version, constants.EXPORT_VERSION))
2987
2988       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2989         raise errors.OpPrereqError("Can't import instance with more than"
2990                                    " one data disk")
2991
2992       # FIXME: are the old os-es, disk sizes, etc. useful?
2993       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2994       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2995                                                          'disk0_dump'))
2996       self.src_image = diskimage
2997     else: # INSTANCE_CREATE
2998       if getattr(self.op, "os_type", None) is None:
2999         raise errors.OpPrereqError("No guest OS specified")
3000
3001     # check primary node
3002     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3003     if pnode is None:
3004       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3005                                  self.op.pnode)
3006     self.op.pnode = pnode.name
3007     self.pnode = pnode
3008     self.secondaries = []
3009     # disk template and mirror node verification
3010     if self.op.disk_template not in constants.DISK_TEMPLATES:
3011       raise errors.OpPrereqError("Invalid disk template name")
3012
3013     if self.op.disk_template in constants.DTS_NET_MIRROR:
3014       if getattr(self.op, "snode", None) is None:
3015         raise errors.OpPrereqError("The networked disk templates need"
3016                                    " a mirror node")
3017
3018       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3019       if snode_name is None:
3020         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3021                                    self.op.snode)
3022       elif snode_name == pnode.name:
3023         raise errors.OpPrereqError("The secondary node cannot be"
3024                                    " the primary node.")
3025       self.secondaries.append(snode_name)
3026
3027     # Required free disk space as a function of disk and swap space
3028     req_size_dict = {
3029       constants.DT_DISKLESS: None,
3030       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3031       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3032       # 256 MB are added for drbd metadata, 128MB for each drbd device
3033       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3034       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3035     }
3036
3037     if self.op.disk_template not in req_size_dict:
3038       raise errors.ProgrammerError("Disk template '%s' size requirement"
3039                                    " is unknown" %  self.op.disk_template)
3040
3041     req_size = req_size_dict[self.op.disk_template]
3042
3043     # Check lv size requirements
3044     if req_size is not None:
3045       nodenames = [pnode.name] + self.secondaries
3046       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3047       for node in nodenames:
3048         info = nodeinfo.get(node, None)
3049         if not info:
3050           raise errors.OpPrereqError("Cannot get current information"
3051                                      " from node '%s'" % nodeinfo)
3052         vg_free = info.get('vg_free', None)
3053         if not isinstance(vg_free, int):
3054           raise errors.OpPrereqError("Can't compute free disk space on"
3055                                      " node %s" % node)
3056         if req_size > info['vg_free']:
3057           raise errors.OpPrereqError("Not enough disk space on target node %s."
3058                                      " %d MB available, %d MB required" %
3059                                      (node, info['vg_free'], req_size))
3060
3061     # os verification
3062     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3063     if not os_obj:
3064       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3065                                  " primary node"  % self.op.os_type)
3066
3067     if self.op.kernel_path == constants.VALUE_NONE:
3068       raise errors.OpPrereqError("Can't set instance kernel to none")
3069
3070     # instance verification
3071     hostname1 = utils.HostInfo(self.op.instance_name)
3072
3073     self.op.instance_name = instance_name = hostname1.name
3074     instance_list = self.cfg.GetInstanceList()
3075     if instance_name in instance_list:
3076       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3077                                  instance_name)
3078
3079     ip = getattr(self.op, "ip", None)
3080     if ip is None or ip.lower() == "none":
3081       inst_ip = None
3082     elif ip.lower() == "auto":
3083       inst_ip = hostname1.ip
3084     else:
3085       if not utils.IsValidIP(ip):
3086         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3087                                    " like a valid IP" % ip)
3088       inst_ip = ip
3089     self.inst_ip = inst_ip
3090
3091     if self.op.start and not self.op.ip_check:
3092       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3093                                  " adding an instance in start mode")
3094
3095     if self.op.ip_check:
3096       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3097         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3098                                    (hostname1.ip, instance_name))
3099
3100     # MAC address verification
3101     if self.op.mac != "auto":
3102       if not utils.IsValidMac(self.op.mac.lower()):
3103         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3104                                    self.op.mac)
3105
3106     # bridge verification
3107     bridge = getattr(self.op, "bridge", None)
3108     if bridge is None:
3109       self.op.bridge = self.cfg.GetDefBridge()
3110     else:
3111       self.op.bridge = bridge
3112
3113     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3114       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3115                                  " destination node '%s'" %
3116                                  (self.op.bridge, pnode.name))
3117
3118     # boot order verification
3119     if self.op.hvm_boot_order is not None:
3120       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3121         raise errors.OpPrereqError("invalid boot order specified,"
3122                                    " must be one or more of [acdn]")
3123
3124     if self.op.start:
3125       self.instance_status = 'up'
3126     else:
3127       self.instance_status = 'down'
3128
3129   def Exec(self, feedback_fn):
3130     """Create and add the instance to the cluster.
3131
3132     """
3133     instance = self.op.instance_name
3134     pnode_name = self.pnode.name
3135
3136     if self.op.mac == "auto":
3137       mac_address = self.cfg.GenerateMAC()
3138     else:
3139       mac_address = self.op.mac
3140
3141     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3142     if self.inst_ip is not None:
3143       nic.ip = self.inst_ip
3144
3145     ht_kind = self.sstore.GetHypervisorType()
3146     if ht_kind in constants.HTS_REQ_PORT:
3147       network_port = self.cfg.AllocatePort()
3148     else:
3149       network_port = None
3150
3151     disks = _GenerateDiskTemplate(self.cfg,
3152                                   self.op.disk_template,
3153                                   instance, pnode_name,
3154                                   self.secondaries, self.op.disk_size,
3155                                   self.op.swap_size)
3156
3157     iobj = objects.Instance(name=instance, os=self.op.os_type,
3158                             primary_node=pnode_name,
3159                             memory=self.op.mem_size,
3160                             vcpus=self.op.vcpus,
3161                             nics=[nic], disks=disks,
3162                             disk_template=self.op.disk_template,
3163                             status=self.instance_status,
3164                             network_port=network_port,
3165                             kernel_path=self.op.kernel_path,
3166                             initrd_path=self.op.initrd_path,
3167                             hvm_boot_order=self.op.hvm_boot_order,
3168                             )
3169
3170     feedback_fn("* creating instance disks...")
3171     if not _CreateDisks(self.cfg, iobj):
3172       _RemoveDisks(iobj, self.cfg)
3173       raise errors.OpExecError("Device creation failed, reverting...")
3174
3175     feedback_fn("adding instance %s to cluster config" % instance)
3176
3177     self.cfg.AddInstance(iobj)
3178
3179     if self.op.wait_for_sync:
3180       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3181     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3182       # make sure the disks are not degraded (still sync-ing is ok)
3183       time.sleep(15)
3184       feedback_fn("* checking mirrors status")
3185       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3186     else:
3187       disk_abort = False
3188
3189     if disk_abort:
3190       _RemoveDisks(iobj, self.cfg)
3191       self.cfg.RemoveInstance(iobj.name)
3192       raise errors.OpExecError("There are some degraded disks for"
3193                                " this instance")
3194
3195     feedback_fn("creating os for instance %s on node %s" %
3196                 (instance, pnode_name))
3197
3198     if iobj.disk_template != constants.DT_DISKLESS:
3199       if self.op.mode == constants.INSTANCE_CREATE:
3200         feedback_fn("* running the instance OS create scripts...")
3201         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3202           raise errors.OpExecError("could not add os for instance %s"
3203                                    " on node %s" %
3204                                    (instance, pnode_name))
3205
3206       elif self.op.mode == constants.INSTANCE_IMPORT:
3207         feedback_fn("* running the instance OS import scripts...")
3208         src_node = self.op.src_node
3209         src_image = self.src_image
3210         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3211                                                 src_node, src_image):
3212           raise errors.OpExecError("Could not import os for instance"
3213                                    " %s on node %s" %
3214                                    (instance, pnode_name))
3215       else:
3216         # also checked in the prereq part
3217         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3218                                      % self.op.mode)
3219
3220     if self.op.start:
3221       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3222       feedback_fn("* starting instance...")
3223       if not rpc.call_instance_start(pnode_name, iobj, None):
3224         raise errors.OpExecError("Could not start instance")
3225
3226
3227 class LUConnectConsole(NoHooksLU):
3228   """Connect to an instance's console.
3229
3230   This is somewhat special in that it returns the command line that
3231   you need to run on the master node in order to connect to the
3232   console.
3233
3234   """
3235   _OP_REQP = ["instance_name"]
3236
3237   def CheckPrereq(self):
3238     """Check prerequisites.
3239
3240     This checks that the instance is in the cluster.
3241
3242     """
3243     instance = self.cfg.GetInstanceInfo(
3244       self.cfg.ExpandInstanceName(self.op.instance_name))
3245     if instance is None:
3246       raise errors.OpPrereqError("Instance '%s' not known" %
3247                                  self.op.instance_name)
3248     self.instance = instance
3249
3250   def Exec(self, feedback_fn):
3251     """Connect to the console of an instance
3252
3253     """
3254     instance = self.instance
3255     node = instance.primary_node
3256
3257     node_insts = rpc.call_instance_list([node])[node]
3258     if node_insts is False:
3259       raise errors.OpExecError("Can't connect to node %s." % node)
3260
3261     if instance.name not in node_insts:
3262       raise errors.OpExecError("Instance %s is not running." % instance.name)
3263
3264     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3265
3266     hyper = hypervisor.GetHypervisor()
3267     console_cmd = hyper.GetShellCommandForConsole(instance)
3268     # build ssh cmdline
3269     argv = ["ssh", "-q", "-t"]
3270     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3271     argv.extend(ssh.BATCH_MODE_OPTS)
3272     argv.append(node)
3273     argv.append(console_cmd)
3274     return "ssh", argv
3275
3276
3277 class LUAddMDDRBDComponent(LogicalUnit):
3278   """Adda new mirror member to an instance's disk.
3279
3280   """
3281   HPATH = "mirror-add"
3282   HTYPE = constants.HTYPE_INSTANCE
3283   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3284
3285   def BuildHooksEnv(self):
3286     """Build hooks env.
3287
3288     This runs on the master, the primary and all the secondaries.
3289
3290     """
3291     env = {
3292       "NEW_SECONDARY": self.op.remote_node,
3293       "DISK_NAME": self.op.disk_name,
3294       }
3295     env.update(_BuildInstanceHookEnvByObject(self.instance))
3296     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3297           self.op.remote_node,] + list(self.instance.secondary_nodes)
3298     return env, nl, nl
3299
3300   def CheckPrereq(self):
3301     """Check prerequisites.
3302
3303     This checks that the instance is in the cluster.
3304
3305     """
3306     instance = self.cfg.GetInstanceInfo(
3307       self.cfg.ExpandInstanceName(self.op.instance_name))
3308     if instance is None:
3309       raise errors.OpPrereqError("Instance '%s' not known" %
3310                                  self.op.instance_name)
3311     self.instance = instance
3312
3313     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3314     if remote_node is None:
3315       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3316     self.remote_node = remote_node
3317
3318     if remote_node == instance.primary_node:
3319       raise errors.OpPrereqError("The specified node is the primary node of"
3320                                  " the instance.")
3321
3322     if instance.disk_template != constants.DT_REMOTE_RAID1:
3323       raise errors.OpPrereqError("Instance's disk layout is not"
3324                                  " remote_raid1.")
3325     for disk in instance.disks:
3326       if disk.iv_name == self.op.disk_name:
3327         break
3328     else:
3329       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3330                                  " instance." % self.op.disk_name)
3331     if len(disk.children) > 1:
3332       raise errors.OpPrereqError("The device already has two slave devices."
3333                                  " This would create a 3-disk raid1 which we"
3334                                  " don't allow.")
3335     self.disk = disk
3336
3337   def Exec(self, feedback_fn):
3338     """Add the mirror component
3339
3340     """
3341     disk = self.disk
3342     instance = self.instance
3343
3344     remote_node = self.remote_node
3345     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3346     names = _GenerateUniqueNames(self.cfg, lv_names)
3347     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3348                                      remote_node, disk.size, names)
3349
3350     logger.Info("adding new mirror component on secondary")
3351     #HARDCODE
3352     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3353                                       new_drbd, False,
3354                                       _GetInstanceInfoText(instance)):
3355       raise errors.OpExecError("Failed to create new component on secondary"
3356                                " node %s" % remote_node)
3357
3358     logger.Info("adding new mirror component on primary")
3359     #HARDCODE
3360     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3361                                     instance, new_drbd,
3362                                     _GetInstanceInfoText(instance)):
3363       # remove secondary dev
3364       self.cfg.SetDiskID(new_drbd, remote_node)
3365       rpc.call_blockdev_remove(remote_node, new_drbd)
3366       raise errors.OpExecError("Failed to create volume on primary")
3367
3368     # the device exists now
3369     # call the primary node to add the mirror to md
3370     logger.Info("adding new mirror component to md")
3371     if not rpc.call_blockdev_addchildren(instance.primary_node,
3372                                          disk, [new_drbd]):
3373       logger.Error("Can't add mirror compoment to md!")
3374       self.cfg.SetDiskID(new_drbd, remote_node)
3375       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3376         logger.Error("Can't rollback on secondary")
3377       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3378       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3379         logger.Error("Can't rollback on primary")
3380       raise errors.OpExecError("Can't add mirror component to md array")
3381
3382     disk.children.append(new_drbd)
3383
3384     self.cfg.AddInstance(instance)
3385
3386     _WaitForSync(self.cfg, instance, self.proc)
3387
3388     return 0
3389
3390
3391 class LURemoveMDDRBDComponent(LogicalUnit):
3392   """Remove a component from a remote_raid1 disk.
3393
3394   """
3395   HPATH = "mirror-remove"
3396   HTYPE = constants.HTYPE_INSTANCE
3397   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3398
3399   def BuildHooksEnv(self):
3400     """Build hooks env.
3401
3402     This runs on the master, the primary and all the secondaries.
3403
3404     """
3405     env = {
3406       "DISK_NAME": self.op.disk_name,
3407       "DISK_ID": self.op.disk_id,
3408       "OLD_SECONDARY": self.old_secondary,
3409       }
3410     env.update(_BuildInstanceHookEnvByObject(self.instance))
3411     nl = [self.sstore.GetMasterNode(),
3412           self.instance.primary_node] + list(self.instance.secondary_nodes)
3413     return env, nl, nl
3414
3415   def CheckPrereq(self):
3416     """Check prerequisites.
3417
3418     This checks that the instance is in the cluster.
3419
3420     """
3421     instance = self.cfg.GetInstanceInfo(
3422       self.cfg.ExpandInstanceName(self.op.instance_name))
3423     if instance is None:
3424       raise errors.OpPrereqError("Instance '%s' not known" %
3425                                  self.op.instance_name)
3426     self.instance = instance
3427
3428     if instance.disk_template != constants.DT_REMOTE_RAID1:
3429       raise errors.OpPrereqError("Instance's disk layout is not"
3430                                  " remote_raid1.")
3431     for disk in instance.disks:
3432       if disk.iv_name == self.op.disk_name:
3433         break
3434     else:
3435       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3436                                  " instance." % self.op.disk_name)
3437     for child in disk.children:
3438       if (child.dev_type == constants.LD_DRBD7 and
3439           child.logical_id[2] == self.op.disk_id):
3440         break
3441     else:
3442       raise errors.OpPrereqError("Can't find the device with this port.")
3443
3444     if len(disk.children) < 2:
3445       raise errors.OpPrereqError("Cannot remove the last component from"
3446                                  " a mirror.")
3447     self.disk = disk
3448     self.child = child
3449     if self.child.logical_id[0] == instance.primary_node:
3450       oid = 1
3451     else:
3452       oid = 0
3453     self.old_secondary = self.child.logical_id[oid]
3454
3455   def Exec(self, feedback_fn):
3456     """Remove the mirror component
3457
3458     """
3459     instance = self.instance
3460     disk = self.disk
3461     child = self.child
3462     logger.Info("remove mirror component")
3463     self.cfg.SetDiskID(disk, instance.primary_node)
3464     if not rpc.call_blockdev_removechildren(instance.primary_node,
3465                                             disk, [child]):
3466       raise errors.OpExecError("Can't remove child from mirror.")
3467
3468     for node in child.logical_id[:2]:
3469       self.cfg.SetDiskID(child, node)
3470       if not rpc.call_blockdev_remove(node, child):
3471         logger.Error("Warning: failed to remove device from node %s,"
3472                      " continuing operation." % node)
3473
3474     disk.children.remove(child)
3475     self.cfg.AddInstance(instance)
3476
3477
3478 class LUReplaceDisks(LogicalUnit):
3479   """Replace the disks of an instance.
3480
3481   """
3482   HPATH = "mirrors-replace"
3483   HTYPE = constants.HTYPE_INSTANCE
3484   _OP_REQP = ["instance_name", "mode", "disks"]
3485
3486   def BuildHooksEnv(self):
3487     """Build hooks env.
3488
3489     This runs on the master, the primary and all the secondaries.
3490
3491     """
3492     env = {
3493       "MODE": self.op.mode,
3494       "NEW_SECONDARY": self.op.remote_node,
3495       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3496       }
3497     env.update(_BuildInstanceHookEnvByObject(self.instance))
3498     nl = [
3499       self.sstore.GetMasterNode(),
3500       self.instance.primary_node,
3501       ]
3502     if self.op.remote_node is not None:
3503       nl.append(self.op.remote_node)
3504     return env, nl, nl
3505
3506   def CheckPrereq(self):
3507     """Check prerequisites.
3508
3509     This checks that the instance is in the cluster.
3510
3511     """
3512     instance = self.cfg.GetInstanceInfo(
3513       self.cfg.ExpandInstanceName(self.op.instance_name))
3514     if instance is None:
3515       raise errors.OpPrereqError("Instance '%s' not known" %
3516                                  self.op.instance_name)
3517     self.instance = instance
3518     self.op.instance_name = instance.name
3519
3520     if instance.disk_template not in constants.DTS_NET_MIRROR:
3521       raise errors.OpPrereqError("Instance's disk layout is not"
3522                                  " network mirrored.")
3523
3524     if len(instance.secondary_nodes) != 1:
3525       raise errors.OpPrereqError("The instance has a strange layout,"
3526                                  " expected one secondary but found %d" %
3527                                  len(instance.secondary_nodes))
3528
3529     self.sec_node = instance.secondary_nodes[0]
3530
3531     remote_node = getattr(self.op, "remote_node", None)
3532     if remote_node is not None:
3533       remote_node = self.cfg.ExpandNodeName(remote_node)
3534       if remote_node is None:
3535         raise errors.OpPrereqError("Node '%s' not known" %
3536                                    self.op.remote_node)
3537       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3538     else:
3539       self.remote_node_info = None
3540     if remote_node == instance.primary_node:
3541       raise errors.OpPrereqError("The specified node is the primary node of"
3542                                  " the instance.")
3543     elif remote_node == self.sec_node:
3544       if self.op.mode == constants.REPLACE_DISK_SEC:
3545         # this is for DRBD8, where we can't execute the same mode of
3546         # replacement as for drbd7 (no different port allocated)
3547         raise errors.OpPrereqError("Same secondary given, cannot execute"
3548                                    " replacement")
3549       # the user gave the current secondary, switch to
3550       # 'no-replace-secondary' mode for drbd7
3551       remote_node = None
3552     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3553         self.op.mode != constants.REPLACE_DISK_ALL):
3554       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3555                                  " disks replacement, not individual ones")
3556     if instance.disk_template == constants.DT_DRBD8:
3557       if (self.op.mode == constants.REPLACE_DISK_ALL and
3558           remote_node is not None):
3559         # switch to replace secondary mode
3560         self.op.mode = constants.REPLACE_DISK_SEC
3561
3562       if self.op.mode == constants.REPLACE_DISK_ALL:
3563         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3564                                    " secondary disk replacement, not"
3565                                    " both at once")
3566       elif self.op.mode == constants.REPLACE_DISK_PRI:
3567         if remote_node is not None:
3568           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3569                                      " the secondary while doing a primary"
3570                                      " node disk replacement")
3571         self.tgt_node = instance.primary_node
3572         self.oth_node = instance.secondary_nodes[0]
3573       elif self.op.mode == constants.REPLACE_DISK_SEC:
3574         self.new_node = remote_node # this can be None, in which case
3575                                     # we don't change the secondary
3576         self.tgt_node = instance.secondary_nodes[0]
3577         self.oth_node = instance.primary_node
3578       else:
3579         raise errors.ProgrammerError("Unhandled disk replace mode")
3580
3581     for name in self.op.disks:
3582       if instance.FindDisk(name) is None:
3583         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3584                                    (name, instance.name))
3585     self.op.remote_node = remote_node
3586
3587   def _ExecRR1(self, feedback_fn):
3588     """Replace the disks of an instance.
3589
3590     """
3591     instance = self.instance
3592     iv_names = {}
3593     # start of work
3594     if self.op.remote_node is None:
3595       remote_node = self.sec_node
3596     else:
3597       remote_node = self.op.remote_node
3598     cfg = self.cfg
3599     for dev in instance.disks:
3600       size = dev.size
3601       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3602       names = _GenerateUniqueNames(cfg, lv_names)
3603       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3604                                        remote_node, size, names)
3605       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3606       logger.Info("adding new mirror component on secondary for %s" %
3607                   dev.iv_name)
3608       #HARDCODE
3609       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3610                                         new_drbd, False,
3611                                         _GetInstanceInfoText(instance)):
3612         raise errors.OpExecError("Failed to create new component on secondary"
3613                                  " node %s. Full abort, cleanup manually!" %
3614                                  remote_node)
3615
3616       logger.Info("adding new mirror component on primary")
3617       #HARDCODE
3618       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3619                                       instance, new_drbd,
3620                                       _GetInstanceInfoText(instance)):
3621         # remove secondary dev
3622         cfg.SetDiskID(new_drbd, remote_node)
3623         rpc.call_blockdev_remove(remote_node, new_drbd)
3624         raise errors.OpExecError("Failed to create volume on primary!"
3625                                  " Full abort, cleanup manually!!")
3626
3627       # the device exists now
3628       # call the primary node to add the mirror to md
3629       logger.Info("adding new mirror component to md")
3630       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3631                                            [new_drbd]):
3632         logger.Error("Can't add mirror compoment to md!")
3633         cfg.SetDiskID(new_drbd, remote_node)
3634         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3635           logger.Error("Can't rollback on secondary")
3636         cfg.SetDiskID(new_drbd, instance.primary_node)
3637         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3638           logger.Error("Can't rollback on primary")
3639         raise errors.OpExecError("Full abort, cleanup manually!!")
3640
3641       dev.children.append(new_drbd)
3642       cfg.AddInstance(instance)
3643
3644     # this can fail as the old devices are degraded and _WaitForSync
3645     # does a combined result over all disks, so we don't check its
3646     # return value
3647     _WaitForSync(cfg, instance, self.proc, unlock=True)
3648
3649     # so check manually all the devices
3650     for name in iv_names:
3651       dev, child, new_drbd = iv_names[name]
3652       cfg.SetDiskID(dev, instance.primary_node)
3653       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3654       if is_degr:
3655         raise errors.OpExecError("MD device %s is degraded!" % name)
3656       cfg.SetDiskID(new_drbd, instance.primary_node)
3657       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3658       if is_degr:
3659         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3660
3661     for name in iv_names:
3662       dev, child, new_drbd = iv_names[name]
3663       logger.Info("remove mirror %s component" % name)
3664       cfg.SetDiskID(dev, instance.primary_node)
3665       if not rpc.call_blockdev_removechildren(instance.primary_node,
3666                                               dev, [child]):
3667         logger.Error("Can't remove child from mirror, aborting"
3668                      " *this device cleanup*.\nYou need to cleanup manually!!")
3669         continue
3670
3671       for node in child.logical_id[:2]:
3672         logger.Info("remove child device on %s" % node)
3673         cfg.SetDiskID(child, node)
3674         if not rpc.call_blockdev_remove(node, child):
3675           logger.Error("Warning: failed to remove device from node %s,"
3676                        " continuing operation." % node)
3677
3678       dev.children.remove(child)
3679
3680       cfg.AddInstance(instance)
3681
3682   def _ExecD8DiskOnly(self, feedback_fn):
3683     """Replace a disk on the primary or secondary for dbrd8.
3684
3685     The algorithm for replace is quite complicated:
3686       - for each disk to be replaced:
3687         - create new LVs on the target node with unique names
3688         - detach old LVs from the drbd device
3689         - rename old LVs to name_replaced.<time_t>
3690         - rename new LVs to old LVs
3691         - attach the new LVs (with the old names now) to the drbd device
3692       - wait for sync across all devices
3693       - for each modified disk:
3694         - remove old LVs (which have the name name_replaces.<time_t>)
3695
3696     Failures are not very well handled.
3697
3698     """
3699     steps_total = 6
3700     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3701     instance = self.instance
3702     iv_names = {}
3703     vgname = self.cfg.GetVGName()
3704     # start of work
3705     cfg = self.cfg
3706     tgt_node = self.tgt_node
3707     oth_node = self.oth_node
3708
3709     # Step: check device activation
3710     self.proc.LogStep(1, steps_total, "check device existence")
3711     info("checking volume groups")
3712     my_vg = cfg.GetVGName()
3713     results = rpc.call_vg_list([oth_node, tgt_node])
3714     if not results:
3715       raise errors.OpExecError("Can't list volume groups on the nodes")
3716     for node in oth_node, tgt_node:
3717       res = results.get(node, False)
3718       if not res or my_vg not in res:
3719         raise errors.OpExecError("Volume group '%s' not found on %s" %
3720                                  (my_vg, node))
3721     for dev in instance.disks:
3722       if not dev.iv_name in self.op.disks:
3723         continue
3724       for node in tgt_node, oth_node:
3725         info("checking %s on %s" % (dev.iv_name, node))
3726         cfg.SetDiskID(dev, node)
3727         if not rpc.call_blockdev_find(node, dev):
3728           raise errors.OpExecError("Can't find device %s on node %s" %
3729                                    (dev.iv_name, node))
3730
3731     # Step: check other node consistency
3732     self.proc.LogStep(2, steps_total, "check peer consistency")
3733     for dev in instance.disks:
3734       if not dev.iv_name in self.op.disks:
3735         continue
3736       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3737       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3738                                    oth_node==instance.primary_node):
3739         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3740                                  " to replace disks on this node (%s)" %
3741                                  (oth_node, tgt_node))
3742
3743     # Step: create new storage
3744     self.proc.LogStep(3, steps_total, "allocate new storage")
3745     for dev in instance.disks:
3746       if not dev.iv_name in self.op.disks:
3747         continue
3748       size = dev.size
3749       cfg.SetDiskID(dev, tgt_node)
3750       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3751       names = _GenerateUniqueNames(cfg, lv_names)
3752       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3753                              logical_id=(vgname, names[0]))
3754       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3755                              logical_id=(vgname, names[1]))
3756       new_lvs = [lv_data, lv_meta]
3757       old_lvs = dev.children
3758       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3759       info("creating new local storage on %s for %s" %
3760            (tgt_node, dev.iv_name))
3761       # since we *always* want to create this LV, we use the
3762       # _Create...OnPrimary (which forces the creation), even if we
3763       # are talking about the secondary node
3764       for new_lv in new_lvs:
3765         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3766                                         _GetInstanceInfoText(instance)):
3767           raise errors.OpExecError("Failed to create new LV named '%s' on"
3768                                    " node '%s'" %
3769                                    (new_lv.logical_id[1], tgt_node))
3770
3771     # Step: for each lv, detach+rename*2+attach
3772     self.proc.LogStep(4, steps_total, "change drbd configuration")
3773     for dev, old_lvs, new_lvs in iv_names.itervalues():
3774       info("detaching %s drbd from local storage" % dev.iv_name)
3775       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3776         raise errors.OpExecError("Can't detach drbd from local storage on node"
3777                                  " %s for device %s" % (tgt_node, dev.iv_name))
3778       #dev.children = []
3779       #cfg.Update(instance)
3780
3781       # ok, we created the new LVs, so now we know we have the needed
3782       # storage; as such, we proceed on the target node to rename
3783       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3784       # using the assumption that logical_id == physical_id (which in
3785       # turn is the unique_id on that node)
3786
3787       # FIXME(iustin): use a better name for the replaced LVs
3788       temp_suffix = int(time.time())
3789       ren_fn = lambda d, suff: (d.physical_id[0],
3790                                 d.physical_id[1] + "_replaced-%s" % suff)
3791       # build the rename list based on what LVs exist on the node
3792       rlist = []
3793       for to_ren in old_lvs:
3794         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3795         if find_res is not None: # device exists
3796           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3797
3798       info("renaming the old LVs on the target node")
3799       if not rpc.call_blockdev_rename(tgt_node, rlist):
3800         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3801       # now we rename the new LVs to the old LVs
3802       info("renaming the new LVs on the target node")
3803       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3804       if not rpc.call_blockdev_rename(tgt_node, rlist):
3805         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3806
3807       for old, new in zip(old_lvs, new_lvs):
3808         new.logical_id = old.logical_id
3809         cfg.SetDiskID(new, tgt_node)
3810
3811       for disk in old_lvs:
3812         disk.logical_id = ren_fn(disk, temp_suffix)
3813         cfg.SetDiskID(disk, tgt_node)
3814
3815       # now that the new lvs have the old name, we can add them to the device
3816       info("adding new mirror component on %s" % tgt_node)
3817       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3818         for new_lv in new_lvs:
3819           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3820             warning("Can't rollback device %s", hint="manually cleanup unused"
3821                     " logical volumes")
3822         raise errors.OpExecError("Can't add local storage to drbd")
3823
3824       dev.children = new_lvs
3825       cfg.Update(instance)
3826
3827     # Step: wait for sync
3828
3829     # this can fail as the old devices are degraded and _WaitForSync
3830     # does a combined result over all disks, so we don't check its
3831     # return value
3832     self.proc.LogStep(5, steps_total, "sync devices")
3833     _WaitForSync(cfg, instance, self.proc, unlock=True)
3834
3835     # so check manually all the devices
3836     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3837       cfg.SetDiskID(dev, instance.primary_node)
3838       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3839       if is_degr:
3840         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3841
3842     # Step: remove old storage
3843     self.proc.LogStep(6, steps_total, "removing old storage")
3844     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3845       info("remove logical volumes for %s" % name)
3846       for lv in old_lvs:
3847         cfg.SetDiskID(lv, tgt_node)
3848         if not rpc.call_blockdev_remove(tgt_node, lv):
3849           warning("Can't remove old LV", hint="manually remove unused LVs")
3850           continue
3851
3852   def _ExecD8Secondary(self, feedback_fn):
3853     """Replace the secondary node for drbd8.
3854
3855     The algorithm for replace is quite complicated:
3856       - for all disks of the instance:
3857         - create new LVs on the new node with same names
3858         - shutdown the drbd device on the old secondary
3859         - disconnect the drbd network on the primary
3860         - create the drbd device on the new secondary
3861         - network attach the drbd on the primary, using an artifice:
3862           the drbd code for Attach() will connect to the network if it
3863           finds a device which is connected to the good local disks but
3864           not network enabled
3865       - wait for sync across all devices
3866       - remove all disks from the old secondary
3867
3868     Failures are not very well handled.
3869
3870     """
3871     steps_total = 6
3872     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3873     instance = self.instance
3874     iv_names = {}
3875     vgname = self.cfg.GetVGName()
3876     # start of work
3877     cfg = self.cfg
3878     old_node = self.tgt_node
3879     new_node = self.new_node
3880     pri_node = instance.primary_node
3881
3882     # Step: check device activation
3883     self.proc.LogStep(1, steps_total, "check device existence")
3884     info("checking volume groups")
3885     my_vg = cfg.GetVGName()
3886     results = rpc.call_vg_list([pri_node, new_node])
3887     if not results:
3888       raise errors.OpExecError("Can't list volume groups on the nodes")
3889     for node in pri_node, new_node:
3890       res = results.get(node, False)
3891       if not res or my_vg not in res:
3892         raise errors.OpExecError("Volume group '%s' not found on %s" %
3893                                  (my_vg, node))
3894     for dev in instance.disks:
3895       if not dev.iv_name in self.op.disks:
3896         continue
3897       info("checking %s on %s" % (dev.iv_name, pri_node))
3898       cfg.SetDiskID(dev, pri_node)
3899       if not rpc.call_blockdev_find(pri_node, dev):
3900         raise errors.OpExecError("Can't find device %s on node %s" %
3901                                  (dev.iv_name, pri_node))
3902
3903     # Step: check other node consistency
3904     self.proc.LogStep(2, steps_total, "check peer consistency")
3905     for dev in instance.disks:
3906       if not dev.iv_name in self.op.disks:
3907         continue
3908       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3909       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3910         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3911                                  " unsafe to replace the secondary" %
3912                                  pri_node)
3913
3914     # Step: create new storage
3915     self.proc.LogStep(3, steps_total, "allocate new storage")
3916     for dev in instance.disks:
3917       size = dev.size
3918       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3919       # since we *always* want to create this LV, we use the
3920       # _Create...OnPrimary (which forces the creation), even if we
3921       # are talking about the secondary node
3922       for new_lv in dev.children:
3923         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3924                                         _GetInstanceInfoText(instance)):
3925           raise errors.OpExecError("Failed to create new LV named '%s' on"
3926                                    " node '%s'" %
3927                                    (new_lv.logical_id[1], new_node))
3928
3929       iv_names[dev.iv_name] = (dev, dev.children)
3930
3931     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3932     for dev in instance.disks:
3933       size = dev.size
3934       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3935       # create new devices on new_node
3936       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3937                               logical_id=(pri_node, new_node,
3938                                           dev.logical_id[2]),
3939                               children=dev.children)
3940       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3941                                         new_drbd, False,
3942                                       _GetInstanceInfoText(instance)):
3943         raise errors.OpExecError("Failed to create new DRBD on"
3944                                  " node '%s'" % new_node)
3945
3946     for dev in instance.disks:
3947       # we have new devices, shutdown the drbd on the old secondary
3948       info("shutting down drbd for %s on old node" % dev.iv_name)
3949       cfg.SetDiskID(dev, old_node)
3950       if not rpc.call_blockdev_shutdown(old_node, dev):
3951         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3952                 hint="Please cleanup this device manually as soon as possible")
3953
3954     info("detaching primary drbds from the network (=> standalone)")
3955     done = 0
3956     for dev in instance.disks:
3957       cfg.SetDiskID(dev, pri_node)
3958       # set the physical (unique in bdev terms) id to None, meaning
3959       # detach from network
3960       dev.physical_id = (None,) * len(dev.physical_id)
3961       # and 'find' the device, which will 'fix' it to match the
3962       # standalone state
3963       if rpc.call_blockdev_find(pri_node, dev):
3964         done += 1
3965       else:
3966         warning("Failed to detach drbd %s from network, unusual case" %
3967                 dev.iv_name)
3968
3969     if not done:
3970       # no detaches succeeded (very unlikely)
3971       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3972
3973     # if we managed to detach at least one, we update all the disks of
3974     # the instance to point to the new secondary
3975     info("updating instance configuration")
3976     for dev in instance.disks:
3977       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3978       cfg.SetDiskID(dev, pri_node)
3979     cfg.Update(instance)
3980
3981     # and now perform the drbd attach
3982     info("attaching primary drbds to new secondary (standalone => connected)")
3983     failures = []
3984     for dev in instance.disks:
3985       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3986       # since the attach is smart, it's enough to 'find' the device,
3987       # it will automatically activate the network, if the physical_id
3988       # is correct
3989       cfg.SetDiskID(dev, pri_node)
3990       if not rpc.call_blockdev_find(pri_node, dev):
3991         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3992                 "please do a gnt-instance info to see the status of disks")
3993
3994     # this can fail as the old devices are degraded and _WaitForSync
3995     # does a combined result over all disks, so we don't check its
3996     # return value
3997     self.proc.LogStep(5, steps_total, "sync devices")
3998     _WaitForSync(cfg, instance, self.proc, unlock=True)
3999
4000     # so check manually all the devices
4001     for name, (dev, old_lvs) in iv_names.iteritems():
4002       cfg.SetDiskID(dev, pri_node)
4003       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4004       if is_degr:
4005         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4006
4007     self.proc.LogStep(6, steps_total, "removing old storage")
4008     for name, (dev, old_lvs) in iv_names.iteritems():
4009       info("remove logical volumes for %s" % name)
4010       for lv in old_lvs:
4011         cfg.SetDiskID(lv, old_node)
4012         if not rpc.call_blockdev_remove(old_node, lv):
4013           warning("Can't remove LV on old secondary",
4014                   hint="Cleanup stale volumes by hand")
4015
4016   def Exec(self, feedback_fn):
4017     """Execute disk replacement.
4018
4019     This dispatches the disk replacement to the appropriate handler.
4020
4021     """
4022     instance = self.instance
4023     if instance.disk_template == constants.DT_REMOTE_RAID1:
4024       fn = self._ExecRR1
4025     elif instance.disk_template == constants.DT_DRBD8:
4026       if self.op.remote_node is None:
4027         fn = self._ExecD8DiskOnly
4028       else:
4029         fn = self._ExecD8Secondary
4030     else:
4031       raise errors.ProgrammerError("Unhandled disk replacement case")
4032     return fn(feedback_fn)
4033
4034
4035 class LUQueryInstanceData(NoHooksLU):
4036   """Query runtime instance data.
4037
4038   """
4039   _OP_REQP = ["instances"]
4040
4041   def CheckPrereq(self):
4042     """Check prerequisites.
4043
4044     This only checks the optional instance list against the existing names.
4045
4046     """
4047     if not isinstance(self.op.instances, list):
4048       raise errors.OpPrereqError("Invalid argument type 'instances'")
4049     if self.op.instances:
4050       self.wanted_instances = []
4051       names = self.op.instances
4052       for name in names:
4053         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4054         if instance is None:
4055           raise errors.OpPrereqError("No such instance name '%s'" % name)
4056         self.wanted_instances.append(instance)
4057     else:
4058       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4059                                in self.cfg.GetInstanceList()]
4060     return
4061
4062
4063   def _ComputeDiskStatus(self, instance, snode, dev):
4064     """Compute block device status.
4065
4066     """
4067     self.cfg.SetDiskID(dev, instance.primary_node)
4068     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4069     if dev.dev_type in constants.LDS_DRBD:
4070       # we change the snode then (otherwise we use the one passed in)
4071       if dev.logical_id[0] == instance.primary_node:
4072         snode = dev.logical_id[1]
4073       else:
4074         snode = dev.logical_id[0]
4075
4076     if snode:
4077       self.cfg.SetDiskID(dev, snode)
4078       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4079     else:
4080       dev_sstatus = None
4081
4082     if dev.children:
4083       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4084                       for child in dev.children]
4085     else:
4086       dev_children = []
4087
4088     data = {
4089       "iv_name": dev.iv_name,
4090       "dev_type": dev.dev_type,
4091       "logical_id": dev.logical_id,
4092       "physical_id": dev.physical_id,
4093       "pstatus": dev_pstatus,
4094       "sstatus": dev_sstatus,
4095       "children": dev_children,
4096       }
4097
4098     return data
4099
4100   def Exec(self, feedback_fn):
4101     """Gather and return data"""
4102     result = {}
4103     for instance in self.wanted_instances:
4104       remote_info = rpc.call_instance_info(instance.primary_node,
4105                                                 instance.name)
4106       if remote_info and "state" in remote_info:
4107         remote_state = "up"
4108       else:
4109         remote_state = "down"
4110       if instance.status == "down":
4111         config_state = "down"
4112       else:
4113         config_state = "up"
4114
4115       disks = [self._ComputeDiskStatus(instance, None, device)
4116                for device in instance.disks]
4117
4118       idict = {
4119         "name": instance.name,
4120         "config_state": config_state,
4121         "run_state": remote_state,
4122         "pnode": instance.primary_node,
4123         "snodes": instance.secondary_nodes,
4124         "os": instance.os,
4125         "memory": instance.memory,
4126         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4127         "disks": disks,
4128         "network_port": instance.network_port,
4129         "vcpus": instance.vcpus,
4130         "kernel_path": instance.kernel_path,
4131         "initrd_path": instance.initrd_path,
4132         "hvm_boot_order": instance.hvm_boot_order,
4133         }
4134
4135       result[instance.name] = idict
4136
4137     return result
4138
4139
4140 class LUSetInstanceParms(LogicalUnit):
4141   """Modifies an instances's parameters.
4142
4143   """
4144   HPATH = "instance-modify"
4145   HTYPE = constants.HTYPE_INSTANCE
4146   _OP_REQP = ["instance_name"]
4147
4148   def BuildHooksEnv(self):
4149     """Build hooks env.
4150
4151     This runs on the master, primary and secondaries.
4152
4153     """
4154     args = dict()
4155     if self.mem:
4156       args['memory'] = self.mem
4157     if self.vcpus:
4158       args['vcpus'] = self.vcpus
4159     if self.do_ip or self.do_bridge or self.mac:
4160       if self.do_ip:
4161         ip = self.ip
4162       else:
4163         ip = self.instance.nics[0].ip
4164       if self.bridge:
4165         bridge = self.bridge
4166       else:
4167         bridge = self.instance.nics[0].bridge
4168       if self.mac:
4169         mac = self.mac
4170       else:
4171         mac = self.instance.nics[0].mac
4172       args['nics'] = [(ip, bridge, mac)]
4173     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4174     nl = [self.sstore.GetMasterNode(),
4175           self.instance.primary_node] + list(self.instance.secondary_nodes)
4176     return env, nl, nl
4177
4178   def CheckPrereq(self):
4179     """Check prerequisites.
4180
4181     This only checks the instance list against the existing names.
4182
4183     """
4184     self.mem = getattr(self.op, "mem", None)
4185     self.vcpus = getattr(self.op, "vcpus", None)
4186     self.ip = getattr(self.op, "ip", None)
4187     self.mac = getattr(self.op, "mac", None)
4188     self.bridge = getattr(self.op, "bridge", None)
4189     self.kernel_path = getattr(self.op, "kernel_path", None)
4190     self.initrd_path = getattr(self.op, "initrd_path", None)
4191     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4192     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4193                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4194     if all_parms.count(None) == len(all_parms):
4195       raise errors.OpPrereqError("No changes submitted")
4196     if self.mem is not None:
4197       try:
4198         self.mem = int(self.mem)
4199       except ValueError, err:
4200         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4201     if self.vcpus is not None:
4202       try:
4203         self.vcpus = int(self.vcpus)
4204       except ValueError, err:
4205         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4206     if self.ip is not None:
4207       self.do_ip = True
4208       if self.ip.lower() == "none":
4209         self.ip = None
4210       else:
4211         if not utils.IsValidIP(self.ip):
4212           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4213     else:
4214       self.do_ip = False
4215     self.do_bridge = (self.bridge is not None)
4216     if self.mac is not None:
4217       if self.cfg.IsMacInUse(self.mac):
4218         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4219                                    self.mac)
4220       if not utils.IsValidMac(self.mac):
4221         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4222
4223     if self.kernel_path is not None:
4224       self.do_kernel_path = True
4225       if self.kernel_path == constants.VALUE_NONE:
4226         raise errors.OpPrereqError("Can't set instance to no kernel")
4227
4228       if self.kernel_path != constants.VALUE_DEFAULT:
4229         if not os.path.isabs(self.kernel_path):
4230           raise errors.OpPrereqError("The kernel path must be an absolute"
4231                                     " filename")
4232     else:
4233       self.do_kernel_path = False
4234
4235     if self.initrd_path is not None:
4236       self.do_initrd_path = True
4237       if self.initrd_path not in (constants.VALUE_NONE,
4238                                   constants.VALUE_DEFAULT):
4239         if not os.path.isabs(self.initrd_path):
4240           raise errors.OpPrereqError("The initrd path must be an absolute"
4241                                     " filename")
4242     else:
4243       self.do_initrd_path = False
4244
4245     # boot order verification
4246     if self.hvm_boot_order is not None:
4247       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4248         if len(self.hvm_boot_order.strip("acdn")) != 0:
4249           raise errors.OpPrereqError("invalid boot order specified,"
4250                                      " must be one or more of [acdn]"
4251                                      " or 'default'")
4252
4253     instance = self.cfg.GetInstanceInfo(
4254       self.cfg.ExpandInstanceName(self.op.instance_name))
4255     if instance is None:
4256       raise errors.OpPrereqError("No such instance name '%s'" %
4257                                  self.op.instance_name)
4258     self.op.instance_name = instance.name
4259     self.instance = instance
4260     return
4261
4262   def Exec(self, feedback_fn):
4263     """Modifies an instance.
4264
4265     All parameters take effect only at the next restart of the instance.
4266     """
4267     result = []
4268     instance = self.instance
4269     if self.mem:
4270       instance.memory = self.mem
4271       result.append(("mem", self.mem))
4272     if self.vcpus:
4273       instance.vcpus = self.vcpus
4274       result.append(("vcpus",  self.vcpus))
4275     if self.do_ip:
4276       instance.nics[0].ip = self.ip
4277       result.append(("ip", self.ip))
4278     if self.bridge:
4279       instance.nics[0].bridge = self.bridge
4280       result.append(("bridge", self.bridge))
4281     if self.mac:
4282       instance.nics[0].mac = self.mac
4283       result.append(("mac", self.mac))
4284     if self.do_kernel_path:
4285       instance.kernel_path = self.kernel_path
4286       result.append(("kernel_path", self.kernel_path))
4287     if self.do_initrd_path:
4288       instance.initrd_path = self.initrd_path
4289       result.append(("initrd_path", self.initrd_path))
4290     if self.hvm_boot_order:
4291       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4292         instance.hvm_boot_order = None
4293       else:
4294         instance.hvm_boot_order = self.hvm_boot_order
4295       result.append(("hvm_boot_order", self.hvm_boot_order))
4296
4297     self.cfg.AddInstance(instance)
4298
4299     return result
4300
4301
4302 class LUQueryExports(NoHooksLU):
4303   """Query the exports list
4304
4305   """
4306   _OP_REQP = []
4307
4308   def CheckPrereq(self):
4309     """Check that the nodelist contains only existing nodes.
4310
4311     """
4312     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4313
4314   def Exec(self, feedback_fn):
4315     """Compute the list of all the exported system images.
4316
4317     Returns:
4318       a dictionary with the structure node->(export-list)
4319       where export-list is a list of the instances exported on
4320       that node.
4321
4322     """
4323     return rpc.call_export_list(self.nodes)
4324
4325
4326 class LUExportInstance(LogicalUnit):
4327   """Export an instance to an image in the cluster.
4328
4329   """
4330   HPATH = "instance-export"
4331   HTYPE = constants.HTYPE_INSTANCE
4332   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4333
4334   def BuildHooksEnv(self):
4335     """Build hooks env.
4336
4337     This will run on the master, primary node and target node.
4338
4339     """
4340     env = {
4341       "EXPORT_NODE": self.op.target_node,
4342       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4343       }
4344     env.update(_BuildInstanceHookEnvByObject(self.instance))
4345     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4346           self.op.target_node]
4347     return env, nl, nl
4348
4349   def CheckPrereq(self):
4350     """Check prerequisites.
4351
4352     This checks that the instance name is a valid one.
4353
4354     """
4355     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4356     self.instance = self.cfg.GetInstanceInfo(instance_name)
4357     if self.instance is None:
4358       raise errors.OpPrereqError("Instance '%s' not found" %
4359                                  self.op.instance_name)
4360
4361     # node verification
4362     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4363     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4364
4365     if self.dst_node is None:
4366       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4367                                  self.op.target_node)
4368     self.op.target_node = self.dst_node.name
4369
4370   def Exec(self, feedback_fn):
4371     """Export an instance to an image in the cluster.
4372
4373     """
4374     instance = self.instance
4375     dst_node = self.dst_node
4376     src_node = instance.primary_node
4377     if self.op.shutdown:
4378       # shutdown the instance, but not the disks
4379       if not rpc.call_instance_shutdown(src_node, instance):
4380          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4381                                  (instance.name, source_node))
4382
4383     vgname = self.cfg.GetVGName()
4384
4385     snap_disks = []
4386
4387     try:
4388       for disk in instance.disks:
4389         if disk.iv_name == "sda":
4390           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4391           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4392
4393           if not new_dev_name:
4394             logger.Error("could not snapshot block device %s on node %s" %
4395                          (disk.logical_id[1], src_node))
4396           else:
4397             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4398                                       logical_id=(vgname, new_dev_name),
4399                                       physical_id=(vgname, new_dev_name),
4400                                       iv_name=disk.iv_name)
4401             snap_disks.append(new_dev)
4402
4403     finally:
4404       if self.op.shutdown and instance.status == "up":
4405         if not rpc.call_instance_start(src_node, instance, None):
4406           _ShutdownInstanceDisks(instance, self.cfg)
4407           raise errors.OpExecError("Could not start instance")
4408
4409     # TODO: check for size
4410
4411     for dev in snap_disks:
4412       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4413                                            instance):
4414         logger.Error("could not export block device %s from node"
4415                      " %s to node %s" %
4416                      (dev.logical_id[1], src_node, dst_node.name))
4417       if not rpc.call_blockdev_remove(src_node, dev):
4418         logger.Error("could not remove snapshot block device %s from"
4419                      " node %s" % (dev.logical_id[1], src_node))
4420
4421     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4422       logger.Error("could not finalize export for instance %s on node %s" %
4423                    (instance.name, dst_node.name))
4424
4425     nodelist = self.cfg.GetNodeList()
4426     nodelist.remove(dst_node.name)
4427
4428     # on one-node clusters nodelist will be empty after the removal
4429     # if we proceed the backup would be removed because OpQueryExports
4430     # substitutes an empty list with the full cluster node list.
4431     if nodelist:
4432       op = opcodes.OpQueryExports(nodes=nodelist)
4433       exportlist = self.proc.ChainOpCode(op)
4434       for node in exportlist:
4435         if instance.name in exportlist[node]:
4436           if not rpc.call_export_remove(node, instance.name):
4437             logger.Error("could not remove older export for instance %s"
4438                          " on node %s" % (instance.name, node))
4439
4440
4441 class TagsLU(NoHooksLU):
4442   """Generic tags LU.
4443
4444   This is an abstract class which is the parent of all the other tags LUs.
4445
4446   """
4447   def CheckPrereq(self):
4448     """Check prerequisites.
4449
4450     """
4451     if self.op.kind == constants.TAG_CLUSTER:
4452       self.target = self.cfg.GetClusterInfo()
4453     elif self.op.kind == constants.TAG_NODE:
4454       name = self.cfg.ExpandNodeName(self.op.name)
4455       if name is None:
4456         raise errors.OpPrereqError("Invalid node name (%s)" %
4457                                    (self.op.name,))
4458       self.op.name = name
4459       self.target = self.cfg.GetNodeInfo(name)
4460     elif self.op.kind == constants.TAG_INSTANCE:
4461       name = self.cfg.ExpandInstanceName(self.op.name)
4462       if name is None:
4463         raise errors.OpPrereqError("Invalid instance name (%s)" %
4464                                    (self.op.name,))
4465       self.op.name = name
4466       self.target = self.cfg.GetInstanceInfo(name)
4467     else:
4468       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4469                                  str(self.op.kind))
4470
4471
4472 class LUGetTags(TagsLU):
4473   """Returns the tags of a given object.
4474
4475   """
4476   _OP_REQP = ["kind", "name"]
4477
4478   def Exec(self, feedback_fn):
4479     """Returns the tag list.
4480
4481     """
4482     return self.target.GetTags()
4483
4484
4485 class LUSearchTags(NoHooksLU):
4486   """Searches the tags for a given pattern.
4487
4488   """
4489   _OP_REQP = ["pattern"]
4490
4491   def CheckPrereq(self):
4492     """Check prerequisites.
4493
4494     This checks the pattern passed for validity by compiling it.
4495
4496     """
4497     try:
4498       self.re = re.compile(self.op.pattern)
4499     except re.error, err:
4500       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4501                                  (self.op.pattern, err))
4502
4503   def Exec(self, feedback_fn):
4504     """Returns the tag list.
4505
4506     """
4507     cfg = self.cfg
4508     tgts = [("/cluster", cfg.GetClusterInfo())]
4509     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4510     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4511     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4512     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4513     results = []
4514     for path, target in tgts:
4515       for tag in target.GetTags():
4516         if self.re.search(tag):
4517           results.append((path, tag))
4518     return results
4519
4520
4521 class LUAddTags(TagsLU):
4522   """Sets a tag on a given object.
4523
4524   """
4525   _OP_REQP = ["kind", "name", "tags"]
4526
4527   def CheckPrereq(self):
4528     """Check prerequisites.
4529
4530     This checks the type and length of the tag name and value.
4531
4532     """
4533     TagsLU.CheckPrereq(self)
4534     for tag in self.op.tags:
4535       objects.TaggableObject.ValidateTag(tag)
4536
4537   def Exec(self, feedback_fn):
4538     """Sets the tag.
4539
4540     """
4541     try:
4542       for tag in self.op.tags:
4543         self.target.AddTag(tag)
4544     except errors.TagError, err:
4545       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4546     try:
4547       self.cfg.Update(self.target)
4548     except errors.ConfigurationError:
4549       raise errors.OpRetryError("There has been a modification to the"
4550                                 " config file and the operation has been"
4551                                 " aborted. Please retry.")
4552
4553
4554 class LUDelTags(TagsLU):
4555   """Delete a list of tags from a given object.
4556
4557   """
4558   _OP_REQP = ["kind", "name", "tags"]
4559
4560   def CheckPrereq(self):
4561     """Check prerequisites.
4562
4563     This checks that we have the given tag.
4564
4565     """
4566     TagsLU.CheckPrereq(self)
4567     for tag in self.op.tags:
4568       objects.TaggableObject.ValidateTag(tag)
4569     del_tags = frozenset(self.op.tags)
4570     cur_tags = self.target.GetTags()
4571     if not del_tags <= cur_tags:
4572       diff_tags = del_tags - cur_tags
4573       diff_names = ["'%s'" % tag for tag in diff_tags]
4574       diff_names.sort()
4575       raise errors.OpPrereqError("Tag(s) %s not found" %
4576                                  (",".join(diff_names)))
4577
4578   def Exec(self, feedback_fn):
4579     """Remove the tag from the object.
4580
4581     """
4582     for tag in self.op.tags:
4583       self.target.RemoveTag(tag)
4584     try:
4585       self.cfg.Update(self.target)
4586     except errors.ConfigurationError:
4587       raise errors.OpRetryError("There has been a modification to the"
4588                                 " config file and the operation has been"
4589                                 " aborted. Please retry.")
4590
4591 class LUTestDelay(NoHooksLU):
4592   """Sleep for a specified amount of time.
4593
4594   This LU sleeps on the master and/or nodes for a specified amoutn of
4595   time.
4596
4597   """
4598   _OP_REQP = ["duration", "on_master", "on_nodes"]
4599
4600   def CheckPrereq(self):
4601     """Check prerequisites.
4602
4603     This checks that we have a good list of nodes and/or the duration
4604     is valid.
4605
4606     """
4607
4608     if self.op.on_nodes:
4609       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4610
4611   def Exec(self, feedback_fn):
4612     """Do the actual sleep.
4613
4614     """
4615     if self.op.on_master:
4616       if not utils.TestDelay(self.op.duration):
4617         raise errors.OpExecError("Error during master delay test")
4618     if self.op.on_nodes:
4619       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4620       if not result:
4621         raise errors.OpExecError("Complete failure from rpc call")
4622       for node, node_result in result.items():
4623         if not node_result:
4624           raise errors.OpExecError("Failure during rpc call to node %s,"
4625                                    " result: %s" % (node, node_result))
4626
4627
4628 def _AllocatorGetClusterData(cfg, sstore):
4629   """Compute the generic allocator input data.
4630
4631   This is the data that is independent of the actual operation.
4632
4633   """
4634   # cluster data
4635   data = {
4636     "version": 1,
4637     "cluster_name": sstore.GetClusterName(),
4638     "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4639     # we don't have job IDs
4640     }
4641
4642   # node data
4643   node_results = {}
4644   node_list = cfg.GetNodeList()
4645   node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4646   for nname in node_list:
4647     ninfo = cfg.GetNodeInfo(nname)
4648     if nname not in node_data or not isinstance(node_data[nname], dict):
4649       raise errors.OpExecError("Can't get data for node %s" % nname)
4650     remote_info = node_data[nname]
4651     for attr in ['memory_total', 'memory_free',
4652                  'vg_size', 'vg_free']:
4653       if attr not in remote_info:
4654         raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4655                                  (nname, attr))
4656       try:
4657         int(remote_info[attr])
4658       except ValueError, err:
4659         raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4660                                  " %s" % (nname, attr, str(err)))
4661     pnr = {
4662       "tags": list(ninfo.GetTags()),
4663       "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4664       "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4665       "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4666       "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4667       "primary_ip": ninfo.primary_ip,
4668       "secondary_ip": ninfo.secondary_ip,
4669       }
4670     node_results[nname] = pnr
4671   data["nodes"] = node_results
4672
4673   # instance data
4674   instance_data = {}
4675   i_list = cfg.GetInstanceList()
4676   for iname in i_list:
4677     iinfo = cfg.GetInstanceInfo(iname)
4678     nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4679                 for n in iinfo.nics]
4680     pir = {
4681       "tags": list(iinfo.GetTags()),
4682       "should_run": iinfo.status == "up",
4683       "vcpus": iinfo.vcpus,
4684       "memory": iinfo.memory,
4685       "os": iinfo.os,
4686       "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4687       "nics": nic_data,
4688       "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4689       "disk_template": iinfo.disk_template,
4690       }
4691     instance_data[iname] = pir
4692
4693   data["instances"] = instance_data
4694
4695   return data
4696
4697
4698 def _AllocatorAddNewInstance(data, op):
4699   """Add new instance data to allocator structure.
4700
4701   This in combination with _AllocatorGetClusterData will create the
4702   correct structure needed as input for the allocator.
4703
4704   The checks for the completeness of the opcode must have already been
4705   done.
4706
4707   """
4708   request = {
4709     "type": "allocate",
4710     "name": op.name,
4711     "disk_template": op.disk_template,
4712     "tags": op.tags,
4713     "os": op.os,
4714     "vcpus": op.vcpus,
4715     "memory": op.mem_size,
4716     "disks": op.disks,
4717     "nics": op.nics,
4718     }
4719   data["request"] = request
4720
4721
4722 def _AllocatorAddRelocateInstance(data, op):
4723   """Add relocate instance data to allocator structure.
4724
4725   This in combination with _AllocatorGetClusterData will create the
4726   correct structure needed as input for the allocator.
4727
4728   The checks for the completeness of the opcode must have already been
4729   done.
4730
4731   """
4732   request = {
4733     "type": "replace_secondary",
4734     "name": op.name,
4735     }
4736   data["request"] = request
4737
4738
4739 class LUTestAllocator(NoHooksLU):
4740   """Run allocator tests.
4741
4742   This LU runs the allocator tests
4743
4744   """
4745   _OP_REQP = ["direction", "mode", "name"]
4746
4747   def CheckPrereq(self):
4748     """Check prerequisites.
4749
4750     This checks the opcode parameters depending on the director and mode test.
4751
4752     """
4753     if self.op.mode == constants.ALF_MODE_ALLOC:
4754       for attr in ["name", "mem_size", "disks", "disk_template",
4755                    "os", "tags", "nics", "vcpus"]:
4756         if not hasattr(self.op, attr):
4757           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4758                                      attr)
4759       iname = self.cfg.ExpandInstanceName(self.op.name)
4760       if iname is not None:
4761         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4762                                    iname)
4763       if not isinstance(self.op.nics, list):
4764         raise errors.OpPrereqError("Invalid parameter 'nics'")
4765       for row in self.op.nics:
4766         if (not isinstance(row, dict) or
4767             "mac" not in row or
4768             "ip" not in row or
4769             "bridge" not in row):
4770           raise errors.OpPrereqError("Invalid contents of the"
4771                                      " 'nics' parameter")
4772       if not isinstance(self.op.disks, list):
4773         raise errors.OpPrereqError("Invalid parameter 'disks'")
4774       for row in self.op.disks:
4775         if (not isinstance(row, dict) or
4776             "size" not in row or
4777             not isinstance(row["size"], int) or
4778             "mode" not in row or
4779             row["mode"] not in ['r', 'w']):
4780           raise errors.OpPrereqError("Invalid contents of the"
4781                                      " 'disks' parameter")
4782     elif self.op.mode == constants.ALF_MODE_RELOC:
4783       if not hasattr(self.op, "name"):
4784         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4785       fname = self.cfg.ExpandInstanceName(self.op.name)
4786       if fname is None:
4787         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4788                                    self.op.name)
4789       self.op.name = fname
4790     else:
4791       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4792                                  self.op.mode)
4793
4794     if self.op.direction == constants.ALF_DIR_OUT:
4795       if not hasattr(self.op, "allocator"):
4796         raise errors.OpPrereqError("Missing allocator name")
4797       raise errors.OpPrereqError("Allocator out mode not supported yet")
4798     elif self.op.direction != constants.ALF_DIR_IN:
4799       raise errors.OpPrereqError("Wrong allocator test '%s'" %
4800                                  self.op.direction)
4801
4802   def Exec(self, feedback_fn):
4803     """Run the allocator test.
4804
4805     """
4806     data = _AllocatorGetClusterData(self.cfg, self.sstore)
4807     if self.op.mode == constants.ALF_MODE_ALLOC:
4808       _AllocatorAddNewInstance(data, self.op)
4809     else:
4810       _AllocatorAddRelocateInstance(data, self.op)
4811
4812     if _JSON_INDENT is None:
4813       text = simplejson.dumps(data)
4814     else:
4815       text = simplejson.dumps(data, indent=_JSON_INDENT)
4816     return text