Merge branch 'stable-2.5' into devel-2.5
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.SPICE_CERT_FILE,
200     constants.SPICE_CACERT_FILE,
201     constants.RAPI_USERS_FILE,
202     constants.CONFD_HMAC_KEY,
203     constants.CLUSTER_DOMAIN_SECRET_FILE,
204     ])
205
206   for hv_name in constants.HYPER_TYPES:
207     hv_class = hypervisor.GetHypervisorClass(hv_name)
208     allowed_files.update(hv_class.GetAncillaryFiles())
209
210   return frozenset(allowed_files)
211
212
213 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
214
215
216 def JobQueuePurge():
217   """Removes job queue files and archived jobs.
218
219   @rtype: tuple
220   @return: True, None
221
222   """
223   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
225
226
227 def GetMasterInfo():
228   """Returns master information.
229
230   This is an utility function to compute master information, either
231   for consumption here or from the node daemon.
232
233   @rtype: tuple
234   @return: master_netdev, master_ip, master_name, primary_ip_family
235   @raise RPCFail: in case of errors
236
237   """
238   try:
239     cfg = _GetConfig()
240     master_netdev = cfg.GetMasterNetdev()
241     master_ip = cfg.GetMasterIP()
242     master_node = cfg.GetMasterNode()
243     primary_ip_family = cfg.GetPrimaryIPFamily()
244   except errors.ConfigurationError, err:
245     _Fail("Cluster configuration incomplete: %s", err, exc=True)
246   return (master_netdev, master_ip, master_node, primary_ip_family)
247
248
249 def ActivateMasterIp():
250   """Activate the IP address of the master daemon.
251
252   """
253   # GetMasterInfo will raise an exception if not able to return data
254   master_netdev, master_ip, _, family = GetMasterInfo()
255
256   err_msg = None
257   if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
258     if netutils.IPAddress.Own(master_ip):
259       # we already have the ip:
260       logging.debug("Master IP already configured, doing nothing")
261     else:
262       err_msg = "Someone else has the master ip, not activating"
263       logging.error(err_msg)
264   else:
265     ipcls = netutils.IP4Address
266     if family == netutils.IP6Address.family:
267       ipcls = netutils.IP6Address
268
269     result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
270                            "%s/%d" % (master_ip, ipcls.iplen),
271                            "dev", master_netdev, "label",
272                            "%s:0" % master_netdev])
273     if result.failed:
274       err_msg = "Can't activate master IP: %s" % result.output
275       logging.error(err_msg)
276
277     # we ignore the exit code of the following cmds
278     if ipcls == netutils.IP4Address:
279       utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
280                     master_ip, master_ip])
281     elif ipcls == netutils.IP6Address:
282       try:
283         utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
284       except errors.OpExecError:
285         # TODO: Better error reporting
286         logging.warning("Can't execute ndisc6, please install if missing")
287
288   if err_msg:
289     _Fail(err_msg)
290
291
292 def StartMasterDaemons(no_voting):
293   """Activate local node as master node.
294
295   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
296
297   @type no_voting: boolean
298   @param no_voting: whether to start ganeti-masterd without a node vote
299       but still non-interactively
300   @rtype: None
301
302   """
303
304   if no_voting:
305     masterd_args = "--no-voting --yes-do-it"
306   else:
307     masterd_args = ""
308
309   env = {
310     "EXTRA_MASTERD_ARGS": masterd_args,
311     }
312
313   result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
314   if result.failed:
315     msg = "Can't start Ganeti master: %s" % result.output
316     logging.error(msg)
317     _Fail(msg)
318
319
320 def DeactivateMasterIp():
321   """Deactivate the master IP on this node.
322
323   """
324   # TODO: log and report back to the caller the error failures; we
325   # need to decide in which case we fail the RPC for this
326
327   # GetMasterInfo will raise an exception if not able to return data
328   master_netdev, master_ip, _, family = GetMasterInfo()
329
330   ipcls = netutils.IP4Address
331   if family == netutils.IP6Address.family:
332     ipcls = netutils.IP6Address
333
334   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
335                          "%s/%d" % (master_ip, ipcls.iplen),
336                          "dev", master_netdev])
337   if result.failed:
338     logging.error("Can't remove the master IP, error: %s", result.output)
339     # but otherwise ignore the failure
340
341
342 def StopMasterDaemons():
343   """Stop the master daemons on this node.
344
345   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
346
347   @rtype: None
348
349   """
350   # TODO: log and report back to the caller the error failures; we
351   # need to decide in which case we fail the RPC for this
352
353   result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
354   if result.failed:
355     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
356                   " and error %s",
357                   result.cmd, result.exit_code, result.output)
358
359
360 def EtcHostsModify(mode, host, ip):
361   """Modify a host entry in /etc/hosts.
362
363   @param mode: The mode to operate. Either add or remove entry
364   @param host: The host to operate on
365   @param ip: The ip associated with the entry
366
367   """
368   if mode == constants.ETC_HOSTS_ADD:
369     if not ip:
370       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
371               " present")
372     utils.AddHostToEtcHosts(host, ip)
373   elif mode == constants.ETC_HOSTS_REMOVE:
374     if ip:
375       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
376               " parameter is present")
377     utils.RemoveHostFromEtcHosts(host)
378   else:
379     RPCFail("Mode not supported")
380
381
382 def LeaveCluster(modify_ssh_setup):
383   """Cleans up and remove the current node.
384
385   This function cleans up and prepares the current node to be removed
386   from the cluster.
387
388   If processing is successful, then it raises an
389   L{errors.QuitGanetiException} which is used as a special case to
390   shutdown the node daemon.
391
392   @param modify_ssh_setup: boolean
393
394   """
395   _CleanDirectory(constants.DATA_DIR)
396   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
397   JobQueuePurge()
398
399   if modify_ssh_setup:
400     try:
401       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
402
403       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
404
405       utils.RemoveFile(priv_key)
406       utils.RemoveFile(pub_key)
407     except errors.OpExecError:
408       logging.exception("Error while processing ssh files")
409
410   try:
411     utils.RemoveFile(constants.CONFD_HMAC_KEY)
412     utils.RemoveFile(constants.RAPI_CERT_FILE)
413     utils.RemoveFile(constants.SPICE_CERT_FILE)
414     utils.RemoveFile(constants.SPICE_CACERT_FILE)
415     utils.RemoveFile(constants.NODED_CERT_FILE)
416   except: # pylint: disable=W0702
417     logging.exception("Error while removing cluster secrets")
418
419   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
420   if result.failed:
421     logging.error("Command %s failed with exitcode %s and error %s",
422                   result.cmd, result.exit_code, result.output)
423
424   # Raise a custom exception (handled in ganeti-noded)
425   raise errors.QuitGanetiException(True, "Shutdown scheduled")
426
427
428 def GetNodeInfo(vgname, hypervisor_type):
429   """Gives back a hash with different information about the node.
430
431   @type vgname: C{string}
432   @param vgname: the name of the volume group to ask for disk space information
433   @type hypervisor_type: C{str}
434   @param hypervisor_type: the name of the hypervisor to ask for
435       memory information
436   @rtype: C{dict}
437   @return: dictionary with the following keys:
438       - vg_size is the size of the configured volume group in MiB
439       - vg_free is the free size of the volume group in MiB
440       - memory_dom0 is the memory allocated for domain0 in MiB
441       - memory_free is the currently available (free) ram in MiB
442       - memory_total is the total number of ram in MiB
443       - hv_version: the hypervisor version, if available
444
445   """
446   outputarray = {}
447
448   if vgname is not None:
449     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
450     vg_free = vg_size = None
451     if vginfo:
452       vg_free = int(round(vginfo[0][0], 0))
453       vg_size = int(round(vginfo[0][1], 0))
454     outputarray["vg_size"] = vg_size
455     outputarray["vg_free"] = vg_free
456
457   if hypervisor_type is not None:
458     hyper = hypervisor.GetHypervisor(hypervisor_type)
459     hyp_info = hyper.GetNodeInfo()
460     if hyp_info is not None:
461       outputarray.update(hyp_info)
462
463   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
464
465   return outputarray
466
467
468 def VerifyNode(what, cluster_name):
469   """Verify the status of the local node.
470
471   Based on the input L{what} parameter, various checks are done on the
472   local node.
473
474   If the I{filelist} key is present, this list of
475   files is checksummed and the file/checksum pairs are returned.
476
477   If the I{nodelist} key is present, we check that we have
478   connectivity via ssh with the target nodes (and check the hostname
479   report).
480
481   If the I{node-net-test} key is present, we check that we have
482   connectivity to the given nodes via both primary IP and, if
483   applicable, secondary IPs.
484
485   @type what: C{dict}
486   @param what: a dictionary of things to check:
487       - filelist: list of files for which to compute checksums
488       - nodelist: list of nodes we should check ssh communication with
489       - node-net-test: list of nodes we should check node daemon port
490         connectivity with
491       - hypervisor: list with hypervisors to run the verify for
492   @rtype: dict
493   @return: a dictionary with the same keys as the input dict, and
494       values representing the result of the checks
495
496   """
497   result = {}
498   my_name = netutils.Hostname.GetSysName()
499   port = netutils.GetDaemonPort(constants.NODED)
500   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
501
502   if constants.NV_HYPERVISOR in what and vm_capable:
503     result[constants.NV_HYPERVISOR] = tmp = {}
504     for hv_name in what[constants.NV_HYPERVISOR]:
505       try:
506         val = hypervisor.GetHypervisor(hv_name).Verify()
507       except errors.HypervisorError, err:
508         val = "Error while checking hypervisor: %s" % str(err)
509       tmp[hv_name] = val
510
511   if constants.NV_HVPARAMS in what and vm_capable:
512     result[constants.NV_HVPARAMS] = tmp = []
513     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
514       try:
515         logging.info("Validating hv %s, %s", hv_name, hvparms)
516         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
517       except errors.HypervisorError, err:
518         tmp.append((source, hv_name, str(err)))
519
520   if constants.NV_FILELIST in what:
521     result[constants.NV_FILELIST] = utils.FingerprintFiles(
522       what[constants.NV_FILELIST])
523
524   if constants.NV_NODELIST in what:
525     (nodes, bynode) = what[constants.NV_NODELIST]
526
527     # Add nodes from other groups (different for each node)
528     try:
529       nodes.extend(bynode[my_name])
530     except KeyError:
531       pass
532
533     # Use a random order
534     random.shuffle(nodes)
535
536     # Try to contact all nodes
537     val = {}
538     for node in nodes:
539       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
540       if not success:
541         val[node] = message
542
543     result[constants.NV_NODELIST] = val
544
545   if constants.NV_NODENETTEST in what:
546     result[constants.NV_NODENETTEST] = tmp = {}
547     my_pip = my_sip = None
548     for name, pip, sip in what[constants.NV_NODENETTEST]:
549       if name == my_name:
550         my_pip = pip
551         my_sip = sip
552         break
553     if not my_pip:
554       tmp[my_name] = ("Can't find my own primary/secondary IP"
555                       " in the node list")
556     else:
557       for name, pip, sip in what[constants.NV_NODENETTEST]:
558         fail = []
559         if not netutils.TcpPing(pip, port, source=my_pip):
560           fail.append("primary")
561         if sip != pip:
562           if not netutils.TcpPing(sip, port, source=my_sip):
563             fail.append("secondary")
564         if fail:
565           tmp[name] = ("failure using the %s interface(s)" %
566                        " and ".join(fail))
567
568   if constants.NV_MASTERIP in what:
569     # FIXME: add checks on incoming data structures (here and in the
570     # rest of the function)
571     master_name, master_ip = what[constants.NV_MASTERIP]
572     if master_name == my_name:
573       source = constants.IP4_ADDRESS_LOCALHOST
574     else:
575       source = None
576     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
577                                                   source=source)
578
579   if constants.NV_OOB_PATHS in what:
580     result[constants.NV_OOB_PATHS] = tmp = []
581     for path in what[constants.NV_OOB_PATHS]:
582       try:
583         st = os.stat(path)
584       except OSError, err:
585         tmp.append("error stating out of band helper: %s" % err)
586       else:
587         if stat.S_ISREG(st.st_mode):
588           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
589             tmp.append(None)
590           else:
591             tmp.append("out of band helper %s is not executable" % path)
592         else:
593           tmp.append("out of band helper %s is not a file" % path)
594
595   if constants.NV_LVLIST in what and vm_capable:
596     try:
597       val = GetVolumeList(utils.ListVolumeGroups().keys())
598     except RPCFail, err:
599       val = str(err)
600     result[constants.NV_LVLIST] = val
601
602   if constants.NV_INSTANCELIST in what and vm_capable:
603     # GetInstanceList can fail
604     try:
605       val = GetInstanceList(what[constants.NV_INSTANCELIST])
606     except RPCFail, err:
607       val = str(err)
608     result[constants.NV_INSTANCELIST] = val
609
610   if constants.NV_VGLIST in what and vm_capable:
611     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
612
613   if constants.NV_PVLIST in what and vm_capable:
614     result[constants.NV_PVLIST] = \
615       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
616                                    filter_allocatable=False)
617
618   if constants.NV_VERSION in what:
619     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
620                                     constants.RELEASE_VERSION)
621
622   if constants.NV_HVINFO in what and vm_capable:
623     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
624     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
625
626   if constants.NV_DRBDLIST in what and vm_capable:
627     try:
628       used_minors = bdev.DRBD8.GetUsedDevs().keys()
629     except errors.BlockDeviceError, err:
630       logging.warning("Can't get used minors list", exc_info=True)
631       used_minors = str(err)
632     result[constants.NV_DRBDLIST] = used_minors
633
634   if constants.NV_DRBDHELPER in what and vm_capable:
635     status = True
636     try:
637       payload = bdev.BaseDRBD.GetUsermodeHelper()
638     except errors.BlockDeviceError, err:
639       logging.error("Can't get DRBD usermode helper: %s", str(err))
640       status = False
641       payload = str(err)
642     result[constants.NV_DRBDHELPER] = (status, payload)
643
644   if constants.NV_NODESETUP in what:
645     result[constants.NV_NODESETUP] = tmpr = []
646     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
647       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
648                   " under /sys, missing required directories /sys/block"
649                   " and /sys/class/net")
650     if (not os.path.isdir("/proc/sys") or
651         not os.path.isfile("/proc/sysrq-trigger")):
652       tmpr.append("The procfs filesystem doesn't seem to be mounted"
653                   " under /proc, missing required directory /proc/sys and"
654                   " the file /proc/sysrq-trigger")
655
656   if constants.NV_TIME in what:
657     result[constants.NV_TIME] = utils.SplitTime(time.time())
658
659   if constants.NV_OSLIST in what and vm_capable:
660     result[constants.NV_OSLIST] = DiagnoseOS()
661
662   if constants.NV_BRIDGES in what and vm_capable:
663     result[constants.NV_BRIDGES] = [bridge
664                                     for bridge in what[constants.NV_BRIDGES]
665                                     if not utils.BridgeExists(bridge)]
666   return result
667
668
669 def GetBlockDevSizes(devices):
670   """Return the size of the given block devices
671
672   @type devices: list
673   @param devices: list of block device nodes to query
674   @rtype: dict
675   @return:
676     dictionary of all block devices under /dev (key). The value is their
677     size in MiB.
678
679     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
680
681   """
682   DEV_PREFIX = "/dev/"
683   blockdevs = {}
684
685   for devpath in devices:
686     if not utils.IsBelowDir(DEV_PREFIX, devpath):
687       continue
688
689     try:
690       st = os.stat(devpath)
691     except EnvironmentError, err:
692       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
693       continue
694
695     if stat.S_ISBLK(st.st_mode):
696       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
697       if result.failed:
698         # We don't want to fail, just do not list this device as available
699         logging.warning("Cannot get size for block device %s", devpath)
700         continue
701
702       size = int(result.stdout) / (1024 * 1024)
703       blockdevs[devpath] = size
704   return blockdevs
705
706
707 def GetVolumeList(vg_names):
708   """Compute list of logical volumes and their size.
709
710   @type vg_names: list
711   @param vg_names: the volume groups whose LVs we should list, or
712       empty for all volume groups
713   @rtype: dict
714   @return:
715       dictionary of all partions (key) with value being a tuple of
716       their size (in MiB), inactive and online status::
717
718         {'xenvg/test1': ('20.06', True, True)}
719
720       in case of errors, a string is returned with the error
721       details.
722
723   """
724   lvs = {}
725   sep = "|"
726   if not vg_names:
727     vg_names = []
728   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
729                          "--separator=%s" % sep,
730                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
731   if result.failed:
732     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
733
734   for line in result.stdout.splitlines():
735     line = line.strip()
736     match = _LVSLINE_REGEX.match(line)
737     if not match:
738       logging.error("Invalid line returned from lvs output: '%s'", line)
739       continue
740     vg_name, name, size, attr = match.groups()
741     inactive = attr[4] == "-"
742     online = attr[5] == "o"
743     virtual = attr[0] == "v"
744     if virtual:
745       # we don't want to report such volumes as existing, since they
746       # don't really hold data
747       continue
748     lvs[vg_name + "/" + name] = (size, inactive, online)
749
750   return lvs
751
752
753 def ListVolumeGroups():
754   """List the volume groups and their size.
755
756   @rtype: dict
757   @return: dictionary with keys volume name and values the
758       size of the volume
759
760   """
761   return utils.ListVolumeGroups()
762
763
764 def NodeVolumes():
765   """List all volumes on this node.
766
767   @rtype: list
768   @return:
769     A list of dictionaries, each having four keys:
770       - name: the logical volume name,
771       - size: the size of the logical volume
772       - dev: the physical device on which the LV lives
773       - vg: the volume group to which it belongs
774
775     In case of errors, we return an empty list and log the
776     error.
777
778     Note that since a logical volume can live on multiple physical
779     volumes, the resulting list might include a logical volume
780     multiple times.
781
782   """
783   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
784                          "--separator=|",
785                          "--options=lv_name,lv_size,devices,vg_name"])
786   if result.failed:
787     _Fail("Failed to list logical volumes, lvs output: %s",
788           result.output)
789
790   def parse_dev(dev):
791     return dev.split("(")[0]
792
793   def handle_dev(dev):
794     return [parse_dev(x) for x in dev.split(",")]
795
796   def map_line(line):
797     line = [v.strip() for v in line]
798     return [{"name": line[0], "size": line[1],
799              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
800
801   all_devs = []
802   for line in result.stdout.splitlines():
803     if line.count("|") >= 3:
804       all_devs.extend(map_line(line.split("|")))
805     else:
806       logging.warning("Strange line in the output from lvs: '%s'", line)
807   return all_devs
808
809
810 def BridgesExist(bridges_list):
811   """Check if a list of bridges exist on the current node.
812
813   @rtype: boolean
814   @return: C{True} if all of them exist, C{False} otherwise
815
816   """
817   missing = []
818   for bridge in bridges_list:
819     if not utils.BridgeExists(bridge):
820       missing.append(bridge)
821
822   if missing:
823     _Fail("Missing bridges %s", utils.CommaJoin(missing))
824
825
826 def GetInstanceList(hypervisor_list):
827   """Provides a list of instances.
828
829   @type hypervisor_list: list
830   @param hypervisor_list: the list of hypervisors to query information
831
832   @rtype: list
833   @return: a list of all running instances on the current node
834     - instance1.example.com
835     - instance2.example.com
836
837   """
838   results = []
839   for hname in hypervisor_list:
840     try:
841       names = hypervisor.GetHypervisor(hname).ListInstances()
842       results.extend(names)
843     except errors.HypervisorError, err:
844       _Fail("Error enumerating instances (hypervisor %s): %s",
845             hname, err, exc=True)
846
847   return results
848
849
850 def GetInstanceInfo(instance, hname):
851   """Gives back the information about an instance as a dictionary.
852
853   @type instance: string
854   @param instance: the instance name
855   @type hname: string
856   @param hname: the hypervisor type of the instance
857
858   @rtype: dict
859   @return: dictionary with the following keys:
860       - memory: memory size of instance (int)
861       - state: xen state of instance (string)
862       - time: cpu time of instance (float)
863
864   """
865   output = {}
866
867   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
868   if iinfo is not None:
869     output["memory"] = iinfo[2]
870     output["state"] = iinfo[4]
871     output["time"] = iinfo[5]
872
873   return output
874
875
876 def GetInstanceMigratable(instance):
877   """Gives whether an instance can be migrated.
878
879   @type instance: L{objects.Instance}
880   @param instance: object representing the instance to be checked.
881
882   @rtype: tuple
883   @return: tuple of (result, description) where:
884       - result: whether the instance can be migrated or not
885       - description: a description of the issue, if relevant
886
887   """
888   hyper = hypervisor.GetHypervisor(instance.hypervisor)
889   iname = instance.name
890   if iname not in hyper.ListInstances():
891     _Fail("Instance %s is not running", iname)
892
893   for idx in range(len(instance.disks)):
894     link_name = _GetBlockDevSymlinkPath(iname, idx)
895     if not os.path.islink(link_name):
896       logging.warning("Instance %s is missing symlink %s for disk %d",
897                       iname, link_name, idx)
898
899
900 def GetAllInstancesInfo(hypervisor_list):
901   """Gather data about all instances.
902
903   This is the equivalent of L{GetInstanceInfo}, except that it
904   computes data for all instances at once, thus being faster if one
905   needs data about more than one instance.
906
907   @type hypervisor_list: list
908   @param hypervisor_list: list of hypervisors to query for instance data
909
910   @rtype: dict
911   @return: dictionary of instance: data, with data having the following keys:
912       - memory: memory size of instance (int)
913       - state: xen state of instance (string)
914       - time: cpu time of instance (float)
915       - vcpus: the number of vcpus
916
917   """
918   output = {}
919
920   for hname in hypervisor_list:
921     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
922     if iinfo:
923       for name, _, memory, vcpus, state, times in iinfo:
924         value = {
925           "memory": memory,
926           "vcpus": vcpus,
927           "state": state,
928           "time": times,
929           }
930         if name in output:
931           # we only check static parameters, like memory and vcpus,
932           # and not state and time which can change between the
933           # invocations of the different hypervisors
934           for key in "memory", "vcpus":
935             if value[key] != output[name][key]:
936               _Fail("Instance %s is running twice"
937                     " with different parameters", name)
938         output[name] = value
939
940   return output
941
942
943 def _InstanceLogName(kind, os_name, instance, component):
944   """Compute the OS log filename for a given instance and operation.
945
946   The instance name and os name are passed in as strings since not all
947   operations have these as part of an instance object.
948
949   @type kind: string
950   @param kind: the operation type (e.g. add, import, etc.)
951   @type os_name: string
952   @param os_name: the os name
953   @type instance: string
954   @param instance: the name of the instance being imported/added/etc.
955   @type component: string or None
956   @param component: the name of the component of the instance being
957       transferred
958
959   """
960   # TODO: Use tempfile.mkstemp to create unique filename
961   if component:
962     assert "/" not in component
963     c_msg = "-%s" % component
964   else:
965     c_msg = ""
966   base = ("%s-%s-%s%s-%s.log" %
967           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
968   return utils.PathJoin(constants.LOG_OS_DIR, base)
969
970
971 def InstanceOsAdd(instance, reinstall, debug):
972   """Add an OS to an instance.
973
974   @type instance: L{objects.Instance}
975   @param instance: Instance whose OS is to be installed
976   @type reinstall: boolean
977   @param reinstall: whether this is an instance reinstall
978   @type debug: integer
979   @param debug: debug level, passed to the OS scripts
980   @rtype: None
981
982   """
983   inst_os = OSFromDisk(instance.os)
984
985   create_env = OSEnvironment(instance, inst_os, debug)
986   if reinstall:
987     create_env["INSTANCE_REINSTALL"] = "1"
988
989   logfile = _InstanceLogName("add", instance.os, instance.name, None)
990
991   result = utils.RunCmd([inst_os.create_script], env=create_env,
992                         cwd=inst_os.path, output=logfile, reset_env=True)
993   if result.failed:
994     logging.error("os create command '%s' returned error: %s, logfile: %s,"
995                   " output: %s", result.cmd, result.fail_reason, logfile,
996                   result.output)
997     lines = [utils.SafeEncode(val)
998              for val in utils.TailFile(logfile, lines=20)]
999     _Fail("OS create script failed (%s), last lines in the"
1000           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1001
1002
1003 def RunRenameInstance(instance, old_name, debug):
1004   """Run the OS rename script for an instance.
1005
1006   @type instance: L{objects.Instance}
1007   @param instance: Instance whose OS is to be installed
1008   @type old_name: string
1009   @param old_name: previous instance name
1010   @type debug: integer
1011   @param debug: debug level, passed to the OS scripts
1012   @rtype: boolean
1013   @return: the success of the operation
1014
1015   """
1016   inst_os = OSFromDisk(instance.os)
1017
1018   rename_env = OSEnvironment(instance, inst_os, debug)
1019   rename_env["OLD_INSTANCE_NAME"] = old_name
1020
1021   logfile = _InstanceLogName("rename", instance.os,
1022                              "%s-%s" % (old_name, instance.name), None)
1023
1024   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1025                         cwd=inst_os.path, output=logfile, reset_env=True)
1026
1027   if result.failed:
1028     logging.error("os create command '%s' returned error: %s output: %s",
1029                   result.cmd, result.fail_reason, result.output)
1030     lines = [utils.SafeEncode(val)
1031              for val in utils.TailFile(logfile, lines=20)]
1032     _Fail("OS rename script failed (%s), last lines in the"
1033           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1034
1035
1036 def _GetBlockDevSymlinkPath(instance_name, idx):
1037   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1038                         (instance_name, constants.DISK_SEPARATOR, idx))
1039
1040
1041 def _SymlinkBlockDev(instance_name, device_path, idx):
1042   """Set up symlinks to a instance's block device.
1043
1044   This is an auxiliary function run when an instance is start (on the primary
1045   node) or when an instance is migrated (on the target node).
1046
1047
1048   @param instance_name: the name of the target instance
1049   @param device_path: path of the physical block device, on the node
1050   @param idx: the disk index
1051   @return: absolute path to the disk's symlink
1052
1053   """
1054   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1055   try:
1056     os.symlink(device_path, link_name)
1057   except OSError, err:
1058     if err.errno == errno.EEXIST:
1059       if (not os.path.islink(link_name) or
1060           os.readlink(link_name) != device_path):
1061         os.remove(link_name)
1062         os.symlink(device_path, link_name)
1063     else:
1064       raise
1065
1066   return link_name
1067
1068
1069 def _RemoveBlockDevLinks(instance_name, disks):
1070   """Remove the block device symlinks belonging to the given instance.
1071
1072   """
1073   for idx, _ in enumerate(disks):
1074     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1075     if os.path.islink(link_name):
1076       try:
1077         os.remove(link_name)
1078       except OSError:
1079         logging.exception("Can't remove symlink '%s'", link_name)
1080
1081
1082 def _GatherAndLinkBlockDevs(instance):
1083   """Set up an instance's block device(s).
1084
1085   This is run on the primary node at instance startup. The block
1086   devices must be already assembled.
1087
1088   @type instance: L{objects.Instance}
1089   @param instance: the instance whose disks we shoul assemble
1090   @rtype: list
1091   @return: list of (disk_object, device_path)
1092
1093   """
1094   block_devices = []
1095   for idx, disk in enumerate(instance.disks):
1096     device = _RecursiveFindBD(disk)
1097     if device is None:
1098       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1099                                     str(disk))
1100     device.Open()
1101     try:
1102       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1103     except OSError, e:
1104       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1105                                     e.strerror)
1106
1107     block_devices.append((disk, link_name))
1108
1109   return block_devices
1110
1111
1112 def StartInstance(instance, startup_paused):
1113   """Start an instance.
1114
1115   @type instance: L{objects.Instance}
1116   @param instance: the instance object
1117   @type startup_paused: bool
1118   @param instance: pause instance at startup?
1119   @rtype: None
1120
1121   """
1122   running_instances = GetInstanceList([instance.hypervisor])
1123
1124   if instance.name in running_instances:
1125     logging.info("Instance %s already running, not starting", instance.name)
1126     return
1127
1128   try:
1129     block_devices = _GatherAndLinkBlockDevs(instance)
1130     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1131     hyper.StartInstance(instance, block_devices, startup_paused)
1132   except errors.BlockDeviceError, err:
1133     _Fail("Block device error: %s", err, exc=True)
1134   except errors.HypervisorError, err:
1135     _RemoveBlockDevLinks(instance.name, instance.disks)
1136     _Fail("Hypervisor error: %s", err, exc=True)
1137
1138
1139 def InstanceShutdown(instance, timeout):
1140   """Shut an instance down.
1141
1142   @note: this functions uses polling with a hardcoded timeout.
1143
1144   @type instance: L{objects.Instance}
1145   @param instance: the instance object
1146   @type timeout: integer
1147   @param timeout: maximum timeout for soft shutdown
1148   @rtype: None
1149
1150   """
1151   hv_name = instance.hypervisor
1152   hyper = hypervisor.GetHypervisor(hv_name)
1153   iname = instance.name
1154
1155   if instance.name not in hyper.ListInstances():
1156     logging.info("Instance %s not running, doing nothing", iname)
1157     return
1158
1159   class _TryShutdown:
1160     def __init__(self):
1161       self.tried_once = False
1162
1163     def __call__(self):
1164       if iname not in hyper.ListInstances():
1165         return
1166
1167       try:
1168         hyper.StopInstance(instance, retry=self.tried_once)
1169       except errors.HypervisorError, err:
1170         if iname not in hyper.ListInstances():
1171           # if the instance is no longer existing, consider this a
1172           # success and go to cleanup
1173           return
1174
1175         _Fail("Failed to stop instance %s: %s", iname, err)
1176
1177       self.tried_once = True
1178
1179       raise utils.RetryAgain()
1180
1181   try:
1182     utils.Retry(_TryShutdown(), 5, timeout)
1183   except utils.RetryTimeout:
1184     # the shutdown did not succeed
1185     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1186
1187     try:
1188       hyper.StopInstance(instance, force=True)
1189     except errors.HypervisorError, err:
1190       if iname in hyper.ListInstances():
1191         # only raise an error if the instance still exists, otherwise
1192         # the error could simply be "instance ... unknown"!
1193         _Fail("Failed to force stop instance %s: %s", iname, err)
1194
1195     time.sleep(1)
1196
1197     if iname in hyper.ListInstances():
1198       _Fail("Could not shutdown instance %s even by destroy", iname)
1199
1200   try:
1201     hyper.CleanupInstance(instance.name)
1202   except errors.HypervisorError, err:
1203     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1204
1205   _RemoveBlockDevLinks(iname, instance.disks)
1206
1207
1208 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1209   """Reboot an instance.
1210
1211   @type instance: L{objects.Instance}
1212   @param instance: the instance object to reboot
1213   @type reboot_type: str
1214   @param reboot_type: the type of reboot, one the following
1215     constants:
1216       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1217         instance OS, do not recreate the VM
1218       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1219         restart the VM (at the hypervisor level)
1220       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1221         not accepted here, since that mode is handled differently, in
1222         cmdlib, and translates into full stop and start of the
1223         instance (instead of a call_instance_reboot RPC)
1224   @type shutdown_timeout: integer
1225   @param shutdown_timeout: maximum timeout for soft shutdown
1226   @rtype: None
1227
1228   """
1229   running_instances = GetInstanceList([instance.hypervisor])
1230
1231   if instance.name not in running_instances:
1232     _Fail("Cannot reboot instance %s that is not running", instance.name)
1233
1234   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1235   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1236     try:
1237       hyper.RebootInstance(instance)
1238     except errors.HypervisorError, err:
1239       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1240   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1241     try:
1242       InstanceShutdown(instance, shutdown_timeout)
1243       return StartInstance(instance, False)
1244     except errors.HypervisorError, err:
1245       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1246   else:
1247     _Fail("Invalid reboot_type received: %s", reboot_type)
1248
1249
1250 def MigrationInfo(instance):
1251   """Gather information about an instance to be migrated.
1252
1253   @type instance: L{objects.Instance}
1254   @param instance: the instance definition
1255
1256   """
1257   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1258   try:
1259     info = hyper.MigrationInfo(instance)
1260   except errors.HypervisorError, err:
1261     _Fail("Failed to fetch migration information: %s", err, exc=True)
1262   return info
1263
1264
1265 def AcceptInstance(instance, info, target):
1266   """Prepare the node to accept an instance.
1267
1268   @type instance: L{objects.Instance}
1269   @param instance: the instance definition
1270   @type info: string/data (opaque)
1271   @param info: migration information, from the source node
1272   @type target: string
1273   @param target: target host (usually ip), on this node
1274
1275   """
1276   # TODO: why is this required only for DTS_EXT_MIRROR?
1277   if instance.disk_template in constants.DTS_EXT_MIRROR:
1278     # Create the symlinks, as the disks are not active
1279     # in any way
1280     try:
1281       _GatherAndLinkBlockDevs(instance)
1282     except errors.BlockDeviceError, err:
1283       _Fail("Block device error: %s", err, exc=True)
1284
1285   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1286   try:
1287     hyper.AcceptInstance(instance, info, target)
1288   except errors.HypervisorError, err:
1289     if instance.disk_template in constants.DTS_EXT_MIRROR:
1290       _RemoveBlockDevLinks(instance.name, instance.disks)
1291     _Fail("Failed to accept instance: %s", err, exc=True)
1292
1293
1294 def FinalizeMigration(instance, info, success):
1295   """Finalize any preparation to accept an instance.
1296
1297   @type instance: L{objects.Instance}
1298   @param instance: the instance definition
1299   @type info: string/data (opaque)
1300   @param info: migration information, from the source node
1301   @type success: boolean
1302   @param success: whether the migration was a success or a failure
1303
1304   """
1305   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1306   try:
1307     hyper.FinalizeMigration(instance, info, success)
1308   except errors.HypervisorError, err:
1309     _Fail("Failed to finalize migration: %s", err, exc=True)
1310
1311
1312 def MigrateInstance(instance, target, live):
1313   """Migrates an instance to another node.
1314
1315   @type instance: L{objects.Instance}
1316   @param instance: the instance definition
1317   @type target: string
1318   @param target: the target node name
1319   @type live: boolean
1320   @param live: whether the migration should be done live or not (the
1321       interpretation of this parameter is left to the hypervisor)
1322   @rtype: tuple
1323   @return: a tuple of (success, msg) where:
1324       - succes is a boolean denoting the success/failure of the operation
1325       - msg is a string with details in case of failure
1326
1327   """
1328   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1329
1330   try:
1331     hyper.MigrateInstance(instance, target, live)
1332   except errors.HypervisorError, err:
1333     _Fail("Failed to migrate instance: %s", err, exc=True)
1334
1335
1336 def BlockdevCreate(disk, size, owner, on_primary, info):
1337   """Creates a block device for an instance.
1338
1339   @type disk: L{objects.Disk}
1340   @param disk: the object describing the disk we should create
1341   @type size: int
1342   @param size: the size of the physical underlying device, in MiB
1343   @type owner: str
1344   @param owner: the name of the instance for which disk is created,
1345       used for device cache data
1346   @type on_primary: boolean
1347   @param on_primary:  indicates if it is the primary node or not
1348   @type info: string
1349   @param info: string that will be sent to the physical device
1350       creation, used for example to set (LVM) tags on LVs
1351
1352   @return: the new unique_id of the device (this can sometime be
1353       computed only after creation), or None. On secondary nodes,
1354       it's not required to return anything.
1355
1356   """
1357   # TODO: remove the obsolete "size" argument
1358   # pylint: disable=W0613
1359   clist = []
1360   if disk.children:
1361     for child in disk.children:
1362       try:
1363         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1364       except errors.BlockDeviceError, err:
1365         _Fail("Can't assemble device %s: %s", child, err)
1366       if on_primary or disk.AssembleOnSecondary():
1367         # we need the children open in case the device itself has to
1368         # be assembled
1369         try:
1370           # pylint: disable=E1103
1371           crdev.Open()
1372         except errors.BlockDeviceError, err:
1373           _Fail("Can't make child '%s' read-write: %s", child, err)
1374       clist.append(crdev)
1375
1376   try:
1377     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1378   except errors.BlockDeviceError, err:
1379     _Fail("Can't create block device: %s", err)
1380
1381   if on_primary or disk.AssembleOnSecondary():
1382     try:
1383       device.Assemble()
1384     except errors.BlockDeviceError, err:
1385       _Fail("Can't assemble device after creation, unusual event: %s", err)
1386     device.SetSyncSpeed(constants.SYNC_SPEED)
1387     if on_primary or disk.OpenOnSecondary():
1388       try:
1389         device.Open(force=True)
1390       except errors.BlockDeviceError, err:
1391         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1392     DevCacheManager.UpdateCache(device.dev_path, owner,
1393                                 on_primary, disk.iv_name)
1394
1395   device.SetInfo(info)
1396
1397   return device.unique_id
1398
1399
1400 def _WipeDevice(path, offset, size):
1401   """This function actually wipes the device.
1402
1403   @param path: The path to the device to wipe
1404   @param offset: The offset in MiB in the file
1405   @param size: The size in MiB to write
1406
1407   """
1408   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1409          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1410          "count=%d" % size]
1411   result = utils.RunCmd(cmd)
1412
1413   if result.failed:
1414     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1415           result.fail_reason, result.output)
1416
1417
1418 def BlockdevWipe(disk, offset, size):
1419   """Wipes a block device.
1420
1421   @type disk: L{objects.Disk}
1422   @param disk: the disk object we want to wipe
1423   @type offset: int
1424   @param offset: The offset in MiB in the file
1425   @type size: int
1426   @param size: The size in MiB to write
1427
1428   """
1429   try:
1430     rdev = _RecursiveFindBD(disk)
1431   except errors.BlockDeviceError:
1432     rdev = None
1433
1434   if not rdev:
1435     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1436
1437   # Do cross verify some of the parameters
1438   if offset > rdev.size:
1439     _Fail("Offset is bigger than device size")
1440   if (offset + size) > rdev.size:
1441     _Fail("The provided offset and size to wipe is bigger than device size")
1442
1443   _WipeDevice(rdev.dev_path, offset, size)
1444
1445
1446 def BlockdevPauseResumeSync(disks, pause):
1447   """Pause or resume the sync of the block device.
1448
1449   @type disks: list of L{objects.Disk}
1450   @param disks: the disks object we want to pause/resume
1451   @type pause: bool
1452   @param pause: Wheater to pause or resume
1453
1454   """
1455   success = []
1456   for disk in disks:
1457     try:
1458       rdev = _RecursiveFindBD(disk)
1459     except errors.BlockDeviceError:
1460       rdev = None
1461
1462     if not rdev:
1463       success.append((False, ("Cannot change sync for device %s:"
1464                               " device not found" % disk.iv_name)))
1465       continue
1466
1467     result = rdev.PauseResumeSync(pause)
1468
1469     if result:
1470       success.append((result, None))
1471     else:
1472       if pause:
1473         msg = "Pause"
1474       else:
1475         msg = "Resume"
1476       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1477
1478   return success
1479
1480
1481 def BlockdevRemove(disk):
1482   """Remove a block device.
1483
1484   @note: This is intended to be called recursively.
1485
1486   @type disk: L{objects.Disk}
1487   @param disk: the disk object we should remove
1488   @rtype: boolean
1489   @return: the success of the operation
1490
1491   """
1492   msgs = []
1493   try:
1494     rdev = _RecursiveFindBD(disk)
1495   except errors.BlockDeviceError, err:
1496     # probably can't attach
1497     logging.info("Can't attach to device %s in remove", disk)
1498     rdev = None
1499   if rdev is not None:
1500     r_path = rdev.dev_path
1501     try:
1502       rdev.Remove()
1503     except errors.BlockDeviceError, err:
1504       msgs.append(str(err))
1505     if not msgs:
1506       DevCacheManager.RemoveCache(r_path)
1507
1508   if disk.children:
1509     for child in disk.children:
1510       try:
1511         BlockdevRemove(child)
1512       except RPCFail, err:
1513         msgs.append(str(err))
1514
1515   if msgs:
1516     _Fail("; ".join(msgs))
1517
1518
1519 def _RecursiveAssembleBD(disk, owner, as_primary):
1520   """Activate a block device for an instance.
1521
1522   This is run on the primary and secondary nodes for an instance.
1523
1524   @note: this function is called recursively.
1525
1526   @type disk: L{objects.Disk}
1527   @param disk: the disk we try to assemble
1528   @type owner: str
1529   @param owner: the name of the instance which owns the disk
1530   @type as_primary: boolean
1531   @param as_primary: if we should make the block device
1532       read/write
1533
1534   @return: the assembled device or None (in case no device
1535       was assembled)
1536   @raise errors.BlockDeviceError: in case there is an error
1537       during the activation of the children or the device
1538       itself
1539
1540   """
1541   children = []
1542   if disk.children:
1543     mcn = disk.ChildrenNeeded()
1544     if mcn == -1:
1545       mcn = 0 # max number of Nones allowed
1546     else:
1547       mcn = len(disk.children) - mcn # max number of Nones
1548     for chld_disk in disk.children:
1549       try:
1550         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1551       except errors.BlockDeviceError, err:
1552         if children.count(None) >= mcn:
1553           raise
1554         cdev = None
1555         logging.error("Error in child activation (but continuing): %s",
1556                       str(err))
1557       children.append(cdev)
1558
1559   if as_primary or disk.AssembleOnSecondary():
1560     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1561     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1562     result = r_dev
1563     if as_primary or disk.OpenOnSecondary():
1564       r_dev.Open()
1565     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1566                                 as_primary, disk.iv_name)
1567
1568   else:
1569     result = True
1570   return result
1571
1572
1573 def BlockdevAssemble(disk, owner, as_primary, idx):
1574   """Activate a block device for an instance.
1575
1576   This is a wrapper over _RecursiveAssembleBD.
1577
1578   @rtype: str or boolean
1579   @return: a C{/dev/...} path for primary nodes, and
1580       C{True} for secondary nodes
1581
1582   """
1583   try:
1584     result = _RecursiveAssembleBD(disk, owner, as_primary)
1585     if isinstance(result, bdev.BlockDev):
1586       # pylint: disable=E1103
1587       result = result.dev_path
1588       if as_primary:
1589         _SymlinkBlockDev(owner, result, idx)
1590   except errors.BlockDeviceError, err:
1591     _Fail("Error while assembling disk: %s", err, exc=True)
1592   except OSError, err:
1593     _Fail("Error while symlinking disk: %s", err, exc=True)
1594
1595   return result
1596
1597
1598 def BlockdevShutdown(disk):
1599   """Shut down a block device.
1600
1601   First, if the device is assembled (Attach() is successful), then
1602   the device is shutdown. Then the children of the device are
1603   shutdown.
1604
1605   This function is called recursively. Note that we don't cache the
1606   children or such, as oppossed to assemble, shutdown of different
1607   devices doesn't require that the upper device was active.
1608
1609   @type disk: L{objects.Disk}
1610   @param disk: the description of the disk we should
1611       shutdown
1612   @rtype: None
1613
1614   """
1615   msgs = []
1616   r_dev = _RecursiveFindBD(disk)
1617   if r_dev is not None:
1618     r_path = r_dev.dev_path
1619     try:
1620       r_dev.Shutdown()
1621       DevCacheManager.RemoveCache(r_path)
1622     except errors.BlockDeviceError, err:
1623       msgs.append(str(err))
1624
1625   if disk.children:
1626     for child in disk.children:
1627       try:
1628         BlockdevShutdown(child)
1629       except RPCFail, err:
1630         msgs.append(str(err))
1631
1632   if msgs:
1633     _Fail("; ".join(msgs))
1634
1635
1636 def BlockdevAddchildren(parent_cdev, new_cdevs):
1637   """Extend a mirrored block device.
1638
1639   @type parent_cdev: L{objects.Disk}
1640   @param parent_cdev: the disk to which we should add children
1641   @type new_cdevs: list of L{objects.Disk}
1642   @param new_cdevs: the list of children which we should add
1643   @rtype: None
1644
1645   """
1646   parent_bdev = _RecursiveFindBD(parent_cdev)
1647   if parent_bdev is None:
1648     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1649   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1650   if new_bdevs.count(None) > 0:
1651     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1652   parent_bdev.AddChildren(new_bdevs)
1653
1654
1655 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1656   """Shrink a mirrored block device.
1657
1658   @type parent_cdev: L{objects.Disk}
1659   @param parent_cdev: the disk from which we should remove children
1660   @type new_cdevs: list of L{objects.Disk}
1661   @param new_cdevs: the list of children which we should remove
1662   @rtype: None
1663
1664   """
1665   parent_bdev = _RecursiveFindBD(parent_cdev)
1666   if parent_bdev is None:
1667     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1668   devs = []
1669   for disk in new_cdevs:
1670     rpath = disk.StaticDevPath()
1671     if rpath is None:
1672       bd = _RecursiveFindBD(disk)
1673       if bd is None:
1674         _Fail("Can't find device %s while removing children", disk)
1675       else:
1676         devs.append(bd.dev_path)
1677     else:
1678       if not utils.IsNormAbsPath(rpath):
1679         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1680       devs.append(rpath)
1681   parent_bdev.RemoveChildren(devs)
1682
1683
1684 def BlockdevGetmirrorstatus(disks):
1685   """Get the mirroring status of a list of devices.
1686
1687   @type disks: list of L{objects.Disk}
1688   @param disks: the list of disks which we should query
1689   @rtype: disk
1690   @return: List of L{objects.BlockDevStatus}, one for each disk
1691   @raise errors.BlockDeviceError: if any of the disks cannot be
1692       found
1693
1694   """
1695   stats = []
1696   for dsk in disks:
1697     rbd = _RecursiveFindBD(dsk)
1698     if rbd is None:
1699       _Fail("Can't find device %s", dsk)
1700
1701     stats.append(rbd.CombinedSyncStatus())
1702
1703   return stats
1704
1705
1706 def BlockdevGetmirrorstatusMulti(disks):
1707   """Get the mirroring status of a list of devices.
1708
1709   @type disks: list of L{objects.Disk}
1710   @param disks: the list of disks which we should query
1711   @rtype: disk
1712   @return: List of tuples, (bool, status), one for each disk; bool denotes
1713     success/failure, status is L{objects.BlockDevStatus} on success, string
1714     otherwise
1715
1716   """
1717   result = []
1718   for disk in disks:
1719     try:
1720       rbd = _RecursiveFindBD(disk)
1721       if rbd is None:
1722         result.append((False, "Can't find device %s" % disk))
1723         continue
1724
1725       status = rbd.CombinedSyncStatus()
1726     except errors.BlockDeviceError, err:
1727       logging.exception("Error while getting disk status")
1728       result.append((False, str(err)))
1729     else:
1730       result.append((True, status))
1731
1732   assert len(disks) == len(result)
1733
1734   return result
1735
1736
1737 def _RecursiveFindBD(disk):
1738   """Check if a device is activated.
1739
1740   If so, return information about the real device.
1741
1742   @type disk: L{objects.Disk}
1743   @param disk: the disk object we need to find
1744
1745   @return: None if the device can't be found,
1746       otherwise the device instance
1747
1748   """
1749   children = []
1750   if disk.children:
1751     for chdisk in disk.children:
1752       children.append(_RecursiveFindBD(chdisk))
1753
1754   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1755
1756
1757 def _OpenRealBD(disk):
1758   """Opens the underlying block device of a disk.
1759
1760   @type disk: L{objects.Disk}
1761   @param disk: the disk object we want to open
1762
1763   """
1764   real_disk = _RecursiveFindBD(disk)
1765   if real_disk is None:
1766     _Fail("Block device '%s' is not set up", disk)
1767
1768   real_disk.Open()
1769
1770   return real_disk
1771
1772
1773 def BlockdevFind(disk):
1774   """Check if a device is activated.
1775
1776   If it is, return information about the real device.
1777
1778   @type disk: L{objects.Disk}
1779   @param disk: the disk to find
1780   @rtype: None or objects.BlockDevStatus
1781   @return: None if the disk cannot be found, otherwise a the current
1782            information
1783
1784   """
1785   try:
1786     rbd = _RecursiveFindBD(disk)
1787   except errors.BlockDeviceError, err:
1788     _Fail("Failed to find device: %s", err, exc=True)
1789
1790   if rbd is None:
1791     return None
1792
1793   return rbd.GetSyncStatus()
1794
1795
1796 def BlockdevGetsize(disks):
1797   """Computes the size of the given disks.
1798
1799   If a disk is not found, returns None instead.
1800
1801   @type disks: list of L{objects.Disk}
1802   @param disks: the list of disk to compute the size for
1803   @rtype: list
1804   @return: list with elements None if the disk cannot be found,
1805       otherwise the size
1806
1807   """
1808   result = []
1809   for cf in disks:
1810     try:
1811       rbd = _RecursiveFindBD(cf)
1812     except errors.BlockDeviceError:
1813       result.append(None)
1814       continue
1815     if rbd is None:
1816       result.append(None)
1817     else:
1818       result.append(rbd.GetActualSize())
1819   return result
1820
1821
1822 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1823   """Export a block device to a remote node.
1824
1825   @type disk: L{objects.Disk}
1826   @param disk: the description of the disk to export
1827   @type dest_node: str
1828   @param dest_node: the destination node to export to
1829   @type dest_path: str
1830   @param dest_path: the destination path on the target node
1831   @type cluster_name: str
1832   @param cluster_name: the cluster name, needed for SSH hostalias
1833   @rtype: None
1834
1835   """
1836   real_disk = _OpenRealBD(disk)
1837
1838   # the block size on the read dd is 1MiB to match our units
1839   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1840                                "dd if=%s bs=1048576 count=%s",
1841                                real_disk.dev_path, str(disk.size))
1842
1843   # we set here a smaller block size as, due to ssh buffering, more
1844   # than 64-128k will mostly ignored; we use nocreat to fail if the
1845   # device is not already there or we pass a wrong path; we use
1846   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1847   # to not buffer too much memory; this means that at best, we flush
1848   # every 64k, which will not be very fast
1849   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1850                                 " oflag=dsync", dest_path)
1851
1852   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1853                                                    constants.GANETI_RUNAS,
1854                                                    destcmd)
1855
1856   # all commands have been checked, so we're safe to combine them
1857   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1858
1859   result = utils.RunCmd(["bash", "-c", command])
1860
1861   if result.failed:
1862     _Fail("Disk copy command '%s' returned error: %s"
1863           " output: %s", command, result.fail_reason, result.output)
1864
1865
1866 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1867   """Write a file to the filesystem.
1868
1869   This allows the master to overwrite(!) a file. It will only perform
1870   the operation if the file belongs to a list of configuration files.
1871
1872   @type file_name: str
1873   @param file_name: the target file name
1874   @type data: str
1875   @param data: the new contents of the file
1876   @type mode: int
1877   @param mode: the mode to give the file (can be None)
1878   @type uid: string
1879   @param uid: the owner of the file
1880   @type gid: string
1881   @param gid: the group of the file
1882   @type atime: float
1883   @param atime: the atime to set on the file (can be None)
1884   @type mtime: float
1885   @param mtime: the mtime to set on the file (can be None)
1886   @rtype: None
1887
1888   """
1889   if not os.path.isabs(file_name):
1890     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1891
1892   if file_name not in _ALLOWED_UPLOAD_FILES:
1893     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1894           file_name)
1895
1896   raw_data = _Decompress(data)
1897
1898   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1899     _Fail("Invalid username/groupname type")
1900
1901   getents = runtime.GetEnts()
1902   uid = getents.LookupUser(uid)
1903   gid = getents.LookupGroup(gid)
1904
1905   utils.SafeWriteFile(file_name, None,
1906                       data=raw_data, mode=mode, uid=uid, gid=gid,
1907                       atime=atime, mtime=mtime)
1908
1909
1910 def RunOob(oob_program, command, node, timeout):
1911   """Executes oob_program with given command on given node.
1912
1913   @param oob_program: The path to the executable oob_program
1914   @param command: The command to invoke on oob_program
1915   @param node: The node given as an argument to the program
1916   @param timeout: Timeout after which we kill the oob program
1917
1918   @return: stdout
1919   @raise RPCFail: If execution fails for some reason
1920
1921   """
1922   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1923
1924   if result.failed:
1925     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1926           result.fail_reason, result.output)
1927
1928   return result.stdout
1929
1930
1931 def WriteSsconfFiles(values):
1932   """Update all ssconf files.
1933
1934   Wrapper around the SimpleStore.WriteFiles.
1935
1936   """
1937   ssconf.SimpleStore().WriteFiles(values)
1938
1939
1940 def _ErrnoOrStr(err):
1941   """Format an EnvironmentError exception.
1942
1943   If the L{err} argument has an errno attribute, it will be looked up
1944   and converted into a textual C{E...} description. Otherwise the
1945   string representation of the error will be returned.
1946
1947   @type err: L{EnvironmentError}
1948   @param err: the exception to format
1949
1950   """
1951   if hasattr(err, "errno"):
1952     detail = errno.errorcode[err.errno]
1953   else:
1954     detail = str(err)
1955   return detail
1956
1957
1958 def _OSOndiskAPIVersion(os_dir):
1959   """Compute and return the API version of a given OS.
1960
1961   This function will try to read the API version of the OS residing in
1962   the 'os_dir' directory.
1963
1964   @type os_dir: str
1965   @param os_dir: the directory in which we should look for the OS
1966   @rtype: tuple
1967   @return: tuple (status, data) with status denoting the validity and
1968       data holding either the vaid versions or an error message
1969
1970   """
1971   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1972
1973   try:
1974     st = os.stat(api_file)
1975   except EnvironmentError, err:
1976     return False, ("Required file '%s' not found under path %s: %s" %
1977                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1978
1979   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1980     return False, ("File '%s' in %s is not a regular file" %
1981                    (constants.OS_API_FILE, os_dir))
1982
1983   try:
1984     api_versions = utils.ReadFile(api_file).splitlines()
1985   except EnvironmentError, err:
1986     return False, ("Error while reading the API version file at %s: %s" %
1987                    (api_file, _ErrnoOrStr(err)))
1988
1989   try:
1990     api_versions = [int(version.strip()) for version in api_versions]
1991   except (TypeError, ValueError), err:
1992     return False, ("API version(s) can't be converted to integer: %s" %
1993                    str(err))
1994
1995   return True, api_versions
1996
1997
1998 def DiagnoseOS(top_dirs=None):
1999   """Compute the validity for all OSes.
2000
2001   @type top_dirs: list
2002   @param top_dirs: the list of directories in which to
2003       search (if not given defaults to
2004       L{constants.OS_SEARCH_PATH})
2005   @rtype: list of L{objects.OS}
2006   @return: a list of tuples (name, path, status, diagnose, variants,
2007       parameters, api_version) for all (potential) OSes under all
2008       search paths, where:
2009           - name is the (potential) OS name
2010           - path is the full path to the OS
2011           - status True/False is the validity of the OS
2012           - diagnose is the error message for an invalid OS, otherwise empty
2013           - variants is a list of supported OS variants, if any
2014           - parameters is a list of (name, help) parameters, if any
2015           - api_version is a list of support OS API versions
2016
2017   """
2018   if top_dirs is None:
2019     top_dirs = constants.OS_SEARCH_PATH
2020
2021   result = []
2022   for dir_name in top_dirs:
2023     if os.path.isdir(dir_name):
2024       try:
2025         f_names = utils.ListVisibleFiles(dir_name)
2026       except EnvironmentError, err:
2027         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2028         break
2029       for name in f_names:
2030         os_path = utils.PathJoin(dir_name, name)
2031         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2032         if status:
2033           diagnose = ""
2034           variants = os_inst.supported_variants
2035           parameters = os_inst.supported_parameters
2036           api_versions = os_inst.api_versions
2037         else:
2038           diagnose = os_inst
2039           variants = parameters = api_versions = []
2040         result.append((name, os_path, status, diagnose, variants,
2041                        parameters, api_versions))
2042
2043   return result
2044
2045
2046 def _TryOSFromDisk(name, base_dir=None):
2047   """Create an OS instance from disk.
2048
2049   This function will return an OS instance if the given name is a
2050   valid OS name.
2051
2052   @type base_dir: string
2053   @keyword base_dir: Base directory containing OS installations.
2054                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2055   @rtype: tuple
2056   @return: success and either the OS instance if we find a valid one,
2057       or error message
2058
2059   """
2060   if base_dir is None:
2061     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2062   else:
2063     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2064
2065   if os_dir is None:
2066     return False, "Directory for OS %s not found in search path" % name
2067
2068   status, api_versions = _OSOndiskAPIVersion(os_dir)
2069   if not status:
2070     # push the error up
2071     return status, api_versions
2072
2073   if not constants.OS_API_VERSIONS.intersection(api_versions):
2074     return False, ("API version mismatch for path '%s': found %s, want %s." %
2075                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2076
2077   # OS Files dictionary, we will populate it with the absolute path
2078   # names; if the value is True, then it is a required file, otherwise
2079   # an optional one
2080   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2081
2082   if max(api_versions) >= constants.OS_API_V15:
2083     os_files[constants.OS_VARIANTS_FILE] = False
2084
2085   if max(api_versions) >= constants.OS_API_V20:
2086     os_files[constants.OS_PARAMETERS_FILE] = True
2087   else:
2088     del os_files[constants.OS_SCRIPT_VERIFY]
2089
2090   for (filename, required) in os_files.items():
2091     os_files[filename] = utils.PathJoin(os_dir, filename)
2092
2093     try:
2094       st = os.stat(os_files[filename])
2095     except EnvironmentError, err:
2096       if err.errno == errno.ENOENT and not required:
2097         del os_files[filename]
2098         continue
2099       return False, ("File '%s' under path '%s' is missing (%s)" %
2100                      (filename, os_dir, _ErrnoOrStr(err)))
2101
2102     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2103       return False, ("File '%s' under path '%s' is not a regular file" %
2104                      (filename, os_dir))
2105
2106     if filename in constants.OS_SCRIPTS:
2107       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2108         return False, ("File '%s' under path '%s' is not executable" %
2109                        (filename, os_dir))
2110
2111   variants = []
2112   if constants.OS_VARIANTS_FILE in os_files:
2113     variants_file = os_files[constants.OS_VARIANTS_FILE]
2114     try:
2115       variants = utils.ReadFile(variants_file).splitlines()
2116     except EnvironmentError, err:
2117       # we accept missing files, but not other errors
2118       if err.errno != errno.ENOENT:
2119         return False, ("Error while reading the OS variants file at %s: %s" %
2120                        (variants_file, _ErrnoOrStr(err)))
2121
2122   parameters = []
2123   if constants.OS_PARAMETERS_FILE in os_files:
2124     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2125     try:
2126       parameters = utils.ReadFile(parameters_file).splitlines()
2127     except EnvironmentError, err:
2128       return False, ("Error while reading the OS parameters file at %s: %s" %
2129                      (parameters_file, _ErrnoOrStr(err)))
2130     parameters = [v.split(None, 1) for v in parameters]
2131
2132   os_obj = objects.OS(name=name, path=os_dir,
2133                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2134                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2135                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2136                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2137                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2138                                                  None),
2139                       supported_variants=variants,
2140                       supported_parameters=parameters,
2141                       api_versions=api_versions)
2142   return True, os_obj
2143
2144
2145 def OSFromDisk(name, base_dir=None):
2146   """Create an OS instance from disk.
2147
2148   This function will return an OS instance if the given name is a
2149   valid OS name. Otherwise, it will raise an appropriate
2150   L{RPCFail} exception, detailing why this is not a valid OS.
2151
2152   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2153   an exception but returns true/false status data.
2154
2155   @type base_dir: string
2156   @keyword base_dir: Base directory containing OS installations.
2157                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2158   @rtype: L{objects.OS}
2159   @return: the OS instance if we find a valid one
2160   @raise RPCFail: if we don't find a valid OS
2161
2162   """
2163   name_only = objects.OS.GetName(name)
2164   status, payload = _TryOSFromDisk(name_only, base_dir)
2165
2166   if not status:
2167     _Fail(payload)
2168
2169   return payload
2170
2171
2172 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2173   """Calculate the basic environment for an os script.
2174
2175   @type os_name: str
2176   @param os_name: full operating system name (including variant)
2177   @type inst_os: L{objects.OS}
2178   @param inst_os: operating system for which the environment is being built
2179   @type os_params: dict
2180   @param os_params: the OS parameters
2181   @type debug: integer
2182   @param debug: debug level (0 or 1, for OS Api 10)
2183   @rtype: dict
2184   @return: dict of environment variables
2185   @raise errors.BlockDeviceError: if the block device
2186       cannot be found
2187
2188   """
2189   result = {}
2190   api_version = \
2191     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2192   result["OS_API_VERSION"] = "%d" % api_version
2193   result["OS_NAME"] = inst_os.name
2194   result["DEBUG_LEVEL"] = "%d" % debug
2195
2196   # OS variants
2197   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2198     variant = objects.OS.GetVariant(os_name)
2199     if not variant:
2200       variant = inst_os.supported_variants[0]
2201   else:
2202     variant = ""
2203   result["OS_VARIANT"] = variant
2204
2205   # OS params
2206   for pname, pvalue in os_params.items():
2207     result["OSP_%s" % pname.upper()] = pvalue
2208
2209   return result
2210
2211
2212 def OSEnvironment(instance, inst_os, debug=0):
2213   """Calculate the environment for an os script.
2214
2215   @type instance: L{objects.Instance}
2216   @param instance: target instance for the os script run
2217   @type inst_os: L{objects.OS}
2218   @param inst_os: operating system for which the environment is being built
2219   @type debug: integer
2220   @param debug: debug level (0 or 1, for OS Api 10)
2221   @rtype: dict
2222   @return: dict of environment variables
2223   @raise errors.BlockDeviceError: if the block device
2224       cannot be found
2225
2226   """
2227   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2228
2229   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2230     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2231
2232   result["HYPERVISOR"] = instance.hypervisor
2233   result["DISK_COUNT"] = "%d" % len(instance.disks)
2234   result["NIC_COUNT"] = "%d" % len(instance.nics)
2235   result["INSTANCE_SECONDARY_NODES"] = \
2236       ("%s" % " ".join(instance.secondary_nodes))
2237
2238   # Disks
2239   for idx, disk in enumerate(instance.disks):
2240     real_disk = _OpenRealBD(disk)
2241     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2242     result["DISK_%d_ACCESS" % idx] = disk.mode
2243     if constants.HV_DISK_TYPE in instance.hvparams:
2244       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2245         instance.hvparams[constants.HV_DISK_TYPE]
2246     if disk.dev_type in constants.LDS_BLOCK:
2247       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2248     elif disk.dev_type == constants.LD_FILE:
2249       result["DISK_%d_BACKEND_TYPE" % idx] = \
2250         "file:%s" % disk.physical_id[0]
2251
2252   # NICs
2253   for idx, nic in enumerate(instance.nics):
2254     result["NIC_%d_MAC" % idx] = nic.mac
2255     if nic.ip:
2256       result["NIC_%d_IP" % idx] = nic.ip
2257     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2258     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2259       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2260     if nic.nicparams[constants.NIC_LINK]:
2261       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2262     if constants.HV_NIC_TYPE in instance.hvparams:
2263       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2264         instance.hvparams[constants.HV_NIC_TYPE]
2265
2266   # HV/BE params
2267   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2268     for key, value in source.items():
2269       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2270
2271   return result
2272
2273
2274 def BlockdevGrow(disk, amount, dryrun):
2275   """Grow a stack of block devices.
2276
2277   This function is called recursively, with the childrens being the
2278   first ones to resize.
2279
2280   @type disk: L{objects.Disk}
2281   @param disk: the disk to be grown
2282   @type amount: integer
2283   @param amount: the amount (in mebibytes) to grow with
2284   @type dryrun: boolean
2285   @param dryrun: whether to execute the operation in simulation mode
2286       only, without actually increasing the size
2287   @rtype: (status, result)
2288   @return: a tuple with the status of the operation (True/False), and
2289       the errors message if status is False
2290
2291   """
2292   r_dev = _RecursiveFindBD(disk)
2293   if r_dev is None:
2294     _Fail("Cannot find block device %s", disk)
2295
2296   try:
2297     r_dev.Grow(amount, dryrun)
2298   except errors.BlockDeviceError, err:
2299     _Fail("Failed to grow block device: %s", err, exc=True)
2300
2301
2302 def BlockdevSnapshot(disk):
2303   """Create a snapshot copy of a block device.
2304
2305   This function is called recursively, and the snapshot is actually created
2306   just for the leaf lvm backend device.
2307
2308   @type disk: L{objects.Disk}
2309   @param disk: the disk to be snapshotted
2310   @rtype: string
2311   @return: snapshot disk ID as (vg, lv)
2312
2313   """
2314   if disk.dev_type == constants.LD_DRBD8:
2315     if not disk.children:
2316       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2317             disk.unique_id)
2318     return BlockdevSnapshot(disk.children[0])
2319   elif disk.dev_type == constants.LD_LV:
2320     r_dev = _RecursiveFindBD(disk)
2321     if r_dev is not None:
2322       # FIXME: choose a saner value for the snapshot size
2323       # let's stay on the safe side and ask for the full size, for now
2324       return r_dev.Snapshot(disk.size)
2325     else:
2326       _Fail("Cannot find block device %s", disk)
2327   else:
2328     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2329           disk.unique_id, disk.dev_type)
2330
2331
2332 def FinalizeExport(instance, snap_disks):
2333   """Write out the export configuration information.
2334
2335   @type instance: L{objects.Instance}
2336   @param instance: the instance which we export, used for
2337       saving configuration
2338   @type snap_disks: list of L{objects.Disk}
2339   @param snap_disks: list of snapshot block devices, which
2340       will be used to get the actual name of the dump file
2341
2342   @rtype: None
2343
2344   """
2345   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2346   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2347
2348   config = objects.SerializableConfigParser()
2349
2350   config.add_section(constants.INISECT_EXP)
2351   config.set(constants.INISECT_EXP, "version", "0")
2352   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2353   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2354   config.set(constants.INISECT_EXP, "os", instance.os)
2355   config.set(constants.INISECT_EXP, "compression", "none")
2356
2357   config.add_section(constants.INISECT_INS)
2358   config.set(constants.INISECT_INS, "name", instance.name)
2359   config.set(constants.INISECT_INS, "memory", "%d" %
2360              instance.beparams[constants.BE_MEMORY])
2361   config.set(constants.INISECT_INS, "vcpus", "%d" %
2362              instance.beparams[constants.BE_VCPUS])
2363   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2364   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2365   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2366
2367   nic_total = 0
2368   for nic_count, nic in enumerate(instance.nics):
2369     nic_total += 1
2370     config.set(constants.INISECT_INS, "nic%d_mac" %
2371                nic_count, "%s" % nic.mac)
2372     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2373     for param in constants.NICS_PARAMETER_TYPES:
2374       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2375                  "%s" % nic.nicparams.get(param, None))
2376   # TODO: redundant: on load can read nics until it doesn't exist
2377   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2378
2379   disk_total = 0
2380   for disk_count, disk in enumerate(snap_disks):
2381     if disk:
2382       disk_total += 1
2383       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2384                  ("%s" % disk.iv_name))
2385       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2386                  ("%s" % disk.physical_id[1]))
2387       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2388                  ("%d" % disk.size))
2389
2390   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2391
2392   # New-style hypervisor/backend parameters
2393
2394   config.add_section(constants.INISECT_HYP)
2395   for name, value in instance.hvparams.items():
2396     if name not in constants.HVC_GLOBALS:
2397       config.set(constants.INISECT_HYP, name, str(value))
2398
2399   config.add_section(constants.INISECT_BEP)
2400   for name, value in instance.beparams.items():
2401     config.set(constants.INISECT_BEP, name, str(value))
2402
2403   config.add_section(constants.INISECT_OSP)
2404   for name, value in instance.osparams.items():
2405     config.set(constants.INISECT_OSP, name, str(value))
2406
2407   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2408                   data=config.Dumps())
2409   shutil.rmtree(finaldestdir, ignore_errors=True)
2410   shutil.move(destdir, finaldestdir)
2411
2412
2413 def ExportInfo(dest):
2414   """Get export configuration information.
2415
2416   @type dest: str
2417   @param dest: directory containing the export
2418
2419   @rtype: L{objects.SerializableConfigParser}
2420   @return: a serializable config file containing the
2421       export info
2422
2423   """
2424   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2425
2426   config = objects.SerializableConfigParser()
2427   config.read(cff)
2428
2429   if (not config.has_section(constants.INISECT_EXP) or
2430       not config.has_section(constants.INISECT_INS)):
2431     _Fail("Export info file doesn't have the required fields")
2432
2433   return config.Dumps()
2434
2435
2436 def ListExports():
2437   """Return a list of exports currently available on this machine.
2438
2439   @rtype: list
2440   @return: list of the exports
2441
2442   """
2443   if os.path.isdir(constants.EXPORT_DIR):
2444     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2445   else:
2446     _Fail("No exports directory")
2447
2448
2449 def RemoveExport(export):
2450   """Remove an existing export from the node.
2451
2452   @type export: str
2453   @param export: the name of the export to remove
2454   @rtype: None
2455
2456   """
2457   target = utils.PathJoin(constants.EXPORT_DIR, export)
2458
2459   try:
2460     shutil.rmtree(target)
2461   except EnvironmentError, err:
2462     _Fail("Error while removing the export: %s", err, exc=True)
2463
2464
2465 def BlockdevRename(devlist):
2466   """Rename a list of block devices.
2467
2468   @type devlist: list of tuples
2469   @param devlist: list of tuples of the form  (disk,
2470       new_logical_id, new_physical_id); disk is an
2471       L{objects.Disk} object describing the current disk,
2472       and new logical_id/physical_id is the name we
2473       rename it to
2474   @rtype: boolean
2475   @return: True if all renames succeeded, False otherwise
2476
2477   """
2478   msgs = []
2479   result = True
2480   for disk, unique_id in devlist:
2481     dev = _RecursiveFindBD(disk)
2482     if dev is None:
2483       msgs.append("Can't find device %s in rename" % str(disk))
2484       result = False
2485       continue
2486     try:
2487       old_rpath = dev.dev_path
2488       dev.Rename(unique_id)
2489       new_rpath = dev.dev_path
2490       if old_rpath != new_rpath:
2491         DevCacheManager.RemoveCache(old_rpath)
2492         # FIXME: we should add the new cache information here, like:
2493         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2494         # but we don't have the owner here - maybe parse from existing
2495         # cache? for now, we only lose lvm data when we rename, which
2496         # is less critical than DRBD or MD
2497     except errors.BlockDeviceError, err:
2498       msgs.append("Can't rename device '%s' to '%s': %s" %
2499                   (dev, unique_id, err))
2500       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2501       result = False
2502   if not result:
2503     _Fail("; ".join(msgs))
2504
2505
2506 def _TransformFileStorageDir(fs_dir):
2507   """Checks whether given file_storage_dir is valid.
2508
2509   Checks wheter the given fs_dir is within the cluster-wide default
2510   file_storage_dir or the shared_file_storage_dir, which are stored in
2511   SimpleStore. Only paths under those directories are allowed.
2512
2513   @type fs_dir: str
2514   @param fs_dir: the path to check
2515
2516   @return: the normalized path if valid, None otherwise
2517
2518   """
2519   if not constants.ENABLE_FILE_STORAGE:
2520     _Fail("File storage disabled at configure time")
2521   cfg = _GetConfig()
2522   fs_dir = os.path.normpath(fs_dir)
2523   base_fstore = cfg.GetFileStorageDir()
2524   base_shared = cfg.GetSharedFileStorageDir()
2525   if not (utils.IsBelowDir(base_fstore, fs_dir) or
2526           utils.IsBelowDir(base_shared, fs_dir)):
2527     _Fail("File storage directory '%s' is not under base file"
2528           " storage directory '%s' or shared storage directory '%s'",
2529           fs_dir, base_fstore, base_shared)
2530   return fs_dir
2531
2532
2533 def CreateFileStorageDir(file_storage_dir):
2534   """Create file storage directory.
2535
2536   @type file_storage_dir: str
2537   @param file_storage_dir: directory to create
2538
2539   @rtype: tuple
2540   @return: tuple with first element a boolean indicating wheter dir
2541       creation was successful or not
2542
2543   """
2544   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2545   if os.path.exists(file_storage_dir):
2546     if not os.path.isdir(file_storage_dir):
2547       _Fail("Specified storage dir '%s' is not a directory",
2548             file_storage_dir)
2549   else:
2550     try:
2551       os.makedirs(file_storage_dir, 0750)
2552     except OSError, err:
2553       _Fail("Cannot create file storage directory '%s': %s",
2554             file_storage_dir, err, exc=True)
2555
2556
2557 def RemoveFileStorageDir(file_storage_dir):
2558   """Remove file storage directory.
2559
2560   Remove it only if it's empty. If not log an error and return.
2561
2562   @type file_storage_dir: str
2563   @param file_storage_dir: the directory we should cleanup
2564   @rtype: tuple (success,)
2565   @return: tuple of one element, C{success}, denoting
2566       whether the operation was successful
2567
2568   """
2569   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2570   if os.path.exists(file_storage_dir):
2571     if not os.path.isdir(file_storage_dir):
2572       _Fail("Specified Storage directory '%s' is not a directory",
2573             file_storage_dir)
2574     # deletes dir only if empty, otherwise we want to fail the rpc call
2575     try:
2576       os.rmdir(file_storage_dir)
2577     except OSError, err:
2578       _Fail("Cannot remove file storage directory '%s': %s",
2579             file_storage_dir, err)
2580
2581
2582 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2583   """Rename the file storage directory.
2584
2585   @type old_file_storage_dir: str
2586   @param old_file_storage_dir: the current path
2587   @type new_file_storage_dir: str
2588   @param new_file_storage_dir: the name we should rename to
2589   @rtype: tuple (success,)
2590   @return: tuple of one element, C{success}, denoting
2591       whether the operation was successful
2592
2593   """
2594   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2595   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2596   if not os.path.exists(new_file_storage_dir):
2597     if os.path.isdir(old_file_storage_dir):
2598       try:
2599         os.rename(old_file_storage_dir, new_file_storage_dir)
2600       except OSError, err:
2601         _Fail("Cannot rename '%s' to '%s': %s",
2602               old_file_storage_dir, new_file_storage_dir, err)
2603     else:
2604       _Fail("Specified storage dir '%s' is not a directory",
2605             old_file_storage_dir)
2606   else:
2607     if os.path.exists(old_file_storage_dir):
2608       _Fail("Cannot rename '%s' to '%s': both locations exist",
2609             old_file_storage_dir, new_file_storage_dir)
2610
2611
2612 def _EnsureJobQueueFile(file_name):
2613   """Checks whether the given filename is in the queue directory.
2614
2615   @type file_name: str
2616   @param file_name: the file name we should check
2617   @rtype: None
2618   @raises RPCFail: if the file is not valid
2619
2620   """
2621   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2622   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2623
2624   if not result:
2625     _Fail("Passed job queue file '%s' does not belong to"
2626           " the queue directory '%s'", file_name, queue_dir)
2627
2628
2629 def JobQueueUpdate(file_name, content):
2630   """Updates a file in the queue directory.
2631
2632   This is just a wrapper over L{utils.io.WriteFile}, with proper
2633   checking.
2634
2635   @type file_name: str
2636   @param file_name: the job file name
2637   @type content: str
2638   @param content: the new job contents
2639   @rtype: boolean
2640   @return: the success of the operation
2641
2642   """
2643   _EnsureJobQueueFile(file_name)
2644   getents = runtime.GetEnts()
2645
2646   # Write and replace the file atomically
2647   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2648                   gid=getents.masterd_gid)
2649
2650
2651 def JobQueueRename(old, new):
2652   """Renames a job queue file.
2653
2654   This is just a wrapper over os.rename with proper checking.
2655
2656   @type old: str
2657   @param old: the old (actual) file name
2658   @type new: str
2659   @param new: the desired file name
2660   @rtype: tuple
2661   @return: the success of the operation and payload
2662
2663   """
2664   _EnsureJobQueueFile(old)
2665   _EnsureJobQueueFile(new)
2666
2667   utils.RenameFile(old, new, mkdir=True)
2668
2669
2670 def BlockdevClose(instance_name, disks):
2671   """Closes the given block devices.
2672
2673   This means they will be switched to secondary mode (in case of
2674   DRBD).
2675
2676   @param instance_name: if the argument is not empty, the symlinks
2677       of this instance will be removed
2678   @type disks: list of L{objects.Disk}
2679   @param disks: the list of disks to be closed
2680   @rtype: tuple (success, message)
2681   @return: a tuple of success and message, where success
2682       indicates the succes of the operation, and message
2683       which will contain the error details in case we
2684       failed
2685
2686   """
2687   bdevs = []
2688   for cf in disks:
2689     rd = _RecursiveFindBD(cf)
2690     if rd is None:
2691       _Fail("Can't find device %s", cf)
2692     bdevs.append(rd)
2693
2694   msg = []
2695   for rd in bdevs:
2696     try:
2697       rd.Close()
2698     except errors.BlockDeviceError, err:
2699       msg.append(str(err))
2700   if msg:
2701     _Fail("Can't make devices secondary: %s", ",".join(msg))
2702   else:
2703     if instance_name:
2704       _RemoveBlockDevLinks(instance_name, disks)
2705
2706
2707 def ValidateHVParams(hvname, hvparams):
2708   """Validates the given hypervisor parameters.
2709
2710   @type hvname: string
2711   @param hvname: the hypervisor name
2712   @type hvparams: dict
2713   @param hvparams: the hypervisor parameters to be validated
2714   @rtype: None
2715
2716   """
2717   try:
2718     hv_type = hypervisor.GetHypervisor(hvname)
2719     hv_type.ValidateParameters(hvparams)
2720   except errors.HypervisorError, err:
2721     _Fail(str(err), log=False)
2722
2723
2724 def _CheckOSPList(os_obj, parameters):
2725   """Check whether a list of parameters is supported by the OS.
2726
2727   @type os_obj: L{objects.OS}
2728   @param os_obj: OS object to check
2729   @type parameters: list
2730   @param parameters: the list of parameters to check
2731
2732   """
2733   supported = [v[0] for v in os_obj.supported_parameters]
2734   delta = frozenset(parameters).difference(supported)
2735   if delta:
2736     _Fail("The following parameters are not supported"
2737           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2738
2739
2740 def ValidateOS(required, osname, checks, osparams):
2741   """Validate the given OS' parameters.
2742
2743   @type required: boolean
2744   @param required: whether absence of the OS should translate into
2745       failure or not
2746   @type osname: string
2747   @param osname: the OS to be validated
2748   @type checks: list
2749   @param checks: list of the checks to run (currently only 'parameters')
2750   @type osparams: dict
2751   @param osparams: dictionary with OS parameters
2752   @rtype: boolean
2753   @return: True if the validation passed, or False if the OS was not
2754       found and L{required} was false
2755
2756   """
2757   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2758     _Fail("Unknown checks required for OS %s: %s", osname,
2759           set(checks).difference(constants.OS_VALIDATE_CALLS))
2760
2761   name_only = objects.OS.GetName(osname)
2762   status, tbv = _TryOSFromDisk(name_only, None)
2763
2764   if not status:
2765     if required:
2766       _Fail(tbv)
2767     else:
2768       return False
2769
2770   if max(tbv.api_versions) < constants.OS_API_V20:
2771     return True
2772
2773   if constants.OS_VALIDATE_PARAMETERS in checks:
2774     _CheckOSPList(tbv, osparams.keys())
2775
2776   validate_env = OSCoreEnv(osname, tbv, osparams)
2777   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2778                         cwd=tbv.path, reset_env=True)
2779   if result.failed:
2780     logging.error("os validate command '%s' returned error: %s output: %s",
2781                   result.cmd, result.fail_reason, result.output)
2782     _Fail("OS validation script failed (%s), output: %s",
2783           result.fail_reason, result.output, log=False)
2784
2785   return True
2786
2787
2788 def DemoteFromMC():
2789   """Demotes the current node from master candidate role.
2790
2791   """
2792   # try to ensure we're not the master by mistake
2793   master, myself = ssconf.GetMasterAndMyself()
2794   if master == myself:
2795     _Fail("ssconf status shows I'm the master node, will not demote")
2796
2797   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2798   if not result.failed:
2799     _Fail("The master daemon is running, will not demote")
2800
2801   try:
2802     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2803       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2804   except EnvironmentError, err:
2805     if err.errno != errno.ENOENT:
2806       _Fail("Error while backing up cluster file: %s", err, exc=True)
2807
2808   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2809
2810
2811 def _GetX509Filenames(cryptodir, name):
2812   """Returns the full paths for the private key and certificate.
2813
2814   """
2815   return (utils.PathJoin(cryptodir, name),
2816           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2817           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2818
2819
2820 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2821   """Creates a new X509 certificate for SSL/TLS.
2822
2823   @type validity: int
2824   @param validity: Validity in seconds
2825   @rtype: tuple; (string, string)
2826   @return: Certificate name and public part
2827
2828   """
2829   (key_pem, cert_pem) = \
2830     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2831                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2832
2833   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2834                               prefix="x509-%s-" % utils.TimestampForFilename())
2835   try:
2836     name = os.path.basename(cert_dir)
2837     assert len(name) > 5
2838
2839     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2840
2841     utils.WriteFile(key_file, mode=0400, data=key_pem)
2842     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2843
2844     # Never return private key as it shouldn't leave the node
2845     return (name, cert_pem)
2846   except Exception:
2847     shutil.rmtree(cert_dir, ignore_errors=True)
2848     raise
2849
2850
2851 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2852   """Removes a X509 certificate.
2853
2854   @type name: string
2855   @param name: Certificate name
2856
2857   """
2858   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2859
2860   utils.RemoveFile(key_file)
2861   utils.RemoveFile(cert_file)
2862
2863   try:
2864     os.rmdir(cert_dir)
2865   except EnvironmentError, err:
2866     _Fail("Cannot remove certificate directory '%s': %s",
2867           cert_dir, err)
2868
2869
2870 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2871   """Returns the command for the requested input/output.
2872
2873   @type instance: L{objects.Instance}
2874   @param instance: The instance object
2875   @param mode: Import/export mode
2876   @param ieio: Input/output type
2877   @param ieargs: Input/output arguments
2878
2879   """
2880   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2881
2882   env = None
2883   prefix = None
2884   suffix = None
2885   exp_size = None
2886
2887   if ieio == constants.IEIO_FILE:
2888     (filename, ) = ieargs
2889
2890     if not utils.IsNormAbsPath(filename):
2891       _Fail("Path '%s' is not normalized or absolute", filename)
2892
2893     real_filename = os.path.realpath(filename)
2894     directory = os.path.dirname(real_filename)
2895
2896     if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2897       _Fail("File '%s' is not under exports directory '%s': %s",
2898             filename, constants.EXPORT_DIR, real_filename)
2899
2900     # Create directory
2901     utils.Makedirs(directory, mode=0750)
2902
2903     quoted_filename = utils.ShellQuote(filename)
2904
2905     if mode == constants.IEM_IMPORT:
2906       suffix = "> %s" % quoted_filename
2907     elif mode == constants.IEM_EXPORT:
2908       suffix = "< %s" % quoted_filename
2909
2910       # Retrieve file size
2911       try:
2912         st = os.stat(filename)
2913       except EnvironmentError, err:
2914         logging.error("Can't stat(2) %s: %s", filename, err)
2915       else:
2916         exp_size = utils.BytesToMebibyte(st.st_size)
2917
2918   elif ieio == constants.IEIO_RAW_DISK:
2919     (disk, ) = ieargs
2920
2921     real_disk = _OpenRealBD(disk)
2922
2923     if mode == constants.IEM_IMPORT:
2924       # we set here a smaller block size as, due to transport buffering, more
2925       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2926       # is not already there or we pass a wrong path; we use notrunc to no
2927       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2928       # much memory; this means that at best, we flush every 64k, which will
2929       # not be very fast
2930       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2931                                     " bs=%s oflag=dsync"),
2932                                     real_disk.dev_path,
2933                                     str(64 * 1024))
2934
2935     elif mode == constants.IEM_EXPORT:
2936       # the block size on the read dd is 1MiB to match our units
2937       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2938                                    real_disk.dev_path,
2939                                    str(1024 * 1024), # 1 MB
2940                                    str(disk.size))
2941       exp_size = disk.size
2942
2943   elif ieio == constants.IEIO_SCRIPT:
2944     (disk, disk_index, ) = ieargs
2945
2946     assert isinstance(disk_index, (int, long))
2947
2948     real_disk = _OpenRealBD(disk)
2949
2950     inst_os = OSFromDisk(instance.os)
2951     env = OSEnvironment(instance, inst_os)
2952
2953     if mode == constants.IEM_IMPORT:
2954       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2955       env["IMPORT_INDEX"] = str(disk_index)
2956       script = inst_os.import_script
2957
2958     elif mode == constants.IEM_EXPORT:
2959       env["EXPORT_DEVICE"] = real_disk.dev_path
2960       env["EXPORT_INDEX"] = str(disk_index)
2961       script = inst_os.export_script
2962
2963     # TODO: Pass special environment only to script
2964     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2965
2966     if mode == constants.IEM_IMPORT:
2967       suffix = "| %s" % script_cmd
2968
2969     elif mode == constants.IEM_EXPORT:
2970       prefix = "%s |" % script_cmd
2971
2972     # Let script predict size
2973     exp_size = constants.IE_CUSTOM_SIZE
2974
2975   else:
2976     _Fail("Invalid %s I/O mode %r", mode, ieio)
2977
2978   return (env, prefix, suffix, exp_size)
2979
2980
2981 def _CreateImportExportStatusDir(prefix):
2982   """Creates status directory for import/export.
2983
2984   """
2985   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2986                           prefix=("%s-%s-" %
2987                                   (prefix, utils.TimestampForFilename())))
2988
2989
2990 def StartImportExportDaemon(mode, opts, host, port, instance, component,
2991                             ieio, ieioargs):
2992   """Starts an import or export daemon.
2993
2994   @param mode: Import/output mode
2995   @type opts: L{objects.ImportExportOptions}
2996   @param opts: Daemon options
2997   @type host: string
2998   @param host: Remote host for export (None for import)
2999   @type port: int
3000   @param port: Remote port for export (None for import)
3001   @type instance: L{objects.Instance}
3002   @param instance: Instance object
3003   @type component: string
3004   @param component: which part of the instance is transferred now,
3005       e.g. 'disk/0'
3006   @param ieio: Input/output type
3007   @param ieioargs: Input/output arguments
3008
3009   """
3010   if mode == constants.IEM_IMPORT:
3011     prefix = "import"
3012
3013     if not (host is None and port is None):
3014       _Fail("Can not specify host or port on import")
3015
3016   elif mode == constants.IEM_EXPORT:
3017     prefix = "export"
3018
3019     if host is None or port is None:
3020       _Fail("Host and port must be specified for an export")
3021
3022   else:
3023     _Fail("Invalid mode %r", mode)
3024
3025   if (opts.key_name is None) ^ (opts.ca_pem is None):
3026     _Fail("Cluster certificate can only be used for both key and CA")
3027
3028   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3029     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3030
3031   if opts.key_name is None:
3032     # Use server.pem
3033     key_path = constants.NODED_CERT_FILE
3034     cert_path = constants.NODED_CERT_FILE
3035     assert opts.ca_pem is None
3036   else:
3037     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3038                                                  opts.key_name)
3039     assert opts.ca_pem is not None
3040
3041   for i in [key_path, cert_path]:
3042     if not os.path.exists(i):
3043       _Fail("File '%s' does not exist" % i)
3044
3045   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3046   try:
3047     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3048     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3049     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3050
3051     if opts.ca_pem is None:
3052       # Use server.pem
3053       ca = utils.ReadFile(constants.NODED_CERT_FILE)
3054     else:
3055       ca = opts.ca_pem
3056
3057     # Write CA file
3058     utils.WriteFile(ca_file, data=ca, mode=0400)
3059
3060     cmd = [
3061       constants.IMPORT_EXPORT_DAEMON,
3062       status_file, mode,
3063       "--key=%s" % key_path,
3064       "--cert=%s" % cert_path,
3065       "--ca=%s" % ca_file,
3066       ]
3067
3068     if host:
3069       cmd.append("--host=%s" % host)
3070
3071     if port:
3072       cmd.append("--port=%s" % port)
3073
3074     if opts.ipv6:
3075       cmd.append("--ipv6")
3076     else:
3077       cmd.append("--ipv4")
3078
3079     if opts.compress:
3080       cmd.append("--compress=%s" % opts.compress)
3081
3082     if opts.magic:
3083       cmd.append("--magic=%s" % opts.magic)
3084
3085     if exp_size is not None:
3086       cmd.append("--expected-size=%s" % exp_size)
3087
3088     if cmd_prefix:
3089       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3090
3091     if cmd_suffix:
3092       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3093
3094     if mode == constants.IEM_EXPORT:
3095       # Retry connection a few times when connecting to remote peer
3096       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3097       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3098     elif opts.connect_timeout is not None:
3099       assert mode == constants.IEM_IMPORT
3100       # Overall timeout for establishing connection while listening
3101       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3102
3103     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3104
3105     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3106     # support for receiving a file descriptor for output
3107     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3108                       output=logfile)
3109
3110     # The import/export name is simply the status directory name
3111     return os.path.basename(status_dir)
3112
3113   except Exception:
3114     shutil.rmtree(status_dir, ignore_errors=True)
3115     raise
3116
3117
3118 def GetImportExportStatus(names):
3119   """Returns import/export daemon status.
3120
3121   @type names: sequence
3122   @param names: List of names
3123   @rtype: List of dicts
3124   @return: Returns a list of the state of each named import/export or None if a
3125            status couldn't be read
3126
3127   """
3128   result = []
3129
3130   for name in names:
3131     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3132                                  _IES_STATUS_FILE)
3133
3134     try:
3135       data = utils.ReadFile(status_file)
3136     except EnvironmentError, err:
3137       if err.errno != errno.ENOENT:
3138         raise
3139       data = None
3140
3141     if not data:
3142       result.append(None)
3143       continue
3144
3145     result.append(serializer.LoadJson(data))
3146
3147   return result
3148
3149
3150 def AbortImportExport(name):
3151   """Sends SIGTERM to a running import/export daemon.
3152
3153   """
3154   logging.info("Abort import/export %s", name)
3155
3156   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3157   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3158
3159   if pid:
3160     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3161                  name, pid)
3162     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3163
3164
3165 def CleanupImportExport(name):
3166   """Cleanup after an import or export.
3167
3168   If the import/export daemon is still running it's killed. Afterwards the
3169   whole status directory is removed.
3170
3171   """
3172   logging.info("Finalizing import/export %s", name)
3173
3174   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3175
3176   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3177
3178   if pid:
3179     logging.info("Import/export %s is still running with PID %s",
3180                  name, pid)
3181     utils.KillProcess(pid, waitpid=False)
3182
3183   shutil.rmtree(status_dir, ignore_errors=True)
3184
3185
3186 def _FindDisks(nodes_ip, disks):
3187   """Sets the physical ID on disks and returns the block devices.
3188
3189   """
3190   # set the correct physical ID
3191   my_name = netutils.Hostname.GetSysName()
3192   for cf in disks:
3193     cf.SetPhysicalID(my_name, nodes_ip)
3194
3195   bdevs = []
3196
3197   for cf in disks:
3198     rd = _RecursiveFindBD(cf)
3199     if rd is None:
3200       _Fail("Can't find device %s", cf)
3201     bdevs.append(rd)
3202   return bdevs
3203
3204
3205 def DrbdDisconnectNet(nodes_ip, disks):
3206   """Disconnects the network on a list of drbd devices.
3207
3208   """
3209   bdevs = _FindDisks(nodes_ip, disks)
3210
3211   # disconnect disks
3212   for rd in bdevs:
3213     try:
3214       rd.DisconnectNet()
3215     except errors.BlockDeviceError, err:
3216       _Fail("Can't change network configuration to standalone mode: %s",
3217             err, exc=True)
3218
3219
3220 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3221   """Attaches the network on a list of drbd devices.
3222
3223   """
3224   bdevs = _FindDisks(nodes_ip, disks)
3225
3226   if multimaster:
3227     for idx, rd in enumerate(bdevs):
3228       try:
3229         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3230       except EnvironmentError, err:
3231         _Fail("Can't create symlink: %s", err)
3232   # reconnect disks, switch to new master configuration and if
3233   # needed primary mode
3234   for rd in bdevs:
3235     try:
3236       rd.AttachNet(multimaster)
3237     except errors.BlockDeviceError, err:
3238       _Fail("Can't change network configuration: %s", err)
3239
3240   # wait until the disks are connected; we need to retry the re-attach
3241   # if the device becomes standalone, as this might happen if the one
3242   # node disconnects and reconnects in a different mode before the
3243   # other node reconnects; in this case, one or both of the nodes will
3244   # decide it has wrong configuration and switch to standalone
3245
3246   def _Attach():
3247     all_connected = True
3248
3249     for rd in bdevs:
3250       stats = rd.GetProcStatus()
3251
3252       all_connected = (all_connected and
3253                        (stats.is_connected or stats.is_in_resync))
3254
3255       if stats.is_standalone:
3256         # peer had different config info and this node became
3257         # standalone, even though this should not happen with the
3258         # new staged way of changing disk configs
3259         try:
3260           rd.AttachNet(multimaster)
3261         except errors.BlockDeviceError, err:
3262           _Fail("Can't change network configuration: %s", err)
3263
3264     if not all_connected:
3265       raise utils.RetryAgain()
3266
3267   try:
3268     # Start with a delay of 100 miliseconds and go up to 5 seconds
3269     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3270   except utils.RetryTimeout:
3271     _Fail("Timeout in disk reconnecting")
3272
3273   if multimaster:
3274     # change to primary mode
3275     for rd in bdevs:
3276       try:
3277         rd.Open()
3278       except errors.BlockDeviceError, err:
3279         _Fail("Can't change to primary mode: %s", err)
3280
3281
3282 def DrbdWaitSync(nodes_ip, disks):
3283   """Wait until DRBDs have synchronized.
3284
3285   """
3286   def _helper(rd):
3287     stats = rd.GetProcStatus()
3288     if not (stats.is_connected or stats.is_in_resync):
3289       raise utils.RetryAgain()
3290     return stats
3291
3292   bdevs = _FindDisks(nodes_ip, disks)
3293
3294   min_resync = 100
3295   alldone = True
3296   for rd in bdevs:
3297     try:
3298       # poll each second for 15 seconds
3299       stats = utils.Retry(_helper, 1, 15, args=[rd])
3300     except utils.RetryTimeout:
3301       stats = rd.GetProcStatus()
3302       # last check
3303       if not (stats.is_connected or stats.is_in_resync):
3304         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3305     alldone = alldone and (not stats.is_in_resync)
3306     if stats.sync_percent is not None:
3307       min_resync = min(min_resync, stats.sync_percent)
3308
3309   return (alldone, min_resync)
3310
3311
3312 def GetDrbdUsermodeHelper():
3313   """Returns DRBD usermode helper currently configured.
3314
3315   """
3316   try:
3317     return bdev.BaseDRBD.GetUsermodeHelper()
3318   except errors.BlockDeviceError, err:
3319     _Fail(str(err))
3320
3321
3322 def PowercycleNode(hypervisor_type):
3323   """Hard-powercycle the node.
3324
3325   Because we need to return first, and schedule the powercycle in the
3326   background, we won't be able to report failures nicely.
3327
3328   """
3329   hyper = hypervisor.GetHypervisor(hypervisor_type)
3330   try:
3331     pid = os.fork()
3332   except OSError:
3333     # if we can't fork, we'll pretend that we're in the child process
3334     pid = 0
3335   if pid > 0:
3336     return "Reboot scheduled in 5 seconds"
3337   # ensure the child is running on ram
3338   try:
3339     utils.Mlockall()
3340   except Exception: # pylint: disable=W0703
3341     pass
3342   time.sleep(5)
3343   hyper.PowercycleNode()
3344
3345
3346 class HooksRunner(object):
3347   """Hook runner.
3348
3349   This class is instantiated on the node side (ganeti-noded) and not
3350   on the master side.
3351
3352   """
3353   def __init__(self, hooks_base_dir=None):
3354     """Constructor for hooks runner.
3355
3356     @type hooks_base_dir: str or None
3357     @param hooks_base_dir: if not None, this overrides the
3358         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3359
3360     """
3361     if hooks_base_dir is None:
3362       hooks_base_dir = constants.HOOKS_BASE_DIR
3363     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3364     # constant
3365     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3366
3367   def RunHooks(self, hpath, phase, env):
3368     """Run the scripts in the hooks directory.
3369
3370     @type hpath: str
3371     @param hpath: the path to the hooks directory which
3372         holds the scripts
3373     @type phase: str
3374     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3375         L{constants.HOOKS_PHASE_POST}
3376     @type env: dict
3377     @param env: dictionary with the environment for the hook
3378     @rtype: list
3379     @return: list of 3-element tuples:
3380       - script path
3381       - script result, either L{constants.HKR_SUCCESS} or
3382         L{constants.HKR_FAIL}
3383       - output of the script
3384
3385     @raise errors.ProgrammerError: for invalid input
3386         parameters
3387
3388     """
3389     if phase == constants.HOOKS_PHASE_PRE:
3390       suffix = "pre"
3391     elif phase == constants.HOOKS_PHASE_POST:
3392       suffix = "post"
3393     else:
3394       _Fail("Unknown hooks phase '%s'", phase)
3395
3396     subdir = "%s-%s.d" % (hpath, suffix)
3397     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3398
3399     results = []
3400
3401     if not os.path.isdir(dir_name):
3402       # for non-existing/non-dirs, we simply exit instead of logging a
3403       # warning at every operation
3404       return results
3405
3406     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3407
3408     for (relname, relstatus, runresult)  in runparts_results:
3409       if relstatus == constants.RUNPARTS_SKIP:
3410         rrval = constants.HKR_SKIP
3411         output = ""
3412       elif relstatus == constants.RUNPARTS_ERR:
3413         rrval = constants.HKR_FAIL
3414         output = "Hook script execution error: %s" % runresult
3415       elif relstatus == constants.RUNPARTS_RUN:
3416         if runresult.failed:
3417           rrval = constants.HKR_FAIL
3418         else:
3419           rrval = constants.HKR_SUCCESS
3420         output = utils.SafeEncode(runresult.output.strip())
3421       results.append(("%s/%s" % (subdir, relname), rrval, output))
3422
3423     return results
3424
3425
3426 class IAllocatorRunner(object):
3427   """IAllocator runner.
3428
3429   This class is instantiated on the node side (ganeti-noded) and not on
3430   the master side.
3431
3432   """
3433   @staticmethod
3434   def Run(name, idata):
3435     """Run an iallocator script.
3436
3437     @type name: str
3438     @param name: the iallocator script name
3439     @type idata: str
3440     @param idata: the allocator input data
3441
3442     @rtype: tuple
3443     @return: two element tuple of:
3444        - status
3445        - either error message or stdout of allocator (for success)
3446
3447     """
3448     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3449                                   os.path.isfile)
3450     if alloc_script is None:
3451       _Fail("iallocator module '%s' not found in the search path", name)
3452
3453     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3454     try:
3455       os.write(fd, idata)
3456       os.close(fd)
3457       result = utils.RunCmd([alloc_script, fin_name])
3458       if result.failed:
3459         _Fail("iallocator module '%s' failed: %s, output '%s'",
3460               name, result.fail_reason, result.output)
3461     finally:
3462       os.unlink(fin_name)
3463
3464     return result.stdout
3465
3466
3467 class DevCacheManager(object):
3468   """Simple class for managing a cache of block device information.
3469
3470   """
3471   _DEV_PREFIX = "/dev/"
3472   _ROOT_DIR = constants.BDEV_CACHE_DIR
3473
3474   @classmethod
3475   def _ConvertPath(cls, dev_path):
3476     """Converts a /dev/name path to the cache file name.
3477
3478     This replaces slashes with underscores and strips the /dev
3479     prefix. It then returns the full path to the cache file.
3480
3481     @type dev_path: str
3482     @param dev_path: the C{/dev/} path name
3483     @rtype: str
3484     @return: the converted path name
3485
3486     """
3487     if dev_path.startswith(cls._DEV_PREFIX):
3488       dev_path = dev_path[len(cls._DEV_PREFIX):]
3489     dev_path = dev_path.replace("/", "_")
3490     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3491     return fpath
3492
3493   @classmethod
3494   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3495     """Updates the cache information for a given device.
3496
3497     @type dev_path: str
3498     @param dev_path: the pathname of the device
3499     @type owner: str
3500     @param owner: the owner (instance name) of the device
3501     @type on_primary: bool
3502     @param on_primary: whether this is the primary
3503         node nor not
3504     @type iv_name: str
3505     @param iv_name: the instance-visible name of the
3506         device, as in objects.Disk.iv_name
3507
3508     @rtype: None
3509
3510     """
3511     if dev_path is None:
3512       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3513       return
3514     fpath = cls._ConvertPath(dev_path)
3515     if on_primary:
3516       state = "primary"
3517     else:
3518       state = "secondary"
3519     if iv_name is None:
3520       iv_name = "not_visible"
3521     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3522     try:
3523       utils.WriteFile(fpath, data=fdata)
3524     except EnvironmentError, err:
3525       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3526
3527   @classmethod
3528   def RemoveCache(cls, dev_path):
3529     """Remove data for a dev_path.
3530
3531     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3532     path name and logging.
3533
3534     @type dev_path: str
3535     @param dev_path: the pathname of the device
3536
3537     @rtype: None
3538
3539     """
3540     if dev_path is None:
3541       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3542       return
3543     fpath = cls._ConvertPath(dev_path)
3544     try:
3545       utils.RemoveFile(fpath)
3546     except EnvironmentError, err:
3547       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)