Don't send gratuitous ARP if master IP setup fails
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.SPICE_CERT_FILE,
200     constants.SPICE_CACERT_FILE,
201     constants.RAPI_USERS_FILE,
202     constants.CONFD_HMAC_KEY,
203     constants.CLUSTER_DOMAIN_SECRET_FILE,
204     ])
205
206   for hv_name in constants.HYPER_TYPES:
207     hv_class = hypervisor.GetHypervisorClass(hv_name)
208     allowed_files.update(hv_class.GetAncillaryFiles())
209
210   return frozenset(allowed_files)
211
212
213 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
214
215
216 def JobQueuePurge():
217   """Removes job queue files and archived jobs.
218
219   @rtype: tuple
220   @return: True, None
221
222   """
223   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
225
226
227 def GetMasterInfo():
228   """Returns master information.
229
230   This is an utility function to compute master information, either
231   for consumption here or from the node daemon.
232
233   @rtype: tuple
234   @return: master_netdev, master_ip, master_name, primary_ip_family,
235     master_netmask
236   @raise RPCFail: in case of errors
237
238   """
239   try:
240     cfg = _GetConfig()
241     master_netdev = cfg.GetMasterNetdev()
242     master_ip = cfg.GetMasterIP()
243     master_netmask = cfg.GetMasterNetmask()
244     master_node = cfg.GetMasterNode()
245     primary_ip_family = cfg.GetPrimaryIPFamily()
246   except errors.ConfigurationError, err:
247     _Fail("Cluster configuration incomplete: %s", err, exc=True)
248   return (master_netdev, master_ip, master_node, primary_ip_family,
249       master_netmask)
250
251
252 def ActivateMasterIp():
253   """Activate the IP address of the master daemon.
254
255   """
256   # GetMasterInfo will raise an exception if not able to return data
257   master_netdev, master_ip, _, family, master_netmask = GetMasterInfo()
258
259   err_msg = None
260   if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
261     if netutils.IPAddress.Own(master_ip):
262       # we already have the ip:
263       logging.debug("Master IP already configured, doing nothing")
264     else:
265       err_msg = "Someone else has the master ip, not activating"
266       logging.error(err_msg)
267   else:
268     ipcls = netutils.IP4Address
269     if family == netutils.IP6Address.family:
270       ipcls = netutils.IP6Address
271
272     result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
273                            "%s/%s" % (master_ip, master_netmask),
274                            "dev", master_netdev, "label",
275                            "%s:0" % master_netdev])
276     if result.failed:
277       err_msg = "Can't activate master IP: %s" % result.output
278       logging.error(err_msg)
279
280     else:
281       # we ignore the exit code of the following cmds
282       if ipcls == netutils.IP4Address:
283         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
284                       master_ip, master_ip])
285       elif ipcls == netutils.IP6Address:
286         try:
287           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
288         except errors.OpExecError:
289           # TODO: Better error reporting
290           logging.warning("Can't execute ndisc6, please install if missing")
291
292   if err_msg:
293     _Fail(err_msg)
294
295
296 def StartMasterDaemons(no_voting):
297   """Activate local node as master node.
298
299   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
300
301   @type no_voting: boolean
302   @param no_voting: whether to start ganeti-masterd without a node vote
303       but still non-interactively
304   @rtype: None
305
306   """
307
308   if no_voting:
309     masterd_args = "--no-voting --yes-do-it"
310   else:
311     masterd_args = ""
312
313   env = {
314     "EXTRA_MASTERD_ARGS": masterd_args,
315     }
316
317   result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
318   if result.failed:
319     msg = "Can't start Ganeti master: %s" % result.output
320     logging.error(msg)
321     _Fail(msg)
322
323
324 def DeactivateMasterIp():
325   """Deactivate the master IP on this node.
326
327   """
328   # TODO: log and report back to the caller the error failures; we
329   # need to decide in which case we fail the RPC for this
330
331   # GetMasterInfo will raise an exception if not able to return data
332   master_netdev, master_ip, _, _, master_netmask = GetMasterInfo()
333
334   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
335                          "%s/%s" % (master_ip, master_netmask),
336                          "dev", master_netdev])
337   if result.failed:
338     logging.error("Can't remove the master IP, error: %s", result.output)
339     # but otherwise ignore the failure
340
341
342 def StopMasterDaemons():
343   """Stop the master daemons on this node.
344
345   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
346
347   @rtype: None
348
349   """
350   # TODO: log and report back to the caller the error failures; we
351   # need to decide in which case we fail the RPC for this
352
353   result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
354   if result.failed:
355     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
356                   " and error %s",
357                   result.cmd, result.exit_code, result.output)
358
359
360 def ChangeMasterNetmask(netmask):
361   """Change the netmask of the master IP.
362
363   """
364   master_netdev, master_ip, _, _, old_netmask = GetMasterInfo()
365   if old_netmask == netmask:
366     return
367
368   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
369                          "%s/%s" % (master_ip, netmask),
370                          "dev", master_netdev, "label",
371                          "%s:0" % master_netdev])
372   if result.failed:
373     _Fail("Could not change the master IP netmask")
374
375   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
376                          "%s/%s" % (master_ip, old_netmask),
377                          "dev", master_netdev, "label",
378                          "%s:0" % master_netdev])
379   if result.failed:
380     _Fail("Could not change the master IP netmask")
381
382
383 def EtcHostsModify(mode, host, ip):
384   """Modify a host entry in /etc/hosts.
385
386   @param mode: The mode to operate. Either add or remove entry
387   @param host: The host to operate on
388   @param ip: The ip associated with the entry
389
390   """
391   if mode == constants.ETC_HOSTS_ADD:
392     if not ip:
393       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
394               " present")
395     utils.AddHostToEtcHosts(host, ip)
396   elif mode == constants.ETC_HOSTS_REMOVE:
397     if ip:
398       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
399               " parameter is present")
400     utils.RemoveHostFromEtcHosts(host)
401   else:
402     RPCFail("Mode not supported")
403
404
405 def LeaveCluster(modify_ssh_setup):
406   """Cleans up and remove the current node.
407
408   This function cleans up and prepares the current node to be removed
409   from the cluster.
410
411   If processing is successful, then it raises an
412   L{errors.QuitGanetiException} which is used as a special case to
413   shutdown the node daemon.
414
415   @param modify_ssh_setup: boolean
416
417   """
418   _CleanDirectory(constants.DATA_DIR)
419   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
420   JobQueuePurge()
421
422   if modify_ssh_setup:
423     try:
424       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
425
426       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
427
428       utils.RemoveFile(priv_key)
429       utils.RemoveFile(pub_key)
430     except errors.OpExecError:
431       logging.exception("Error while processing ssh files")
432
433   try:
434     utils.RemoveFile(constants.CONFD_HMAC_KEY)
435     utils.RemoveFile(constants.RAPI_CERT_FILE)
436     utils.RemoveFile(constants.SPICE_CERT_FILE)
437     utils.RemoveFile(constants.SPICE_CACERT_FILE)
438     utils.RemoveFile(constants.NODED_CERT_FILE)
439   except: # pylint: disable=W0702
440     logging.exception("Error while removing cluster secrets")
441
442   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
443   if result.failed:
444     logging.error("Command %s failed with exitcode %s and error %s",
445                   result.cmd, result.exit_code, result.output)
446
447   # Raise a custom exception (handled in ganeti-noded)
448   raise errors.QuitGanetiException(True, "Shutdown scheduled")
449
450
451 def GetNodeInfo(vgname, hypervisor_type):
452   """Gives back a hash with different information about the node.
453
454   @type vgname: C{string}
455   @param vgname: the name of the volume group to ask for disk space information
456   @type hypervisor_type: C{str}
457   @param hypervisor_type: the name of the hypervisor to ask for
458       memory information
459   @rtype: C{dict}
460   @return: dictionary with the following keys:
461       - vg_size is the size of the configured volume group in MiB
462       - vg_free is the free size of the volume group in MiB
463       - memory_dom0 is the memory allocated for domain0 in MiB
464       - memory_free is the currently available (free) ram in MiB
465       - memory_total is the total number of ram in MiB
466       - hv_version: the hypervisor version, if available
467
468   """
469   outputarray = {}
470
471   if vgname is not None:
472     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
473     vg_free = vg_size = None
474     if vginfo:
475       vg_free = int(round(vginfo[0][0], 0))
476       vg_size = int(round(vginfo[0][1], 0))
477     outputarray["vg_size"] = vg_size
478     outputarray["vg_free"] = vg_free
479
480   if hypervisor_type is not None:
481     hyper = hypervisor.GetHypervisor(hypervisor_type)
482     hyp_info = hyper.GetNodeInfo()
483     if hyp_info is not None:
484       outputarray.update(hyp_info)
485
486   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
487
488   return outputarray
489
490
491 def VerifyNode(what, cluster_name):
492   """Verify the status of the local node.
493
494   Based on the input L{what} parameter, various checks are done on the
495   local node.
496
497   If the I{filelist} key is present, this list of
498   files is checksummed and the file/checksum pairs are returned.
499
500   If the I{nodelist} key is present, we check that we have
501   connectivity via ssh with the target nodes (and check the hostname
502   report).
503
504   If the I{node-net-test} key is present, we check that we have
505   connectivity to the given nodes via both primary IP and, if
506   applicable, secondary IPs.
507
508   @type what: C{dict}
509   @param what: a dictionary of things to check:
510       - filelist: list of files for which to compute checksums
511       - nodelist: list of nodes we should check ssh communication with
512       - node-net-test: list of nodes we should check node daemon port
513         connectivity with
514       - hypervisor: list with hypervisors to run the verify for
515   @rtype: dict
516   @return: a dictionary with the same keys as the input dict, and
517       values representing the result of the checks
518
519   """
520   result = {}
521   my_name = netutils.Hostname.GetSysName()
522   port = netutils.GetDaemonPort(constants.NODED)
523   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
524
525   if constants.NV_HYPERVISOR in what and vm_capable:
526     result[constants.NV_HYPERVISOR] = tmp = {}
527     for hv_name in what[constants.NV_HYPERVISOR]:
528       try:
529         val = hypervisor.GetHypervisor(hv_name).Verify()
530       except errors.HypervisorError, err:
531         val = "Error while checking hypervisor: %s" % str(err)
532       tmp[hv_name] = val
533
534   if constants.NV_HVPARAMS in what and vm_capable:
535     result[constants.NV_HVPARAMS] = tmp = []
536     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
537       try:
538         logging.info("Validating hv %s, %s", hv_name, hvparms)
539         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
540       except errors.HypervisorError, err:
541         tmp.append((source, hv_name, str(err)))
542
543   if constants.NV_FILELIST in what:
544     result[constants.NV_FILELIST] = utils.FingerprintFiles(
545       what[constants.NV_FILELIST])
546
547   if constants.NV_NODELIST in what:
548     (nodes, bynode) = what[constants.NV_NODELIST]
549
550     # Add nodes from other groups (different for each node)
551     try:
552       nodes.extend(bynode[my_name])
553     except KeyError:
554       pass
555
556     # Use a random order
557     random.shuffle(nodes)
558
559     # Try to contact all nodes
560     val = {}
561     for node in nodes:
562       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
563       if not success:
564         val[node] = message
565
566     result[constants.NV_NODELIST] = val
567
568   if constants.NV_NODENETTEST in what:
569     result[constants.NV_NODENETTEST] = tmp = {}
570     my_pip = my_sip = None
571     for name, pip, sip in what[constants.NV_NODENETTEST]:
572       if name == my_name:
573         my_pip = pip
574         my_sip = sip
575         break
576     if not my_pip:
577       tmp[my_name] = ("Can't find my own primary/secondary IP"
578                       " in the node list")
579     else:
580       for name, pip, sip in what[constants.NV_NODENETTEST]:
581         fail = []
582         if not netutils.TcpPing(pip, port, source=my_pip):
583           fail.append("primary")
584         if sip != pip:
585           if not netutils.TcpPing(sip, port, source=my_sip):
586             fail.append("secondary")
587         if fail:
588           tmp[name] = ("failure using the %s interface(s)" %
589                        " and ".join(fail))
590
591   if constants.NV_MASTERIP in what:
592     # FIXME: add checks on incoming data structures (here and in the
593     # rest of the function)
594     master_name, master_ip = what[constants.NV_MASTERIP]
595     if master_name == my_name:
596       source = constants.IP4_ADDRESS_LOCALHOST
597     else:
598       source = None
599     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
600                                                   source=source)
601
602   if constants.NV_OOB_PATHS in what:
603     result[constants.NV_OOB_PATHS] = tmp = []
604     for path in what[constants.NV_OOB_PATHS]:
605       try:
606         st = os.stat(path)
607       except OSError, err:
608         tmp.append("error stating out of band helper: %s" % err)
609       else:
610         if stat.S_ISREG(st.st_mode):
611           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
612             tmp.append(None)
613           else:
614             tmp.append("out of band helper %s is not executable" % path)
615         else:
616           tmp.append("out of band helper %s is not a file" % path)
617
618   if constants.NV_LVLIST in what and vm_capable:
619     try:
620       val = GetVolumeList(utils.ListVolumeGroups().keys())
621     except RPCFail, err:
622       val = str(err)
623     result[constants.NV_LVLIST] = val
624
625   if constants.NV_INSTANCELIST in what and vm_capable:
626     # GetInstanceList can fail
627     try:
628       val = GetInstanceList(what[constants.NV_INSTANCELIST])
629     except RPCFail, err:
630       val = str(err)
631     result[constants.NV_INSTANCELIST] = val
632
633   if constants.NV_VGLIST in what and vm_capable:
634     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
635
636   if constants.NV_PVLIST in what and vm_capable:
637     result[constants.NV_PVLIST] = \
638       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
639                                    filter_allocatable=False)
640
641   if constants.NV_VERSION in what:
642     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
643                                     constants.RELEASE_VERSION)
644
645   if constants.NV_HVINFO in what and vm_capable:
646     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
647     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
648
649   if constants.NV_DRBDLIST in what and vm_capable:
650     try:
651       used_minors = bdev.DRBD8.GetUsedDevs().keys()
652     except errors.BlockDeviceError, err:
653       logging.warning("Can't get used minors list", exc_info=True)
654       used_minors = str(err)
655     result[constants.NV_DRBDLIST] = used_minors
656
657   if constants.NV_DRBDHELPER in what and vm_capable:
658     status = True
659     try:
660       payload = bdev.BaseDRBD.GetUsermodeHelper()
661     except errors.BlockDeviceError, err:
662       logging.error("Can't get DRBD usermode helper: %s", str(err))
663       status = False
664       payload = str(err)
665     result[constants.NV_DRBDHELPER] = (status, payload)
666
667   if constants.NV_NODESETUP in what:
668     result[constants.NV_NODESETUP] = tmpr = []
669     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
670       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
671                   " under /sys, missing required directories /sys/block"
672                   " and /sys/class/net")
673     if (not os.path.isdir("/proc/sys") or
674         not os.path.isfile("/proc/sysrq-trigger")):
675       tmpr.append("The procfs filesystem doesn't seem to be mounted"
676                   " under /proc, missing required directory /proc/sys and"
677                   " the file /proc/sysrq-trigger")
678
679   if constants.NV_TIME in what:
680     result[constants.NV_TIME] = utils.SplitTime(time.time())
681
682   if constants.NV_OSLIST in what and vm_capable:
683     result[constants.NV_OSLIST] = DiagnoseOS()
684
685   if constants.NV_BRIDGES in what and vm_capable:
686     result[constants.NV_BRIDGES] = [bridge
687                                     for bridge in what[constants.NV_BRIDGES]
688                                     if not utils.BridgeExists(bridge)]
689   return result
690
691
692 def GetBlockDevSizes(devices):
693   """Return the size of the given block devices
694
695   @type devices: list
696   @param devices: list of block device nodes to query
697   @rtype: dict
698   @return:
699     dictionary of all block devices under /dev (key). The value is their
700     size in MiB.
701
702     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
703
704   """
705   DEV_PREFIX = "/dev/"
706   blockdevs = {}
707
708   for devpath in devices:
709     if not utils.IsBelowDir(DEV_PREFIX, devpath):
710       continue
711
712     try:
713       st = os.stat(devpath)
714     except EnvironmentError, err:
715       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
716       continue
717
718     if stat.S_ISBLK(st.st_mode):
719       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
720       if result.failed:
721         # We don't want to fail, just do not list this device as available
722         logging.warning("Cannot get size for block device %s", devpath)
723         continue
724
725       size = int(result.stdout) / (1024 * 1024)
726       blockdevs[devpath] = size
727   return blockdevs
728
729
730 def GetVolumeList(vg_names):
731   """Compute list of logical volumes and their size.
732
733   @type vg_names: list
734   @param vg_names: the volume groups whose LVs we should list, or
735       empty for all volume groups
736   @rtype: dict
737   @return:
738       dictionary of all partions (key) with value being a tuple of
739       their size (in MiB), inactive and online status::
740
741         {'xenvg/test1': ('20.06', True, True)}
742
743       in case of errors, a string is returned with the error
744       details.
745
746   """
747   lvs = {}
748   sep = "|"
749   if not vg_names:
750     vg_names = []
751   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
752                          "--separator=%s" % sep,
753                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
754   if result.failed:
755     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
756
757   for line in result.stdout.splitlines():
758     line = line.strip()
759     match = _LVSLINE_REGEX.match(line)
760     if not match:
761       logging.error("Invalid line returned from lvs output: '%s'", line)
762       continue
763     vg_name, name, size, attr = match.groups()
764     inactive = attr[4] == "-"
765     online = attr[5] == "o"
766     virtual = attr[0] == "v"
767     if virtual:
768       # we don't want to report such volumes as existing, since they
769       # don't really hold data
770       continue
771     lvs[vg_name + "/" + name] = (size, inactive, online)
772
773   return lvs
774
775
776 def ListVolumeGroups():
777   """List the volume groups and their size.
778
779   @rtype: dict
780   @return: dictionary with keys volume name and values the
781       size of the volume
782
783   """
784   return utils.ListVolumeGroups()
785
786
787 def NodeVolumes():
788   """List all volumes on this node.
789
790   @rtype: list
791   @return:
792     A list of dictionaries, each having four keys:
793       - name: the logical volume name,
794       - size: the size of the logical volume
795       - dev: the physical device on which the LV lives
796       - vg: the volume group to which it belongs
797
798     In case of errors, we return an empty list and log the
799     error.
800
801     Note that since a logical volume can live on multiple physical
802     volumes, the resulting list might include a logical volume
803     multiple times.
804
805   """
806   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
807                          "--separator=|",
808                          "--options=lv_name,lv_size,devices,vg_name"])
809   if result.failed:
810     _Fail("Failed to list logical volumes, lvs output: %s",
811           result.output)
812
813   def parse_dev(dev):
814     return dev.split("(")[0]
815
816   def handle_dev(dev):
817     return [parse_dev(x) for x in dev.split(",")]
818
819   def map_line(line):
820     line = [v.strip() for v in line]
821     return [{"name": line[0], "size": line[1],
822              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
823
824   all_devs = []
825   for line in result.stdout.splitlines():
826     if line.count("|") >= 3:
827       all_devs.extend(map_line(line.split("|")))
828     else:
829       logging.warning("Strange line in the output from lvs: '%s'", line)
830   return all_devs
831
832
833 def BridgesExist(bridges_list):
834   """Check if a list of bridges exist on the current node.
835
836   @rtype: boolean
837   @return: C{True} if all of them exist, C{False} otherwise
838
839   """
840   missing = []
841   for bridge in bridges_list:
842     if not utils.BridgeExists(bridge):
843       missing.append(bridge)
844
845   if missing:
846     _Fail("Missing bridges %s", utils.CommaJoin(missing))
847
848
849 def GetInstanceList(hypervisor_list):
850   """Provides a list of instances.
851
852   @type hypervisor_list: list
853   @param hypervisor_list: the list of hypervisors to query information
854
855   @rtype: list
856   @return: a list of all running instances on the current node
857     - instance1.example.com
858     - instance2.example.com
859
860   """
861   results = []
862   for hname in hypervisor_list:
863     try:
864       names = hypervisor.GetHypervisor(hname).ListInstances()
865       results.extend(names)
866     except errors.HypervisorError, err:
867       _Fail("Error enumerating instances (hypervisor %s): %s",
868             hname, err, exc=True)
869
870   return results
871
872
873 def GetInstanceInfo(instance, hname):
874   """Gives back the information about an instance as a dictionary.
875
876   @type instance: string
877   @param instance: the instance name
878   @type hname: string
879   @param hname: the hypervisor type of the instance
880
881   @rtype: dict
882   @return: dictionary with the following keys:
883       - memory: memory size of instance (int)
884       - state: xen state of instance (string)
885       - time: cpu time of instance (float)
886
887   """
888   output = {}
889
890   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
891   if iinfo is not None:
892     output["memory"] = iinfo[2]
893     output["state"] = iinfo[4]
894     output["time"] = iinfo[5]
895
896   return output
897
898
899 def GetInstanceMigratable(instance):
900   """Gives whether an instance can be migrated.
901
902   @type instance: L{objects.Instance}
903   @param instance: object representing the instance to be checked.
904
905   @rtype: tuple
906   @return: tuple of (result, description) where:
907       - result: whether the instance can be migrated or not
908       - description: a description of the issue, if relevant
909
910   """
911   hyper = hypervisor.GetHypervisor(instance.hypervisor)
912   iname = instance.name
913   if iname not in hyper.ListInstances():
914     _Fail("Instance %s is not running", iname)
915
916   for idx in range(len(instance.disks)):
917     link_name = _GetBlockDevSymlinkPath(iname, idx)
918     if not os.path.islink(link_name):
919       logging.warning("Instance %s is missing symlink %s for disk %d",
920                       iname, link_name, idx)
921
922
923 def GetAllInstancesInfo(hypervisor_list):
924   """Gather data about all instances.
925
926   This is the equivalent of L{GetInstanceInfo}, except that it
927   computes data for all instances at once, thus being faster if one
928   needs data about more than one instance.
929
930   @type hypervisor_list: list
931   @param hypervisor_list: list of hypervisors to query for instance data
932
933   @rtype: dict
934   @return: dictionary of instance: data, with data having the following keys:
935       - memory: memory size of instance (int)
936       - state: xen state of instance (string)
937       - time: cpu time of instance (float)
938       - vcpus: the number of vcpus
939
940   """
941   output = {}
942
943   for hname in hypervisor_list:
944     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
945     if iinfo:
946       for name, _, memory, vcpus, state, times in iinfo:
947         value = {
948           "memory": memory,
949           "vcpus": vcpus,
950           "state": state,
951           "time": times,
952           }
953         if name in output:
954           # we only check static parameters, like memory and vcpus,
955           # and not state and time which can change between the
956           # invocations of the different hypervisors
957           for key in "memory", "vcpus":
958             if value[key] != output[name][key]:
959               _Fail("Instance %s is running twice"
960                     " with different parameters", name)
961         output[name] = value
962
963   return output
964
965
966 def _InstanceLogName(kind, os_name, instance, component):
967   """Compute the OS log filename for a given instance and operation.
968
969   The instance name and os name are passed in as strings since not all
970   operations have these as part of an instance object.
971
972   @type kind: string
973   @param kind: the operation type (e.g. add, import, etc.)
974   @type os_name: string
975   @param os_name: the os name
976   @type instance: string
977   @param instance: the name of the instance being imported/added/etc.
978   @type component: string or None
979   @param component: the name of the component of the instance being
980       transferred
981
982   """
983   # TODO: Use tempfile.mkstemp to create unique filename
984   if component:
985     assert "/" not in component
986     c_msg = "-%s" % component
987   else:
988     c_msg = ""
989   base = ("%s-%s-%s%s-%s.log" %
990           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
991   return utils.PathJoin(constants.LOG_OS_DIR, base)
992
993
994 def InstanceOsAdd(instance, reinstall, debug):
995   """Add an OS to an instance.
996
997   @type instance: L{objects.Instance}
998   @param instance: Instance whose OS is to be installed
999   @type reinstall: boolean
1000   @param reinstall: whether this is an instance reinstall
1001   @type debug: integer
1002   @param debug: debug level, passed to the OS scripts
1003   @rtype: None
1004
1005   """
1006   inst_os = OSFromDisk(instance.os)
1007
1008   create_env = OSEnvironment(instance, inst_os, debug)
1009   if reinstall:
1010     create_env["INSTANCE_REINSTALL"] = "1"
1011
1012   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1013
1014   result = utils.RunCmd([inst_os.create_script], env=create_env,
1015                         cwd=inst_os.path, output=logfile, reset_env=True)
1016   if result.failed:
1017     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1018                   " output: %s", result.cmd, result.fail_reason, logfile,
1019                   result.output)
1020     lines = [utils.SafeEncode(val)
1021              for val in utils.TailFile(logfile, lines=20)]
1022     _Fail("OS create script failed (%s), last lines in the"
1023           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1024
1025
1026 def RunRenameInstance(instance, old_name, debug):
1027   """Run the OS rename script for an instance.
1028
1029   @type instance: L{objects.Instance}
1030   @param instance: Instance whose OS is to be installed
1031   @type old_name: string
1032   @param old_name: previous instance name
1033   @type debug: integer
1034   @param debug: debug level, passed to the OS scripts
1035   @rtype: boolean
1036   @return: the success of the operation
1037
1038   """
1039   inst_os = OSFromDisk(instance.os)
1040
1041   rename_env = OSEnvironment(instance, inst_os, debug)
1042   rename_env["OLD_INSTANCE_NAME"] = old_name
1043
1044   logfile = _InstanceLogName("rename", instance.os,
1045                              "%s-%s" % (old_name, instance.name), None)
1046
1047   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1048                         cwd=inst_os.path, output=logfile, reset_env=True)
1049
1050   if result.failed:
1051     logging.error("os create command '%s' returned error: %s output: %s",
1052                   result.cmd, result.fail_reason, result.output)
1053     lines = [utils.SafeEncode(val)
1054              for val in utils.TailFile(logfile, lines=20)]
1055     _Fail("OS rename script failed (%s), last lines in the"
1056           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1057
1058
1059 def _GetBlockDevSymlinkPath(instance_name, idx):
1060   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1061                         (instance_name, constants.DISK_SEPARATOR, idx))
1062
1063
1064 def _SymlinkBlockDev(instance_name, device_path, idx):
1065   """Set up symlinks to a instance's block device.
1066
1067   This is an auxiliary function run when an instance is start (on the primary
1068   node) or when an instance is migrated (on the target node).
1069
1070
1071   @param instance_name: the name of the target instance
1072   @param device_path: path of the physical block device, on the node
1073   @param idx: the disk index
1074   @return: absolute path to the disk's symlink
1075
1076   """
1077   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1078   try:
1079     os.symlink(device_path, link_name)
1080   except OSError, err:
1081     if err.errno == errno.EEXIST:
1082       if (not os.path.islink(link_name) or
1083           os.readlink(link_name) != device_path):
1084         os.remove(link_name)
1085         os.symlink(device_path, link_name)
1086     else:
1087       raise
1088
1089   return link_name
1090
1091
1092 def _RemoveBlockDevLinks(instance_name, disks):
1093   """Remove the block device symlinks belonging to the given instance.
1094
1095   """
1096   for idx, _ in enumerate(disks):
1097     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1098     if os.path.islink(link_name):
1099       try:
1100         os.remove(link_name)
1101       except OSError:
1102         logging.exception("Can't remove symlink '%s'", link_name)
1103
1104
1105 def _GatherAndLinkBlockDevs(instance):
1106   """Set up an instance's block device(s).
1107
1108   This is run on the primary node at instance startup. The block
1109   devices must be already assembled.
1110
1111   @type instance: L{objects.Instance}
1112   @param instance: the instance whose disks we shoul assemble
1113   @rtype: list
1114   @return: list of (disk_object, device_path)
1115
1116   """
1117   block_devices = []
1118   for idx, disk in enumerate(instance.disks):
1119     device = _RecursiveFindBD(disk)
1120     if device is None:
1121       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1122                                     str(disk))
1123     device.Open()
1124     try:
1125       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1126     except OSError, e:
1127       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1128                                     e.strerror)
1129
1130     block_devices.append((disk, link_name))
1131
1132   return block_devices
1133
1134
1135 def StartInstance(instance, startup_paused):
1136   """Start an instance.
1137
1138   @type instance: L{objects.Instance}
1139   @param instance: the instance object
1140   @type startup_paused: bool
1141   @param instance: pause instance at startup?
1142   @rtype: None
1143
1144   """
1145   running_instances = GetInstanceList([instance.hypervisor])
1146
1147   if instance.name in running_instances:
1148     logging.info("Instance %s already running, not starting", instance.name)
1149     return
1150
1151   try:
1152     block_devices = _GatherAndLinkBlockDevs(instance)
1153     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1154     hyper.StartInstance(instance, block_devices, startup_paused)
1155   except errors.BlockDeviceError, err:
1156     _Fail("Block device error: %s", err, exc=True)
1157   except errors.HypervisorError, err:
1158     _RemoveBlockDevLinks(instance.name, instance.disks)
1159     _Fail("Hypervisor error: %s", err, exc=True)
1160
1161
1162 def InstanceShutdown(instance, timeout):
1163   """Shut an instance down.
1164
1165   @note: this functions uses polling with a hardcoded timeout.
1166
1167   @type instance: L{objects.Instance}
1168   @param instance: the instance object
1169   @type timeout: integer
1170   @param timeout: maximum timeout for soft shutdown
1171   @rtype: None
1172
1173   """
1174   hv_name = instance.hypervisor
1175   hyper = hypervisor.GetHypervisor(hv_name)
1176   iname = instance.name
1177
1178   if instance.name not in hyper.ListInstances():
1179     logging.info("Instance %s not running, doing nothing", iname)
1180     return
1181
1182   class _TryShutdown:
1183     def __init__(self):
1184       self.tried_once = False
1185
1186     def __call__(self):
1187       if iname not in hyper.ListInstances():
1188         return
1189
1190       try:
1191         hyper.StopInstance(instance, retry=self.tried_once)
1192       except errors.HypervisorError, err:
1193         if iname not in hyper.ListInstances():
1194           # if the instance is no longer existing, consider this a
1195           # success and go to cleanup
1196           return
1197
1198         _Fail("Failed to stop instance %s: %s", iname, err)
1199
1200       self.tried_once = True
1201
1202       raise utils.RetryAgain()
1203
1204   try:
1205     utils.Retry(_TryShutdown(), 5, timeout)
1206   except utils.RetryTimeout:
1207     # the shutdown did not succeed
1208     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1209
1210     try:
1211       hyper.StopInstance(instance, force=True)
1212     except errors.HypervisorError, err:
1213       if iname in hyper.ListInstances():
1214         # only raise an error if the instance still exists, otherwise
1215         # the error could simply be "instance ... unknown"!
1216         _Fail("Failed to force stop instance %s: %s", iname, err)
1217
1218     time.sleep(1)
1219
1220     if iname in hyper.ListInstances():
1221       _Fail("Could not shutdown instance %s even by destroy", iname)
1222
1223   try:
1224     hyper.CleanupInstance(instance.name)
1225   except errors.HypervisorError, err:
1226     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1227
1228   _RemoveBlockDevLinks(iname, instance.disks)
1229
1230
1231 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1232   """Reboot an instance.
1233
1234   @type instance: L{objects.Instance}
1235   @param instance: the instance object to reboot
1236   @type reboot_type: str
1237   @param reboot_type: the type of reboot, one the following
1238     constants:
1239       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1240         instance OS, do not recreate the VM
1241       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1242         restart the VM (at the hypervisor level)
1243       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1244         not accepted here, since that mode is handled differently, in
1245         cmdlib, and translates into full stop and start of the
1246         instance (instead of a call_instance_reboot RPC)
1247   @type shutdown_timeout: integer
1248   @param shutdown_timeout: maximum timeout for soft shutdown
1249   @rtype: None
1250
1251   """
1252   running_instances = GetInstanceList([instance.hypervisor])
1253
1254   if instance.name not in running_instances:
1255     _Fail("Cannot reboot instance %s that is not running", instance.name)
1256
1257   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1258   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1259     try:
1260       hyper.RebootInstance(instance)
1261     except errors.HypervisorError, err:
1262       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1263   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1264     try:
1265       InstanceShutdown(instance, shutdown_timeout)
1266       return StartInstance(instance, False)
1267     except errors.HypervisorError, err:
1268       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1269   else:
1270     _Fail("Invalid reboot_type received: %s", reboot_type)
1271
1272
1273 def MigrationInfo(instance):
1274   """Gather information about an instance to be migrated.
1275
1276   @type instance: L{objects.Instance}
1277   @param instance: the instance definition
1278
1279   """
1280   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1281   try:
1282     info = hyper.MigrationInfo(instance)
1283   except errors.HypervisorError, err:
1284     _Fail("Failed to fetch migration information: %s", err, exc=True)
1285   return info
1286
1287
1288 def AcceptInstance(instance, info, target):
1289   """Prepare the node to accept an instance.
1290
1291   @type instance: L{objects.Instance}
1292   @param instance: the instance definition
1293   @type info: string/data (opaque)
1294   @param info: migration information, from the source node
1295   @type target: string
1296   @param target: target host (usually ip), on this node
1297
1298   """
1299   # TODO: why is this required only for DTS_EXT_MIRROR?
1300   if instance.disk_template in constants.DTS_EXT_MIRROR:
1301     # Create the symlinks, as the disks are not active
1302     # in any way
1303     try:
1304       _GatherAndLinkBlockDevs(instance)
1305     except errors.BlockDeviceError, err:
1306       _Fail("Block device error: %s", err, exc=True)
1307
1308   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1309   try:
1310     hyper.AcceptInstance(instance, info, target)
1311   except errors.HypervisorError, err:
1312     if instance.disk_template in constants.DTS_EXT_MIRROR:
1313       _RemoveBlockDevLinks(instance.name, instance.disks)
1314     _Fail("Failed to accept instance: %s", err, exc=True)
1315
1316
1317 def FinalizeMigrationDst(instance, info, success):
1318   """Finalize any preparation to accept an instance.
1319
1320   @type instance: L{objects.Instance}
1321   @param instance: the instance definition
1322   @type info: string/data (opaque)
1323   @param info: migration information, from the source node
1324   @type success: boolean
1325   @param success: whether the migration was a success or a failure
1326
1327   """
1328   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1329   try:
1330     hyper.FinalizeMigrationDst(instance, info, success)
1331   except errors.HypervisorError, err:
1332     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1333
1334
1335 def MigrateInstance(instance, target, live):
1336   """Migrates an instance to another node.
1337
1338   @type instance: L{objects.Instance}
1339   @param instance: the instance definition
1340   @type target: string
1341   @param target: the target node name
1342   @type live: boolean
1343   @param live: whether the migration should be done live or not (the
1344       interpretation of this parameter is left to the hypervisor)
1345   @raise RPCFail: if migration fails for some reason
1346
1347   """
1348   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1349
1350   try:
1351     hyper.MigrateInstance(instance, target, live)
1352   except errors.HypervisorError, err:
1353     _Fail("Failed to migrate instance: %s", err, exc=True)
1354
1355
1356 def FinalizeMigrationSource(instance, success, live):
1357   """Finalize the instance migration on the source node.
1358
1359   @type instance: L{objects.Instance}
1360   @param instance: the instance definition of the migrated instance
1361   @type success: bool
1362   @param success: whether the migration succeeded or not
1363   @type live: bool
1364   @param live: whether the user requested a live migration or not
1365   @raise RPCFail: If the execution fails for some reason
1366
1367   """
1368   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1369
1370   try:
1371     hyper.FinalizeMigrationSource(instance, success, live)
1372   except Exception, err:  # pylint: disable=W0703
1373     _Fail("Failed to finalize the migration on the source node: %s", err,
1374           exc=True)
1375
1376
1377 def GetMigrationStatus(instance):
1378   """Get the migration status
1379
1380   @type instance: L{objects.Instance}
1381   @param instance: the instance that is being migrated
1382   @rtype: L{objects.MigrationStatus}
1383   @return: the status of the current migration (one of
1384            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1385            progress info that can be retrieved from the hypervisor
1386   @raise RPCFail: If the migration status cannot be retrieved
1387
1388   """
1389   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1390   try:
1391     return hyper.GetMigrationStatus(instance)
1392   except Exception, err:  # pylint: disable=W0703
1393     _Fail("Failed to get migration status: %s", err, exc=True)
1394
1395
1396 def BlockdevCreate(disk, size, owner, on_primary, info):
1397   """Creates a block device for an instance.
1398
1399   @type disk: L{objects.Disk}
1400   @param disk: the object describing the disk we should create
1401   @type size: int
1402   @param size: the size of the physical underlying device, in MiB
1403   @type owner: str
1404   @param owner: the name of the instance for which disk is created,
1405       used for device cache data
1406   @type on_primary: boolean
1407   @param on_primary:  indicates if it is the primary node or not
1408   @type info: string
1409   @param info: string that will be sent to the physical device
1410       creation, used for example to set (LVM) tags on LVs
1411
1412   @return: the new unique_id of the device (this can sometime be
1413       computed only after creation), or None. On secondary nodes,
1414       it's not required to return anything.
1415
1416   """
1417   # TODO: remove the obsolete "size" argument
1418   # pylint: disable=W0613
1419   clist = []
1420   if disk.children:
1421     for child in disk.children:
1422       try:
1423         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1424       except errors.BlockDeviceError, err:
1425         _Fail("Can't assemble device %s: %s", child, err)
1426       if on_primary or disk.AssembleOnSecondary():
1427         # we need the children open in case the device itself has to
1428         # be assembled
1429         try:
1430           # pylint: disable=E1103
1431           crdev.Open()
1432         except errors.BlockDeviceError, err:
1433           _Fail("Can't make child '%s' read-write: %s", child, err)
1434       clist.append(crdev)
1435
1436   try:
1437     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1438   except errors.BlockDeviceError, err:
1439     _Fail("Can't create block device: %s", err)
1440
1441   if on_primary or disk.AssembleOnSecondary():
1442     try:
1443       device.Assemble()
1444     except errors.BlockDeviceError, err:
1445       _Fail("Can't assemble device after creation, unusual event: %s", err)
1446     device.SetSyncSpeed(constants.SYNC_SPEED)
1447     if on_primary or disk.OpenOnSecondary():
1448       try:
1449         device.Open(force=True)
1450       except errors.BlockDeviceError, err:
1451         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1452     DevCacheManager.UpdateCache(device.dev_path, owner,
1453                                 on_primary, disk.iv_name)
1454
1455   device.SetInfo(info)
1456
1457   return device.unique_id
1458
1459
1460 def _WipeDevice(path, offset, size):
1461   """This function actually wipes the device.
1462
1463   @param path: The path to the device to wipe
1464   @param offset: The offset in MiB in the file
1465   @param size: The size in MiB to write
1466
1467   """
1468   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1469          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1470          "count=%d" % size]
1471   result = utils.RunCmd(cmd)
1472
1473   if result.failed:
1474     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1475           result.fail_reason, result.output)
1476
1477
1478 def BlockdevWipe(disk, offset, size):
1479   """Wipes a block device.
1480
1481   @type disk: L{objects.Disk}
1482   @param disk: the disk object we want to wipe
1483   @type offset: int
1484   @param offset: The offset in MiB in the file
1485   @type size: int
1486   @param size: The size in MiB to write
1487
1488   """
1489   try:
1490     rdev = _RecursiveFindBD(disk)
1491   except errors.BlockDeviceError:
1492     rdev = None
1493
1494   if not rdev:
1495     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1496
1497   # Do cross verify some of the parameters
1498   if offset > rdev.size:
1499     _Fail("Offset is bigger than device size")
1500   if (offset + size) > rdev.size:
1501     _Fail("The provided offset and size to wipe is bigger than device size")
1502
1503   _WipeDevice(rdev.dev_path, offset, size)
1504
1505
1506 def BlockdevPauseResumeSync(disks, pause):
1507   """Pause or resume the sync of the block device.
1508
1509   @type disks: list of L{objects.Disk}
1510   @param disks: the disks object we want to pause/resume
1511   @type pause: bool
1512   @param pause: Wheater to pause or resume
1513
1514   """
1515   success = []
1516   for disk in disks:
1517     try:
1518       rdev = _RecursiveFindBD(disk)
1519     except errors.BlockDeviceError:
1520       rdev = None
1521
1522     if not rdev:
1523       success.append((False, ("Cannot change sync for device %s:"
1524                               " device not found" % disk.iv_name)))
1525       continue
1526
1527     result = rdev.PauseResumeSync(pause)
1528
1529     if result:
1530       success.append((result, None))
1531     else:
1532       if pause:
1533         msg = "Pause"
1534       else:
1535         msg = "Resume"
1536       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1537
1538   return success
1539
1540
1541 def BlockdevRemove(disk):
1542   """Remove a block device.
1543
1544   @note: This is intended to be called recursively.
1545
1546   @type disk: L{objects.Disk}
1547   @param disk: the disk object we should remove
1548   @rtype: boolean
1549   @return: the success of the operation
1550
1551   """
1552   msgs = []
1553   try:
1554     rdev = _RecursiveFindBD(disk)
1555   except errors.BlockDeviceError, err:
1556     # probably can't attach
1557     logging.info("Can't attach to device %s in remove", disk)
1558     rdev = None
1559   if rdev is not None:
1560     r_path = rdev.dev_path
1561     try:
1562       rdev.Remove()
1563     except errors.BlockDeviceError, err:
1564       msgs.append(str(err))
1565     if not msgs:
1566       DevCacheManager.RemoveCache(r_path)
1567
1568   if disk.children:
1569     for child in disk.children:
1570       try:
1571         BlockdevRemove(child)
1572       except RPCFail, err:
1573         msgs.append(str(err))
1574
1575   if msgs:
1576     _Fail("; ".join(msgs))
1577
1578
1579 def _RecursiveAssembleBD(disk, owner, as_primary):
1580   """Activate a block device for an instance.
1581
1582   This is run on the primary and secondary nodes for an instance.
1583
1584   @note: this function is called recursively.
1585
1586   @type disk: L{objects.Disk}
1587   @param disk: the disk we try to assemble
1588   @type owner: str
1589   @param owner: the name of the instance which owns the disk
1590   @type as_primary: boolean
1591   @param as_primary: if we should make the block device
1592       read/write
1593
1594   @return: the assembled device or None (in case no device
1595       was assembled)
1596   @raise errors.BlockDeviceError: in case there is an error
1597       during the activation of the children or the device
1598       itself
1599
1600   """
1601   children = []
1602   if disk.children:
1603     mcn = disk.ChildrenNeeded()
1604     if mcn == -1:
1605       mcn = 0 # max number of Nones allowed
1606     else:
1607       mcn = len(disk.children) - mcn # max number of Nones
1608     for chld_disk in disk.children:
1609       try:
1610         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1611       except errors.BlockDeviceError, err:
1612         if children.count(None) >= mcn:
1613           raise
1614         cdev = None
1615         logging.error("Error in child activation (but continuing): %s",
1616                       str(err))
1617       children.append(cdev)
1618
1619   if as_primary or disk.AssembleOnSecondary():
1620     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1621     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1622     result = r_dev
1623     if as_primary or disk.OpenOnSecondary():
1624       r_dev.Open()
1625     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1626                                 as_primary, disk.iv_name)
1627
1628   else:
1629     result = True
1630   return result
1631
1632
1633 def BlockdevAssemble(disk, owner, as_primary, idx):
1634   """Activate a block device for an instance.
1635
1636   This is a wrapper over _RecursiveAssembleBD.
1637
1638   @rtype: str or boolean
1639   @return: a C{/dev/...} path for primary nodes, and
1640       C{True} for secondary nodes
1641
1642   """
1643   try:
1644     result = _RecursiveAssembleBD(disk, owner, as_primary)
1645     if isinstance(result, bdev.BlockDev):
1646       # pylint: disable=E1103
1647       result = result.dev_path
1648       if as_primary:
1649         _SymlinkBlockDev(owner, result, idx)
1650   except errors.BlockDeviceError, err:
1651     _Fail("Error while assembling disk: %s", err, exc=True)
1652   except OSError, err:
1653     _Fail("Error while symlinking disk: %s", err, exc=True)
1654
1655   return result
1656
1657
1658 def BlockdevShutdown(disk):
1659   """Shut down a block device.
1660
1661   First, if the device is assembled (Attach() is successful), then
1662   the device is shutdown. Then the children of the device are
1663   shutdown.
1664
1665   This function is called recursively. Note that we don't cache the
1666   children or such, as oppossed to assemble, shutdown of different
1667   devices doesn't require that the upper device was active.
1668
1669   @type disk: L{objects.Disk}
1670   @param disk: the description of the disk we should
1671       shutdown
1672   @rtype: None
1673
1674   """
1675   msgs = []
1676   r_dev = _RecursiveFindBD(disk)
1677   if r_dev is not None:
1678     r_path = r_dev.dev_path
1679     try:
1680       r_dev.Shutdown()
1681       DevCacheManager.RemoveCache(r_path)
1682     except errors.BlockDeviceError, err:
1683       msgs.append(str(err))
1684
1685   if disk.children:
1686     for child in disk.children:
1687       try:
1688         BlockdevShutdown(child)
1689       except RPCFail, err:
1690         msgs.append(str(err))
1691
1692   if msgs:
1693     _Fail("; ".join(msgs))
1694
1695
1696 def BlockdevAddchildren(parent_cdev, new_cdevs):
1697   """Extend a mirrored block device.
1698
1699   @type parent_cdev: L{objects.Disk}
1700   @param parent_cdev: the disk to which we should add children
1701   @type new_cdevs: list of L{objects.Disk}
1702   @param new_cdevs: the list of children which we should add
1703   @rtype: None
1704
1705   """
1706   parent_bdev = _RecursiveFindBD(parent_cdev)
1707   if parent_bdev is None:
1708     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1709   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1710   if new_bdevs.count(None) > 0:
1711     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1712   parent_bdev.AddChildren(new_bdevs)
1713
1714
1715 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1716   """Shrink a mirrored block device.
1717
1718   @type parent_cdev: L{objects.Disk}
1719   @param parent_cdev: the disk from which we should remove children
1720   @type new_cdevs: list of L{objects.Disk}
1721   @param new_cdevs: the list of children which we should remove
1722   @rtype: None
1723
1724   """
1725   parent_bdev = _RecursiveFindBD(parent_cdev)
1726   if parent_bdev is None:
1727     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1728   devs = []
1729   for disk in new_cdevs:
1730     rpath = disk.StaticDevPath()
1731     if rpath is None:
1732       bd = _RecursiveFindBD(disk)
1733       if bd is None:
1734         _Fail("Can't find device %s while removing children", disk)
1735       else:
1736         devs.append(bd.dev_path)
1737     else:
1738       if not utils.IsNormAbsPath(rpath):
1739         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1740       devs.append(rpath)
1741   parent_bdev.RemoveChildren(devs)
1742
1743
1744 def BlockdevGetmirrorstatus(disks):
1745   """Get the mirroring status of a list of devices.
1746
1747   @type disks: list of L{objects.Disk}
1748   @param disks: the list of disks which we should query
1749   @rtype: disk
1750   @return: List of L{objects.BlockDevStatus}, one for each disk
1751   @raise errors.BlockDeviceError: if any of the disks cannot be
1752       found
1753
1754   """
1755   stats = []
1756   for dsk in disks:
1757     rbd = _RecursiveFindBD(dsk)
1758     if rbd is None:
1759       _Fail("Can't find device %s", dsk)
1760
1761     stats.append(rbd.CombinedSyncStatus())
1762
1763   return stats
1764
1765
1766 def BlockdevGetmirrorstatusMulti(disks):
1767   """Get the mirroring status of a list of devices.
1768
1769   @type disks: list of L{objects.Disk}
1770   @param disks: the list of disks which we should query
1771   @rtype: disk
1772   @return: List of tuples, (bool, status), one for each disk; bool denotes
1773     success/failure, status is L{objects.BlockDevStatus} on success, string
1774     otherwise
1775
1776   """
1777   result = []
1778   for disk in disks:
1779     try:
1780       rbd = _RecursiveFindBD(disk)
1781       if rbd is None:
1782         result.append((False, "Can't find device %s" % disk))
1783         continue
1784
1785       status = rbd.CombinedSyncStatus()
1786     except errors.BlockDeviceError, err:
1787       logging.exception("Error while getting disk status")
1788       result.append((False, str(err)))
1789     else:
1790       result.append((True, status))
1791
1792   assert len(disks) == len(result)
1793
1794   return result
1795
1796
1797 def _RecursiveFindBD(disk):
1798   """Check if a device is activated.
1799
1800   If so, return information about the real device.
1801
1802   @type disk: L{objects.Disk}
1803   @param disk: the disk object we need to find
1804
1805   @return: None if the device can't be found,
1806       otherwise the device instance
1807
1808   """
1809   children = []
1810   if disk.children:
1811     for chdisk in disk.children:
1812       children.append(_RecursiveFindBD(chdisk))
1813
1814   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1815
1816
1817 def _OpenRealBD(disk):
1818   """Opens the underlying block device of a disk.
1819
1820   @type disk: L{objects.Disk}
1821   @param disk: the disk object we want to open
1822
1823   """
1824   real_disk = _RecursiveFindBD(disk)
1825   if real_disk is None:
1826     _Fail("Block device '%s' is not set up", disk)
1827
1828   real_disk.Open()
1829
1830   return real_disk
1831
1832
1833 def BlockdevFind(disk):
1834   """Check if a device is activated.
1835
1836   If it is, return information about the real device.
1837
1838   @type disk: L{objects.Disk}
1839   @param disk: the disk to find
1840   @rtype: None or objects.BlockDevStatus
1841   @return: None if the disk cannot be found, otherwise a the current
1842            information
1843
1844   """
1845   try:
1846     rbd = _RecursiveFindBD(disk)
1847   except errors.BlockDeviceError, err:
1848     _Fail("Failed to find device: %s", err, exc=True)
1849
1850   if rbd is None:
1851     return None
1852
1853   return rbd.GetSyncStatus()
1854
1855
1856 def BlockdevGetsize(disks):
1857   """Computes the size of the given disks.
1858
1859   If a disk is not found, returns None instead.
1860
1861   @type disks: list of L{objects.Disk}
1862   @param disks: the list of disk to compute the size for
1863   @rtype: list
1864   @return: list with elements None if the disk cannot be found,
1865       otherwise the size
1866
1867   """
1868   result = []
1869   for cf in disks:
1870     try:
1871       rbd = _RecursiveFindBD(cf)
1872     except errors.BlockDeviceError:
1873       result.append(None)
1874       continue
1875     if rbd is None:
1876       result.append(None)
1877     else:
1878       result.append(rbd.GetActualSize())
1879   return result
1880
1881
1882 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1883   """Export a block device to a remote node.
1884
1885   @type disk: L{objects.Disk}
1886   @param disk: the description of the disk to export
1887   @type dest_node: str
1888   @param dest_node: the destination node to export to
1889   @type dest_path: str
1890   @param dest_path: the destination path on the target node
1891   @type cluster_name: str
1892   @param cluster_name: the cluster name, needed for SSH hostalias
1893   @rtype: None
1894
1895   """
1896   real_disk = _OpenRealBD(disk)
1897
1898   # the block size on the read dd is 1MiB to match our units
1899   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1900                                "dd if=%s bs=1048576 count=%s",
1901                                real_disk.dev_path, str(disk.size))
1902
1903   # we set here a smaller block size as, due to ssh buffering, more
1904   # than 64-128k will mostly ignored; we use nocreat to fail if the
1905   # device is not already there or we pass a wrong path; we use
1906   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1907   # to not buffer too much memory; this means that at best, we flush
1908   # every 64k, which will not be very fast
1909   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1910                                 " oflag=dsync", dest_path)
1911
1912   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1913                                                    constants.GANETI_RUNAS,
1914                                                    destcmd)
1915
1916   # all commands have been checked, so we're safe to combine them
1917   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1918
1919   result = utils.RunCmd(["bash", "-c", command])
1920
1921   if result.failed:
1922     _Fail("Disk copy command '%s' returned error: %s"
1923           " output: %s", command, result.fail_reason, result.output)
1924
1925
1926 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1927   """Write a file to the filesystem.
1928
1929   This allows the master to overwrite(!) a file. It will only perform
1930   the operation if the file belongs to a list of configuration files.
1931
1932   @type file_name: str
1933   @param file_name: the target file name
1934   @type data: str
1935   @param data: the new contents of the file
1936   @type mode: int
1937   @param mode: the mode to give the file (can be None)
1938   @type uid: string
1939   @param uid: the owner of the file
1940   @type gid: string
1941   @param gid: the group of the file
1942   @type atime: float
1943   @param atime: the atime to set on the file (can be None)
1944   @type mtime: float
1945   @param mtime: the mtime to set on the file (can be None)
1946   @rtype: None
1947
1948   """
1949   if not os.path.isabs(file_name):
1950     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1951
1952   if file_name not in _ALLOWED_UPLOAD_FILES:
1953     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1954           file_name)
1955
1956   raw_data = _Decompress(data)
1957
1958   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1959     _Fail("Invalid username/groupname type")
1960
1961   getents = runtime.GetEnts()
1962   uid = getents.LookupUser(uid)
1963   gid = getents.LookupGroup(gid)
1964
1965   utils.SafeWriteFile(file_name, None,
1966                       data=raw_data, mode=mode, uid=uid, gid=gid,
1967                       atime=atime, mtime=mtime)
1968
1969
1970 def RunOob(oob_program, command, node, timeout):
1971   """Executes oob_program with given command on given node.
1972
1973   @param oob_program: The path to the executable oob_program
1974   @param command: The command to invoke on oob_program
1975   @param node: The node given as an argument to the program
1976   @param timeout: Timeout after which we kill the oob program
1977
1978   @return: stdout
1979   @raise RPCFail: If execution fails for some reason
1980
1981   """
1982   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1983
1984   if result.failed:
1985     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1986           result.fail_reason, result.output)
1987
1988   return result.stdout
1989
1990
1991 def WriteSsconfFiles(values):
1992   """Update all ssconf files.
1993
1994   Wrapper around the SimpleStore.WriteFiles.
1995
1996   """
1997   ssconf.SimpleStore().WriteFiles(values)
1998
1999
2000 def _ErrnoOrStr(err):
2001   """Format an EnvironmentError exception.
2002
2003   If the L{err} argument has an errno attribute, it will be looked up
2004   and converted into a textual C{E...} description. Otherwise the
2005   string representation of the error will be returned.
2006
2007   @type err: L{EnvironmentError}
2008   @param err: the exception to format
2009
2010   """
2011   if hasattr(err, "errno"):
2012     detail = errno.errorcode[err.errno]
2013   else:
2014     detail = str(err)
2015   return detail
2016
2017
2018 def _OSOndiskAPIVersion(os_dir):
2019   """Compute and return the API version of a given OS.
2020
2021   This function will try to read the API version of the OS residing in
2022   the 'os_dir' directory.
2023
2024   @type os_dir: str
2025   @param os_dir: the directory in which we should look for the OS
2026   @rtype: tuple
2027   @return: tuple (status, data) with status denoting the validity and
2028       data holding either the vaid versions or an error message
2029
2030   """
2031   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2032
2033   try:
2034     st = os.stat(api_file)
2035   except EnvironmentError, err:
2036     return False, ("Required file '%s' not found under path %s: %s" %
2037                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2038
2039   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2040     return False, ("File '%s' in %s is not a regular file" %
2041                    (constants.OS_API_FILE, os_dir))
2042
2043   try:
2044     api_versions = utils.ReadFile(api_file).splitlines()
2045   except EnvironmentError, err:
2046     return False, ("Error while reading the API version file at %s: %s" %
2047                    (api_file, _ErrnoOrStr(err)))
2048
2049   try:
2050     api_versions = [int(version.strip()) for version in api_versions]
2051   except (TypeError, ValueError), err:
2052     return False, ("API version(s) can't be converted to integer: %s" %
2053                    str(err))
2054
2055   return True, api_versions
2056
2057
2058 def DiagnoseOS(top_dirs=None):
2059   """Compute the validity for all OSes.
2060
2061   @type top_dirs: list
2062   @param top_dirs: the list of directories in which to
2063       search (if not given defaults to
2064       L{constants.OS_SEARCH_PATH})
2065   @rtype: list of L{objects.OS}
2066   @return: a list of tuples (name, path, status, diagnose, variants,
2067       parameters, api_version) for all (potential) OSes under all
2068       search paths, where:
2069           - name is the (potential) OS name
2070           - path is the full path to the OS
2071           - status True/False is the validity of the OS
2072           - diagnose is the error message for an invalid OS, otherwise empty
2073           - variants is a list of supported OS variants, if any
2074           - parameters is a list of (name, help) parameters, if any
2075           - api_version is a list of support OS API versions
2076
2077   """
2078   if top_dirs is None:
2079     top_dirs = constants.OS_SEARCH_PATH
2080
2081   result = []
2082   for dir_name in top_dirs:
2083     if os.path.isdir(dir_name):
2084       try:
2085         f_names = utils.ListVisibleFiles(dir_name)
2086       except EnvironmentError, err:
2087         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2088         break
2089       for name in f_names:
2090         os_path = utils.PathJoin(dir_name, name)
2091         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2092         if status:
2093           diagnose = ""
2094           variants = os_inst.supported_variants
2095           parameters = os_inst.supported_parameters
2096           api_versions = os_inst.api_versions
2097         else:
2098           diagnose = os_inst
2099           variants = parameters = api_versions = []
2100         result.append((name, os_path, status, diagnose, variants,
2101                        parameters, api_versions))
2102
2103   return result
2104
2105
2106 def _TryOSFromDisk(name, base_dir=None):
2107   """Create an OS instance from disk.
2108
2109   This function will return an OS instance if the given name is a
2110   valid OS name.
2111
2112   @type base_dir: string
2113   @keyword base_dir: Base directory containing OS installations.
2114                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2115   @rtype: tuple
2116   @return: success and either the OS instance if we find a valid one,
2117       or error message
2118
2119   """
2120   if base_dir is None:
2121     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2122   else:
2123     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2124
2125   if os_dir is None:
2126     return False, "Directory for OS %s not found in search path" % name
2127
2128   status, api_versions = _OSOndiskAPIVersion(os_dir)
2129   if not status:
2130     # push the error up
2131     return status, api_versions
2132
2133   if not constants.OS_API_VERSIONS.intersection(api_versions):
2134     return False, ("API version mismatch for path '%s': found %s, want %s." %
2135                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2136
2137   # OS Files dictionary, we will populate it with the absolute path
2138   # names; if the value is True, then it is a required file, otherwise
2139   # an optional one
2140   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2141
2142   if max(api_versions) >= constants.OS_API_V15:
2143     os_files[constants.OS_VARIANTS_FILE] = False
2144
2145   if max(api_versions) >= constants.OS_API_V20:
2146     os_files[constants.OS_PARAMETERS_FILE] = True
2147   else:
2148     del os_files[constants.OS_SCRIPT_VERIFY]
2149
2150   for (filename, required) in os_files.items():
2151     os_files[filename] = utils.PathJoin(os_dir, filename)
2152
2153     try:
2154       st = os.stat(os_files[filename])
2155     except EnvironmentError, err:
2156       if err.errno == errno.ENOENT and not required:
2157         del os_files[filename]
2158         continue
2159       return False, ("File '%s' under path '%s' is missing (%s)" %
2160                      (filename, os_dir, _ErrnoOrStr(err)))
2161
2162     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2163       return False, ("File '%s' under path '%s' is not a regular file" %
2164                      (filename, os_dir))
2165
2166     if filename in constants.OS_SCRIPTS:
2167       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2168         return False, ("File '%s' under path '%s' is not executable" %
2169                        (filename, os_dir))
2170
2171   variants = []
2172   if constants.OS_VARIANTS_FILE in os_files:
2173     variants_file = os_files[constants.OS_VARIANTS_FILE]
2174     try:
2175       variants = utils.ReadFile(variants_file).splitlines()
2176     except EnvironmentError, err:
2177       # we accept missing files, but not other errors
2178       if err.errno != errno.ENOENT:
2179         return False, ("Error while reading the OS variants file at %s: %s" %
2180                        (variants_file, _ErrnoOrStr(err)))
2181
2182   parameters = []
2183   if constants.OS_PARAMETERS_FILE in os_files:
2184     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2185     try:
2186       parameters = utils.ReadFile(parameters_file).splitlines()
2187     except EnvironmentError, err:
2188       return False, ("Error while reading the OS parameters file at %s: %s" %
2189                      (parameters_file, _ErrnoOrStr(err)))
2190     parameters = [v.split(None, 1) for v in parameters]
2191
2192   os_obj = objects.OS(name=name, path=os_dir,
2193                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2194                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2195                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2196                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2197                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2198                                                  None),
2199                       supported_variants=variants,
2200                       supported_parameters=parameters,
2201                       api_versions=api_versions)
2202   return True, os_obj
2203
2204
2205 def OSFromDisk(name, base_dir=None):
2206   """Create an OS instance from disk.
2207
2208   This function will return an OS instance if the given name is a
2209   valid OS name. Otherwise, it will raise an appropriate
2210   L{RPCFail} exception, detailing why this is not a valid OS.
2211
2212   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2213   an exception but returns true/false status data.
2214
2215   @type base_dir: string
2216   @keyword base_dir: Base directory containing OS installations.
2217                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2218   @rtype: L{objects.OS}
2219   @return: the OS instance if we find a valid one
2220   @raise RPCFail: if we don't find a valid OS
2221
2222   """
2223   name_only = objects.OS.GetName(name)
2224   status, payload = _TryOSFromDisk(name_only, base_dir)
2225
2226   if not status:
2227     _Fail(payload)
2228
2229   return payload
2230
2231
2232 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2233   """Calculate the basic environment for an os script.
2234
2235   @type os_name: str
2236   @param os_name: full operating system name (including variant)
2237   @type inst_os: L{objects.OS}
2238   @param inst_os: operating system for which the environment is being built
2239   @type os_params: dict
2240   @param os_params: the OS parameters
2241   @type debug: integer
2242   @param debug: debug level (0 or 1, for OS Api 10)
2243   @rtype: dict
2244   @return: dict of environment variables
2245   @raise errors.BlockDeviceError: if the block device
2246       cannot be found
2247
2248   """
2249   result = {}
2250   api_version = \
2251     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2252   result["OS_API_VERSION"] = "%d" % api_version
2253   result["OS_NAME"] = inst_os.name
2254   result["DEBUG_LEVEL"] = "%d" % debug
2255
2256   # OS variants
2257   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2258     variant = objects.OS.GetVariant(os_name)
2259     if not variant:
2260       variant = inst_os.supported_variants[0]
2261   else:
2262     variant = ""
2263   result["OS_VARIANT"] = variant
2264
2265   # OS params
2266   for pname, pvalue in os_params.items():
2267     result["OSP_%s" % pname.upper()] = pvalue
2268
2269   return result
2270
2271
2272 def OSEnvironment(instance, inst_os, debug=0):
2273   """Calculate the environment for an os script.
2274
2275   @type instance: L{objects.Instance}
2276   @param instance: target instance for the os script run
2277   @type inst_os: L{objects.OS}
2278   @param inst_os: operating system for which the environment is being built
2279   @type debug: integer
2280   @param debug: debug level (0 or 1, for OS Api 10)
2281   @rtype: dict
2282   @return: dict of environment variables
2283   @raise errors.BlockDeviceError: if the block device
2284       cannot be found
2285
2286   """
2287   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2288
2289   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2290     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2291
2292   result["HYPERVISOR"] = instance.hypervisor
2293   result["DISK_COUNT"] = "%d" % len(instance.disks)
2294   result["NIC_COUNT"] = "%d" % len(instance.nics)
2295   result["INSTANCE_SECONDARY_NODES"] = \
2296       ("%s" % " ".join(instance.secondary_nodes))
2297
2298   # Disks
2299   for idx, disk in enumerate(instance.disks):
2300     real_disk = _OpenRealBD(disk)
2301     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2302     result["DISK_%d_ACCESS" % idx] = disk.mode
2303     if constants.HV_DISK_TYPE in instance.hvparams:
2304       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2305         instance.hvparams[constants.HV_DISK_TYPE]
2306     if disk.dev_type in constants.LDS_BLOCK:
2307       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2308     elif disk.dev_type == constants.LD_FILE:
2309       result["DISK_%d_BACKEND_TYPE" % idx] = \
2310         "file:%s" % disk.physical_id[0]
2311
2312   # NICs
2313   for idx, nic in enumerate(instance.nics):
2314     result["NIC_%d_MAC" % idx] = nic.mac
2315     if nic.ip:
2316       result["NIC_%d_IP" % idx] = nic.ip
2317     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2318     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2319       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2320     if nic.nicparams[constants.NIC_LINK]:
2321       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2322     if constants.HV_NIC_TYPE in instance.hvparams:
2323       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2324         instance.hvparams[constants.HV_NIC_TYPE]
2325
2326   # HV/BE params
2327   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2328     for key, value in source.items():
2329       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2330
2331   return result
2332
2333
2334 def BlockdevGrow(disk, amount, dryrun):
2335   """Grow a stack of block devices.
2336
2337   This function is called recursively, with the childrens being the
2338   first ones to resize.
2339
2340   @type disk: L{objects.Disk}
2341   @param disk: the disk to be grown
2342   @type amount: integer
2343   @param amount: the amount (in mebibytes) to grow with
2344   @type dryrun: boolean
2345   @param dryrun: whether to execute the operation in simulation mode
2346       only, without actually increasing the size
2347   @rtype: (status, result)
2348   @return: a tuple with the status of the operation (True/False), and
2349       the errors message if status is False
2350
2351   """
2352   r_dev = _RecursiveFindBD(disk)
2353   if r_dev is None:
2354     _Fail("Cannot find block device %s", disk)
2355
2356   try:
2357     r_dev.Grow(amount, dryrun)
2358   except errors.BlockDeviceError, err:
2359     _Fail("Failed to grow block device: %s", err, exc=True)
2360
2361
2362 def BlockdevSnapshot(disk):
2363   """Create a snapshot copy of a block device.
2364
2365   This function is called recursively, and the snapshot is actually created
2366   just for the leaf lvm backend device.
2367
2368   @type disk: L{objects.Disk}
2369   @param disk: the disk to be snapshotted
2370   @rtype: string
2371   @return: snapshot disk ID as (vg, lv)
2372
2373   """
2374   if disk.dev_type == constants.LD_DRBD8:
2375     if not disk.children:
2376       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2377             disk.unique_id)
2378     return BlockdevSnapshot(disk.children[0])
2379   elif disk.dev_type == constants.LD_LV:
2380     r_dev = _RecursiveFindBD(disk)
2381     if r_dev is not None:
2382       # FIXME: choose a saner value for the snapshot size
2383       # let's stay on the safe side and ask for the full size, for now
2384       return r_dev.Snapshot(disk.size)
2385     else:
2386       _Fail("Cannot find block device %s", disk)
2387   else:
2388     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2389           disk.unique_id, disk.dev_type)
2390
2391
2392 def FinalizeExport(instance, snap_disks):
2393   """Write out the export configuration information.
2394
2395   @type instance: L{objects.Instance}
2396   @param instance: the instance which we export, used for
2397       saving configuration
2398   @type snap_disks: list of L{objects.Disk}
2399   @param snap_disks: list of snapshot block devices, which
2400       will be used to get the actual name of the dump file
2401
2402   @rtype: None
2403
2404   """
2405   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2406   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2407
2408   config = objects.SerializableConfigParser()
2409
2410   config.add_section(constants.INISECT_EXP)
2411   config.set(constants.INISECT_EXP, "version", "0")
2412   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2413   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2414   config.set(constants.INISECT_EXP, "os", instance.os)
2415   config.set(constants.INISECT_EXP, "compression", "none")
2416
2417   config.add_section(constants.INISECT_INS)
2418   config.set(constants.INISECT_INS, "name", instance.name)
2419   config.set(constants.INISECT_INS, "memory", "%d" %
2420              instance.beparams[constants.BE_MEMORY])
2421   config.set(constants.INISECT_INS, "vcpus", "%d" %
2422              instance.beparams[constants.BE_VCPUS])
2423   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2424   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2425   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2426
2427   nic_total = 0
2428   for nic_count, nic in enumerate(instance.nics):
2429     nic_total += 1
2430     config.set(constants.INISECT_INS, "nic%d_mac" %
2431                nic_count, "%s" % nic.mac)
2432     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2433     for param in constants.NICS_PARAMETER_TYPES:
2434       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2435                  "%s" % nic.nicparams.get(param, None))
2436   # TODO: redundant: on load can read nics until it doesn't exist
2437   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2438
2439   disk_total = 0
2440   for disk_count, disk in enumerate(snap_disks):
2441     if disk:
2442       disk_total += 1
2443       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2444                  ("%s" % disk.iv_name))
2445       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2446                  ("%s" % disk.physical_id[1]))
2447       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2448                  ("%d" % disk.size))
2449
2450   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2451
2452   # New-style hypervisor/backend parameters
2453
2454   config.add_section(constants.INISECT_HYP)
2455   for name, value in instance.hvparams.items():
2456     if name not in constants.HVC_GLOBALS:
2457       config.set(constants.INISECT_HYP, name, str(value))
2458
2459   config.add_section(constants.INISECT_BEP)
2460   for name, value in instance.beparams.items():
2461     config.set(constants.INISECT_BEP, name, str(value))
2462
2463   config.add_section(constants.INISECT_OSP)
2464   for name, value in instance.osparams.items():
2465     config.set(constants.INISECT_OSP, name, str(value))
2466
2467   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2468                   data=config.Dumps())
2469   shutil.rmtree(finaldestdir, ignore_errors=True)
2470   shutil.move(destdir, finaldestdir)
2471
2472
2473 def ExportInfo(dest):
2474   """Get export configuration information.
2475
2476   @type dest: str
2477   @param dest: directory containing the export
2478
2479   @rtype: L{objects.SerializableConfigParser}
2480   @return: a serializable config file containing the
2481       export info
2482
2483   """
2484   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2485
2486   config = objects.SerializableConfigParser()
2487   config.read(cff)
2488
2489   if (not config.has_section(constants.INISECT_EXP) or
2490       not config.has_section(constants.INISECT_INS)):
2491     _Fail("Export info file doesn't have the required fields")
2492
2493   return config.Dumps()
2494
2495
2496 def ListExports():
2497   """Return a list of exports currently available on this machine.
2498
2499   @rtype: list
2500   @return: list of the exports
2501
2502   """
2503   if os.path.isdir(constants.EXPORT_DIR):
2504     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2505   else:
2506     _Fail("No exports directory")
2507
2508
2509 def RemoveExport(export):
2510   """Remove an existing export from the node.
2511
2512   @type export: str
2513   @param export: the name of the export to remove
2514   @rtype: None
2515
2516   """
2517   target = utils.PathJoin(constants.EXPORT_DIR, export)
2518
2519   try:
2520     shutil.rmtree(target)
2521   except EnvironmentError, err:
2522     _Fail("Error while removing the export: %s", err, exc=True)
2523
2524
2525 def BlockdevRename(devlist):
2526   """Rename a list of block devices.
2527
2528   @type devlist: list of tuples
2529   @param devlist: list of tuples of the form  (disk,
2530       new_logical_id, new_physical_id); disk is an
2531       L{objects.Disk} object describing the current disk,
2532       and new logical_id/physical_id is the name we
2533       rename it to
2534   @rtype: boolean
2535   @return: True if all renames succeeded, False otherwise
2536
2537   """
2538   msgs = []
2539   result = True
2540   for disk, unique_id in devlist:
2541     dev = _RecursiveFindBD(disk)
2542     if dev is None:
2543       msgs.append("Can't find device %s in rename" % str(disk))
2544       result = False
2545       continue
2546     try:
2547       old_rpath = dev.dev_path
2548       dev.Rename(unique_id)
2549       new_rpath = dev.dev_path
2550       if old_rpath != new_rpath:
2551         DevCacheManager.RemoveCache(old_rpath)
2552         # FIXME: we should add the new cache information here, like:
2553         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2554         # but we don't have the owner here - maybe parse from existing
2555         # cache? for now, we only lose lvm data when we rename, which
2556         # is less critical than DRBD or MD
2557     except errors.BlockDeviceError, err:
2558       msgs.append("Can't rename device '%s' to '%s': %s" %
2559                   (dev, unique_id, err))
2560       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2561       result = False
2562   if not result:
2563     _Fail("; ".join(msgs))
2564
2565
2566 def _TransformFileStorageDir(fs_dir):
2567   """Checks whether given file_storage_dir is valid.
2568
2569   Checks wheter the given fs_dir is within the cluster-wide default
2570   file_storage_dir or the shared_file_storage_dir, which are stored in
2571   SimpleStore. Only paths under those directories are allowed.
2572
2573   @type fs_dir: str
2574   @param fs_dir: the path to check
2575
2576   @return: the normalized path if valid, None otherwise
2577
2578   """
2579   if not constants.ENABLE_FILE_STORAGE:
2580     _Fail("File storage disabled at configure time")
2581   cfg = _GetConfig()
2582   fs_dir = os.path.normpath(fs_dir)
2583   base_fstore = cfg.GetFileStorageDir()
2584   base_shared = cfg.GetSharedFileStorageDir()
2585   if not (utils.IsBelowDir(base_fstore, fs_dir) or
2586           utils.IsBelowDir(base_shared, fs_dir)):
2587     _Fail("File storage directory '%s' is not under base file"
2588           " storage directory '%s' or shared storage directory '%s'",
2589           fs_dir, base_fstore, base_shared)
2590   return fs_dir
2591
2592
2593 def CreateFileStorageDir(file_storage_dir):
2594   """Create file storage directory.
2595
2596   @type file_storage_dir: str
2597   @param file_storage_dir: directory to create
2598
2599   @rtype: tuple
2600   @return: tuple with first element a boolean indicating wheter dir
2601       creation was successful or not
2602
2603   """
2604   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2605   if os.path.exists(file_storage_dir):
2606     if not os.path.isdir(file_storage_dir):
2607       _Fail("Specified storage dir '%s' is not a directory",
2608             file_storage_dir)
2609   else:
2610     try:
2611       os.makedirs(file_storage_dir, 0750)
2612     except OSError, err:
2613       _Fail("Cannot create file storage directory '%s': %s",
2614             file_storage_dir, err, exc=True)
2615
2616
2617 def RemoveFileStorageDir(file_storage_dir):
2618   """Remove file storage directory.
2619
2620   Remove it only if it's empty. If not log an error and return.
2621
2622   @type file_storage_dir: str
2623   @param file_storage_dir: the directory we should cleanup
2624   @rtype: tuple (success,)
2625   @return: tuple of one element, C{success}, denoting
2626       whether the operation was successful
2627
2628   """
2629   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2630   if os.path.exists(file_storage_dir):
2631     if not os.path.isdir(file_storage_dir):
2632       _Fail("Specified Storage directory '%s' is not a directory",
2633             file_storage_dir)
2634     # deletes dir only if empty, otherwise we want to fail the rpc call
2635     try:
2636       os.rmdir(file_storage_dir)
2637     except OSError, err:
2638       _Fail("Cannot remove file storage directory '%s': %s",
2639             file_storage_dir, err)
2640
2641
2642 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2643   """Rename the file storage directory.
2644
2645   @type old_file_storage_dir: str
2646   @param old_file_storage_dir: the current path
2647   @type new_file_storage_dir: str
2648   @param new_file_storage_dir: the name we should rename to
2649   @rtype: tuple (success,)
2650   @return: tuple of one element, C{success}, denoting
2651       whether the operation was successful
2652
2653   """
2654   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2655   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2656   if not os.path.exists(new_file_storage_dir):
2657     if os.path.isdir(old_file_storage_dir):
2658       try:
2659         os.rename(old_file_storage_dir, new_file_storage_dir)
2660       except OSError, err:
2661         _Fail("Cannot rename '%s' to '%s': %s",
2662               old_file_storage_dir, new_file_storage_dir, err)
2663     else:
2664       _Fail("Specified storage dir '%s' is not a directory",
2665             old_file_storage_dir)
2666   else:
2667     if os.path.exists(old_file_storage_dir):
2668       _Fail("Cannot rename '%s' to '%s': both locations exist",
2669             old_file_storage_dir, new_file_storage_dir)
2670
2671
2672 def _EnsureJobQueueFile(file_name):
2673   """Checks whether the given filename is in the queue directory.
2674
2675   @type file_name: str
2676   @param file_name: the file name we should check
2677   @rtype: None
2678   @raises RPCFail: if the file is not valid
2679
2680   """
2681   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2682   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2683
2684   if not result:
2685     _Fail("Passed job queue file '%s' does not belong to"
2686           " the queue directory '%s'", file_name, queue_dir)
2687
2688
2689 def JobQueueUpdate(file_name, content):
2690   """Updates a file in the queue directory.
2691
2692   This is just a wrapper over L{utils.io.WriteFile}, with proper
2693   checking.
2694
2695   @type file_name: str
2696   @param file_name: the job file name
2697   @type content: str
2698   @param content: the new job contents
2699   @rtype: boolean
2700   @return: the success of the operation
2701
2702   """
2703   _EnsureJobQueueFile(file_name)
2704   getents = runtime.GetEnts()
2705
2706   # Write and replace the file atomically
2707   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2708                   gid=getents.masterd_gid)
2709
2710
2711 def JobQueueRename(old, new):
2712   """Renames a job queue file.
2713
2714   This is just a wrapper over os.rename with proper checking.
2715
2716   @type old: str
2717   @param old: the old (actual) file name
2718   @type new: str
2719   @param new: the desired file name
2720   @rtype: tuple
2721   @return: the success of the operation and payload
2722
2723   """
2724   _EnsureJobQueueFile(old)
2725   _EnsureJobQueueFile(new)
2726
2727   utils.RenameFile(old, new, mkdir=True)
2728
2729
2730 def BlockdevClose(instance_name, disks):
2731   """Closes the given block devices.
2732
2733   This means they will be switched to secondary mode (in case of
2734   DRBD).
2735
2736   @param instance_name: if the argument is not empty, the symlinks
2737       of this instance will be removed
2738   @type disks: list of L{objects.Disk}
2739   @param disks: the list of disks to be closed
2740   @rtype: tuple (success, message)
2741   @return: a tuple of success and message, where success
2742       indicates the succes of the operation, and message
2743       which will contain the error details in case we
2744       failed
2745
2746   """
2747   bdevs = []
2748   for cf in disks:
2749     rd = _RecursiveFindBD(cf)
2750     if rd is None:
2751       _Fail("Can't find device %s", cf)
2752     bdevs.append(rd)
2753
2754   msg = []
2755   for rd in bdevs:
2756     try:
2757       rd.Close()
2758     except errors.BlockDeviceError, err:
2759       msg.append(str(err))
2760   if msg:
2761     _Fail("Can't make devices secondary: %s", ",".join(msg))
2762   else:
2763     if instance_name:
2764       _RemoveBlockDevLinks(instance_name, disks)
2765
2766
2767 def ValidateHVParams(hvname, hvparams):
2768   """Validates the given hypervisor parameters.
2769
2770   @type hvname: string
2771   @param hvname: the hypervisor name
2772   @type hvparams: dict
2773   @param hvparams: the hypervisor parameters to be validated
2774   @rtype: None
2775
2776   """
2777   try:
2778     hv_type = hypervisor.GetHypervisor(hvname)
2779     hv_type.ValidateParameters(hvparams)
2780   except errors.HypervisorError, err:
2781     _Fail(str(err), log=False)
2782
2783
2784 def _CheckOSPList(os_obj, parameters):
2785   """Check whether a list of parameters is supported by the OS.
2786
2787   @type os_obj: L{objects.OS}
2788   @param os_obj: OS object to check
2789   @type parameters: list
2790   @param parameters: the list of parameters to check
2791
2792   """
2793   supported = [v[0] for v in os_obj.supported_parameters]
2794   delta = frozenset(parameters).difference(supported)
2795   if delta:
2796     _Fail("The following parameters are not supported"
2797           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2798
2799
2800 def ValidateOS(required, osname, checks, osparams):
2801   """Validate the given OS' parameters.
2802
2803   @type required: boolean
2804   @param required: whether absence of the OS should translate into
2805       failure or not
2806   @type osname: string
2807   @param osname: the OS to be validated
2808   @type checks: list
2809   @param checks: list of the checks to run (currently only 'parameters')
2810   @type osparams: dict
2811   @param osparams: dictionary with OS parameters
2812   @rtype: boolean
2813   @return: True if the validation passed, or False if the OS was not
2814       found and L{required} was false
2815
2816   """
2817   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2818     _Fail("Unknown checks required for OS %s: %s", osname,
2819           set(checks).difference(constants.OS_VALIDATE_CALLS))
2820
2821   name_only = objects.OS.GetName(osname)
2822   status, tbv = _TryOSFromDisk(name_only, None)
2823
2824   if not status:
2825     if required:
2826       _Fail(tbv)
2827     else:
2828       return False
2829
2830   if max(tbv.api_versions) < constants.OS_API_V20:
2831     return True
2832
2833   if constants.OS_VALIDATE_PARAMETERS in checks:
2834     _CheckOSPList(tbv, osparams.keys())
2835
2836   validate_env = OSCoreEnv(osname, tbv, osparams)
2837   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2838                         cwd=tbv.path, reset_env=True)
2839   if result.failed:
2840     logging.error("os validate command '%s' returned error: %s output: %s",
2841                   result.cmd, result.fail_reason, result.output)
2842     _Fail("OS validation script failed (%s), output: %s",
2843           result.fail_reason, result.output, log=False)
2844
2845   return True
2846
2847
2848 def DemoteFromMC():
2849   """Demotes the current node from master candidate role.
2850
2851   """
2852   # try to ensure we're not the master by mistake
2853   master, myself = ssconf.GetMasterAndMyself()
2854   if master == myself:
2855     _Fail("ssconf status shows I'm the master node, will not demote")
2856
2857   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2858   if not result.failed:
2859     _Fail("The master daemon is running, will not demote")
2860
2861   try:
2862     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2863       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2864   except EnvironmentError, err:
2865     if err.errno != errno.ENOENT:
2866       _Fail("Error while backing up cluster file: %s", err, exc=True)
2867
2868   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2869
2870
2871 def _GetX509Filenames(cryptodir, name):
2872   """Returns the full paths for the private key and certificate.
2873
2874   """
2875   return (utils.PathJoin(cryptodir, name),
2876           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2877           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2878
2879
2880 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2881   """Creates a new X509 certificate for SSL/TLS.
2882
2883   @type validity: int
2884   @param validity: Validity in seconds
2885   @rtype: tuple; (string, string)
2886   @return: Certificate name and public part
2887
2888   """
2889   (key_pem, cert_pem) = \
2890     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2891                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2892
2893   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2894                               prefix="x509-%s-" % utils.TimestampForFilename())
2895   try:
2896     name = os.path.basename(cert_dir)
2897     assert len(name) > 5
2898
2899     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2900
2901     utils.WriteFile(key_file, mode=0400, data=key_pem)
2902     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2903
2904     # Never return private key as it shouldn't leave the node
2905     return (name, cert_pem)
2906   except Exception:
2907     shutil.rmtree(cert_dir, ignore_errors=True)
2908     raise
2909
2910
2911 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2912   """Removes a X509 certificate.
2913
2914   @type name: string
2915   @param name: Certificate name
2916
2917   """
2918   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2919
2920   utils.RemoveFile(key_file)
2921   utils.RemoveFile(cert_file)
2922
2923   try:
2924     os.rmdir(cert_dir)
2925   except EnvironmentError, err:
2926     _Fail("Cannot remove certificate directory '%s': %s",
2927           cert_dir, err)
2928
2929
2930 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2931   """Returns the command for the requested input/output.
2932
2933   @type instance: L{objects.Instance}
2934   @param instance: The instance object
2935   @param mode: Import/export mode
2936   @param ieio: Input/output type
2937   @param ieargs: Input/output arguments
2938
2939   """
2940   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2941
2942   env = None
2943   prefix = None
2944   suffix = None
2945   exp_size = None
2946
2947   if ieio == constants.IEIO_FILE:
2948     (filename, ) = ieargs
2949
2950     if not utils.IsNormAbsPath(filename):
2951       _Fail("Path '%s' is not normalized or absolute", filename)
2952
2953     real_filename = os.path.realpath(filename)
2954     directory = os.path.dirname(real_filename)
2955
2956     if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2957       _Fail("File '%s' is not under exports directory '%s': %s",
2958             filename, constants.EXPORT_DIR, real_filename)
2959
2960     # Create directory
2961     utils.Makedirs(directory, mode=0750)
2962
2963     quoted_filename = utils.ShellQuote(filename)
2964
2965     if mode == constants.IEM_IMPORT:
2966       suffix = "> %s" % quoted_filename
2967     elif mode == constants.IEM_EXPORT:
2968       suffix = "< %s" % quoted_filename
2969
2970       # Retrieve file size
2971       try:
2972         st = os.stat(filename)
2973       except EnvironmentError, err:
2974         logging.error("Can't stat(2) %s: %s", filename, err)
2975       else:
2976         exp_size = utils.BytesToMebibyte(st.st_size)
2977
2978   elif ieio == constants.IEIO_RAW_DISK:
2979     (disk, ) = ieargs
2980
2981     real_disk = _OpenRealBD(disk)
2982
2983     if mode == constants.IEM_IMPORT:
2984       # we set here a smaller block size as, due to transport buffering, more
2985       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2986       # is not already there or we pass a wrong path; we use notrunc to no
2987       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2988       # much memory; this means that at best, we flush every 64k, which will
2989       # not be very fast
2990       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2991                                     " bs=%s oflag=dsync"),
2992                                     real_disk.dev_path,
2993                                     str(64 * 1024))
2994
2995     elif mode == constants.IEM_EXPORT:
2996       # the block size on the read dd is 1MiB to match our units
2997       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2998                                    real_disk.dev_path,
2999                                    str(1024 * 1024), # 1 MB
3000                                    str(disk.size))
3001       exp_size = disk.size
3002
3003   elif ieio == constants.IEIO_SCRIPT:
3004     (disk, disk_index, ) = ieargs
3005
3006     assert isinstance(disk_index, (int, long))
3007
3008     real_disk = _OpenRealBD(disk)
3009
3010     inst_os = OSFromDisk(instance.os)
3011     env = OSEnvironment(instance, inst_os)
3012
3013     if mode == constants.IEM_IMPORT:
3014       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3015       env["IMPORT_INDEX"] = str(disk_index)
3016       script = inst_os.import_script
3017
3018     elif mode == constants.IEM_EXPORT:
3019       env["EXPORT_DEVICE"] = real_disk.dev_path
3020       env["EXPORT_INDEX"] = str(disk_index)
3021       script = inst_os.export_script
3022
3023     # TODO: Pass special environment only to script
3024     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3025
3026     if mode == constants.IEM_IMPORT:
3027       suffix = "| %s" % script_cmd
3028
3029     elif mode == constants.IEM_EXPORT:
3030       prefix = "%s |" % script_cmd
3031
3032     # Let script predict size
3033     exp_size = constants.IE_CUSTOM_SIZE
3034
3035   else:
3036     _Fail("Invalid %s I/O mode %r", mode, ieio)
3037
3038   return (env, prefix, suffix, exp_size)
3039
3040
3041 def _CreateImportExportStatusDir(prefix):
3042   """Creates status directory for import/export.
3043
3044   """
3045   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3046                           prefix=("%s-%s-" %
3047                                   (prefix, utils.TimestampForFilename())))
3048
3049
3050 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3051                             ieio, ieioargs):
3052   """Starts an import or export daemon.
3053
3054   @param mode: Import/output mode
3055   @type opts: L{objects.ImportExportOptions}
3056   @param opts: Daemon options
3057   @type host: string
3058   @param host: Remote host for export (None for import)
3059   @type port: int
3060   @param port: Remote port for export (None for import)
3061   @type instance: L{objects.Instance}
3062   @param instance: Instance object
3063   @type component: string
3064   @param component: which part of the instance is transferred now,
3065       e.g. 'disk/0'
3066   @param ieio: Input/output type
3067   @param ieioargs: Input/output arguments
3068
3069   """
3070   if mode == constants.IEM_IMPORT:
3071     prefix = "import"
3072
3073     if not (host is None and port is None):
3074       _Fail("Can not specify host or port on import")
3075
3076   elif mode == constants.IEM_EXPORT:
3077     prefix = "export"
3078
3079     if host is None or port is None:
3080       _Fail("Host and port must be specified for an export")
3081
3082   else:
3083     _Fail("Invalid mode %r", mode)
3084
3085   if (opts.key_name is None) ^ (opts.ca_pem is None):
3086     _Fail("Cluster certificate can only be used for both key and CA")
3087
3088   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3089     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3090
3091   if opts.key_name is None:
3092     # Use server.pem
3093     key_path = constants.NODED_CERT_FILE
3094     cert_path = constants.NODED_CERT_FILE
3095     assert opts.ca_pem is None
3096   else:
3097     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3098                                                  opts.key_name)
3099     assert opts.ca_pem is not None
3100
3101   for i in [key_path, cert_path]:
3102     if not os.path.exists(i):
3103       _Fail("File '%s' does not exist" % i)
3104
3105   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3106   try:
3107     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3108     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3109     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3110
3111     if opts.ca_pem is None:
3112       # Use server.pem
3113       ca = utils.ReadFile(constants.NODED_CERT_FILE)
3114     else:
3115       ca = opts.ca_pem
3116
3117     # Write CA file
3118     utils.WriteFile(ca_file, data=ca, mode=0400)
3119
3120     cmd = [
3121       constants.IMPORT_EXPORT_DAEMON,
3122       status_file, mode,
3123       "--key=%s" % key_path,
3124       "--cert=%s" % cert_path,
3125       "--ca=%s" % ca_file,
3126       ]
3127
3128     if host:
3129       cmd.append("--host=%s" % host)
3130
3131     if port:
3132       cmd.append("--port=%s" % port)
3133
3134     if opts.ipv6:
3135       cmd.append("--ipv6")
3136     else:
3137       cmd.append("--ipv4")
3138
3139     if opts.compress:
3140       cmd.append("--compress=%s" % opts.compress)
3141
3142     if opts.magic:
3143       cmd.append("--magic=%s" % opts.magic)
3144
3145     if exp_size is not None:
3146       cmd.append("--expected-size=%s" % exp_size)
3147
3148     if cmd_prefix:
3149       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3150
3151     if cmd_suffix:
3152       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3153
3154     if mode == constants.IEM_EXPORT:
3155       # Retry connection a few times when connecting to remote peer
3156       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3157       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3158     elif opts.connect_timeout is not None:
3159       assert mode == constants.IEM_IMPORT
3160       # Overall timeout for establishing connection while listening
3161       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3162
3163     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3164
3165     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3166     # support for receiving a file descriptor for output
3167     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3168                       output=logfile)
3169
3170     # The import/export name is simply the status directory name
3171     return os.path.basename(status_dir)
3172
3173   except Exception:
3174     shutil.rmtree(status_dir, ignore_errors=True)
3175     raise
3176
3177
3178 def GetImportExportStatus(names):
3179   """Returns import/export daemon status.
3180
3181   @type names: sequence
3182   @param names: List of names
3183   @rtype: List of dicts
3184   @return: Returns a list of the state of each named import/export or None if a
3185            status couldn't be read
3186
3187   """
3188   result = []
3189
3190   for name in names:
3191     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3192                                  _IES_STATUS_FILE)
3193
3194     try:
3195       data = utils.ReadFile(status_file)
3196     except EnvironmentError, err:
3197       if err.errno != errno.ENOENT:
3198         raise
3199       data = None
3200
3201     if not data:
3202       result.append(None)
3203       continue
3204
3205     result.append(serializer.LoadJson(data))
3206
3207   return result
3208
3209
3210 def AbortImportExport(name):
3211   """Sends SIGTERM to a running import/export daemon.
3212
3213   """
3214   logging.info("Abort import/export %s", name)
3215
3216   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3217   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3218
3219   if pid:
3220     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3221                  name, pid)
3222     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3223
3224
3225 def CleanupImportExport(name):
3226   """Cleanup after an import or export.
3227
3228   If the import/export daemon is still running it's killed. Afterwards the
3229   whole status directory is removed.
3230
3231   """
3232   logging.info("Finalizing import/export %s", name)
3233
3234   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3235
3236   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3237
3238   if pid:
3239     logging.info("Import/export %s is still running with PID %s",
3240                  name, pid)
3241     utils.KillProcess(pid, waitpid=False)
3242
3243   shutil.rmtree(status_dir, ignore_errors=True)
3244
3245
3246 def _FindDisks(nodes_ip, disks):
3247   """Sets the physical ID on disks and returns the block devices.
3248
3249   """
3250   # set the correct physical ID
3251   my_name = netutils.Hostname.GetSysName()
3252   for cf in disks:
3253     cf.SetPhysicalID(my_name, nodes_ip)
3254
3255   bdevs = []
3256
3257   for cf in disks:
3258     rd = _RecursiveFindBD(cf)
3259     if rd is None:
3260       _Fail("Can't find device %s", cf)
3261     bdevs.append(rd)
3262   return bdevs
3263
3264
3265 def DrbdDisconnectNet(nodes_ip, disks):
3266   """Disconnects the network on a list of drbd devices.
3267
3268   """
3269   bdevs = _FindDisks(nodes_ip, disks)
3270
3271   # disconnect disks
3272   for rd in bdevs:
3273     try:
3274       rd.DisconnectNet()
3275     except errors.BlockDeviceError, err:
3276       _Fail("Can't change network configuration to standalone mode: %s",
3277             err, exc=True)
3278
3279
3280 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3281   """Attaches the network on a list of drbd devices.
3282
3283   """
3284   bdevs = _FindDisks(nodes_ip, disks)
3285
3286   if multimaster:
3287     for idx, rd in enumerate(bdevs):
3288       try:
3289         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3290       except EnvironmentError, err:
3291         _Fail("Can't create symlink: %s", err)
3292   # reconnect disks, switch to new master configuration and if
3293   # needed primary mode
3294   for rd in bdevs:
3295     try:
3296       rd.AttachNet(multimaster)
3297     except errors.BlockDeviceError, err:
3298       _Fail("Can't change network configuration: %s", err)
3299
3300   # wait until the disks are connected; we need to retry the re-attach
3301   # if the device becomes standalone, as this might happen if the one
3302   # node disconnects and reconnects in a different mode before the
3303   # other node reconnects; in this case, one or both of the nodes will
3304   # decide it has wrong configuration and switch to standalone
3305
3306   def _Attach():
3307     all_connected = True
3308
3309     for rd in bdevs:
3310       stats = rd.GetProcStatus()
3311
3312       all_connected = (all_connected and
3313                        (stats.is_connected or stats.is_in_resync))
3314
3315       if stats.is_standalone:
3316         # peer had different config info and this node became
3317         # standalone, even though this should not happen with the
3318         # new staged way of changing disk configs
3319         try:
3320           rd.AttachNet(multimaster)
3321         except errors.BlockDeviceError, err:
3322           _Fail("Can't change network configuration: %s", err)
3323
3324     if not all_connected:
3325       raise utils.RetryAgain()
3326
3327   try:
3328     # Start with a delay of 100 miliseconds and go up to 5 seconds
3329     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3330   except utils.RetryTimeout:
3331     _Fail("Timeout in disk reconnecting")
3332
3333   if multimaster:
3334     # change to primary mode
3335     for rd in bdevs:
3336       try:
3337         rd.Open()
3338       except errors.BlockDeviceError, err:
3339         _Fail("Can't change to primary mode: %s", err)
3340
3341
3342 def DrbdWaitSync(nodes_ip, disks):
3343   """Wait until DRBDs have synchronized.
3344
3345   """
3346   def _helper(rd):
3347     stats = rd.GetProcStatus()
3348     if not (stats.is_connected or stats.is_in_resync):
3349       raise utils.RetryAgain()
3350     return stats
3351
3352   bdevs = _FindDisks(nodes_ip, disks)
3353
3354   min_resync = 100
3355   alldone = True
3356   for rd in bdevs:
3357     try:
3358       # poll each second for 15 seconds
3359       stats = utils.Retry(_helper, 1, 15, args=[rd])
3360     except utils.RetryTimeout:
3361       stats = rd.GetProcStatus()
3362       # last check
3363       if not (stats.is_connected or stats.is_in_resync):
3364         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3365     alldone = alldone and (not stats.is_in_resync)
3366     if stats.sync_percent is not None:
3367       min_resync = min(min_resync, stats.sync_percent)
3368
3369   return (alldone, min_resync)
3370
3371
3372 def GetDrbdUsermodeHelper():
3373   """Returns DRBD usermode helper currently configured.
3374
3375   """
3376   try:
3377     return bdev.BaseDRBD.GetUsermodeHelper()
3378   except errors.BlockDeviceError, err:
3379     _Fail(str(err))
3380
3381
3382 def PowercycleNode(hypervisor_type):
3383   """Hard-powercycle the node.
3384
3385   Because we need to return first, and schedule the powercycle in the
3386   background, we won't be able to report failures nicely.
3387
3388   """
3389   hyper = hypervisor.GetHypervisor(hypervisor_type)
3390   try:
3391     pid = os.fork()
3392   except OSError:
3393     # if we can't fork, we'll pretend that we're in the child process
3394     pid = 0
3395   if pid > 0:
3396     return "Reboot scheduled in 5 seconds"
3397   # ensure the child is running on ram
3398   try:
3399     utils.Mlockall()
3400   except Exception: # pylint: disable=W0703
3401     pass
3402   time.sleep(5)
3403   hyper.PowercycleNode()
3404
3405
3406 class HooksRunner(object):
3407   """Hook runner.
3408
3409   This class is instantiated on the node side (ganeti-noded) and not
3410   on the master side.
3411
3412   """
3413   def __init__(self, hooks_base_dir=None):
3414     """Constructor for hooks runner.
3415
3416     @type hooks_base_dir: str or None
3417     @param hooks_base_dir: if not None, this overrides the
3418         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3419
3420     """
3421     if hooks_base_dir is None:
3422       hooks_base_dir = constants.HOOKS_BASE_DIR
3423     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3424     # constant
3425     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3426
3427   def RunHooks(self, hpath, phase, env):
3428     """Run the scripts in the hooks directory.
3429
3430     @type hpath: str
3431     @param hpath: the path to the hooks directory which
3432         holds the scripts
3433     @type phase: str
3434     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3435         L{constants.HOOKS_PHASE_POST}
3436     @type env: dict
3437     @param env: dictionary with the environment for the hook
3438     @rtype: list
3439     @return: list of 3-element tuples:
3440       - script path
3441       - script result, either L{constants.HKR_SUCCESS} or
3442         L{constants.HKR_FAIL}
3443       - output of the script
3444
3445     @raise errors.ProgrammerError: for invalid input
3446         parameters
3447
3448     """
3449     if phase == constants.HOOKS_PHASE_PRE:
3450       suffix = "pre"
3451     elif phase == constants.HOOKS_PHASE_POST:
3452       suffix = "post"
3453     else:
3454       _Fail("Unknown hooks phase '%s'", phase)
3455
3456     subdir = "%s-%s.d" % (hpath, suffix)
3457     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3458
3459     results = []
3460
3461     if not os.path.isdir(dir_name):
3462       # for non-existing/non-dirs, we simply exit instead of logging a
3463       # warning at every operation
3464       return results
3465
3466     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3467
3468     for (relname, relstatus, runresult)  in runparts_results:
3469       if relstatus == constants.RUNPARTS_SKIP:
3470         rrval = constants.HKR_SKIP
3471         output = ""
3472       elif relstatus == constants.RUNPARTS_ERR:
3473         rrval = constants.HKR_FAIL
3474         output = "Hook script execution error: %s" % runresult
3475       elif relstatus == constants.RUNPARTS_RUN:
3476         if runresult.failed:
3477           rrval = constants.HKR_FAIL
3478         else:
3479           rrval = constants.HKR_SUCCESS
3480         output = utils.SafeEncode(runresult.output.strip())
3481       results.append(("%s/%s" % (subdir, relname), rrval, output))
3482
3483     return results
3484
3485
3486 class IAllocatorRunner(object):
3487   """IAllocator runner.
3488
3489   This class is instantiated on the node side (ganeti-noded) and not on
3490   the master side.
3491
3492   """
3493   @staticmethod
3494   def Run(name, idata):
3495     """Run an iallocator script.
3496
3497     @type name: str
3498     @param name: the iallocator script name
3499     @type idata: str
3500     @param idata: the allocator input data
3501
3502     @rtype: tuple
3503     @return: two element tuple of:
3504        - status
3505        - either error message or stdout of allocator (for success)
3506
3507     """
3508     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3509                                   os.path.isfile)
3510     if alloc_script is None:
3511       _Fail("iallocator module '%s' not found in the search path", name)
3512
3513     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3514     try:
3515       os.write(fd, idata)
3516       os.close(fd)
3517       result = utils.RunCmd([alloc_script, fin_name])
3518       if result.failed:
3519         _Fail("iallocator module '%s' failed: %s, output '%s'",
3520               name, result.fail_reason, result.output)
3521     finally:
3522       os.unlink(fin_name)
3523
3524     return result.stdout
3525
3526
3527 class DevCacheManager(object):
3528   """Simple class for managing a cache of block device information.
3529
3530   """
3531   _DEV_PREFIX = "/dev/"
3532   _ROOT_DIR = constants.BDEV_CACHE_DIR
3533
3534   @classmethod
3535   def _ConvertPath(cls, dev_path):
3536     """Converts a /dev/name path to the cache file name.
3537
3538     This replaces slashes with underscores and strips the /dev
3539     prefix. It then returns the full path to the cache file.
3540
3541     @type dev_path: str
3542     @param dev_path: the C{/dev/} path name
3543     @rtype: str
3544     @return: the converted path name
3545
3546     """
3547     if dev_path.startswith(cls._DEV_PREFIX):
3548       dev_path = dev_path[len(cls._DEV_PREFIX):]
3549     dev_path = dev_path.replace("/", "_")
3550     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3551     return fpath
3552
3553   @classmethod
3554   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3555     """Updates the cache information for a given device.
3556
3557     @type dev_path: str
3558     @param dev_path: the pathname of the device
3559     @type owner: str
3560     @param owner: the owner (instance name) of the device
3561     @type on_primary: bool
3562     @param on_primary: whether this is the primary
3563         node nor not
3564     @type iv_name: str
3565     @param iv_name: the instance-visible name of the
3566         device, as in objects.Disk.iv_name
3567
3568     @rtype: None
3569
3570     """
3571     if dev_path is None:
3572       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3573       return
3574     fpath = cls._ConvertPath(dev_path)
3575     if on_primary:
3576       state = "primary"
3577     else:
3578       state = "secondary"
3579     if iv_name is None:
3580       iv_name = "not_visible"
3581     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3582     try:
3583       utils.WriteFile(fpath, data=fdata)
3584     except EnvironmentError, err:
3585       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3586
3587   @classmethod
3588   def RemoveCache(cls, dev_path):
3589     """Remove data for a dev_path.
3590
3591     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3592     path name and logging.
3593
3594     @type dev_path: str
3595     @param dev_path: the pathname of the device
3596
3597     @rtype: None
3598
3599     """
3600     if dev_path is None:
3601       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3602       return
3603     fpath = cls._ConvertPath(dev_path)
3604     try:
3605       utils.RemoveFile(fpath)
3606     except EnvironmentError, err:
3607       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)