Merge branch 'devel-2.5'
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.SPICE_CERT_FILE,
200     constants.SPICE_CACERT_FILE,
201     constants.RAPI_USERS_FILE,
202     constants.CONFD_HMAC_KEY,
203     constants.CLUSTER_DOMAIN_SECRET_FILE,
204     ])
205
206   for hv_name in constants.HYPER_TYPES:
207     hv_class = hypervisor.GetHypervisorClass(hv_name)
208     allowed_files.update(hv_class.GetAncillaryFiles())
209
210   return frozenset(allowed_files)
211
212
213 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
214
215
216 def JobQueuePurge():
217   """Removes job queue files and archived jobs.
218
219   @rtype: tuple
220   @return: True, None
221
222   """
223   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
225
226
227 def GetMasterInfo():
228   """Returns master information.
229
230   This is an utility function to compute master information, either
231   for consumption here or from the node daemon.
232
233   @rtype: tuple
234   @return: master_netdev, master_ip, master_name, primary_ip_family
235   @raise RPCFail: in case of errors
236
237   """
238   try:
239     cfg = _GetConfig()
240     master_netdev = cfg.GetMasterNetdev()
241     master_ip = cfg.GetMasterIP()
242     master_node = cfg.GetMasterNode()
243     primary_ip_family = cfg.GetPrimaryIPFamily()
244   except errors.ConfigurationError, err:
245     _Fail("Cluster configuration incomplete: %s", err, exc=True)
246   return (master_netdev, master_ip, master_node, primary_ip_family)
247
248
249 def StartMaster(start_daemons, no_voting):
250   """Activate local node as master node.
251
252   The function will either try activate the IP address of the master
253   (unless someone else has it) or also start the master daemons, based
254   on the start_daemons parameter.
255
256   @type start_daemons: boolean
257   @param start_daemons: whether to start the master daemons
258       (ganeti-masterd and ganeti-rapi), or (if false) activate the
259       master ip
260   @type no_voting: boolean
261   @param no_voting: whether to start ganeti-masterd without a node vote
262       (if start_daemons is True), but still non-interactively
263   @rtype: None
264
265   """
266   # GetMasterInfo will raise an exception if not able to return data
267   master_netdev, master_ip, _, family = GetMasterInfo()
268
269   err_msgs = []
270   # either start the master and rapi daemons
271   if start_daemons:
272     if no_voting:
273       masterd_args = "--no-voting --yes-do-it"
274     else:
275       masterd_args = ""
276
277     env = {
278       "EXTRA_MASTERD_ARGS": masterd_args,
279       }
280
281     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
282     if result.failed:
283       msg = "Can't start Ganeti master: %s" % result.output
284       logging.error(msg)
285       err_msgs.append(msg)
286   # or activate the IP
287   else:
288     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
289       if netutils.IPAddress.Own(master_ip):
290         # we already have the ip:
291         logging.debug("Master IP already configured, doing nothing")
292       else:
293         msg = "Someone else has the master ip, not activating"
294         logging.error(msg)
295         err_msgs.append(msg)
296     else:
297       ipcls = netutils.IP4Address
298       if family == netutils.IP6Address.family:
299         ipcls = netutils.IP6Address
300
301       result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
302                              "%s/%d" % (master_ip, ipcls.iplen),
303                              "dev", master_netdev, "label",
304                              "%s:0" % master_netdev])
305       if result.failed:
306         msg = "Can't activate master IP: %s" % result.output
307         logging.error(msg)
308         err_msgs.append(msg)
309
310       # we ignore the exit code of the following cmds
311       if ipcls == netutils.IP4Address:
312         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
313                       master_ip, master_ip])
314       elif ipcls == netutils.IP6Address:
315         try:
316           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
317         except errors.OpExecError:
318           # TODO: Better error reporting
319           logging.warning("Can't execute ndisc6, please install if missing")
320
321   if err_msgs:
322     _Fail("; ".join(err_msgs))
323
324
325 def StopMaster(stop_daemons):
326   """Deactivate this node as master.
327
328   The function will always try to deactivate the IP address of the
329   master. It will also stop the master daemons depending on the
330   stop_daemons parameter.
331
332   @type stop_daemons: boolean
333   @param stop_daemons: whether to also stop the master daemons
334       (ganeti-masterd and ganeti-rapi)
335   @rtype: None
336
337   """
338   # TODO: log and report back to the caller the error failures; we
339   # need to decide in which case we fail the RPC for this
340
341   # GetMasterInfo will raise an exception if not able to return data
342   master_netdev, master_ip, _, family = GetMasterInfo()
343
344   ipcls = netutils.IP4Address
345   if family == netutils.IP6Address.family:
346     ipcls = netutils.IP6Address
347
348   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
349                          "%s/%d" % (master_ip, ipcls.iplen),
350                          "dev", master_netdev])
351   if result.failed:
352     logging.error("Can't remove the master IP, error: %s", result.output)
353     # but otherwise ignore the failure
354
355   if stop_daemons:
356     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
357     if result.failed:
358       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
359                     " and error %s",
360                     result.cmd, result.exit_code, result.output)
361
362
363 def EtcHostsModify(mode, host, ip):
364   """Modify a host entry in /etc/hosts.
365
366   @param mode: The mode to operate. Either add or remove entry
367   @param host: The host to operate on
368   @param ip: The ip associated with the entry
369
370   """
371   if mode == constants.ETC_HOSTS_ADD:
372     if not ip:
373       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
374               " present")
375     utils.AddHostToEtcHosts(host, ip)
376   elif mode == constants.ETC_HOSTS_REMOVE:
377     if ip:
378       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
379               " parameter is present")
380     utils.RemoveHostFromEtcHosts(host)
381   else:
382     RPCFail("Mode not supported")
383
384
385 def LeaveCluster(modify_ssh_setup):
386   """Cleans up and remove the current node.
387
388   This function cleans up and prepares the current node to be removed
389   from the cluster.
390
391   If processing is successful, then it raises an
392   L{errors.QuitGanetiException} which is used as a special case to
393   shutdown the node daemon.
394
395   @param modify_ssh_setup: boolean
396
397   """
398   _CleanDirectory(constants.DATA_DIR)
399   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
400   JobQueuePurge()
401
402   if modify_ssh_setup:
403     try:
404       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
405
406       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
407
408       utils.RemoveFile(priv_key)
409       utils.RemoveFile(pub_key)
410     except errors.OpExecError:
411       logging.exception("Error while processing ssh files")
412
413   try:
414     utils.RemoveFile(constants.CONFD_HMAC_KEY)
415     utils.RemoveFile(constants.RAPI_CERT_FILE)
416     utils.RemoveFile(constants.SPICE_CERT_FILE)
417     utils.RemoveFile(constants.SPICE_CACERT_FILE)
418     utils.RemoveFile(constants.NODED_CERT_FILE)
419   except: # pylint: disable=W0702
420     logging.exception("Error while removing cluster secrets")
421
422   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
423   if result.failed:
424     logging.error("Command %s failed with exitcode %s and error %s",
425                   result.cmd, result.exit_code, result.output)
426
427   # Raise a custom exception (handled in ganeti-noded)
428   raise errors.QuitGanetiException(True, "Shutdown scheduled")
429
430
431 def GetNodeInfo(vgname, hypervisor_type):
432   """Gives back a hash with different information about the node.
433
434   @type vgname: C{string}
435   @param vgname: the name of the volume group to ask for disk space information
436   @type hypervisor_type: C{str}
437   @param hypervisor_type: the name of the hypervisor to ask for
438       memory information
439   @rtype: C{dict}
440   @return: dictionary with the following keys:
441       - vg_size is the size of the configured volume group in MiB
442       - vg_free is the free size of the volume group in MiB
443       - memory_dom0 is the memory allocated for domain0 in MiB
444       - memory_free is the currently available (free) ram in MiB
445       - memory_total is the total number of ram in MiB
446
447   """
448   outputarray = {}
449
450   if vgname is not None:
451     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
452     vg_free = vg_size = None
453     if vginfo:
454       vg_free = int(round(vginfo[0][0], 0))
455       vg_size = int(round(vginfo[0][1], 0))
456     outputarray["vg_size"] = vg_size
457     outputarray["vg_free"] = vg_free
458
459   if hypervisor_type is not None:
460     hyper = hypervisor.GetHypervisor(hypervisor_type)
461     hyp_info = hyper.GetNodeInfo()
462     if hyp_info is not None:
463       outputarray.update(hyp_info)
464
465   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
466
467   return outputarray
468
469
470 def VerifyNode(what, cluster_name):
471   """Verify the status of the local node.
472
473   Based on the input L{what} parameter, various checks are done on the
474   local node.
475
476   If the I{filelist} key is present, this list of
477   files is checksummed and the file/checksum pairs are returned.
478
479   If the I{nodelist} key is present, we check that we have
480   connectivity via ssh with the target nodes (and check the hostname
481   report).
482
483   If the I{node-net-test} key is present, we check that we have
484   connectivity to the given nodes via both primary IP and, if
485   applicable, secondary IPs.
486
487   @type what: C{dict}
488   @param what: a dictionary of things to check:
489       - filelist: list of files for which to compute checksums
490       - nodelist: list of nodes we should check ssh communication with
491       - node-net-test: list of nodes we should check node daemon port
492         connectivity with
493       - hypervisor: list with hypervisors to run the verify for
494   @rtype: dict
495   @return: a dictionary with the same keys as the input dict, and
496       values representing the result of the checks
497
498   """
499   result = {}
500   my_name = netutils.Hostname.GetSysName()
501   port = netutils.GetDaemonPort(constants.NODED)
502   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
503
504   if constants.NV_HYPERVISOR in what and vm_capable:
505     result[constants.NV_HYPERVISOR] = tmp = {}
506     for hv_name in what[constants.NV_HYPERVISOR]:
507       try:
508         val = hypervisor.GetHypervisor(hv_name).Verify()
509       except errors.HypervisorError, err:
510         val = "Error while checking hypervisor: %s" % str(err)
511       tmp[hv_name] = val
512
513   if constants.NV_HVPARAMS in what and vm_capable:
514     result[constants.NV_HVPARAMS] = tmp = []
515     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
516       try:
517         logging.info("Validating hv %s, %s", hv_name, hvparms)
518         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
519       except errors.HypervisorError, err:
520         tmp.append((source, hv_name, str(err)))
521
522   if constants.NV_FILELIST in what:
523     result[constants.NV_FILELIST] = utils.FingerprintFiles(
524       what[constants.NV_FILELIST])
525
526   if constants.NV_NODELIST in what:
527     result[constants.NV_NODELIST] = tmp = {}
528     random.shuffle(what[constants.NV_NODELIST])
529     for node in what[constants.NV_NODELIST]:
530       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
531       if not success:
532         tmp[node] = message
533
534   if constants.NV_NODENETTEST in what:
535     result[constants.NV_NODENETTEST] = tmp = {}
536     my_pip = my_sip = None
537     for name, pip, sip in what[constants.NV_NODENETTEST]:
538       if name == my_name:
539         my_pip = pip
540         my_sip = sip
541         break
542     if not my_pip:
543       tmp[my_name] = ("Can't find my own primary/secondary IP"
544                       " in the node list")
545     else:
546       for name, pip, sip in what[constants.NV_NODENETTEST]:
547         fail = []
548         if not netutils.TcpPing(pip, port, source=my_pip):
549           fail.append("primary")
550         if sip != pip:
551           if not netutils.TcpPing(sip, port, source=my_sip):
552             fail.append("secondary")
553         if fail:
554           tmp[name] = ("failure using the %s interface(s)" %
555                        " and ".join(fail))
556
557   if constants.NV_MASTERIP in what:
558     # FIXME: add checks on incoming data structures (here and in the
559     # rest of the function)
560     master_name, master_ip = what[constants.NV_MASTERIP]
561     if master_name == my_name:
562       source = constants.IP4_ADDRESS_LOCALHOST
563     else:
564       source = None
565     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
566                                                   source=source)
567
568   if constants.NV_OOB_PATHS in what:
569     result[constants.NV_OOB_PATHS] = tmp = []
570     for path in what[constants.NV_OOB_PATHS]:
571       try:
572         st = os.stat(path)
573       except OSError, err:
574         tmp.append("error stating out of band helper: %s" % err)
575       else:
576         if stat.S_ISREG(st.st_mode):
577           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
578             tmp.append(None)
579           else:
580             tmp.append("out of band helper %s is not executable" % path)
581         else:
582           tmp.append("out of band helper %s is not a file" % path)
583
584   if constants.NV_LVLIST in what and vm_capable:
585     try:
586       val = GetVolumeList(utils.ListVolumeGroups().keys())
587     except RPCFail, err:
588       val = str(err)
589     result[constants.NV_LVLIST] = val
590
591   if constants.NV_INSTANCELIST in what and vm_capable:
592     # GetInstanceList can fail
593     try:
594       val = GetInstanceList(what[constants.NV_INSTANCELIST])
595     except RPCFail, err:
596       val = str(err)
597     result[constants.NV_INSTANCELIST] = val
598
599   if constants.NV_VGLIST in what and vm_capable:
600     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
601
602   if constants.NV_PVLIST in what and vm_capable:
603     result[constants.NV_PVLIST] = \
604       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
605                                    filter_allocatable=False)
606
607   if constants.NV_VERSION in what:
608     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
609                                     constants.RELEASE_VERSION)
610
611   if constants.NV_HVINFO in what and vm_capable:
612     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
613     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
614
615   if constants.NV_DRBDLIST in what and vm_capable:
616     try:
617       used_minors = bdev.DRBD8.GetUsedDevs().keys()
618     except errors.BlockDeviceError, err:
619       logging.warning("Can't get used minors list", exc_info=True)
620       used_minors = str(err)
621     result[constants.NV_DRBDLIST] = used_minors
622
623   if constants.NV_DRBDHELPER in what and vm_capable:
624     status = True
625     try:
626       payload = bdev.BaseDRBD.GetUsermodeHelper()
627     except errors.BlockDeviceError, err:
628       logging.error("Can't get DRBD usermode helper: %s", str(err))
629       status = False
630       payload = str(err)
631     result[constants.NV_DRBDHELPER] = (status, payload)
632
633   if constants.NV_NODESETUP in what:
634     result[constants.NV_NODESETUP] = tmpr = []
635     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
636       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
637                   " under /sys, missing required directories /sys/block"
638                   " and /sys/class/net")
639     if (not os.path.isdir("/proc/sys") or
640         not os.path.isfile("/proc/sysrq-trigger")):
641       tmpr.append("The procfs filesystem doesn't seem to be mounted"
642                   " under /proc, missing required directory /proc/sys and"
643                   " the file /proc/sysrq-trigger")
644
645   if constants.NV_TIME in what:
646     result[constants.NV_TIME] = utils.SplitTime(time.time())
647
648   if constants.NV_OSLIST in what and vm_capable:
649     result[constants.NV_OSLIST] = DiagnoseOS()
650
651   if constants.NV_BRIDGES in what and vm_capable:
652     result[constants.NV_BRIDGES] = [bridge
653                                     for bridge in what[constants.NV_BRIDGES]
654                                     if not utils.BridgeExists(bridge)]
655   return result
656
657
658 def GetBlockDevSizes(devices):
659   """Return the size of the given block devices
660
661   @type devices: list
662   @param devices: list of block device nodes to query
663   @rtype: dict
664   @return:
665     dictionary of all block devices under /dev (key). The value is their
666     size in MiB.
667
668     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
669
670   """
671   DEV_PREFIX = "/dev/"
672   blockdevs = {}
673
674   for devpath in devices:
675     if not utils.IsBelowDir(DEV_PREFIX, devpath):
676       continue
677
678     try:
679       st = os.stat(devpath)
680     except EnvironmentError, err:
681       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
682       continue
683
684     if stat.S_ISBLK(st.st_mode):
685       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
686       if result.failed:
687         # We don't want to fail, just do not list this device as available
688         logging.warning("Cannot get size for block device %s", devpath)
689         continue
690
691       size = int(result.stdout) / (1024 * 1024)
692       blockdevs[devpath] = size
693   return blockdevs
694
695
696 def GetVolumeList(vg_names):
697   """Compute list of logical volumes and their size.
698
699   @type vg_names: list
700   @param vg_names: the volume groups whose LVs we should list, or
701       empty for all volume groups
702   @rtype: dict
703   @return:
704       dictionary of all partions (key) with value being a tuple of
705       their size (in MiB), inactive and online status::
706
707         {'xenvg/test1': ('20.06', True, True)}
708
709       in case of errors, a string is returned with the error
710       details.
711
712   """
713   lvs = {}
714   sep = "|"
715   if not vg_names:
716     vg_names = []
717   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
718                          "--separator=%s" % sep,
719                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
720   if result.failed:
721     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
722
723   for line in result.stdout.splitlines():
724     line = line.strip()
725     match = _LVSLINE_REGEX.match(line)
726     if not match:
727       logging.error("Invalid line returned from lvs output: '%s'", line)
728       continue
729     vg_name, name, size, attr = match.groups()
730     inactive = attr[4] == "-"
731     online = attr[5] == "o"
732     virtual = attr[0] == "v"
733     if virtual:
734       # we don't want to report such volumes as existing, since they
735       # don't really hold data
736       continue
737     lvs[vg_name + "/" + name] = (size, inactive, online)
738
739   return lvs
740
741
742 def ListVolumeGroups():
743   """List the volume groups and their size.
744
745   @rtype: dict
746   @return: dictionary with keys volume name and values the
747       size of the volume
748
749   """
750   return utils.ListVolumeGroups()
751
752
753 def NodeVolumes():
754   """List all volumes on this node.
755
756   @rtype: list
757   @return:
758     A list of dictionaries, each having four keys:
759       - name: the logical volume name,
760       - size: the size of the logical volume
761       - dev: the physical device on which the LV lives
762       - vg: the volume group to which it belongs
763
764     In case of errors, we return an empty list and log the
765     error.
766
767     Note that since a logical volume can live on multiple physical
768     volumes, the resulting list might include a logical volume
769     multiple times.
770
771   """
772   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
773                          "--separator=|",
774                          "--options=lv_name,lv_size,devices,vg_name"])
775   if result.failed:
776     _Fail("Failed to list logical volumes, lvs output: %s",
777           result.output)
778
779   def parse_dev(dev):
780     return dev.split("(")[0]
781
782   def handle_dev(dev):
783     return [parse_dev(x) for x in dev.split(",")]
784
785   def map_line(line):
786     line = [v.strip() for v in line]
787     return [{"name": line[0], "size": line[1],
788              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
789
790   all_devs = []
791   for line in result.stdout.splitlines():
792     if line.count("|") >= 3:
793       all_devs.extend(map_line(line.split("|")))
794     else:
795       logging.warning("Strange line in the output from lvs: '%s'", line)
796   return all_devs
797
798
799 def BridgesExist(bridges_list):
800   """Check if a list of bridges exist on the current node.
801
802   @rtype: boolean
803   @return: C{True} if all of them exist, C{False} otherwise
804
805   """
806   missing = []
807   for bridge in bridges_list:
808     if not utils.BridgeExists(bridge):
809       missing.append(bridge)
810
811   if missing:
812     _Fail("Missing bridges %s", utils.CommaJoin(missing))
813
814
815 def GetInstanceList(hypervisor_list):
816   """Provides a list of instances.
817
818   @type hypervisor_list: list
819   @param hypervisor_list: the list of hypervisors to query information
820
821   @rtype: list
822   @return: a list of all running instances on the current node
823     - instance1.example.com
824     - instance2.example.com
825
826   """
827   results = []
828   for hname in hypervisor_list:
829     try:
830       names = hypervisor.GetHypervisor(hname).ListInstances()
831       results.extend(names)
832     except errors.HypervisorError, err:
833       _Fail("Error enumerating instances (hypervisor %s): %s",
834             hname, err, exc=True)
835
836   return results
837
838
839 def GetInstanceInfo(instance, hname):
840   """Gives back the information about an instance as a dictionary.
841
842   @type instance: string
843   @param instance: the instance name
844   @type hname: string
845   @param hname: the hypervisor type of the instance
846
847   @rtype: dict
848   @return: dictionary with the following keys:
849       - memory: memory size of instance (int)
850       - state: xen state of instance (string)
851       - time: cpu time of instance (float)
852
853   """
854   output = {}
855
856   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
857   if iinfo is not None:
858     output["memory"] = iinfo[2]
859     output["state"] = iinfo[4]
860     output["time"] = iinfo[5]
861
862   return output
863
864
865 def GetInstanceMigratable(instance):
866   """Gives whether an instance can be migrated.
867
868   @type instance: L{objects.Instance}
869   @param instance: object representing the instance to be checked.
870
871   @rtype: tuple
872   @return: tuple of (result, description) where:
873       - result: whether the instance can be migrated or not
874       - description: a description of the issue, if relevant
875
876   """
877   hyper = hypervisor.GetHypervisor(instance.hypervisor)
878   iname = instance.name
879   if iname not in hyper.ListInstances():
880     _Fail("Instance %s is not running", iname)
881
882   for idx in range(len(instance.disks)):
883     link_name = _GetBlockDevSymlinkPath(iname, idx)
884     if not os.path.islink(link_name):
885       logging.warning("Instance %s is missing symlink %s for disk %d",
886                       iname, link_name, idx)
887
888
889 def GetAllInstancesInfo(hypervisor_list):
890   """Gather data about all instances.
891
892   This is the equivalent of L{GetInstanceInfo}, except that it
893   computes data for all instances at once, thus being faster if one
894   needs data about more than one instance.
895
896   @type hypervisor_list: list
897   @param hypervisor_list: list of hypervisors to query for instance data
898
899   @rtype: dict
900   @return: dictionary of instance: data, with data having the following keys:
901       - memory: memory size of instance (int)
902       - state: xen state of instance (string)
903       - time: cpu time of instance (float)
904       - vcpus: the number of vcpus
905
906   """
907   output = {}
908
909   for hname in hypervisor_list:
910     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
911     if iinfo:
912       for name, _, memory, vcpus, state, times in iinfo:
913         value = {
914           "memory": memory,
915           "vcpus": vcpus,
916           "state": state,
917           "time": times,
918           }
919         if name in output:
920           # we only check static parameters, like memory and vcpus,
921           # and not state and time which can change between the
922           # invocations of the different hypervisors
923           for key in "memory", "vcpus":
924             if value[key] != output[name][key]:
925               _Fail("Instance %s is running twice"
926                     " with different parameters", name)
927         output[name] = value
928
929   return output
930
931
932 def _InstanceLogName(kind, os_name, instance, component):
933   """Compute the OS log filename for a given instance and operation.
934
935   The instance name and os name are passed in as strings since not all
936   operations have these as part of an instance object.
937
938   @type kind: string
939   @param kind: the operation type (e.g. add, import, etc.)
940   @type os_name: string
941   @param os_name: the os name
942   @type instance: string
943   @param instance: the name of the instance being imported/added/etc.
944   @type component: string or None
945   @param component: the name of the component of the instance being
946       transferred
947
948   """
949   # TODO: Use tempfile.mkstemp to create unique filename
950   if component:
951     assert "/" not in component
952     c_msg = "-%s" % component
953   else:
954     c_msg = ""
955   base = ("%s-%s-%s%s-%s.log" %
956           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
957   return utils.PathJoin(constants.LOG_OS_DIR, base)
958
959
960 def InstanceOsAdd(instance, reinstall, debug):
961   """Add an OS to an instance.
962
963   @type instance: L{objects.Instance}
964   @param instance: Instance whose OS is to be installed
965   @type reinstall: boolean
966   @param reinstall: whether this is an instance reinstall
967   @type debug: integer
968   @param debug: debug level, passed to the OS scripts
969   @rtype: None
970
971   """
972   inst_os = OSFromDisk(instance.os)
973
974   create_env = OSEnvironment(instance, inst_os, debug)
975   if reinstall:
976     create_env["INSTANCE_REINSTALL"] = "1"
977
978   logfile = _InstanceLogName("add", instance.os, instance.name, None)
979
980   result = utils.RunCmd([inst_os.create_script], env=create_env,
981                         cwd=inst_os.path, output=logfile, reset_env=True)
982   if result.failed:
983     logging.error("os create command '%s' returned error: %s, logfile: %s,"
984                   " output: %s", result.cmd, result.fail_reason, logfile,
985                   result.output)
986     lines = [utils.SafeEncode(val)
987              for val in utils.TailFile(logfile, lines=20)]
988     _Fail("OS create script failed (%s), last lines in the"
989           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
990
991
992 def RunRenameInstance(instance, old_name, debug):
993   """Run the OS rename script for an instance.
994
995   @type instance: L{objects.Instance}
996   @param instance: Instance whose OS is to be installed
997   @type old_name: string
998   @param old_name: previous instance name
999   @type debug: integer
1000   @param debug: debug level, passed to the OS scripts
1001   @rtype: boolean
1002   @return: the success of the operation
1003
1004   """
1005   inst_os = OSFromDisk(instance.os)
1006
1007   rename_env = OSEnvironment(instance, inst_os, debug)
1008   rename_env["OLD_INSTANCE_NAME"] = old_name
1009
1010   logfile = _InstanceLogName("rename", instance.os,
1011                              "%s-%s" % (old_name, instance.name), None)
1012
1013   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1014                         cwd=inst_os.path, output=logfile, reset_env=True)
1015
1016   if result.failed:
1017     logging.error("os create command '%s' returned error: %s output: %s",
1018                   result.cmd, result.fail_reason, result.output)
1019     lines = [utils.SafeEncode(val)
1020              for val in utils.TailFile(logfile, lines=20)]
1021     _Fail("OS rename script failed (%s), last lines in the"
1022           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1023
1024
1025 def _GetBlockDevSymlinkPath(instance_name, idx):
1026   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1027                         (instance_name, constants.DISK_SEPARATOR, idx))
1028
1029
1030 def _SymlinkBlockDev(instance_name, device_path, idx):
1031   """Set up symlinks to a instance's block device.
1032
1033   This is an auxiliary function run when an instance is start (on the primary
1034   node) or when an instance is migrated (on the target node).
1035
1036
1037   @param instance_name: the name of the target instance
1038   @param device_path: path of the physical block device, on the node
1039   @param idx: the disk index
1040   @return: absolute path to the disk's symlink
1041
1042   """
1043   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1044   try:
1045     os.symlink(device_path, link_name)
1046   except OSError, err:
1047     if err.errno == errno.EEXIST:
1048       if (not os.path.islink(link_name) or
1049           os.readlink(link_name) != device_path):
1050         os.remove(link_name)
1051         os.symlink(device_path, link_name)
1052     else:
1053       raise
1054
1055   return link_name
1056
1057
1058 def _RemoveBlockDevLinks(instance_name, disks):
1059   """Remove the block device symlinks belonging to the given instance.
1060
1061   """
1062   for idx, _ in enumerate(disks):
1063     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1064     if os.path.islink(link_name):
1065       try:
1066         os.remove(link_name)
1067       except OSError:
1068         logging.exception("Can't remove symlink '%s'", link_name)
1069
1070
1071 def _GatherAndLinkBlockDevs(instance):
1072   """Set up an instance's block device(s).
1073
1074   This is run on the primary node at instance startup. The block
1075   devices must be already assembled.
1076
1077   @type instance: L{objects.Instance}
1078   @param instance: the instance whose disks we shoul assemble
1079   @rtype: list
1080   @return: list of (disk_object, device_path)
1081
1082   """
1083   block_devices = []
1084   for idx, disk in enumerate(instance.disks):
1085     device = _RecursiveFindBD(disk)
1086     if device is None:
1087       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1088                                     str(disk))
1089     device.Open()
1090     try:
1091       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1092     except OSError, e:
1093       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1094                                     e.strerror)
1095
1096     block_devices.append((disk, link_name))
1097
1098   return block_devices
1099
1100
1101 def StartInstance(instance, startup_paused):
1102   """Start an instance.
1103
1104   @type instance: L{objects.Instance}
1105   @param instance: the instance object
1106   @type startup_paused: bool
1107   @param instance: pause instance at startup?
1108   @rtype: None
1109
1110   """
1111   running_instances = GetInstanceList([instance.hypervisor])
1112
1113   if instance.name in running_instances:
1114     logging.info("Instance %s already running, not starting", instance.name)
1115     return
1116
1117   try:
1118     block_devices = _GatherAndLinkBlockDevs(instance)
1119     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1120     hyper.StartInstance(instance, block_devices, startup_paused)
1121   except errors.BlockDeviceError, err:
1122     _Fail("Block device error: %s", err, exc=True)
1123   except errors.HypervisorError, err:
1124     _RemoveBlockDevLinks(instance.name, instance.disks)
1125     _Fail("Hypervisor error: %s", err, exc=True)
1126
1127
1128 def InstanceShutdown(instance, timeout):
1129   """Shut an instance down.
1130
1131   @note: this functions uses polling with a hardcoded timeout.
1132
1133   @type instance: L{objects.Instance}
1134   @param instance: the instance object
1135   @type timeout: integer
1136   @param timeout: maximum timeout for soft shutdown
1137   @rtype: None
1138
1139   """
1140   hv_name = instance.hypervisor
1141   hyper = hypervisor.GetHypervisor(hv_name)
1142   iname = instance.name
1143
1144   if instance.name not in hyper.ListInstances():
1145     logging.info("Instance %s not running, doing nothing", iname)
1146     return
1147
1148   class _TryShutdown:
1149     def __init__(self):
1150       self.tried_once = False
1151
1152     def __call__(self):
1153       if iname not in hyper.ListInstances():
1154         return
1155
1156       try:
1157         hyper.StopInstance(instance, retry=self.tried_once)
1158       except errors.HypervisorError, err:
1159         if iname not in hyper.ListInstances():
1160           # if the instance is no longer existing, consider this a
1161           # success and go to cleanup
1162           return
1163
1164         _Fail("Failed to stop instance %s: %s", iname, err)
1165
1166       self.tried_once = True
1167
1168       raise utils.RetryAgain()
1169
1170   try:
1171     utils.Retry(_TryShutdown(), 5, timeout)
1172   except utils.RetryTimeout:
1173     # the shutdown did not succeed
1174     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1175
1176     try:
1177       hyper.StopInstance(instance, force=True)
1178     except errors.HypervisorError, err:
1179       if iname in hyper.ListInstances():
1180         # only raise an error if the instance still exists, otherwise
1181         # the error could simply be "instance ... unknown"!
1182         _Fail("Failed to force stop instance %s: %s", iname, err)
1183
1184     time.sleep(1)
1185
1186     if iname in hyper.ListInstances():
1187       _Fail("Could not shutdown instance %s even by destroy", iname)
1188
1189   try:
1190     hyper.CleanupInstance(instance.name)
1191   except errors.HypervisorError, err:
1192     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1193
1194   _RemoveBlockDevLinks(iname, instance.disks)
1195
1196
1197 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1198   """Reboot an instance.
1199
1200   @type instance: L{objects.Instance}
1201   @param instance: the instance object to reboot
1202   @type reboot_type: str
1203   @param reboot_type: the type of reboot, one the following
1204     constants:
1205       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1206         instance OS, do not recreate the VM
1207       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1208         restart the VM (at the hypervisor level)
1209       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1210         not accepted here, since that mode is handled differently, in
1211         cmdlib, and translates into full stop and start of the
1212         instance (instead of a call_instance_reboot RPC)
1213   @type shutdown_timeout: integer
1214   @param shutdown_timeout: maximum timeout for soft shutdown
1215   @rtype: None
1216
1217   """
1218   running_instances = GetInstanceList([instance.hypervisor])
1219
1220   if instance.name not in running_instances:
1221     _Fail("Cannot reboot instance %s that is not running", instance.name)
1222
1223   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1224   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1225     try:
1226       hyper.RebootInstance(instance)
1227     except errors.HypervisorError, err:
1228       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1229   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1230     try:
1231       InstanceShutdown(instance, shutdown_timeout)
1232       return StartInstance(instance, False)
1233     except errors.HypervisorError, err:
1234       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1235   else:
1236     _Fail("Invalid reboot_type received: %s", reboot_type)
1237
1238
1239 def MigrationInfo(instance):
1240   """Gather information about an instance to be migrated.
1241
1242   @type instance: L{objects.Instance}
1243   @param instance: the instance definition
1244
1245   """
1246   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1247   try:
1248     info = hyper.MigrationInfo(instance)
1249   except errors.HypervisorError, err:
1250     _Fail("Failed to fetch migration information: %s", err, exc=True)
1251   return info
1252
1253
1254 def AcceptInstance(instance, info, target):
1255   """Prepare the node to accept an instance.
1256
1257   @type instance: L{objects.Instance}
1258   @param instance: the instance definition
1259   @type info: string/data (opaque)
1260   @param info: migration information, from the source node
1261   @type target: string
1262   @param target: target host (usually ip), on this node
1263
1264   """
1265   # TODO: why is this required only for DTS_EXT_MIRROR?
1266   if instance.disk_template in constants.DTS_EXT_MIRROR:
1267     # Create the symlinks, as the disks are not active
1268     # in any way
1269     try:
1270       _GatherAndLinkBlockDevs(instance)
1271     except errors.BlockDeviceError, err:
1272       _Fail("Block device error: %s", err, exc=True)
1273
1274   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1275   try:
1276     hyper.AcceptInstance(instance, info, target)
1277   except errors.HypervisorError, err:
1278     if instance.disk_template in constants.DTS_EXT_MIRROR:
1279       _RemoveBlockDevLinks(instance.name, instance.disks)
1280     _Fail("Failed to accept instance: %s", err, exc=True)
1281
1282
1283 def FinalizeMigration(instance, info, success):
1284   """Finalize any preparation to accept an instance.
1285
1286   @type instance: L{objects.Instance}
1287   @param instance: the instance definition
1288   @type info: string/data (opaque)
1289   @param info: migration information, from the source node
1290   @type success: boolean
1291   @param success: whether the migration was a success or a failure
1292
1293   """
1294   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1295   try:
1296     hyper.FinalizeMigration(instance, info, success)
1297   except errors.HypervisorError, err:
1298     _Fail("Failed to finalize migration: %s", err, exc=True)
1299
1300
1301 def MigrateInstance(instance, target, live):
1302   """Migrates an instance to another node.
1303
1304   @type instance: L{objects.Instance}
1305   @param instance: the instance definition
1306   @type target: string
1307   @param target: the target node name
1308   @type live: boolean
1309   @param live: whether the migration should be done live or not (the
1310       interpretation of this parameter is left to the hypervisor)
1311   @raise RPCFail: if migration fails for some reason
1312
1313   """
1314   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1315
1316   try:
1317     hyper.MigrateInstance(instance, target, live)
1318   except errors.HypervisorError, err:
1319     _Fail("Failed to migrate instance: %s", err, exc=True)
1320
1321
1322 def BlockdevCreate(disk, size, owner, on_primary, info):
1323   """Creates a block device for an instance.
1324
1325   @type disk: L{objects.Disk}
1326   @param disk: the object describing the disk we should create
1327   @type size: int
1328   @param size: the size of the physical underlying device, in MiB
1329   @type owner: str
1330   @param owner: the name of the instance for which disk is created,
1331       used for device cache data
1332   @type on_primary: boolean
1333   @param on_primary:  indicates if it is the primary node or not
1334   @type info: string
1335   @param info: string that will be sent to the physical device
1336       creation, used for example to set (LVM) tags on LVs
1337
1338   @return: the new unique_id of the device (this can sometime be
1339       computed only after creation), or None. On secondary nodes,
1340       it's not required to return anything.
1341
1342   """
1343   # TODO: remove the obsolete "size" argument
1344   # pylint: disable=W0613
1345   clist = []
1346   if disk.children:
1347     for child in disk.children:
1348       try:
1349         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1350       except errors.BlockDeviceError, err:
1351         _Fail("Can't assemble device %s: %s", child, err)
1352       if on_primary or disk.AssembleOnSecondary():
1353         # we need the children open in case the device itself has to
1354         # be assembled
1355         try:
1356           # pylint: disable=E1103
1357           crdev.Open()
1358         except errors.BlockDeviceError, err:
1359           _Fail("Can't make child '%s' read-write: %s", child, err)
1360       clist.append(crdev)
1361
1362   try:
1363     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1364   except errors.BlockDeviceError, err:
1365     _Fail("Can't create block device: %s", err)
1366
1367   if on_primary or disk.AssembleOnSecondary():
1368     try:
1369       device.Assemble()
1370     except errors.BlockDeviceError, err:
1371       _Fail("Can't assemble device after creation, unusual event: %s", err)
1372     device.SetSyncSpeed(constants.SYNC_SPEED)
1373     if on_primary or disk.OpenOnSecondary():
1374       try:
1375         device.Open(force=True)
1376       except errors.BlockDeviceError, err:
1377         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1378     DevCacheManager.UpdateCache(device.dev_path, owner,
1379                                 on_primary, disk.iv_name)
1380
1381   device.SetInfo(info)
1382
1383   return device.unique_id
1384
1385
1386 def _WipeDevice(path, offset, size):
1387   """This function actually wipes the device.
1388
1389   @param path: The path to the device to wipe
1390   @param offset: The offset in MiB in the file
1391   @param size: The size in MiB to write
1392
1393   """
1394   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1395          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1396          "count=%d" % size]
1397   result = utils.RunCmd(cmd)
1398
1399   if result.failed:
1400     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1401           result.fail_reason, result.output)
1402
1403
1404 def BlockdevWipe(disk, offset, size):
1405   """Wipes a block device.
1406
1407   @type disk: L{objects.Disk}
1408   @param disk: the disk object we want to wipe
1409   @type offset: int
1410   @param offset: The offset in MiB in the file
1411   @type size: int
1412   @param size: The size in MiB to write
1413
1414   """
1415   try:
1416     rdev = _RecursiveFindBD(disk)
1417   except errors.BlockDeviceError:
1418     rdev = None
1419
1420   if not rdev:
1421     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1422
1423   # Do cross verify some of the parameters
1424   if offset > rdev.size:
1425     _Fail("Offset is bigger than device size")
1426   if (offset + size) > rdev.size:
1427     _Fail("The provided offset and size to wipe is bigger than device size")
1428
1429   _WipeDevice(rdev.dev_path, offset, size)
1430
1431
1432 def BlockdevPauseResumeSync(disks, pause):
1433   """Pause or resume the sync of the block device.
1434
1435   @type disks: list of L{objects.Disk}
1436   @param disks: the disks object we want to pause/resume
1437   @type pause: bool
1438   @param pause: Wheater to pause or resume
1439
1440   """
1441   success = []
1442   for disk in disks:
1443     try:
1444       rdev = _RecursiveFindBD(disk)
1445     except errors.BlockDeviceError:
1446       rdev = None
1447
1448     if not rdev:
1449       success.append((False, ("Cannot change sync for device %s:"
1450                               " device not found" % disk.iv_name)))
1451       continue
1452
1453     result = rdev.PauseResumeSync(pause)
1454
1455     if result:
1456       success.append((result, None))
1457     else:
1458       if pause:
1459         msg = "Pause"
1460       else:
1461         msg = "Resume"
1462       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1463
1464   return success
1465
1466
1467 def BlockdevRemove(disk):
1468   """Remove a block device.
1469
1470   @note: This is intended to be called recursively.
1471
1472   @type disk: L{objects.Disk}
1473   @param disk: the disk object we should remove
1474   @rtype: boolean
1475   @return: the success of the operation
1476
1477   """
1478   msgs = []
1479   try:
1480     rdev = _RecursiveFindBD(disk)
1481   except errors.BlockDeviceError, err:
1482     # probably can't attach
1483     logging.info("Can't attach to device %s in remove", disk)
1484     rdev = None
1485   if rdev is not None:
1486     r_path = rdev.dev_path
1487     try:
1488       rdev.Remove()
1489     except errors.BlockDeviceError, err:
1490       msgs.append(str(err))
1491     if not msgs:
1492       DevCacheManager.RemoveCache(r_path)
1493
1494   if disk.children:
1495     for child in disk.children:
1496       try:
1497         BlockdevRemove(child)
1498       except RPCFail, err:
1499         msgs.append(str(err))
1500
1501   if msgs:
1502     _Fail("; ".join(msgs))
1503
1504
1505 def _RecursiveAssembleBD(disk, owner, as_primary):
1506   """Activate a block device for an instance.
1507
1508   This is run on the primary and secondary nodes for an instance.
1509
1510   @note: this function is called recursively.
1511
1512   @type disk: L{objects.Disk}
1513   @param disk: the disk we try to assemble
1514   @type owner: str
1515   @param owner: the name of the instance which owns the disk
1516   @type as_primary: boolean
1517   @param as_primary: if we should make the block device
1518       read/write
1519
1520   @return: the assembled device or None (in case no device
1521       was assembled)
1522   @raise errors.BlockDeviceError: in case there is an error
1523       during the activation of the children or the device
1524       itself
1525
1526   """
1527   children = []
1528   if disk.children:
1529     mcn = disk.ChildrenNeeded()
1530     if mcn == -1:
1531       mcn = 0 # max number of Nones allowed
1532     else:
1533       mcn = len(disk.children) - mcn # max number of Nones
1534     for chld_disk in disk.children:
1535       try:
1536         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1537       except errors.BlockDeviceError, err:
1538         if children.count(None) >= mcn:
1539           raise
1540         cdev = None
1541         logging.error("Error in child activation (but continuing): %s",
1542                       str(err))
1543       children.append(cdev)
1544
1545   if as_primary or disk.AssembleOnSecondary():
1546     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1547     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1548     result = r_dev
1549     if as_primary or disk.OpenOnSecondary():
1550       r_dev.Open()
1551     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1552                                 as_primary, disk.iv_name)
1553
1554   else:
1555     result = True
1556   return result
1557
1558
1559 def BlockdevAssemble(disk, owner, as_primary, idx):
1560   """Activate a block device for an instance.
1561
1562   This is a wrapper over _RecursiveAssembleBD.
1563
1564   @rtype: str or boolean
1565   @return: a C{/dev/...} path for primary nodes, and
1566       C{True} for secondary nodes
1567
1568   """
1569   try:
1570     result = _RecursiveAssembleBD(disk, owner, as_primary)
1571     if isinstance(result, bdev.BlockDev):
1572       # pylint: disable=E1103
1573       result = result.dev_path
1574       if as_primary:
1575         _SymlinkBlockDev(owner, result, idx)
1576   except errors.BlockDeviceError, err:
1577     _Fail("Error while assembling disk: %s", err, exc=True)
1578   except OSError, err:
1579     _Fail("Error while symlinking disk: %s", err, exc=True)
1580
1581   return result
1582
1583
1584 def BlockdevShutdown(disk):
1585   """Shut down a block device.
1586
1587   First, if the device is assembled (Attach() is successful), then
1588   the device is shutdown. Then the children of the device are
1589   shutdown.
1590
1591   This function is called recursively. Note that we don't cache the
1592   children or such, as oppossed to assemble, shutdown of different
1593   devices doesn't require that the upper device was active.
1594
1595   @type disk: L{objects.Disk}
1596   @param disk: the description of the disk we should
1597       shutdown
1598   @rtype: None
1599
1600   """
1601   msgs = []
1602   r_dev = _RecursiveFindBD(disk)
1603   if r_dev is not None:
1604     r_path = r_dev.dev_path
1605     try:
1606       r_dev.Shutdown()
1607       DevCacheManager.RemoveCache(r_path)
1608     except errors.BlockDeviceError, err:
1609       msgs.append(str(err))
1610
1611   if disk.children:
1612     for child in disk.children:
1613       try:
1614         BlockdevShutdown(child)
1615       except RPCFail, err:
1616         msgs.append(str(err))
1617
1618   if msgs:
1619     _Fail("; ".join(msgs))
1620
1621
1622 def BlockdevAddchildren(parent_cdev, new_cdevs):
1623   """Extend a mirrored block device.
1624
1625   @type parent_cdev: L{objects.Disk}
1626   @param parent_cdev: the disk to which we should add children
1627   @type new_cdevs: list of L{objects.Disk}
1628   @param new_cdevs: the list of children which we should add
1629   @rtype: None
1630
1631   """
1632   parent_bdev = _RecursiveFindBD(parent_cdev)
1633   if parent_bdev is None:
1634     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1635   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1636   if new_bdevs.count(None) > 0:
1637     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1638   parent_bdev.AddChildren(new_bdevs)
1639
1640
1641 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1642   """Shrink a mirrored block device.
1643
1644   @type parent_cdev: L{objects.Disk}
1645   @param parent_cdev: the disk from which we should remove children
1646   @type new_cdevs: list of L{objects.Disk}
1647   @param new_cdevs: the list of children which we should remove
1648   @rtype: None
1649
1650   """
1651   parent_bdev = _RecursiveFindBD(parent_cdev)
1652   if parent_bdev is None:
1653     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1654   devs = []
1655   for disk in new_cdevs:
1656     rpath = disk.StaticDevPath()
1657     if rpath is None:
1658       bd = _RecursiveFindBD(disk)
1659       if bd is None:
1660         _Fail("Can't find device %s while removing children", disk)
1661       else:
1662         devs.append(bd.dev_path)
1663     else:
1664       if not utils.IsNormAbsPath(rpath):
1665         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1666       devs.append(rpath)
1667   parent_bdev.RemoveChildren(devs)
1668
1669
1670 def BlockdevGetmirrorstatus(disks):
1671   """Get the mirroring status of a list of devices.
1672
1673   @type disks: list of L{objects.Disk}
1674   @param disks: the list of disks which we should query
1675   @rtype: disk
1676   @return: List of L{objects.BlockDevStatus}, one for each disk
1677   @raise errors.BlockDeviceError: if any of the disks cannot be
1678       found
1679
1680   """
1681   stats = []
1682   for dsk in disks:
1683     rbd = _RecursiveFindBD(dsk)
1684     if rbd is None:
1685       _Fail("Can't find device %s", dsk)
1686
1687     stats.append(rbd.CombinedSyncStatus())
1688
1689   return stats
1690
1691
1692 def BlockdevGetmirrorstatusMulti(disks):
1693   """Get the mirroring status of a list of devices.
1694
1695   @type disks: list of L{objects.Disk}
1696   @param disks: the list of disks which we should query
1697   @rtype: disk
1698   @return: List of tuples, (bool, status), one for each disk; bool denotes
1699     success/failure, status is L{objects.BlockDevStatus} on success, string
1700     otherwise
1701
1702   """
1703   result = []
1704   for disk in disks:
1705     try:
1706       rbd = _RecursiveFindBD(disk)
1707       if rbd is None:
1708         result.append((False, "Can't find device %s" % disk))
1709         continue
1710
1711       status = rbd.CombinedSyncStatus()
1712     except errors.BlockDeviceError, err:
1713       logging.exception("Error while getting disk status")
1714       result.append((False, str(err)))
1715     else:
1716       result.append((True, status))
1717
1718   assert len(disks) == len(result)
1719
1720   return result
1721
1722
1723 def _RecursiveFindBD(disk):
1724   """Check if a device is activated.
1725
1726   If so, return information about the real device.
1727
1728   @type disk: L{objects.Disk}
1729   @param disk: the disk object we need to find
1730
1731   @return: None if the device can't be found,
1732       otherwise the device instance
1733
1734   """
1735   children = []
1736   if disk.children:
1737     for chdisk in disk.children:
1738       children.append(_RecursiveFindBD(chdisk))
1739
1740   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1741
1742
1743 def _OpenRealBD(disk):
1744   """Opens the underlying block device of a disk.
1745
1746   @type disk: L{objects.Disk}
1747   @param disk: the disk object we want to open
1748
1749   """
1750   real_disk = _RecursiveFindBD(disk)
1751   if real_disk is None:
1752     _Fail("Block device '%s' is not set up", disk)
1753
1754   real_disk.Open()
1755
1756   return real_disk
1757
1758
1759 def BlockdevFind(disk):
1760   """Check if a device is activated.
1761
1762   If it is, return information about the real device.
1763
1764   @type disk: L{objects.Disk}
1765   @param disk: the disk to find
1766   @rtype: None or objects.BlockDevStatus
1767   @return: None if the disk cannot be found, otherwise a the current
1768            information
1769
1770   """
1771   try:
1772     rbd = _RecursiveFindBD(disk)
1773   except errors.BlockDeviceError, err:
1774     _Fail("Failed to find device: %s", err, exc=True)
1775
1776   if rbd is None:
1777     return None
1778
1779   return rbd.GetSyncStatus()
1780
1781
1782 def BlockdevGetsize(disks):
1783   """Computes the size of the given disks.
1784
1785   If a disk is not found, returns None instead.
1786
1787   @type disks: list of L{objects.Disk}
1788   @param disks: the list of disk to compute the size for
1789   @rtype: list
1790   @return: list with elements None if the disk cannot be found,
1791       otherwise the size
1792
1793   """
1794   result = []
1795   for cf in disks:
1796     try:
1797       rbd = _RecursiveFindBD(cf)
1798     except errors.BlockDeviceError:
1799       result.append(None)
1800       continue
1801     if rbd is None:
1802       result.append(None)
1803     else:
1804       result.append(rbd.GetActualSize())
1805   return result
1806
1807
1808 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1809   """Export a block device to a remote node.
1810
1811   @type disk: L{objects.Disk}
1812   @param disk: the description of the disk to export
1813   @type dest_node: str
1814   @param dest_node: the destination node to export to
1815   @type dest_path: str
1816   @param dest_path: the destination path on the target node
1817   @type cluster_name: str
1818   @param cluster_name: the cluster name, needed for SSH hostalias
1819   @rtype: None
1820
1821   """
1822   real_disk = _OpenRealBD(disk)
1823
1824   # the block size on the read dd is 1MiB to match our units
1825   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1826                                "dd if=%s bs=1048576 count=%s",
1827                                real_disk.dev_path, str(disk.size))
1828
1829   # we set here a smaller block size as, due to ssh buffering, more
1830   # than 64-128k will mostly ignored; we use nocreat to fail if the
1831   # device is not already there or we pass a wrong path; we use
1832   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1833   # to not buffer too much memory; this means that at best, we flush
1834   # every 64k, which will not be very fast
1835   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1836                                 " oflag=dsync", dest_path)
1837
1838   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1839                                                    constants.GANETI_RUNAS,
1840                                                    destcmd)
1841
1842   # all commands have been checked, so we're safe to combine them
1843   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1844
1845   result = utils.RunCmd(["bash", "-c", command])
1846
1847   if result.failed:
1848     _Fail("Disk copy command '%s' returned error: %s"
1849           " output: %s", command, result.fail_reason, result.output)
1850
1851
1852 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1853   """Write a file to the filesystem.
1854
1855   This allows the master to overwrite(!) a file. It will only perform
1856   the operation if the file belongs to a list of configuration files.
1857
1858   @type file_name: str
1859   @param file_name: the target file name
1860   @type data: str
1861   @param data: the new contents of the file
1862   @type mode: int
1863   @param mode: the mode to give the file (can be None)
1864   @type uid: string
1865   @param uid: the owner of the file
1866   @type gid: string
1867   @param gid: the group of the file
1868   @type atime: float
1869   @param atime: the atime to set on the file (can be None)
1870   @type mtime: float
1871   @param mtime: the mtime to set on the file (can be None)
1872   @rtype: None
1873
1874   """
1875   if not os.path.isabs(file_name):
1876     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1877
1878   if file_name not in _ALLOWED_UPLOAD_FILES:
1879     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1880           file_name)
1881
1882   raw_data = _Decompress(data)
1883
1884   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1885     _Fail("Invalid username/groupname type")
1886
1887   getents = runtime.GetEnts()
1888   uid = getents.LookupUser(uid)
1889   gid = getents.LookupGroup(gid)
1890
1891   utils.SafeWriteFile(file_name, None,
1892                       data=raw_data, mode=mode, uid=uid, gid=gid,
1893                       atime=atime, mtime=mtime)
1894
1895
1896 def RunOob(oob_program, command, node, timeout):
1897   """Executes oob_program with given command on given node.
1898
1899   @param oob_program: The path to the executable oob_program
1900   @param command: The command to invoke on oob_program
1901   @param node: The node given as an argument to the program
1902   @param timeout: Timeout after which we kill the oob program
1903
1904   @return: stdout
1905   @raise RPCFail: If execution fails for some reason
1906
1907   """
1908   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1909
1910   if result.failed:
1911     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1912           result.fail_reason, result.output)
1913
1914   return result.stdout
1915
1916
1917 def WriteSsconfFiles(values):
1918   """Update all ssconf files.
1919
1920   Wrapper around the SimpleStore.WriteFiles.
1921
1922   """
1923   ssconf.SimpleStore().WriteFiles(values)
1924
1925
1926 def _ErrnoOrStr(err):
1927   """Format an EnvironmentError exception.
1928
1929   If the L{err} argument has an errno attribute, it will be looked up
1930   and converted into a textual C{E...} description. Otherwise the
1931   string representation of the error will be returned.
1932
1933   @type err: L{EnvironmentError}
1934   @param err: the exception to format
1935
1936   """
1937   if hasattr(err, "errno"):
1938     detail = errno.errorcode[err.errno]
1939   else:
1940     detail = str(err)
1941   return detail
1942
1943
1944 def _OSOndiskAPIVersion(os_dir):
1945   """Compute and return the API version of a given OS.
1946
1947   This function will try to read the API version of the OS residing in
1948   the 'os_dir' directory.
1949
1950   @type os_dir: str
1951   @param os_dir: the directory in which we should look for the OS
1952   @rtype: tuple
1953   @return: tuple (status, data) with status denoting the validity and
1954       data holding either the vaid versions or an error message
1955
1956   """
1957   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1958
1959   try:
1960     st = os.stat(api_file)
1961   except EnvironmentError, err:
1962     return False, ("Required file '%s' not found under path %s: %s" %
1963                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1964
1965   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1966     return False, ("File '%s' in %s is not a regular file" %
1967                    (constants.OS_API_FILE, os_dir))
1968
1969   try:
1970     api_versions = utils.ReadFile(api_file).splitlines()
1971   except EnvironmentError, err:
1972     return False, ("Error while reading the API version file at %s: %s" %
1973                    (api_file, _ErrnoOrStr(err)))
1974
1975   try:
1976     api_versions = [int(version.strip()) for version in api_versions]
1977   except (TypeError, ValueError), err:
1978     return False, ("API version(s) can't be converted to integer: %s" %
1979                    str(err))
1980
1981   return True, api_versions
1982
1983
1984 def DiagnoseOS(top_dirs=None):
1985   """Compute the validity for all OSes.
1986
1987   @type top_dirs: list
1988   @param top_dirs: the list of directories in which to
1989       search (if not given defaults to
1990       L{constants.OS_SEARCH_PATH})
1991   @rtype: list of L{objects.OS}
1992   @return: a list of tuples (name, path, status, diagnose, variants,
1993       parameters, api_version) for all (potential) OSes under all
1994       search paths, where:
1995           - name is the (potential) OS name
1996           - path is the full path to the OS
1997           - status True/False is the validity of the OS
1998           - diagnose is the error message for an invalid OS, otherwise empty
1999           - variants is a list of supported OS variants, if any
2000           - parameters is a list of (name, help) parameters, if any
2001           - api_version is a list of support OS API versions
2002
2003   """
2004   if top_dirs is None:
2005     top_dirs = constants.OS_SEARCH_PATH
2006
2007   result = []
2008   for dir_name in top_dirs:
2009     if os.path.isdir(dir_name):
2010       try:
2011         f_names = utils.ListVisibleFiles(dir_name)
2012       except EnvironmentError, err:
2013         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2014         break
2015       for name in f_names:
2016         os_path = utils.PathJoin(dir_name, name)
2017         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2018         if status:
2019           diagnose = ""
2020           variants = os_inst.supported_variants
2021           parameters = os_inst.supported_parameters
2022           api_versions = os_inst.api_versions
2023         else:
2024           diagnose = os_inst
2025           variants = parameters = api_versions = []
2026         result.append((name, os_path, status, diagnose, variants,
2027                        parameters, api_versions))
2028
2029   return result
2030
2031
2032 def _TryOSFromDisk(name, base_dir=None):
2033   """Create an OS instance from disk.
2034
2035   This function will return an OS instance if the given name is a
2036   valid OS name.
2037
2038   @type base_dir: string
2039   @keyword base_dir: Base directory containing OS installations.
2040                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2041   @rtype: tuple
2042   @return: success and either the OS instance if we find a valid one,
2043       or error message
2044
2045   """
2046   if base_dir is None:
2047     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2048   else:
2049     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2050
2051   if os_dir is None:
2052     return False, "Directory for OS %s not found in search path" % name
2053
2054   status, api_versions = _OSOndiskAPIVersion(os_dir)
2055   if not status:
2056     # push the error up
2057     return status, api_versions
2058
2059   if not constants.OS_API_VERSIONS.intersection(api_versions):
2060     return False, ("API version mismatch for path '%s': found %s, want %s." %
2061                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2062
2063   # OS Files dictionary, we will populate it with the absolute path
2064   # names; if the value is True, then it is a required file, otherwise
2065   # an optional one
2066   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2067
2068   if max(api_versions) >= constants.OS_API_V15:
2069     os_files[constants.OS_VARIANTS_FILE] = False
2070
2071   if max(api_versions) >= constants.OS_API_V20:
2072     os_files[constants.OS_PARAMETERS_FILE] = True
2073   else:
2074     del os_files[constants.OS_SCRIPT_VERIFY]
2075
2076   for (filename, required) in os_files.items():
2077     os_files[filename] = utils.PathJoin(os_dir, filename)
2078
2079     try:
2080       st = os.stat(os_files[filename])
2081     except EnvironmentError, err:
2082       if err.errno == errno.ENOENT and not required:
2083         del os_files[filename]
2084         continue
2085       return False, ("File '%s' under path '%s' is missing (%s)" %
2086                      (filename, os_dir, _ErrnoOrStr(err)))
2087
2088     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2089       return False, ("File '%s' under path '%s' is not a regular file" %
2090                      (filename, os_dir))
2091
2092     if filename in constants.OS_SCRIPTS:
2093       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2094         return False, ("File '%s' under path '%s' is not executable" %
2095                        (filename, os_dir))
2096
2097   variants = []
2098   if constants.OS_VARIANTS_FILE in os_files:
2099     variants_file = os_files[constants.OS_VARIANTS_FILE]
2100     try:
2101       variants = utils.ReadFile(variants_file).splitlines()
2102     except EnvironmentError, err:
2103       # we accept missing files, but not other errors
2104       if err.errno != errno.ENOENT:
2105         return False, ("Error while reading the OS variants file at %s: %s" %
2106                        (variants_file, _ErrnoOrStr(err)))
2107
2108   parameters = []
2109   if constants.OS_PARAMETERS_FILE in os_files:
2110     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2111     try:
2112       parameters = utils.ReadFile(parameters_file).splitlines()
2113     except EnvironmentError, err:
2114       return False, ("Error while reading the OS parameters file at %s: %s" %
2115                      (parameters_file, _ErrnoOrStr(err)))
2116     parameters = [v.split(None, 1) for v in parameters]
2117
2118   os_obj = objects.OS(name=name, path=os_dir,
2119                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2120                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2121                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2122                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2123                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2124                                                  None),
2125                       supported_variants=variants,
2126                       supported_parameters=parameters,
2127                       api_versions=api_versions)
2128   return True, os_obj
2129
2130
2131 def OSFromDisk(name, base_dir=None):
2132   """Create an OS instance from disk.
2133
2134   This function will return an OS instance if the given name is a
2135   valid OS name. Otherwise, it will raise an appropriate
2136   L{RPCFail} exception, detailing why this is not a valid OS.
2137
2138   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2139   an exception but returns true/false status data.
2140
2141   @type base_dir: string
2142   @keyword base_dir: Base directory containing OS installations.
2143                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2144   @rtype: L{objects.OS}
2145   @return: the OS instance if we find a valid one
2146   @raise RPCFail: if we don't find a valid OS
2147
2148   """
2149   name_only = objects.OS.GetName(name)
2150   status, payload = _TryOSFromDisk(name_only, base_dir)
2151
2152   if not status:
2153     _Fail(payload)
2154
2155   return payload
2156
2157
2158 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2159   """Calculate the basic environment for an os script.
2160
2161   @type os_name: str
2162   @param os_name: full operating system name (including variant)
2163   @type inst_os: L{objects.OS}
2164   @param inst_os: operating system for which the environment is being built
2165   @type os_params: dict
2166   @param os_params: the OS parameters
2167   @type debug: integer
2168   @param debug: debug level (0 or 1, for OS Api 10)
2169   @rtype: dict
2170   @return: dict of environment variables
2171   @raise errors.BlockDeviceError: if the block device
2172       cannot be found
2173
2174   """
2175   result = {}
2176   api_version = \
2177     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2178   result["OS_API_VERSION"] = "%d" % api_version
2179   result["OS_NAME"] = inst_os.name
2180   result["DEBUG_LEVEL"] = "%d" % debug
2181
2182   # OS variants
2183   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2184     variant = objects.OS.GetVariant(os_name)
2185     if not variant:
2186       variant = inst_os.supported_variants[0]
2187   else:
2188     variant = ""
2189   result["OS_VARIANT"] = variant
2190
2191   # OS params
2192   for pname, pvalue in os_params.items():
2193     result["OSP_%s" % pname.upper()] = pvalue
2194
2195   return result
2196
2197
2198 def OSEnvironment(instance, inst_os, debug=0):
2199   """Calculate the environment for an os script.
2200
2201   @type instance: L{objects.Instance}
2202   @param instance: target instance for the os script run
2203   @type inst_os: L{objects.OS}
2204   @param inst_os: operating system for which the environment is being built
2205   @type debug: integer
2206   @param debug: debug level (0 or 1, for OS Api 10)
2207   @rtype: dict
2208   @return: dict of environment variables
2209   @raise errors.BlockDeviceError: if the block device
2210       cannot be found
2211
2212   """
2213   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2214
2215   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2216     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2217
2218   result["HYPERVISOR"] = instance.hypervisor
2219   result["DISK_COUNT"] = "%d" % len(instance.disks)
2220   result["NIC_COUNT"] = "%d" % len(instance.nics)
2221   result["INSTANCE_SECONDARY_NODES"] = \
2222       ("%s" % " ".join(instance.secondary_nodes))
2223
2224   # Disks
2225   for idx, disk in enumerate(instance.disks):
2226     real_disk = _OpenRealBD(disk)
2227     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2228     result["DISK_%d_ACCESS" % idx] = disk.mode
2229     if constants.HV_DISK_TYPE in instance.hvparams:
2230       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2231         instance.hvparams[constants.HV_DISK_TYPE]
2232     if disk.dev_type in constants.LDS_BLOCK:
2233       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2234     elif disk.dev_type == constants.LD_FILE:
2235       result["DISK_%d_BACKEND_TYPE" % idx] = \
2236         "file:%s" % disk.physical_id[0]
2237
2238   # NICs
2239   for idx, nic in enumerate(instance.nics):
2240     result["NIC_%d_MAC" % idx] = nic.mac
2241     if nic.ip:
2242       result["NIC_%d_IP" % idx] = nic.ip
2243     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2244     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2245       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2246     if nic.nicparams[constants.NIC_LINK]:
2247       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2248     if constants.HV_NIC_TYPE in instance.hvparams:
2249       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2250         instance.hvparams[constants.HV_NIC_TYPE]
2251
2252   # HV/BE params
2253   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2254     for key, value in source.items():
2255       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2256
2257   return result
2258
2259
2260 def BlockdevGrow(disk, amount, dryrun):
2261   """Grow a stack of block devices.
2262
2263   This function is called recursively, with the childrens being the
2264   first ones to resize.
2265
2266   @type disk: L{objects.Disk}
2267   @param disk: the disk to be grown
2268   @type amount: integer
2269   @param amount: the amount (in mebibytes) to grow with
2270   @type dryrun: boolean
2271   @param dryrun: whether to execute the operation in simulation mode
2272       only, without actually increasing the size
2273   @rtype: (status, result)
2274   @return: a tuple with the status of the operation (True/False), and
2275       the errors message if status is False
2276
2277   """
2278   r_dev = _RecursiveFindBD(disk)
2279   if r_dev is None:
2280     _Fail("Cannot find block device %s", disk)
2281
2282   try:
2283     r_dev.Grow(amount, dryrun)
2284   except errors.BlockDeviceError, err:
2285     _Fail("Failed to grow block device: %s", err, exc=True)
2286
2287
2288 def BlockdevSnapshot(disk):
2289   """Create a snapshot copy of a block device.
2290
2291   This function is called recursively, and the snapshot is actually created
2292   just for the leaf lvm backend device.
2293
2294   @type disk: L{objects.Disk}
2295   @param disk: the disk to be snapshotted
2296   @rtype: string
2297   @return: snapshot disk ID as (vg, lv)
2298
2299   """
2300   if disk.dev_type == constants.LD_DRBD8:
2301     if not disk.children:
2302       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2303             disk.unique_id)
2304     return BlockdevSnapshot(disk.children[0])
2305   elif disk.dev_type == constants.LD_LV:
2306     r_dev = _RecursiveFindBD(disk)
2307     if r_dev is not None:
2308       # FIXME: choose a saner value for the snapshot size
2309       # let's stay on the safe side and ask for the full size, for now
2310       return r_dev.Snapshot(disk.size)
2311     else:
2312       _Fail("Cannot find block device %s", disk)
2313   else:
2314     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2315           disk.unique_id, disk.dev_type)
2316
2317
2318 def FinalizeExport(instance, snap_disks):
2319   """Write out the export configuration information.
2320
2321   @type instance: L{objects.Instance}
2322   @param instance: the instance which we export, used for
2323       saving configuration
2324   @type snap_disks: list of L{objects.Disk}
2325   @param snap_disks: list of snapshot block devices, which
2326       will be used to get the actual name of the dump file
2327
2328   @rtype: None
2329
2330   """
2331   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2332   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2333
2334   config = objects.SerializableConfigParser()
2335
2336   config.add_section(constants.INISECT_EXP)
2337   config.set(constants.INISECT_EXP, "version", "0")
2338   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2339   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2340   config.set(constants.INISECT_EXP, "os", instance.os)
2341   config.set(constants.INISECT_EXP, "compression", "none")
2342
2343   config.add_section(constants.INISECT_INS)
2344   config.set(constants.INISECT_INS, "name", instance.name)
2345   config.set(constants.INISECT_INS, "memory", "%d" %
2346              instance.beparams[constants.BE_MEMORY])
2347   config.set(constants.INISECT_INS, "vcpus", "%d" %
2348              instance.beparams[constants.BE_VCPUS])
2349   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2350   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2351   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2352
2353   nic_total = 0
2354   for nic_count, nic in enumerate(instance.nics):
2355     nic_total += 1
2356     config.set(constants.INISECT_INS, "nic%d_mac" %
2357                nic_count, "%s" % nic.mac)
2358     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2359     for param in constants.NICS_PARAMETER_TYPES:
2360       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2361                  "%s" % nic.nicparams.get(param, None))
2362   # TODO: redundant: on load can read nics until it doesn't exist
2363   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2364
2365   disk_total = 0
2366   for disk_count, disk in enumerate(snap_disks):
2367     if disk:
2368       disk_total += 1
2369       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2370                  ("%s" % disk.iv_name))
2371       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2372                  ("%s" % disk.physical_id[1]))
2373       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2374                  ("%d" % disk.size))
2375
2376   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2377
2378   # New-style hypervisor/backend parameters
2379
2380   config.add_section(constants.INISECT_HYP)
2381   for name, value in instance.hvparams.items():
2382     if name not in constants.HVC_GLOBALS:
2383       config.set(constants.INISECT_HYP, name, str(value))
2384
2385   config.add_section(constants.INISECT_BEP)
2386   for name, value in instance.beparams.items():
2387     config.set(constants.INISECT_BEP, name, str(value))
2388
2389   config.add_section(constants.INISECT_OSP)
2390   for name, value in instance.osparams.items():
2391     config.set(constants.INISECT_OSP, name, str(value))
2392
2393   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2394                   data=config.Dumps())
2395   shutil.rmtree(finaldestdir, ignore_errors=True)
2396   shutil.move(destdir, finaldestdir)
2397
2398
2399 def ExportInfo(dest):
2400   """Get export configuration information.
2401
2402   @type dest: str
2403   @param dest: directory containing the export
2404
2405   @rtype: L{objects.SerializableConfigParser}
2406   @return: a serializable config file containing the
2407       export info
2408
2409   """
2410   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2411
2412   config = objects.SerializableConfigParser()
2413   config.read(cff)
2414
2415   if (not config.has_section(constants.INISECT_EXP) or
2416       not config.has_section(constants.INISECT_INS)):
2417     _Fail("Export info file doesn't have the required fields")
2418
2419   return config.Dumps()
2420
2421
2422 def ListExports():
2423   """Return a list of exports currently available on this machine.
2424
2425   @rtype: list
2426   @return: list of the exports
2427
2428   """
2429   if os.path.isdir(constants.EXPORT_DIR):
2430     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2431   else:
2432     _Fail("No exports directory")
2433
2434
2435 def RemoveExport(export):
2436   """Remove an existing export from the node.
2437
2438   @type export: str
2439   @param export: the name of the export to remove
2440   @rtype: None
2441
2442   """
2443   target = utils.PathJoin(constants.EXPORT_DIR, export)
2444
2445   try:
2446     shutil.rmtree(target)
2447   except EnvironmentError, err:
2448     _Fail("Error while removing the export: %s", err, exc=True)
2449
2450
2451 def BlockdevRename(devlist):
2452   """Rename a list of block devices.
2453
2454   @type devlist: list of tuples
2455   @param devlist: list of tuples of the form  (disk,
2456       new_logical_id, new_physical_id); disk is an
2457       L{objects.Disk} object describing the current disk,
2458       and new logical_id/physical_id is the name we
2459       rename it to
2460   @rtype: boolean
2461   @return: True if all renames succeeded, False otherwise
2462
2463   """
2464   msgs = []
2465   result = True
2466   for disk, unique_id in devlist:
2467     dev = _RecursiveFindBD(disk)
2468     if dev is None:
2469       msgs.append("Can't find device %s in rename" % str(disk))
2470       result = False
2471       continue
2472     try:
2473       old_rpath = dev.dev_path
2474       dev.Rename(unique_id)
2475       new_rpath = dev.dev_path
2476       if old_rpath != new_rpath:
2477         DevCacheManager.RemoveCache(old_rpath)
2478         # FIXME: we should add the new cache information here, like:
2479         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2480         # but we don't have the owner here - maybe parse from existing
2481         # cache? for now, we only lose lvm data when we rename, which
2482         # is less critical than DRBD or MD
2483     except errors.BlockDeviceError, err:
2484       msgs.append("Can't rename device '%s' to '%s': %s" %
2485                   (dev, unique_id, err))
2486       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2487       result = False
2488   if not result:
2489     _Fail("; ".join(msgs))
2490
2491
2492 def _TransformFileStorageDir(fs_dir):
2493   """Checks whether given file_storage_dir is valid.
2494
2495   Checks wheter the given fs_dir is within the cluster-wide default
2496   file_storage_dir or the shared_file_storage_dir, which are stored in
2497   SimpleStore. Only paths under those directories are allowed.
2498
2499   @type fs_dir: str
2500   @param fs_dir: the path to check
2501
2502   @return: the normalized path if valid, None otherwise
2503
2504   """
2505   if not constants.ENABLE_FILE_STORAGE:
2506     _Fail("File storage disabled at configure time")
2507   cfg = _GetConfig()
2508   fs_dir = os.path.normpath(fs_dir)
2509   base_fstore = cfg.GetFileStorageDir()
2510   base_shared = cfg.GetSharedFileStorageDir()
2511   if not (utils.IsBelowDir(base_fstore, fs_dir) or
2512           utils.IsBelowDir(base_shared, fs_dir)):
2513     _Fail("File storage directory '%s' is not under base file"
2514           " storage directory '%s' or shared storage directory '%s'",
2515           fs_dir, base_fstore, base_shared)
2516   return fs_dir
2517
2518
2519 def CreateFileStorageDir(file_storage_dir):
2520   """Create file storage directory.
2521
2522   @type file_storage_dir: str
2523   @param file_storage_dir: directory to create
2524
2525   @rtype: tuple
2526   @return: tuple with first element a boolean indicating wheter dir
2527       creation was successful or not
2528
2529   """
2530   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2531   if os.path.exists(file_storage_dir):
2532     if not os.path.isdir(file_storage_dir):
2533       _Fail("Specified storage dir '%s' is not a directory",
2534             file_storage_dir)
2535   else:
2536     try:
2537       os.makedirs(file_storage_dir, 0750)
2538     except OSError, err:
2539       _Fail("Cannot create file storage directory '%s': %s",
2540             file_storage_dir, err, exc=True)
2541
2542
2543 def RemoveFileStorageDir(file_storage_dir):
2544   """Remove file storage directory.
2545
2546   Remove it only if it's empty. If not log an error and return.
2547
2548   @type file_storage_dir: str
2549   @param file_storage_dir: the directory we should cleanup
2550   @rtype: tuple (success,)
2551   @return: tuple of one element, C{success}, denoting
2552       whether the operation was successful
2553
2554   """
2555   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2556   if os.path.exists(file_storage_dir):
2557     if not os.path.isdir(file_storage_dir):
2558       _Fail("Specified Storage directory '%s' is not a directory",
2559             file_storage_dir)
2560     # deletes dir only if empty, otherwise we want to fail the rpc call
2561     try:
2562       os.rmdir(file_storage_dir)
2563     except OSError, err:
2564       _Fail("Cannot remove file storage directory '%s': %s",
2565             file_storage_dir, err)
2566
2567
2568 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2569   """Rename the file storage directory.
2570
2571   @type old_file_storage_dir: str
2572   @param old_file_storage_dir: the current path
2573   @type new_file_storage_dir: str
2574   @param new_file_storage_dir: the name we should rename to
2575   @rtype: tuple (success,)
2576   @return: tuple of one element, C{success}, denoting
2577       whether the operation was successful
2578
2579   """
2580   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2581   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2582   if not os.path.exists(new_file_storage_dir):
2583     if os.path.isdir(old_file_storage_dir):
2584       try:
2585         os.rename(old_file_storage_dir, new_file_storage_dir)
2586       except OSError, err:
2587         _Fail("Cannot rename '%s' to '%s': %s",
2588               old_file_storage_dir, new_file_storage_dir, err)
2589     else:
2590       _Fail("Specified storage dir '%s' is not a directory",
2591             old_file_storage_dir)
2592   else:
2593     if os.path.exists(old_file_storage_dir):
2594       _Fail("Cannot rename '%s' to '%s': both locations exist",
2595             old_file_storage_dir, new_file_storage_dir)
2596
2597
2598 def _EnsureJobQueueFile(file_name):
2599   """Checks whether the given filename is in the queue directory.
2600
2601   @type file_name: str
2602   @param file_name: the file name we should check
2603   @rtype: None
2604   @raises RPCFail: if the file is not valid
2605
2606   """
2607   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2608   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2609
2610   if not result:
2611     _Fail("Passed job queue file '%s' does not belong to"
2612           " the queue directory '%s'", file_name, queue_dir)
2613
2614
2615 def JobQueueUpdate(file_name, content):
2616   """Updates a file in the queue directory.
2617
2618   This is just a wrapper over L{utils.io.WriteFile}, with proper
2619   checking.
2620
2621   @type file_name: str
2622   @param file_name: the job file name
2623   @type content: str
2624   @param content: the new job contents
2625   @rtype: boolean
2626   @return: the success of the operation
2627
2628   """
2629   _EnsureJobQueueFile(file_name)
2630   getents = runtime.GetEnts()
2631
2632   # Write and replace the file atomically
2633   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2634                   gid=getents.masterd_gid)
2635
2636
2637 def JobQueueRename(old, new):
2638   """Renames a job queue file.
2639
2640   This is just a wrapper over os.rename with proper checking.
2641
2642   @type old: str
2643   @param old: the old (actual) file name
2644   @type new: str
2645   @param new: the desired file name
2646   @rtype: tuple
2647   @return: the success of the operation and payload
2648
2649   """
2650   _EnsureJobQueueFile(old)
2651   _EnsureJobQueueFile(new)
2652
2653   utils.RenameFile(old, new, mkdir=True)
2654
2655
2656 def BlockdevClose(instance_name, disks):
2657   """Closes the given block devices.
2658
2659   This means they will be switched to secondary mode (in case of
2660   DRBD).
2661
2662   @param instance_name: if the argument is not empty, the symlinks
2663       of this instance will be removed
2664   @type disks: list of L{objects.Disk}
2665   @param disks: the list of disks to be closed
2666   @rtype: tuple (success, message)
2667   @return: a tuple of success and message, where success
2668       indicates the succes of the operation, and message
2669       which will contain the error details in case we
2670       failed
2671
2672   """
2673   bdevs = []
2674   for cf in disks:
2675     rd = _RecursiveFindBD(cf)
2676     if rd is None:
2677       _Fail("Can't find device %s", cf)
2678     bdevs.append(rd)
2679
2680   msg = []
2681   for rd in bdevs:
2682     try:
2683       rd.Close()
2684     except errors.BlockDeviceError, err:
2685       msg.append(str(err))
2686   if msg:
2687     _Fail("Can't make devices secondary: %s", ",".join(msg))
2688   else:
2689     if instance_name:
2690       _RemoveBlockDevLinks(instance_name, disks)
2691
2692
2693 def ValidateHVParams(hvname, hvparams):
2694   """Validates the given hypervisor parameters.
2695
2696   @type hvname: string
2697   @param hvname: the hypervisor name
2698   @type hvparams: dict
2699   @param hvparams: the hypervisor parameters to be validated
2700   @rtype: None
2701
2702   """
2703   try:
2704     hv_type = hypervisor.GetHypervisor(hvname)
2705     hv_type.ValidateParameters(hvparams)
2706   except errors.HypervisorError, err:
2707     _Fail(str(err), log=False)
2708
2709
2710 def _CheckOSPList(os_obj, parameters):
2711   """Check whether a list of parameters is supported by the OS.
2712
2713   @type os_obj: L{objects.OS}
2714   @param os_obj: OS object to check
2715   @type parameters: list
2716   @param parameters: the list of parameters to check
2717
2718   """
2719   supported = [v[0] for v in os_obj.supported_parameters]
2720   delta = frozenset(parameters).difference(supported)
2721   if delta:
2722     _Fail("The following parameters are not supported"
2723           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2724
2725
2726 def ValidateOS(required, osname, checks, osparams):
2727   """Validate the given OS' parameters.
2728
2729   @type required: boolean
2730   @param required: whether absence of the OS should translate into
2731       failure or not
2732   @type osname: string
2733   @param osname: the OS to be validated
2734   @type checks: list
2735   @param checks: list of the checks to run (currently only 'parameters')
2736   @type osparams: dict
2737   @param osparams: dictionary with OS parameters
2738   @rtype: boolean
2739   @return: True if the validation passed, or False if the OS was not
2740       found and L{required} was false
2741
2742   """
2743   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2744     _Fail("Unknown checks required for OS %s: %s", osname,
2745           set(checks).difference(constants.OS_VALIDATE_CALLS))
2746
2747   name_only = objects.OS.GetName(osname)
2748   status, tbv = _TryOSFromDisk(name_only, None)
2749
2750   if not status:
2751     if required:
2752       _Fail(tbv)
2753     else:
2754       return False
2755
2756   if max(tbv.api_versions) < constants.OS_API_V20:
2757     return True
2758
2759   if constants.OS_VALIDATE_PARAMETERS in checks:
2760     _CheckOSPList(tbv, osparams.keys())
2761
2762   validate_env = OSCoreEnv(osname, tbv, osparams)
2763   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2764                         cwd=tbv.path, reset_env=True)
2765   if result.failed:
2766     logging.error("os validate command '%s' returned error: %s output: %s",
2767                   result.cmd, result.fail_reason, result.output)
2768     _Fail("OS validation script failed (%s), output: %s",
2769           result.fail_reason, result.output, log=False)
2770
2771   return True
2772
2773
2774 def DemoteFromMC():
2775   """Demotes the current node from master candidate role.
2776
2777   """
2778   # try to ensure we're not the master by mistake
2779   master, myself = ssconf.GetMasterAndMyself()
2780   if master == myself:
2781     _Fail("ssconf status shows I'm the master node, will not demote")
2782
2783   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2784   if not result.failed:
2785     _Fail("The master daemon is running, will not demote")
2786
2787   try:
2788     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2789       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2790   except EnvironmentError, err:
2791     if err.errno != errno.ENOENT:
2792       _Fail("Error while backing up cluster file: %s", err, exc=True)
2793
2794   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2795
2796
2797 def _GetX509Filenames(cryptodir, name):
2798   """Returns the full paths for the private key and certificate.
2799
2800   """
2801   return (utils.PathJoin(cryptodir, name),
2802           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2803           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2804
2805
2806 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2807   """Creates a new X509 certificate for SSL/TLS.
2808
2809   @type validity: int
2810   @param validity: Validity in seconds
2811   @rtype: tuple; (string, string)
2812   @return: Certificate name and public part
2813
2814   """
2815   (key_pem, cert_pem) = \
2816     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2817                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2818
2819   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2820                               prefix="x509-%s-" % utils.TimestampForFilename())
2821   try:
2822     name = os.path.basename(cert_dir)
2823     assert len(name) > 5
2824
2825     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2826
2827     utils.WriteFile(key_file, mode=0400, data=key_pem)
2828     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2829
2830     # Never return private key as it shouldn't leave the node
2831     return (name, cert_pem)
2832   except Exception:
2833     shutil.rmtree(cert_dir, ignore_errors=True)
2834     raise
2835
2836
2837 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2838   """Removes a X509 certificate.
2839
2840   @type name: string
2841   @param name: Certificate name
2842
2843   """
2844   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2845
2846   utils.RemoveFile(key_file)
2847   utils.RemoveFile(cert_file)
2848
2849   try:
2850     os.rmdir(cert_dir)
2851   except EnvironmentError, err:
2852     _Fail("Cannot remove certificate directory '%s': %s",
2853           cert_dir, err)
2854
2855
2856 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2857   """Returns the command for the requested input/output.
2858
2859   @type instance: L{objects.Instance}
2860   @param instance: The instance object
2861   @param mode: Import/export mode
2862   @param ieio: Input/output type
2863   @param ieargs: Input/output arguments
2864
2865   """
2866   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2867
2868   env = None
2869   prefix = None
2870   suffix = None
2871   exp_size = None
2872
2873   if ieio == constants.IEIO_FILE:
2874     (filename, ) = ieargs
2875
2876     if not utils.IsNormAbsPath(filename):
2877       _Fail("Path '%s' is not normalized or absolute", filename)
2878
2879     real_filename = os.path.realpath(filename)
2880     directory = os.path.dirname(real_filename)
2881
2882     if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2883       _Fail("File '%s' is not under exports directory '%s': %s",
2884             filename, constants.EXPORT_DIR, real_filename)
2885
2886     # Create directory
2887     utils.Makedirs(directory, mode=0750)
2888
2889     quoted_filename = utils.ShellQuote(filename)
2890
2891     if mode == constants.IEM_IMPORT:
2892       suffix = "> %s" % quoted_filename
2893     elif mode == constants.IEM_EXPORT:
2894       suffix = "< %s" % quoted_filename
2895
2896       # Retrieve file size
2897       try:
2898         st = os.stat(filename)
2899       except EnvironmentError, err:
2900         logging.error("Can't stat(2) %s: %s", filename, err)
2901       else:
2902         exp_size = utils.BytesToMebibyte(st.st_size)
2903
2904   elif ieio == constants.IEIO_RAW_DISK:
2905     (disk, ) = ieargs
2906
2907     real_disk = _OpenRealBD(disk)
2908
2909     if mode == constants.IEM_IMPORT:
2910       # we set here a smaller block size as, due to transport buffering, more
2911       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2912       # is not already there or we pass a wrong path; we use notrunc to no
2913       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2914       # much memory; this means that at best, we flush every 64k, which will
2915       # not be very fast
2916       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2917                                     " bs=%s oflag=dsync"),
2918                                     real_disk.dev_path,
2919                                     str(64 * 1024))
2920
2921     elif mode == constants.IEM_EXPORT:
2922       # the block size on the read dd is 1MiB to match our units
2923       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2924                                    real_disk.dev_path,
2925                                    str(1024 * 1024), # 1 MB
2926                                    str(disk.size))
2927       exp_size = disk.size
2928
2929   elif ieio == constants.IEIO_SCRIPT:
2930     (disk, disk_index, ) = ieargs
2931
2932     assert isinstance(disk_index, (int, long))
2933
2934     real_disk = _OpenRealBD(disk)
2935
2936     inst_os = OSFromDisk(instance.os)
2937     env = OSEnvironment(instance, inst_os)
2938
2939     if mode == constants.IEM_IMPORT:
2940       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2941       env["IMPORT_INDEX"] = str(disk_index)
2942       script = inst_os.import_script
2943
2944     elif mode == constants.IEM_EXPORT:
2945       env["EXPORT_DEVICE"] = real_disk.dev_path
2946       env["EXPORT_INDEX"] = str(disk_index)
2947       script = inst_os.export_script
2948
2949     # TODO: Pass special environment only to script
2950     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2951
2952     if mode == constants.IEM_IMPORT:
2953       suffix = "| %s" % script_cmd
2954
2955     elif mode == constants.IEM_EXPORT:
2956       prefix = "%s |" % script_cmd
2957
2958     # Let script predict size
2959     exp_size = constants.IE_CUSTOM_SIZE
2960
2961   else:
2962     _Fail("Invalid %s I/O mode %r", mode, ieio)
2963
2964   return (env, prefix, suffix, exp_size)
2965
2966
2967 def _CreateImportExportStatusDir(prefix):
2968   """Creates status directory for import/export.
2969
2970   """
2971   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2972                           prefix=("%s-%s-" %
2973                                   (prefix, utils.TimestampForFilename())))
2974
2975
2976 def StartImportExportDaemon(mode, opts, host, port, instance, component,
2977                             ieio, ieioargs):
2978   """Starts an import or export daemon.
2979
2980   @param mode: Import/output mode
2981   @type opts: L{objects.ImportExportOptions}
2982   @param opts: Daemon options
2983   @type host: string
2984   @param host: Remote host for export (None for import)
2985   @type port: int
2986   @param port: Remote port for export (None for import)
2987   @type instance: L{objects.Instance}
2988   @param instance: Instance object
2989   @type component: string
2990   @param component: which part of the instance is transferred now,
2991       e.g. 'disk/0'
2992   @param ieio: Input/output type
2993   @param ieioargs: Input/output arguments
2994
2995   """
2996   if mode == constants.IEM_IMPORT:
2997     prefix = "import"
2998
2999     if not (host is None and port is None):
3000       _Fail("Can not specify host or port on import")
3001
3002   elif mode == constants.IEM_EXPORT:
3003     prefix = "export"
3004
3005     if host is None or port is None:
3006       _Fail("Host and port must be specified for an export")
3007
3008   else:
3009     _Fail("Invalid mode %r", mode)
3010
3011   if (opts.key_name is None) ^ (opts.ca_pem is None):
3012     _Fail("Cluster certificate can only be used for both key and CA")
3013
3014   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3015     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3016
3017   if opts.key_name is None:
3018     # Use server.pem
3019     key_path = constants.NODED_CERT_FILE
3020     cert_path = constants.NODED_CERT_FILE
3021     assert opts.ca_pem is None
3022   else:
3023     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3024                                                  opts.key_name)
3025     assert opts.ca_pem is not None
3026
3027   for i in [key_path, cert_path]:
3028     if not os.path.exists(i):
3029       _Fail("File '%s' does not exist" % i)
3030
3031   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3032   try:
3033     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3034     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3035     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3036
3037     if opts.ca_pem is None:
3038       # Use server.pem
3039       ca = utils.ReadFile(constants.NODED_CERT_FILE)
3040     else:
3041       ca = opts.ca_pem
3042
3043     # Write CA file
3044     utils.WriteFile(ca_file, data=ca, mode=0400)
3045
3046     cmd = [
3047       constants.IMPORT_EXPORT_DAEMON,
3048       status_file, mode,
3049       "--key=%s" % key_path,
3050       "--cert=%s" % cert_path,
3051       "--ca=%s" % ca_file,
3052       ]
3053
3054     if host:
3055       cmd.append("--host=%s" % host)
3056
3057     if port:
3058       cmd.append("--port=%s" % port)
3059
3060     if opts.ipv6:
3061       cmd.append("--ipv6")
3062     else:
3063       cmd.append("--ipv4")
3064
3065     if opts.compress:
3066       cmd.append("--compress=%s" % opts.compress)
3067
3068     if opts.magic:
3069       cmd.append("--magic=%s" % opts.magic)
3070
3071     if exp_size is not None:
3072       cmd.append("--expected-size=%s" % exp_size)
3073
3074     if cmd_prefix:
3075       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3076
3077     if cmd_suffix:
3078       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3079
3080     if mode == constants.IEM_EXPORT:
3081       # Retry connection a few times when connecting to remote peer
3082       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3083       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3084     elif opts.connect_timeout is not None:
3085       assert mode == constants.IEM_IMPORT
3086       # Overall timeout for establishing connection while listening
3087       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3088
3089     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3090
3091     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3092     # support for receiving a file descriptor for output
3093     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3094                       output=logfile)
3095
3096     # The import/export name is simply the status directory name
3097     return os.path.basename(status_dir)
3098
3099   except Exception:
3100     shutil.rmtree(status_dir, ignore_errors=True)
3101     raise
3102
3103
3104 def GetImportExportStatus(names):
3105   """Returns import/export daemon status.
3106
3107   @type names: sequence
3108   @param names: List of names
3109   @rtype: List of dicts
3110   @return: Returns a list of the state of each named import/export or None if a
3111            status couldn't be read
3112
3113   """
3114   result = []
3115
3116   for name in names:
3117     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3118                                  _IES_STATUS_FILE)
3119
3120     try:
3121       data = utils.ReadFile(status_file)
3122     except EnvironmentError, err:
3123       if err.errno != errno.ENOENT:
3124         raise
3125       data = None
3126
3127     if not data:
3128       result.append(None)
3129       continue
3130
3131     result.append(serializer.LoadJson(data))
3132
3133   return result
3134
3135
3136 def AbortImportExport(name):
3137   """Sends SIGTERM to a running import/export daemon.
3138
3139   """
3140   logging.info("Abort import/export %s", name)
3141
3142   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3143   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3144
3145   if pid:
3146     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3147                  name, pid)
3148     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3149
3150
3151 def CleanupImportExport(name):
3152   """Cleanup after an import or export.
3153
3154   If the import/export daemon is still running it's killed. Afterwards the
3155   whole status directory is removed.
3156
3157   """
3158   logging.info("Finalizing import/export %s", name)
3159
3160   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3161
3162   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3163
3164   if pid:
3165     logging.info("Import/export %s is still running with PID %s",
3166                  name, pid)
3167     utils.KillProcess(pid, waitpid=False)
3168
3169   shutil.rmtree(status_dir, ignore_errors=True)
3170
3171
3172 def _FindDisks(nodes_ip, disks):
3173   """Sets the physical ID on disks and returns the block devices.
3174
3175   """
3176   # set the correct physical ID
3177   my_name = netutils.Hostname.GetSysName()
3178   for cf in disks:
3179     cf.SetPhysicalID(my_name, nodes_ip)
3180
3181   bdevs = []
3182
3183   for cf in disks:
3184     rd = _RecursiveFindBD(cf)
3185     if rd is None:
3186       _Fail("Can't find device %s", cf)
3187     bdevs.append(rd)
3188   return bdevs
3189
3190
3191 def DrbdDisconnectNet(nodes_ip, disks):
3192   """Disconnects the network on a list of drbd devices.
3193
3194   """
3195   bdevs = _FindDisks(nodes_ip, disks)
3196
3197   # disconnect disks
3198   for rd in bdevs:
3199     try:
3200       rd.DisconnectNet()
3201     except errors.BlockDeviceError, err:
3202       _Fail("Can't change network configuration to standalone mode: %s",
3203             err, exc=True)
3204
3205
3206 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3207   """Attaches the network on a list of drbd devices.
3208
3209   """
3210   bdevs = _FindDisks(nodes_ip, disks)
3211
3212   if multimaster:
3213     for idx, rd in enumerate(bdevs):
3214       try:
3215         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3216       except EnvironmentError, err:
3217         _Fail("Can't create symlink: %s", err)
3218   # reconnect disks, switch to new master configuration and if
3219   # needed primary mode
3220   for rd in bdevs:
3221     try:
3222       rd.AttachNet(multimaster)
3223     except errors.BlockDeviceError, err:
3224       _Fail("Can't change network configuration: %s", err)
3225
3226   # wait until the disks are connected; we need to retry the re-attach
3227   # if the device becomes standalone, as this might happen if the one
3228   # node disconnects and reconnects in a different mode before the
3229   # other node reconnects; in this case, one or both of the nodes will
3230   # decide it has wrong configuration and switch to standalone
3231
3232   def _Attach():
3233     all_connected = True
3234
3235     for rd in bdevs:
3236       stats = rd.GetProcStatus()
3237
3238       all_connected = (all_connected and
3239                        (stats.is_connected or stats.is_in_resync))
3240
3241       if stats.is_standalone:
3242         # peer had different config info and this node became
3243         # standalone, even though this should not happen with the
3244         # new staged way of changing disk configs
3245         try:
3246           rd.AttachNet(multimaster)
3247         except errors.BlockDeviceError, err:
3248           _Fail("Can't change network configuration: %s", err)
3249
3250     if not all_connected:
3251       raise utils.RetryAgain()
3252
3253   try:
3254     # Start with a delay of 100 miliseconds and go up to 5 seconds
3255     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3256   except utils.RetryTimeout:
3257     _Fail("Timeout in disk reconnecting")
3258
3259   if multimaster:
3260     # change to primary mode
3261     for rd in bdevs:
3262       try:
3263         rd.Open()
3264       except errors.BlockDeviceError, err:
3265         _Fail("Can't change to primary mode: %s", err)
3266
3267
3268 def DrbdWaitSync(nodes_ip, disks):
3269   """Wait until DRBDs have synchronized.
3270
3271   """
3272   def _helper(rd):
3273     stats = rd.GetProcStatus()
3274     if not (stats.is_connected or stats.is_in_resync):
3275       raise utils.RetryAgain()
3276     return stats
3277
3278   bdevs = _FindDisks(nodes_ip, disks)
3279
3280   min_resync = 100
3281   alldone = True
3282   for rd in bdevs:
3283     try:
3284       # poll each second for 15 seconds
3285       stats = utils.Retry(_helper, 1, 15, args=[rd])
3286     except utils.RetryTimeout:
3287       stats = rd.GetProcStatus()
3288       # last check
3289       if not (stats.is_connected or stats.is_in_resync):
3290         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3291     alldone = alldone and (not stats.is_in_resync)
3292     if stats.sync_percent is not None:
3293       min_resync = min(min_resync, stats.sync_percent)
3294
3295   return (alldone, min_resync)
3296
3297
3298 def GetDrbdUsermodeHelper():
3299   """Returns DRBD usermode helper currently configured.
3300
3301   """
3302   try:
3303     return bdev.BaseDRBD.GetUsermodeHelper()
3304   except errors.BlockDeviceError, err:
3305     _Fail(str(err))
3306
3307
3308 def PowercycleNode(hypervisor_type):
3309   """Hard-powercycle the node.
3310
3311   Because we need to return first, and schedule the powercycle in the
3312   background, we won't be able to report failures nicely.
3313
3314   """
3315   hyper = hypervisor.GetHypervisor(hypervisor_type)
3316   try:
3317     pid = os.fork()
3318   except OSError:
3319     # if we can't fork, we'll pretend that we're in the child process
3320     pid = 0
3321   if pid > 0:
3322     return "Reboot scheduled in 5 seconds"
3323   # ensure the child is running on ram
3324   try:
3325     utils.Mlockall()
3326   except Exception: # pylint: disable=W0703
3327     pass
3328   time.sleep(5)
3329   hyper.PowercycleNode()
3330
3331
3332 class HooksRunner(object):
3333   """Hook runner.
3334
3335   This class is instantiated on the node side (ganeti-noded) and not
3336   on the master side.
3337
3338   """
3339   def __init__(self, hooks_base_dir=None):
3340     """Constructor for hooks runner.
3341
3342     @type hooks_base_dir: str or None
3343     @param hooks_base_dir: if not None, this overrides the
3344         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3345
3346     """
3347     if hooks_base_dir is None:
3348       hooks_base_dir = constants.HOOKS_BASE_DIR
3349     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3350     # constant
3351     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3352
3353   def RunHooks(self, hpath, phase, env):
3354     """Run the scripts in the hooks directory.
3355
3356     @type hpath: str
3357     @param hpath: the path to the hooks directory which
3358         holds the scripts
3359     @type phase: str
3360     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3361         L{constants.HOOKS_PHASE_POST}
3362     @type env: dict
3363     @param env: dictionary with the environment for the hook
3364     @rtype: list
3365     @return: list of 3-element tuples:
3366       - script path
3367       - script result, either L{constants.HKR_SUCCESS} or
3368         L{constants.HKR_FAIL}
3369       - output of the script
3370
3371     @raise errors.ProgrammerError: for invalid input
3372         parameters
3373
3374     """
3375     if phase == constants.HOOKS_PHASE_PRE:
3376       suffix = "pre"
3377     elif phase == constants.HOOKS_PHASE_POST:
3378       suffix = "post"
3379     else:
3380       _Fail("Unknown hooks phase '%s'", phase)
3381
3382     subdir = "%s-%s.d" % (hpath, suffix)
3383     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3384
3385     results = []
3386
3387     if not os.path.isdir(dir_name):
3388       # for non-existing/non-dirs, we simply exit instead of logging a
3389       # warning at every operation
3390       return results
3391
3392     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3393
3394     for (relname, relstatus, runresult)  in runparts_results:
3395       if relstatus == constants.RUNPARTS_SKIP:
3396         rrval = constants.HKR_SKIP
3397         output = ""
3398       elif relstatus == constants.RUNPARTS_ERR:
3399         rrval = constants.HKR_FAIL
3400         output = "Hook script execution error: %s" % runresult
3401       elif relstatus == constants.RUNPARTS_RUN:
3402         if runresult.failed:
3403           rrval = constants.HKR_FAIL
3404         else:
3405           rrval = constants.HKR_SUCCESS
3406         output = utils.SafeEncode(runresult.output.strip())
3407       results.append(("%s/%s" % (subdir, relname), rrval, output))
3408
3409     return results
3410
3411
3412 class IAllocatorRunner(object):
3413   """IAllocator runner.
3414
3415   This class is instantiated on the node side (ganeti-noded) and not on
3416   the master side.
3417
3418   """
3419   @staticmethod
3420   def Run(name, idata):
3421     """Run an iallocator script.
3422
3423     @type name: str
3424     @param name: the iallocator script name
3425     @type idata: str
3426     @param idata: the allocator input data
3427
3428     @rtype: tuple
3429     @return: two element tuple of:
3430        - status
3431        - either error message or stdout of allocator (for success)
3432
3433     """
3434     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3435                                   os.path.isfile)
3436     if alloc_script is None:
3437       _Fail("iallocator module '%s' not found in the search path", name)
3438
3439     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3440     try:
3441       os.write(fd, idata)
3442       os.close(fd)
3443       result = utils.RunCmd([alloc_script, fin_name])
3444       if result.failed:
3445         _Fail("iallocator module '%s' failed: %s, output '%s'",
3446               name, result.fail_reason, result.output)
3447     finally:
3448       os.unlink(fin_name)
3449
3450     return result.stdout
3451
3452
3453 class DevCacheManager(object):
3454   """Simple class for managing a cache of block device information.
3455
3456   """
3457   _DEV_PREFIX = "/dev/"
3458   _ROOT_DIR = constants.BDEV_CACHE_DIR
3459
3460   @classmethod
3461   def _ConvertPath(cls, dev_path):
3462     """Converts a /dev/name path to the cache file name.
3463
3464     This replaces slashes with underscores and strips the /dev
3465     prefix. It then returns the full path to the cache file.
3466
3467     @type dev_path: str
3468     @param dev_path: the C{/dev/} path name
3469     @rtype: str
3470     @return: the converted path name
3471
3472     """
3473     if dev_path.startswith(cls._DEV_PREFIX):
3474       dev_path = dev_path[len(cls._DEV_PREFIX):]
3475     dev_path = dev_path.replace("/", "_")
3476     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3477     return fpath
3478
3479   @classmethod
3480   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3481     """Updates the cache information for a given device.
3482
3483     @type dev_path: str
3484     @param dev_path: the pathname of the device
3485     @type owner: str
3486     @param owner: the owner (instance name) of the device
3487     @type on_primary: bool
3488     @param on_primary: whether this is the primary
3489         node nor not
3490     @type iv_name: str
3491     @param iv_name: the instance-visible name of the
3492         device, as in objects.Disk.iv_name
3493
3494     @rtype: None
3495
3496     """
3497     if dev_path is None:
3498       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3499       return
3500     fpath = cls._ConvertPath(dev_path)
3501     if on_primary:
3502       state = "primary"
3503     else:
3504       state = "secondary"
3505     if iv_name is None:
3506       iv_name = "not_visible"
3507     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3508     try:
3509       utils.WriteFile(fpath, data=fdata)
3510     except EnvironmentError, err:
3511       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3512
3513   @classmethod
3514   def RemoveCache(cls, dev_path):
3515     """Remove data for a dev_path.
3516
3517     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3518     path name and logging.
3519
3520     @type dev_path: str
3521     @param dev_path: the pathname of the device
3522
3523     @rtype: None
3524
3525     """
3526     if dev_path is None:
3527       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3528       return
3529     fpath = cls._ConvertPath(dev_path)
3530     try:
3531       utils.RemoveFile(fpath)
3532     except EnvironmentError, err:
3533       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)