Instance transfer: export component name to backend
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.RAPI_USERS_FILE,
200     constants.CONFD_HMAC_KEY,
201     constants.CLUSTER_DOMAIN_SECRET_FILE,
202     ])
203
204   for hv_name in constants.HYPER_TYPES:
205     hv_class = hypervisor.GetHypervisorClass(hv_name)
206     allowed_files.update(hv_class.GetAncillaryFiles())
207
208   return frozenset(allowed_files)
209
210
211 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212
213
214 def JobQueuePurge():
215   """Removes job queue files and archived jobs.
216
217   @rtype: tuple
218   @return: True, None
219
220   """
221   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
222   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223
224
225 def GetMasterInfo():
226   """Returns master information.
227
228   This is an utility function to compute master information, either
229   for consumption here or from the node daemon.
230
231   @rtype: tuple
232   @return: master_netdev, master_ip, master_name, primary_ip_family
233   @raise RPCFail: in case of errors
234
235   """
236   try:
237     cfg = _GetConfig()
238     master_netdev = cfg.GetMasterNetdev()
239     master_ip = cfg.GetMasterIP()
240     master_node = cfg.GetMasterNode()
241     primary_ip_family = cfg.GetPrimaryIPFamily()
242   except errors.ConfigurationError, err:
243     _Fail("Cluster configuration incomplete: %s", err, exc=True)
244   return (master_netdev, master_ip, master_node, primary_ip_family)
245
246
247 def StartMaster(start_daemons, no_voting):
248   """Activate local node as master node.
249
250   The function will either try activate the IP address of the master
251   (unless someone else has it) or also start the master daemons, based
252   on the start_daemons parameter.
253
254   @type start_daemons: boolean
255   @param start_daemons: whether to start the master daemons
256       (ganeti-masterd and ganeti-rapi), or (if false) activate the
257       master ip
258   @type no_voting: boolean
259   @param no_voting: whether to start ganeti-masterd without a node vote
260       (if start_daemons is True), but still non-interactively
261   @rtype: None
262
263   """
264   # GetMasterInfo will raise an exception if not able to return data
265   master_netdev, master_ip, _, family = GetMasterInfo()
266
267   err_msgs = []
268   # either start the master and rapi daemons
269   if start_daemons:
270     if no_voting:
271       masterd_args = "--no-voting --yes-do-it"
272     else:
273       masterd_args = ""
274
275     env = {
276       "EXTRA_MASTERD_ARGS": masterd_args,
277       }
278
279     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
280     if result.failed:
281       msg = "Can't start Ganeti master: %s" % result.output
282       logging.error(msg)
283       err_msgs.append(msg)
284   # or activate the IP
285   else:
286     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
287       if netutils.IPAddress.Own(master_ip):
288         # we already have the ip:
289         logging.debug("Master IP already configured, doing nothing")
290       else:
291         msg = "Someone else has the master ip, not activating"
292         logging.error(msg)
293         err_msgs.append(msg)
294     else:
295       ipcls = netutils.IP4Address
296       if family == netutils.IP6Address.family:
297         ipcls = netutils.IP6Address
298
299       result = utils.RunCmd(["ip", "address", "add",
300                              "%s/%d" % (master_ip, ipcls.iplen),
301                              "dev", master_netdev, "label",
302                              "%s:0" % master_netdev])
303       if result.failed:
304         msg = "Can't activate master IP: %s" % result.output
305         logging.error(msg)
306         err_msgs.append(msg)
307
308       # we ignore the exit code of the following cmds
309       if ipcls == netutils.IP4Address:
310         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
311                       master_ip, master_ip])
312       elif ipcls == netutils.IP6Address:
313         try:
314           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
315         except errors.OpExecError:
316           # TODO: Better error reporting
317           logging.warning("Can't execute ndisc6, please install if missing")
318
319   if err_msgs:
320     _Fail("; ".join(err_msgs))
321
322
323 def StopMaster(stop_daemons):
324   """Deactivate this node as master.
325
326   The function will always try to deactivate the IP address of the
327   master. It will also stop the master daemons depending on the
328   stop_daemons parameter.
329
330   @type stop_daemons: boolean
331   @param stop_daemons: whether to also stop the master daemons
332       (ganeti-masterd and ganeti-rapi)
333   @rtype: None
334
335   """
336   # TODO: log and report back to the caller the error failures; we
337   # need to decide in which case we fail the RPC for this
338
339   # GetMasterInfo will raise an exception if not able to return data
340   master_netdev, master_ip, _, family = GetMasterInfo()
341
342   ipcls = netutils.IP4Address
343   if family == netutils.IP6Address.family:
344     ipcls = netutils.IP6Address
345
346   result = utils.RunCmd(["ip", "address", "del",
347                          "%s/%d" % (master_ip, ipcls.iplen),
348                          "dev", master_netdev])
349   if result.failed:
350     logging.error("Can't remove the master IP, error: %s", result.output)
351     # but otherwise ignore the failure
352
353   if stop_daemons:
354     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355     if result.failed:
356       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357                     " and error %s",
358                     result.cmd, result.exit_code, result.output)
359
360
361 def EtcHostsModify(mode, host, ip):
362   """Modify a host entry in /etc/hosts.
363
364   @param mode: The mode to operate. Either add or remove entry
365   @param host: The host to operate on
366   @param ip: The ip associated with the entry
367
368   """
369   if mode == constants.ETC_HOSTS_ADD:
370     if not ip:
371       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
372               " present")
373     utils.AddHostToEtcHosts(host, ip)
374   elif mode == constants.ETC_HOSTS_REMOVE:
375     if ip:
376       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
377               " parameter is present")
378     utils.RemoveHostFromEtcHosts(host)
379   else:
380     RPCFail("Mode not supported")
381
382
383 def LeaveCluster(modify_ssh_setup):
384   """Cleans up and remove the current node.
385
386   This function cleans up and prepares the current node to be removed
387   from the cluster.
388
389   If processing is successful, then it raises an
390   L{errors.QuitGanetiException} which is used as a special case to
391   shutdown the node daemon.
392
393   @param modify_ssh_setup: boolean
394
395   """
396   _CleanDirectory(constants.DATA_DIR)
397   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398   JobQueuePurge()
399
400   if modify_ssh_setup:
401     try:
402       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403
404       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405
406       utils.RemoveFile(priv_key)
407       utils.RemoveFile(pub_key)
408     except errors.OpExecError:
409       logging.exception("Error while processing ssh files")
410
411   try:
412     utils.RemoveFile(constants.CONFD_HMAC_KEY)
413     utils.RemoveFile(constants.RAPI_CERT_FILE)
414     utils.RemoveFile(constants.NODED_CERT_FILE)
415   except: # pylint: disable-msg=W0702
416     logging.exception("Error while removing cluster secrets")
417
418   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419   if result.failed:
420     logging.error("Command %s failed with exitcode %s and error %s",
421                   result.cmd, result.exit_code, result.output)
422
423   # Raise a custom exception (handled in ganeti-noded)
424   raise errors.QuitGanetiException(True, "Shutdown scheduled")
425
426
427 def GetNodeInfo(vgname, hypervisor_type):
428   """Gives back a hash with different information about the node.
429
430   @type vgname: C{string}
431   @param vgname: the name of the volume group to ask for disk space information
432   @type hypervisor_type: C{str}
433   @param hypervisor_type: the name of the hypervisor to ask for
434       memory information
435   @rtype: C{dict}
436   @return: dictionary with the following keys:
437       - vg_size is the size of the configured volume group in MiB
438       - vg_free is the free size of the volume group in MiB
439       - memory_dom0 is the memory allocated for domain0 in MiB
440       - memory_free is the currently available (free) ram in MiB
441       - memory_total is the total number of ram in MiB
442
443   """
444   outputarray = {}
445
446   if vgname is not None:
447     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
448     vg_free = vg_size = None
449     if vginfo:
450       vg_free = int(round(vginfo[0][0], 0))
451       vg_size = int(round(vginfo[0][1], 0))
452     outputarray["vg_size"] = vg_size
453     outputarray["vg_free"] = vg_free
454
455   if hypervisor_type is not None:
456     hyper = hypervisor.GetHypervisor(hypervisor_type)
457     hyp_info = hyper.GetNodeInfo()
458     if hyp_info is not None:
459       outputarray.update(hyp_info)
460
461   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462
463   return outputarray
464
465
466 def VerifyNode(what, cluster_name):
467   """Verify the status of the local node.
468
469   Based on the input L{what} parameter, various checks are done on the
470   local node.
471
472   If the I{filelist} key is present, this list of
473   files is checksummed and the file/checksum pairs are returned.
474
475   If the I{nodelist} key is present, we check that we have
476   connectivity via ssh with the target nodes (and check the hostname
477   report).
478
479   If the I{node-net-test} key is present, we check that we have
480   connectivity to the given nodes via both primary IP and, if
481   applicable, secondary IPs.
482
483   @type what: C{dict}
484   @param what: a dictionary of things to check:
485       - filelist: list of files for which to compute checksums
486       - nodelist: list of nodes we should check ssh communication with
487       - node-net-test: list of nodes we should check node daemon port
488         connectivity with
489       - hypervisor: list with hypervisors to run the verify for
490   @rtype: dict
491   @return: a dictionary with the same keys as the input dict, and
492       values representing the result of the checks
493
494   """
495   result = {}
496   my_name = netutils.Hostname.GetSysName()
497   port = netutils.GetDaemonPort(constants.NODED)
498   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
499
500   if constants.NV_HYPERVISOR in what and vm_capable:
501     result[constants.NV_HYPERVISOR] = tmp = {}
502     for hv_name in what[constants.NV_HYPERVISOR]:
503       try:
504         val = hypervisor.GetHypervisor(hv_name).Verify()
505       except errors.HypervisorError, err:
506         val = "Error while checking hypervisor: %s" % str(err)
507       tmp[hv_name] = val
508
509   if constants.NV_HVPARAMS in what and vm_capable:
510     result[constants.NV_HVPARAMS] = tmp = []
511     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
512       try:
513         logging.info("Validating hv %s, %s", hv_name, hvparms)
514         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
515       except errors.HypervisorError, err:
516         tmp.append((source, hv_name, str(err)))
517
518   if constants.NV_FILELIST in what:
519     result[constants.NV_FILELIST] = utils.FingerprintFiles(
520       what[constants.NV_FILELIST])
521
522   if constants.NV_NODELIST in what:
523     result[constants.NV_NODELIST] = tmp = {}
524     random.shuffle(what[constants.NV_NODELIST])
525     for node in what[constants.NV_NODELIST]:
526       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
527       if not success:
528         tmp[node] = message
529
530   if constants.NV_NODENETTEST in what:
531     result[constants.NV_NODENETTEST] = tmp = {}
532     my_pip = my_sip = None
533     for name, pip, sip in what[constants.NV_NODENETTEST]:
534       if name == my_name:
535         my_pip = pip
536         my_sip = sip
537         break
538     if not my_pip:
539       tmp[my_name] = ("Can't find my own primary/secondary IP"
540                       " in the node list")
541     else:
542       for name, pip, sip in what[constants.NV_NODENETTEST]:
543         fail = []
544         if not netutils.TcpPing(pip, port, source=my_pip):
545           fail.append("primary")
546         if sip != pip:
547           if not netutils.TcpPing(sip, port, source=my_sip):
548             fail.append("secondary")
549         if fail:
550           tmp[name] = ("failure using the %s interface(s)" %
551                        " and ".join(fail))
552
553   if constants.NV_MASTERIP in what:
554     # FIXME: add checks on incoming data structures (here and in the
555     # rest of the function)
556     master_name, master_ip = what[constants.NV_MASTERIP]
557     if master_name == my_name:
558       source = constants.IP4_ADDRESS_LOCALHOST
559     else:
560       source = None
561     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
562                                                   source=source)
563
564   if constants.NV_OOB_PATHS in what:
565     result[constants.NV_OOB_PATHS] = tmp = []
566     for path in what[constants.NV_OOB_PATHS]:
567       try:
568         st = os.stat(path)
569       except OSError, err:
570         tmp.append("error stating out of band helper: %s" % err)
571       else:
572         if stat.S_ISREG(st.st_mode):
573           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
574             tmp.append(None)
575           else:
576             tmp.append("out of band helper %s is not executable" % path)
577         else:
578           tmp.append("out of band helper %s is not a file" % path)
579
580   if constants.NV_LVLIST in what and vm_capable:
581     try:
582       val = GetVolumeList(utils.ListVolumeGroups().keys())
583     except RPCFail, err:
584       val = str(err)
585     result[constants.NV_LVLIST] = val
586
587   if constants.NV_INSTANCELIST in what and vm_capable:
588     # GetInstanceList can fail
589     try:
590       val = GetInstanceList(what[constants.NV_INSTANCELIST])
591     except RPCFail, err:
592       val = str(err)
593     result[constants.NV_INSTANCELIST] = val
594
595   if constants.NV_VGLIST in what and vm_capable:
596     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
597
598   if constants.NV_PVLIST in what and vm_capable:
599     result[constants.NV_PVLIST] = \
600       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
601                                    filter_allocatable=False)
602
603   if constants.NV_VERSION in what:
604     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
605                                     constants.RELEASE_VERSION)
606
607   if constants.NV_HVINFO in what and vm_capable:
608     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
609     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
610
611   if constants.NV_DRBDLIST in what and vm_capable:
612     try:
613       used_minors = bdev.DRBD8.GetUsedDevs().keys()
614     except errors.BlockDeviceError, err:
615       logging.warning("Can't get used minors list", exc_info=True)
616       used_minors = str(err)
617     result[constants.NV_DRBDLIST] = used_minors
618
619   if constants.NV_DRBDHELPER in what and vm_capable:
620     status = True
621     try:
622       payload = bdev.BaseDRBD.GetUsermodeHelper()
623     except errors.BlockDeviceError, err:
624       logging.error("Can't get DRBD usermode helper: %s", str(err))
625       status = False
626       payload = str(err)
627     result[constants.NV_DRBDHELPER] = (status, payload)
628
629   if constants.NV_NODESETUP in what:
630     result[constants.NV_NODESETUP] = tmpr = []
631     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
632       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
633                   " under /sys, missing required directories /sys/block"
634                   " and /sys/class/net")
635     if (not os.path.isdir("/proc/sys") or
636         not os.path.isfile("/proc/sysrq-trigger")):
637       tmpr.append("The procfs filesystem doesn't seem to be mounted"
638                   " under /proc, missing required directory /proc/sys and"
639                   " the file /proc/sysrq-trigger")
640
641   if constants.NV_TIME in what:
642     result[constants.NV_TIME] = utils.SplitTime(time.time())
643
644   if constants.NV_OSLIST in what and vm_capable:
645     result[constants.NV_OSLIST] = DiagnoseOS()
646
647   if constants.NV_BRIDGES in what and vm_capable:
648     result[constants.NV_BRIDGES] = [bridge
649                                     for bridge in what[constants.NV_BRIDGES]
650                                     if not utils.BridgeExists(bridge)]
651   return result
652
653
654 def GetBlockDevSizes(devices):
655   """Return the size of the given block devices
656
657   @type devices: list
658   @param devices: list of block device nodes to query
659   @rtype: dict
660   @return:
661     dictionary of all block devices under /dev (key). The value is their
662     size in MiB.
663
664     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
665
666   """
667   DEV_PREFIX = "/dev/"
668   blockdevs = {}
669
670   for devpath in devices:
671     if os.path.commonprefix([DEV_PREFIX, devpath]) != DEV_PREFIX:
672       continue
673
674     try:
675       st = os.stat(devpath)
676     except EnvironmentError, err:
677       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
678       continue
679
680     if stat.S_ISBLK(st.st_mode):
681       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
682       if result.failed:
683         # We don't want to fail, just do not list this device as available
684         logging.warning("Cannot get size for block device %s", devpath)
685         continue
686
687       size = int(result.stdout) / (1024 * 1024)
688       blockdevs[devpath] = size
689   return blockdevs
690
691
692 def GetVolumeList(vg_names):
693   """Compute list of logical volumes and their size.
694
695   @type vg_names: list
696   @param vg_names: the volume groups whose LVs we should list, or
697       empty for all volume groups
698   @rtype: dict
699   @return:
700       dictionary of all partions (key) with value being a tuple of
701       their size (in MiB), inactive and online status::
702
703         {'xenvg/test1': ('20.06', True, True)}
704
705       in case of errors, a string is returned with the error
706       details.
707
708   """
709   lvs = {}
710   sep = "|"
711   if not vg_names:
712     vg_names = []
713   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
714                          "--separator=%s" % sep,
715                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
716   if result.failed:
717     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
718
719   for line in result.stdout.splitlines():
720     line = line.strip()
721     match = _LVSLINE_REGEX.match(line)
722     if not match:
723       logging.error("Invalid line returned from lvs output: '%s'", line)
724       continue
725     vg_name, name, size, attr = match.groups()
726     inactive = attr[4] == "-"
727     online = attr[5] == "o"
728     virtual = attr[0] == "v"
729     if virtual:
730       # we don't want to report such volumes as existing, since they
731       # don't really hold data
732       continue
733     lvs[vg_name+"/"+name] = (size, inactive, online)
734
735   return lvs
736
737
738 def ListVolumeGroups():
739   """List the volume groups and their size.
740
741   @rtype: dict
742   @return: dictionary with keys volume name and values the
743       size of the volume
744
745   """
746   return utils.ListVolumeGroups()
747
748
749 def NodeVolumes():
750   """List all volumes on this node.
751
752   @rtype: list
753   @return:
754     A list of dictionaries, each having four keys:
755       - name: the logical volume name,
756       - size: the size of the logical volume
757       - dev: the physical device on which the LV lives
758       - vg: the volume group to which it belongs
759
760     In case of errors, we return an empty list and log the
761     error.
762
763     Note that since a logical volume can live on multiple physical
764     volumes, the resulting list might include a logical volume
765     multiple times.
766
767   """
768   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
769                          "--separator=|",
770                          "--options=lv_name,lv_size,devices,vg_name"])
771   if result.failed:
772     _Fail("Failed to list logical volumes, lvs output: %s",
773           result.output)
774
775   def parse_dev(dev):
776     return dev.split("(")[0]
777
778   def handle_dev(dev):
779     return [parse_dev(x) for x in dev.split(",")]
780
781   def map_line(line):
782     line = [v.strip() for v in line]
783     return [{"name": line[0], "size": line[1],
784              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
785
786   all_devs = []
787   for line in result.stdout.splitlines():
788     if line.count("|") >= 3:
789       all_devs.extend(map_line(line.split("|")))
790     else:
791       logging.warning("Strange line in the output from lvs: '%s'", line)
792   return all_devs
793
794
795 def BridgesExist(bridges_list):
796   """Check if a list of bridges exist on the current node.
797
798   @rtype: boolean
799   @return: C{True} if all of them exist, C{False} otherwise
800
801   """
802   missing = []
803   for bridge in bridges_list:
804     if not utils.BridgeExists(bridge):
805       missing.append(bridge)
806
807   if missing:
808     _Fail("Missing bridges %s", utils.CommaJoin(missing))
809
810
811 def GetInstanceList(hypervisor_list):
812   """Provides a list of instances.
813
814   @type hypervisor_list: list
815   @param hypervisor_list: the list of hypervisors to query information
816
817   @rtype: list
818   @return: a list of all running instances on the current node
819     - instance1.example.com
820     - instance2.example.com
821
822   """
823   results = []
824   for hname in hypervisor_list:
825     try:
826       names = hypervisor.GetHypervisor(hname).ListInstances()
827       results.extend(names)
828     except errors.HypervisorError, err:
829       _Fail("Error enumerating instances (hypervisor %s): %s",
830             hname, err, exc=True)
831
832   return results
833
834
835 def GetInstanceInfo(instance, hname):
836   """Gives back the information about an instance as a dictionary.
837
838   @type instance: string
839   @param instance: the instance name
840   @type hname: string
841   @param hname: the hypervisor type of the instance
842
843   @rtype: dict
844   @return: dictionary with the following keys:
845       - memory: memory size of instance (int)
846       - state: xen state of instance (string)
847       - time: cpu time of instance (float)
848
849   """
850   output = {}
851
852   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
853   if iinfo is not None:
854     output["memory"] = iinfo[2]
855     output["state"] = iinfo[4]
856     output["time"] = iinfo[5]
857
858   return output
859
860
861 def GetInstanceMigratable(instance):
862   """Gives whether an instance can be migrated.
863
864   @type instance: L{objects.Instance}
865   @param instance: object representing the instance to be checked.
866
867   @rtype: tuple
868   @return: tuple of (result, description) where:
869       - result: whether the instance can be migrated or not
870       - description: a description of the issue, if relevant
871
872   """
873   hyper = hypervisor.GetHypervisor(instance.hypervisor)
874   iname = instance.name
875   if iname not in hyper.ListInstances():
876     _Fail("Instance %s is not running", iname)
877
878   for idx in range(len(instance.disks)):
879     link_name = _GetBlockDevSymlinkPath(iname, idx)
880     if not os.path.islink(link_name):
881       logging.warning("Instance %s is missing symlink %s for disk %d",
882                       iname, link_name, idx)
883
884
885 def GetAllInstancesInfo(hypervisor_list):
886   """Gather data about all instances.
887
888   This is the equivalent of L{GetInstanceInfo}, except that it
889   computes data for all instances at once, thus being faster if one
890   needs data about more than one instance.
891
892   @type hypervisor_list: list
893   @param hypervisor_list: list of hypervisors to query for instance data
894
895   @rtype: dict
896   @return: dictionary of instance: data, with data having the following keys:
897       - memory: memory size of instance (int)
898       - state: xen state of instance (string)
899       - time: cpu time of instance (float)
900       - vcpus: the number of vcpus
901
902   """
903   output = {}
904
905   for hname in hypervisor_list:
906     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
907     if iinfo:
908       for name, _, memory, vcpus, state, times in iinfo:
909         value = {
910           "memory": memory,
911           "vcpus": vcpus,
912           "state": state,
913           "time": times,
914           }
915         if name in output:
916           # we only check static parameters, like memory and vcpus,
917           # and not state and time which can change between the
918           # invocations of the different hypervisors
919           for key in "memory", "vcpus":
920             if value[key] != output[name][key]:
921               _Fail("Instance %s is running twice"
922                     " with different parameters", name)
923         output[name] = value
924
925   return output
926
927
928 def _InstanceLogName(kind, os_name, instance):
929   """Compute the OS log filename for a given instance and operation.
930
931   The instance name and os name are passed in as strings since not all
932   operations have these as part of an instance object.
933
934   @type kind: string
935   @param kind: the operation type (e.g. add, import, etc.)
936   @type os_name: string
937   @param os_name: the os name
938   @type instance: string
939   @param instance: the name of the instance being imported/added/etc.
940
941   """
942   # TODO: Use tempfile.mkstemp to create unique filename
943   base = ("%s-%s-%s-%s.log" %
944           (kind, os_name, instance, utils.TimestampForFilename()))
945   return utils.PathJoin(constants.LOG_OS_DIR, base)
946
947
948 def InstanceOsAdd(instance, reinstall, debug):
949   """Add an OS to an instance.
950
951   @type instance: L{objects.Instance}
952   @param instance: Instance whose OS is to be installed
953   @type reinstall: boolean
954   @param reinstall: whether this is an instance reinstall
955   @type debug: integer
956   @param debug: debug level, passed to the OS scripts
957   @rtype: None
958
959   """
960   inst_os = OSFromDisk(instance.os)
961
962   create_env = OSEnvironment(instance, inst_os, debug)
963   if reinstall:
964     create_env["INSTANCE_REINSTALL"] = "1"
965
966   logfile = _InstanceLogName("add", instance.os, instance.name)
967
968   result = utils.RunCmd([inst_os.create_script], env=create_env,
969                         cwd=inst_os.path, output=logfile, reset_env=True)
970   if result.failed:
971     logging.error("os create command '%s' returned error: %s, logfile: %s,"
972                   " output: %s", result.cmd, result.fail_reason, logfile,
973                   result.output)
974     lines = [utils.SafeEncode(val)
975              for val in utils.TailFile(logfile, lines=20)]
976     _Fail("OS create script failed (%s), last lines in the"
977           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
978
979
980 def RunRenameInstance(instance, old_name, debug):
981   """Run the OS rename script for an instance.
982
983   @type instance: L{objects.Instance}
984   @param instance: Instance whose OS is to be installed
985   @type old_name: string
986   @param old_name: previous instance name
987   @type debug: integer
988   @param debug: debug level, passed to the OS scripts
989   @rtype: boolean
990   @return: the success of the operation
991
992   """
993   inst_os = OSFromDisk(instance.os)
994
995   rename_env = OSEnvironment(instance, inst_os, debug)
996   rename_env["OLD_INSTANCE_NAME"] = old_name
997
998   logfile = _InstanceLogName("rename", instance.os,
999                              "%s-%s" % (old_name, instance.name))
1000
1001   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1002                         cwd=inst_os.path, output=logfile, reset_env=True)
1003
1004   if result.failed:
1005     logging.error("os create command '%s' returned error: %s output: %s",
1006                   result.cmd, result.fail_reason, result.output)
1007     lines = [utils.SafeEncode(val)
1008              for val in utils.TailFile(logfile, lines=20)]
1009     _Fail("OS rename script failed (%s), last lines in the"
1010           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1011
1012
1013 def _GetBlockDevSymlinkPath(instance_name, idx):
1014   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1015                         (instance_name, constants.DISK_SEPARATOR, idx))
1016
1017
1018 def _SymlinkBlockDev(instance_name, device_path, idx):
1019   """Set up symlinks to a instance's block device.
1020
1021   This is an auxiliary function run when an instance is start (on the primary
1022   node) or when an instance is migrated (on the target node).
1023
1024
1025   @param instance_name: the name of the target instance
1026   @param device_path: path of the physical block device, on the node
1027   @param idx: the disk index
1028   @return: absolute path to the disk's symlink
1029
1030   """
1031   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1032   try:
1033     os.symlink(device_path, link_name)
1034   except OSError, err:
1035     if err.errno == errno.EEXIST:
1036       if (not os.path.islink(link_name) or
1037           os.readlink(link_name) != device_path):
1038         os.remove(link_name)
1039         os.symlink(device_path, link_name)
1040     else:
1041       raise
1042
1043   return link_name
1044
1045
1046 def _RemoveBlockDevLinks(instance_name, disks):
1047   """Remove the block device symlinks belonging to the given instance.
1048
1049   """
1050   for idx, _ in enumerate(disks):
1051     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1052     if os.path.islink(link_name):
1053       try:
1054         os.remove(link_name)
1055       except OSError:
1056         logging.exception("Can't remove symlink '%s'", link_name)
1057
1058
1059 def _GatherAndLinkBlockDevs(instance):
1060   """Set up an instance's block device(s).
1061
1062   This is run on the primary node at instance startup. The block
1063   devices must be already assembled.
1064
1065   @type instance: L{objects.Instance}
1066   @param instance: the instance whose disks we shoul assemble
1067   @rtype: list
1068   @return: list of (disk_object, device_path)
1069
1070   """
1071   block_devices = []
1072   for idx, disk in enumerate(instance.disks):
1073     device = _RecursiveFindBD(disk)
1074     if device is None:
1075       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1076                                     str(disk))
1077     device.Open()
1078     try:
1079       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1080     except OSError, e:
1081       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1082                                     e.strerror)
1083
1084     block_devices.append((disk, link_name))
1085
1086   return block_devices
1087
1088
1089 def StartInstance(instance, startup_paused):
1090   """Start an instance.
1091
1092   @type instance: L{objects.Instance}
1093   @param instance: the instance object
1094   @type startup_paused: bool
1095   @param instance: pause instance at startup?
1096   @rtype: None
1097
1098   """
1099   running_instances = GetInstanceList([instance.hypervisor])
1100
1101   if instance.name in running_instances:
1102     logging.info("Instance %s already running, not starting", instance.name)
1103     return
1104
1105   try:
1106     block_devices = _GatherAndLinkBlockDevs(instance)
1107     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1108     hyper.StartInstance(instance, block_devices, startup_paused)
1109   except errors.BlockDeviceError, err:
1110     _Fail("Block device error: %s", err, exc=True)
1111   except errors.HypervisorError, err:
1112     _RemoveBlockDevLinks(instance.name, instance.disks)
1113     _Fail("Hypervisor error: %s", err, exc=True)
1114
1115
1116 def InstanceShutdown(instance, timeout):
1117   """Shut an instance down.
1118
1119   @note: this functions uses polling with a hardcoded timeout.
1120
1121   @type instance: L{objects.Instance}
1122   @param instance: the instance object
1123   @type timeout: integer
1124   @param timeout: maximum timeout for soft shutdown
1125   @rtype: None
1126
1127   """
1128   hv_name = instance.hypervisor
1129   hyper = hypervisor.GetHypervisor(hv_name)
1130   iname = instance.name
1131
1132   if instance.name not in hyper.ListInstances():
1133     logging.info("Instance %s not running, doing nothing", iname)
1134     return
1135
1136   class _TryShutdown:
1137     def __init__(self):
1138       self.tried_once = False
1139
1140     def __call__(self):
1141       if iname not in hyper.ListInstances():
1142         return
1143
1144       try:
1145         hyper.StopInstance(instance, retry=self.tried_once)
1146       except errors.HypervisorError, err:
1147         if iname not in hyper.ListInstances():
1148           # if the instance is no longer existing, consider this a
1149           # success and go to cleanup
1150           return
1151
1152         _Fail("Failed to stop instance %s: %s", iname, err)
1153
1154       self.tried_once = True
1155
1156       raise utils.RetryAgain()
1157
1158   try:
1159     utils.Retry(_TryShutdown(), 5, timeout)
1160   except utils.RetryTimeout:
1161     # the shutdown did not succeed
1162     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1163
1164     try:
1165       hyper.StopInstance(instance, force=True)
1166     except errors.HypervisorError, err:
1167       if iname in hyper.ListInstances():
1168         # only raise an error if the instance still exists, otherwise
1169         # the error could simply be "instance ... unknown"!
1170         _Fail("Failed to force stop instance %s: %s", iname, err)
1171
1172     time.sleep(1)
1173
1174     if iname in hyper.ListInstances():
1175       _Fail("Could not shutdown instance %s even by destroy", iname)
1176
1177   try:
1178     hyper.CleanupInstance(instance.name)
1179   except errors.HypervisorError, err:
1180     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1181
1182   _RemoveBlockDevLinks(iname, instance.disks)
1183
1184
1185 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1186   """Reboot an instance.
1187
1188   @type instance: L{objects.Instance}
1189   @param instance: the instance object to reboot
1190   @type reboot_type: str
1191   @param reboot_type: the type of reboot, one the following
1192     constants:
1193       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1194         instance OS, do not recreate the VM
1195       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1196         restart the VM (at the hypervisor level)
1197       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1198         not accepted here, since that mode is handled differently, in
1199         cmdlib, and translates into full stop and start of the
1200         instance (instead of a call_instance_reboot RPC)
1201   @type shutdown_timeout: integer
1202   @param shutdown_timeout: maximum timeout for soft shutdown
1203   @rtype: None
1204
1205   """
1206   running_instances = GetInstanceList([instance.hypervisor])
1207
1208   if instance.name not in running_instances:
1209     _Fail("Cannot reboot instance %s that is not running", instance.name)
1210
1211   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1212   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1213     try:
1214       hyper.RebootInstance(instance)
1215     except errors.HypervisorError, err:
1216       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1217   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1218     try:
1219       InstanceShutdown(instance, shutdown_timeout)
1220       return StartInstance(instance, False)
1221     except errors.HypervisorError, err:
1222       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1223   else:
1224     _Fail("Invalid reboot_type received: %s", reboot_type)
1225
1226
1227 def MigrationInfo(instance):
1228   """Gather information about an instance to be migrated.
1229
1230   @type instance: L{objects.Instance}
1231   @param instance: the instance definition
1232
1233   """
1234   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1235   try:
1236     info = hyper.MigrationInfo(instance)
1237   except errors.HypervisorError, err:
1238     _Fail("Failed to fetch migration information: %s", err, exc=True)
1239   return info
1240
1241
1242 def AcceptInstance(instance, info, target):
1243   """Prepare the node to accept an instance.
1244
1245   @type instance: L{objects.Instance}
1246   @param instance: the instance definition
1247   @type info: string/data (opaque)
1248   @param info: migration information, from the source node
1249   @type target: string
1250   @param target: target host (usually ip), on this node
1251
1252   """
1253   # TODO: why is this required only for DTS_EXT_MIRROR?
1254   if instance.disk_template in constants.DTS_EXT_MIRROR:
1255     # Create the symlinks, as the disks are not active
1256     # in any way
1257     try:
1258       _GatherAndLinkBlockDevs(instance)
1259     except errors.BlockDeviceError, err:
1260       _Fail("Block device error: %s", err, exc=True)
1261
1262   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1263   try:
1264     hyper.AcceptInstance(instance, info, target)
1265   except errors.HypervisorError, err:
1266     if instance.disk_template in constants.DTS_EXT_MIRROR:
1267       _RemoveBlockDevLinks(instance.name, instance.disks)
1268     _Fail("Failed to accept instance: %s", err, exc=True)
1269
1270
1271 def FinalizeMigration(instance, info, success):
1272   """Finalize any preparation to accept an instance.
1273
1274   @type instance: L{objects.Instance}
1275   @param instance: the instance definition
1276   @type info: string/data (opaque)
1277   @param info: migration information, from the source node
1278   @type success: boolean
1279   @param success: whether the migration was a success or a failure
1280
1281   """
1282   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1283   try:
1284     hyper.FinalizeMigration(instance, info, success)
1285   except errors.HypervisorError, err:
1286     _Fail("Failed to finalize migration: %s", err, exc=True)
1287
1288
1289 def MigrateInstance(instance, target, live):
1290   """Migrates an instance to another node.
1291
1292   @type instance: L{objects.Instance}
1293   @param instance: the instance definition
1294   @type target: string
1295   @param target: the target node name
1296   @type live: boolean
1297   @param live: whether the migration should be done live or not (the
1298       interpretation of this parameter is left to the hypervisor)
1299   @rtype: tuple
1300   @return: a tuple of (success, msg) where:
1301       - succes is a boolean denoting the success/failure of the operation
1302       - msg is a string with details in case of failure
1303
1304   """
1305   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1306
1307   try:
1308     hyper.MigrateInstance(instance, target, live)
1309   except errors.HypervisorError, err:
1310     _Fail("Failed to migrate instance: %s", err, exc=True)
1311
1312
1313 def BlockdevCreate(disk, size, owner, on_primary, info):
1314   """Creates a block device for an instance.
1315
1316   @type disk: L{objects.Disk}
1317   @param disk: the object describing the disk we should create
1318   @type size: int
1319   @param size: the size of the physical underlying device, in MiB
1320   @type owner: str
1321   @param owner: the name of the instance for which disk is created,
1322       used for device cache data
1323   @type on_primary: boolean
1324   @param on_primary:  indicates if it is the primary node or not
1325   @type info: string
1326   @param info: string that will be sent to the physical device
1327       creation, used for example to set (LVM) tags on LVs
1328
1329   @return: the new unique_id of the device (this can sometime be
1330       computed only after creation), or None. On secondary nodes,
1331       it's not required to return anything.
1332
1333   """
1334   # TODO: remove the obsolete "size" argument
1335   # pylint: disable-msg=W0613
1336   clist = []
1337   if disk.children:
1338     for child in disk.children:
1339       try:
1340         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1341       except errors.BlockDeviceError, err:
1342         _Fail("Can't assemble device %s: %s", child, err)
1343       if on_primary or disk.AssembleOnSecondary():
1344         # we need the children open in case the device itself has to
1345         # be assembled
1346         try:
1347           # pylint: disable-msg=E1103
1348           crdev.Open()
1349         except errors.BlockDeviceError, err:
1350           _Fail("Can't make child '%s' read-write: %s", child, err)
1351       clist.append(crdev)
1352
1353   try:
1354     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1355   except errors.BlockDeviceError, err:
1356     _Fail("Can't create block device: %s", err)
1357
1358   if on_primary or disk.AssembleOnSecondary():
1359     try:
1360       device.Assemble()
1361     except errors.BlockDeviceError, err:
1362       _Fail("Can't assemble device after creation, unusual event: %s", err)
1363     device.SetSyncSpeed(constants.SYNC_SPEED)
1364     if on_primary or disk.OpenOnSecondary():
1365       try:
1366         device.Open(force=True)
1367       except errors.BlockDeviceError, err:
1368         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1369     DevCacheManager.UpdateCache(device.dev_path, owner,
1370                                 on_primary, disk.iv_name)
1371
1372   device.SetInfo(info)
1373
1374   return device.unique_id
1375
1376
1377 def _WipeDevice(path, offset, size):
1378   """This function actually wipes the device.
1379
1380   @param path: The path to the device to wipe
1381   @param offset: The offset in MiB in the file
1382   @param size: The size in MiB to write
1383
1384   """
1385   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1386          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1387          "count=%d" % size]
1388   result = utils.RunCmd(cmd)
1389
1390   if result.failed:
1391     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1392           result.fail_reason, result.output)
1393
1394
1395 def BlockdevWipe(disk, offset, size):
1396   """Wipes a block device.
1397
1398   @type disk: L{objects.Disk}
1399   @param disk: the disk object we want to wipe
1400   @type offset: int
1401   @param offset: The offset in MiB in the file
1402   @type size: int
1403   @param size: The size in MiB to write
1404
1405   """
1406   try:
1407     rdev = _RecursiveFindBD(disk)
1408   except errors.BlockDeviceError:
1409     rdev = None
1410
1411   if not rdev:
1412     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1413
1414   # Do cross verify some of the parameters
1415   if offset > rdev.size:
1416     _Fail("Offset is bigger than device size")
1417   if (offset + size) > rdev.size:
1418     _Fail("The provided offset and size to wipe is bigger than device size")
1419
1420   _WipeDevice(rdev.dev_path, offset, size)
1421
1422
1423 def BlockdevPauseResumeSync(disks, pause):
1424   """Pause or resume the sync of the block device.
1425
1426   @type disks: list of L{objects.Disk}
1427   @param disks: the disks object we want to pause/resume
1428   @type pause: bool
1429   @param pause: Wheater to pause or resume
1430
1431   """
1432   success = []
1433   for disk in disks:
1434     try:
1435       rdev = _RecursiveFindBD(disk)
1436     except errors.BlockDeviceError:
1437       rdev = None
1438
1439     if not rdev:
1440       success.append((False, ("Cannot change sync for device %s:"
1441                               " device not found" % disk.iv_name)))
1442       continue
1443
1444     result = rdev.PauseResumeSync(pause)
1445
1446     if result:
1447       success.append((result, None))
1448     else:
1449       if pause:
1450         msg = "Pause"
1451       else:
1452         msg = "Resume"
1453       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1454
1455   return success
1456
1457
1458 def BlockdevRemove(disk):
1459   """Remove a block device.
1460
1461   @note: This is intended to be called recursively.
1462
1463   @type disk: L{objects.Disk}
1464   @param disk: the disk object we should remove
1465   @rtype: boolean
1466   @return: the success of the operation
1467
1468   """
1469   msgs = []
1470   try:
1471     rdev = _RecursiveFindBD(disk)
1472   except errors.BlockDeviceError, err:
1473     # probably can't attach
1474     logging.info("Can't attach to device %s in remove", disk)
1475     rdev = None
1476   if rdev is not None:
1477     r_path = rdev.dev_path
1478     try:
1479       rdev.Remove()
1480     except errors.BlockDeviceError, err:
1481       msgs.append(str(err))
1482     if not msgs:
1483       DevCacheManager.RemoveCache(r_path)
1484
1485   if disk.children:
1486     for child in disk.children:
1487       try:
1488         BlockdevRemove(child)
1489       except RPCFail, err:
1490         msgs.append(str(err))
1491
1492   if msgs:
1493     _Fail("; ".join(msgs))
1494
1495
1496 def _RecursiveAssembleBD(disk, owner, as_primary):
1497   """Activate a block device for an instance.
1498
1499   This is run on the primary and secondary nodes for an instance.
1500
1501   @note: this function is called recursively.
1502
1503   @type disk: L{objects.Disk}
1504   @param disk: the disk we try to assemble
1505   @type owner: str
1506   @param owner: the name of the instance which owns the disk
1507   @type as_primary: boolean
1508   @param as_primary: if we should make the block device
1509       read/write
1510
1511   @return: the assembled device or None (in case no device
1512       was assembled)
1513   @raise errors.BlockDeviceError: in case there is an error
1514       during the activation of the children or the device
1515       itself
1516
1517   """
1518   children = []
1519   if disk.children:
1520     mcn = disk.ChildrenNeeded()
1521     if mcn == -1:
1522       mcn = 0 # max number of Nones allowed
1523     else:
1524       mcn = len(disk.children) - mcn # max number of Nones
1525     for chld_disk in disk.children:
1526       try:
1527         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1528       except errors.BlockDeviceError, err:
1529         if children.count(None) >= mcn:
1530           raise
1531         cdev = None
1532         logging.error("Error in child activation (but continuing): %s",
1533                       str(err))
1534       children.append(cdev)
1535
1536   if as_primary or disk.AssembleOnSecondary():
1537     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1538     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1539     result = r_dev
1540     if as_primary or disk.OpenOnSecondary():
1541       r_dev.Open()
1542     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1543                                 as_primary, disk.iv_name)
1544
1545   else:
1546     result = True
1547   return result
1548
1549
1550 def BlockdevAssemble(disk, owner, as_primary, idx):
1551   """Activate a block device for an instance.
1552
1553   This is a wrapper over _RecursiveAssembleBD.
1554
1555   @rtype: str or boolean
1556   @return: a C{/dev/...} path for primary nodes, and
1557       C{True} for secondary nodes
1558
1559   """
1560   try:
1561     result = _RecursiveAssembleBD(disk, owner, as_primary)
1562     if isinstance(result, bdev.BlockDev):
1563       # pylint: disable-msg=E1103
1564       result = result.dev_path
1565       if as_primary:
1566         _SymlinkBlockDev(owner, result, idx)
1567   except errors.BlockDeviceError, err:
1568     _Fail("Error while assembling disk: %s", err, exc=True)
1569   except OSError, err:
1570     _Fail("Error while symlinking disk: %s", err, exc=True)
1571
1572   return result
1573
1574
1575 def BlockdevShutdown(disk):
1576   """Shut down a block device.
1577
1578   First, if the device is assembled (Attach() is successful), then
1579   the device is shutdown. Then the children of the device are
1580   shutdown.
1581
1582   This function is called recursively. Note that we don't cache the
1583   children or such, as oppossed to assemble, shutdown of different
1584   devices doesn't require that the upper device was active.
1585
1586   @type disk: L{objects.Disk}
1587   @param disk: the description of the disk we should
1588       shutdown
1589   @rtype: None
1590
1591   """
1592   msgs = []
1593   r_dev = _RecursiveFindBD(disk)
1594   if r_dev is not None:
1595     r_path = r_dev.dev_path
1596     try:
1597       r_dev.Shutdown()
1598       DevCacheManager.RemoveCache(r_path)
1599     except errors.BlockDeviceError, err:
1600       msgs.append(str(err))
1601
1602   if disk.children:
1603     for child in disk.children:
1604       try:
1605         BlockdevShutdown(child)
1606       except RPCFail, err:
1607         msgs.append(str(err))
1608
1609   if msgs:
1610     _Fail("; ".join(msgs))
1611
1612
1613 def BlockdevAddchildren(parent_cdev, new_cdevs):
1614   """Extend a mirrored block device.
1615
1616   @type parent_cdev: L{objects.Disk}
1617   @param parent_cdev: the disk to which we should add children
1618   @type new_cdevs: list of L{objects.Disk}
1619   @param new_cdevs: the list of children which we should add
1620   @rtype: None
1621
1622   """
1623   parent_bdev = _RecursiveFindBD(parent_cdev)
1624   if parent_bdev is None:
1625     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1626   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1627   if new_bdevs.count(None) > 0:
1628     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1629   parent_bdev.AddChildren(new_bdevs)
1630
1631
1632 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1633   """Shrink a mirrored block device.
1634
1635   @type parent_cdev: L{objects.Disk}
1636   @param parent_cdev: the disk from which we should remove children
1637   @type new_cdevs: list of L{objects.Disk}
1638   @param new_cdevs: the list of children which we should remove
1639   @rtype: None
1640
1641   """
1642   parent_bdev = _RecursiveFindBD(parent_cdev)
1643   if parent_bdev is None:
1644     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1645   devs = []
1646   for disk in new_cdevs:
1647     rpath = disk.StaticDevPath()
1648     if rpath is None:
1649       bd = _RecursiveFindBD(disk)
1650       if bd is None:
1651         _Fail("Can't find device %s while removing children", disk)
1652       else:
1653         devs.append(bd.dev_path)
1654     else:
1655       if not utils.IsNormAbsPath(rpath):
1656         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1657       devs.append(rpath)
1658   parent_bdev.RemoveChildren(devs)
1659
1660
1661 def BlockdevGetmirrorstatus(disks):
1662   """Get the mirroring status of a list of devices.
1663
1664   @type disks: list of L{objects.Disk}
1665   @param disks: the list of disks which we should query
1666   @rtype: disk
1667   @return: List of L{objects.BlockDevStatus}, one for each disk
1668   @raise errors.BlockDeviceError: if any of the disks cannot be
1669       found
1670
1671   """
1672   stats = []
1673   for dsk in disks:
1674     rbd = _RecursiveFindBD(dsk)
1675     if rbd is None:
1676       _Fail("Can't find device %s", dsk)
1677
1678     stats.append(rbd.CombinedSyncStatus())
1679
1680   return stats
1681
1682
1683 def BlockdevGetmirrorstatusMulti(disks):
1684   """Get the mirroring status of a list of devices.
1685
1686   @type disks: list of L{objects.Disk}
1687   @param disks: the list of disks which we should query
1688   @rtype: disk
1689   @return: List of tuples, (bool, status), one for each disk; bool denotes
1690     success/failure, status is L{objects.BlockDevStatus} on success, string
1691     otherwise
1692
1693   """
1694   result = []
1695   for disk in disks:
1696     try:
1697       rbd = _RecursiveFindBD(disk)
1698       if rbd is None:
1699         result.append((False, "Can't find device %s" % disk))
1700         continue
1701
1702       status = rbd.CombinedSyncStatus()
1703     except errors.BlockDeviceError, err:
1704       logging.exception("Error while getting disk status")
1705       result.append((False, str(err)))
1706     else:
1707       result.append((True, status))
1708
1709   assert len(disks) == len(result)
1710
1711   return result
1712
1713
1714 def _RecursiveFindBD(disk):
1715   """Check if a device is activated.
1716
1717   If so, return information about the real device.
1718
1719   @type disk: L{objects.Disk}
1720   @param disk: the disk object we need to find
1721
1722   @return: None if the device can't be found,
1723       otherwise the device instance
1724
1725   """
1726   children = []
1727   if disk.children:
1728     for chdisk in disk.children:
1729       children.append(_RecursiveFindBD(chdisk))
1730
1731   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1732
1733
1734 def _OpenRealBD(disk):
1735   """Opens the underlying block device of a disk.
1736
1737   @type disk: L{objects.Disk}
1738   @param disk: the disk object we want to open
1739
1740   """
1741   real_disk = _RecursiveFindBD(disk)
1742   if real_disk is None:
1743     _Fail("Block device '%s' is not set up", disk)
1744
1745   real_disk.Open()
1746
1747   return real_disk
1748
1749
1750 def BlockdevFind(disk):
1751   """Check if a device is activated.
1752
1753   If it is, return information about the real device.
1754
1755   @type disk: L{objects.Disk}
1756   @param disk: the disk to find
1757   @rtype: None or objects.BlockDevStatus
1758   @return: None if the disk cannot be found, otherwise a the current
1759            information
1760
1761   """
1762   try:
1763     rbd = _RecursiveFindBD(disk)
1764   except errors.BlockDeviceError, err:
1765     _Fail("Failed to find device: %s", err, exc=True)
1766
1767   if rbd is None:
1768     return None
1769
1770   return rbd.GetSyncStatus()
1771
1772
1773 def BlockdevGetsize(disks):
1774   """Computes the size of the given disks.
1775
1776   If a disk is not found, returns None instead.
1777
1778   @type disks: list of L{objects.Disk}
1779   @param disks: the list of disk to compute the size for
1780   @rtype: list
1781   @return: list with elements None if the disk cannot be found,
1782       otherwise the size
1783
1784   """
1785   result = []
1786   for cf in disks:
1787     try:
1788       rbd = _RecursiveFindBD(cf)
1789     except errors.BlockDeviceError:
1790       result.append(None)
1791       continue
1792     if rbd is None:
1793       result.append(None)
1794     else:
1795       result.append(rbd.GetActualSize())
1796   return result
1797
1798
1799 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1800   """Export a block device to a remote node.
1801
1802   @type disk: L{objects.Disk}
1803   @param disk: the description of the disk to export
1804   @type dest_node: str
1805   @param dest_node: the destination node to export to
1806   @type dest_path: str
1807   @param dest_path: the destination path on the target node
1808   @type cluster_name: str
1809   @param cluster_name: the cluster name, needed for SSH hostalias
1810   @rtype: None
1811
1812   """
1813   real_disk = _OpenRealBD(disk)
1814
1815   # the block size on the read dd is 1MiB to match our units
1816   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1817                                "dd if=%s bs=1048576 count=%s",
1818                                real_disk.dev_path, str(disk.size))
1819
1820   # we set here a smaller block size as, due to ssh buffering, more
1821   # than 64-128k will mostly ignored; we use nocreat to fail if the
1822   # device is not already there or we pass a wrong path; we use
1823   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1824   # to not buffer too much memory; this means that at best, we flush
1825   # every 64k, which will not be very fast
1826   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1827                                 " oflag=dsync", dest_path)
1828
1829   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1830                                                    constants.GANETI_RUNAS,
1831                                                    destcmd)
1832
1833   # all commands have been checked, so we're safe to combine them
1834   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1835
1836   result = utils.RunCmd(["bash", "-c", command])
1837
1838   if result.failed:
1839     _Fail("Disk copy command '%s' returned error: %s"
1840           " output: %s", command, result.fail_reason, result.output)
1841
1842
1843 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1844   """Write a file to the filesystem.
1845
1846   This allows the master to overwrite(!) a file. It will only perform
1847   the operation if the file belongs to a list of configuration files.
1848
1849   @type file_name: str
1850   @param file_name: the target file name
1851   @type data: str
1852   @param data: the new contents of the file
1853   @type mode: int
1854   @param mode: the mode to give the file (can be None)
1855   @type uid: string
1856   @param uid: the owner of the file
1857   @type gid: string
1858   @param gid: the group of the file
1859   @type atime: float
1860   @param atime: the atime to set on the file (can be None)
1861   @type mtime: float
1862   @param mtime: the mtime to set on the file (can be None)
1863   @rtype: None
1864
1865   """
1866   if not os.path.isabs(file_name):
1867     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1868
1869   if file_name not in _ALLOWED_UPLOAD_FILES:
1870     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1871           file_name)
1872
1873   raw_data = _Decompress(data)
1874
1875   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1876     _Fail("Invalid username/groupname type")
1877
1878   getents = runtime.GetEnts()
1879   uid = getents.LookupUser(uid)
1880   gid = getents.LookupGroup(gid)
1881
1882   utils.SafeWriteFile(file_name, None,
1883                       data=raw_data, mode=mode, uid=uid, gid=gid,
1884                       atime=atime, mtime=mtime)
1885
1886
1887 def RunOob(oob_program, command, node, timeout):
1888   """Executes oob_program with given command on given node.
1889
1890   @param oob_program: The path to the executable oob_program
1891   @param command: The command to invoke on oob_program
1892   @param node: The node given as an argument to the program
1893   @param timeout: Timeout after which we kill the oob program
1894
1895   @return: stdout
1896   @raise RPCFail: If execution fails for some reason
1897
1898   """
1899   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1900
1901   if result.failed:
1902     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1903           result.fail_reason, result.output)
1904
1905   return result.stdout
1906
1907
1908 def WriteSsconfFiles(values):
1909   """Update all ssconf files.
1910
1911   Wrapper around the SimpleStore.WriteFiles.
1912
1913   """
1914   ssconf.SimpleStore().WriteFiles(values)
1915
1916
1917 def _ErrnoOrStr(err):
1918   """Format an EnvironmentError exception.
1919
1920   If the L{err} argument has an errno attribute, it will be looked up
1921   and converted into a textual C{E...} description. Otherwise the
1922   string representation of the error will be returned.
1923
1924   @type err: L{EnvironmentError}
1925   @param err: the exception to format
1926
1927   """
1928   if hasattr(err, "errno"):
1929     detail = errno.errorcode[err.errno]
1930   else:
1931     detail = str(err)
1932   return detail
1933
1934
1935 def _OSOndiskAPIVersion(os_dir):
1936   """Compute and return the API version of a given OS.
1937
1938   This function will try to read the API version of the OS residing in
1939   the 'os_dir' directory.
1940
1941   @type os_dir: str
1942   @param os_dir: the directory in which we should look for the OS
1943   @rtype: tuple
1944   @return: tuple (status, data) with status denoting the validity and
1945       data holding either the vaid versions or an error message
1946
1947   """
1948   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1949
1950   try:
1951     st = os.stat(api_file)
1952   except EnvironmentError, err:
1953     return False, ("Required file '%s' not found under path %s: %s" %
1954                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1955
1956   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1957     return False, ("File '%s' in %s is not a regular file" %
1958                    (constants.OS_API_FILE, os_dir))
1959
1960   try:
1961     api_versions = utils.ReadFile(api_file).splitlines()
1962   except EnvironmentError, err:
1963     return False, ("Error while reading the API version file at %s: %s" %
1964                    (api_file, _ErrnoOrStr(err)))
1965
1966   try:
1967     api_versions = [int(version.strip()) for version in api_versions]
1968   except (TypeError, ValueError), err:
1969     return False, ("API version(s) can't be converted to integer: %s" %
1970                    str(err))
1971
1972   return True, api_versions
1973
1974
1975 def DiagnoseOS(top_dirs=None):
1976   """Compute the validity for all OSes.
1977
1978   @type top_dirs: list
1979   @param top_dirs: the list of directories in which to
1980       search (if not given defaults to
1981       L{constants.OS_SEARCH_PATH})
1982   @rtype: list of L{objects.OS}
1983   @return: a list of tuples (name, path, status, diagnose, variants,
1984       parameters, api_version) for all (potential) OSes under all
1985       search paths, where:
1986           - name is the (potential) OS name
1987           - path is the full path to the OS
1988           - status True/False is the validity of the OS
1989           - diagnose is the error message for an invalid OS, otherwise empty
1990           - variants is a list of supported OS variants, if any
1991           - parameters is a list of (name, help) parameters, if any
1992           - api_version is a list of support OS API versions
1993
1994   """
1995   if top_dirs is None:
1996     top_dirs = constants.OS_SEARCH_PATH
1997
1998   result = []
1999   for dir_name in top_dirs:
2000     if os.path.isdir(dir_name):
2001       try:
2002         f_names = utils.ListVisibleFiles(dir_name)
2003       except EnvironmentError, err:
2004         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2005         break
2006       for name in f_names:
2007         os_path = utils.PathJoin(dir_name, name)
2008         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2009         if status:
2010           diagnose = ""
2011           variants = os_inst.supported_variants
2012           parameters = os_inst.supported_parameters
2013           api_versions = os_inst.api_versions
2014         else:
2015           diagnose = os_inst
2016           variants = parameters = api_versions = []
2017         result.append((name, os_path, status, diagnose, variants,
2018                        parameters, api_versions))
2019
2020   return result
2021
2022
2023 def _TryOSFromDisk(name, base_dir=None):
2024   """Create an OS instance from disk.
2025
2026   This function will return an OS instance if the given name is a
2027   valid OS name.
2028
2029   @type base_dir: string
2030   @keyword base_dir: Base directory containing OS installations.
2031                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2032   @rtype: tuple
2033   @return: success and either the OS instance if we find a valid one,
2034       or error message
2035
2036   """
2037   if base_dir is None:
2038     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2039   else:
2040     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2041
2042   if os_dir is None:
2043     return False, "Directory for OS %s not found in search path" % name
2044
2045   status, api_versions = _OSOndiskAPIVersion(os_dir)
2046   if not status:
2047     # push the error up
2048     return status, api_versions
2049
2050   if not constants.OS_API_VERSIONS.intersection(api_versions):
2051     return False, ("API version mismatch for path '%s': found %s, want %s." %
2052                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2053
2054   # OS Files dictionary, we will populate it with the absolute path
2055   # names; if the value is True, then it is a required file, otherwise
2056   # an optional one
2057   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2058
2059   if max(api_versions) >= constants.OS_API_V15:
2060     os_files[constants.OS_VARIANTS_FILE] = False
2061
2062   if max(api_versions) >= constants.OS_API_V20:
2063     os_files[constants.OS_PARAMETERS_FILE] = True
2064   else:
2065     del os_files[constants.OS_SCRIPT_VERIFY]
2066
2067   for (filename, required) in os_files.items():
2068     os_files[filename] = utils.PathJoin(os_dir, filename)
2069
2070     try:
2071       st = os.stat(os_files[filename])
2072     except EnvironmentError, err:
2073       if err.errno == errno.ENOENT and not required:
2074         del os_files[filename]
2075         continue
2076       return False, ("File '%s' under path '%s' is missing (%s)" %
2077                      (filename, os_dir, _ErrnoOrStr(err)))
2078
2079     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2080       return False, ("File '%s' under path '%s' is not a regular file" %
2081                      (filename, os_dir))
2082
2083     if filename in constants.OS_SCRIPTS:
2084       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2085         return False, ("File '%s' under path '%s' is not executable" %
2086                        (filename, os_dir))
2087
2088   variants = []
2089   if constants.OS_VARIANTS_FILE in os_files:
2090     variants_file = os_files[constants.OS_VARIANTS_FILE]
2091     try:
2092       variants = utils.ReadFile(variants_file).splitlines()
2093     except EnvironmentError, err:
2094       # we accept missing files, but not other errors
2095       if err.errno != errno.ENOENT:
2096         return False, ("Error while reading the OS variants file at %s: %s" %
2097                        (variants_file, _ErrnoOrStr(err)))
2098
2099   parameters = []
2100   if constants.OS_PARAMETERS_FILE in os_files:
2101     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2102     try:
2103       parameters = utils.ReadFile(parameters_file).splitlines()
2104     except EnvironmentError, err:
2105       return False, ("Error while reading the OS parameters file at %s: %s" %
2106                      (parameters_file, _ErrnoOrStr(err)))
2107     parameters = [v.split(None, 1) for v in parameters]
2108
2109   os_obj = objects.OS(name=name, path=os_dir,
2110                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2111                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2112                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2113                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2114                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2115                                                  None),
2116                       supported_variants=variants,
2117                       supported_parameters=parameters,
2118                       api_versions=api_versions)
2119   return True, os_obj
2120
2121
2122 def OSFromDisk(name, base_dir=None):
2123   """Create an OS instance from disk.
2124
2125   This function will return an OS instance if the given name is a
2126   valid OS name. Otherwise, it will raise an appropriate
2127   L{RPCFail} exception, detailing why this is not a valid OS.
2128
2129   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2130   an exception but returns true/false status data.
2131
2132   @type base_dir: string
2133   @keyword base_dir: Base directory containing OS installations.
2134                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2135   @rtype: L{objects.OS}
2136   @return: the OS instance if we find a valid one
2137   @raise RPCFail: if we don't find a valid OS
2138
2139   """
2140   name_only = objects.OS.GetName(name)
2141   status, payload = _TryOSFromDisk(name_only, base_dir)
2142
2143   if not status:
2144     _Fail(payload)
2145
2146   return payload
2147
2148
2149 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2150   """Calculate the basic environment for an os script.
2151
2152   @type os_name: str
2153   @param os_name: full operating system name (including variant)
2154   @type inst_os: L{objects.OS}
2155   @param inst_os: operating system for which the environment is being built
2156   @type os_params: dict
2157   @param os_params: the OS parameters
2158   @type debug: integer
2159   @param debug: debug level (0 or 1, for OS Api 10)
2160   @rtype: dict
2161   @return: dict of environment variables
2162   @raise errors.BlockDeviceError: if the block device
2163       cannot be found
2164
2165   """
2166   result = {}
2167   api_version = \
2168     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2169   result["OS_API_VERSION"] = "%d" % api_version
2170   result["OS_NAME"] = inst_os.name
2171   result["DEBUG_LEVEL"] = "%d" % debug
2172
2173   # OS variants
2174   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2175     variant = objects.OS.GetVariant(os_name)
2176     if not variant:
2177       variant = inst_os.supported_variants[0]
2178   else:
2179     variant = ""
2180   result["OS_VARIANT"] = variant
2181
2182   # OS params
2183   for pname, pvalue in os_params.items():
2184     result["OSP_%s" % pname.upper()] = pvalue
2185
2186   return result
2187
2188
2189 def OSEnvironment(instance, inst_os, debug=0):
2190   """Calculate the environment for an os script.
2191
2192   @type instance: L{objects.Instance}
2193   @param instance: target instance for the os script run
2194   @type inst_os: L{objects.OS}
2195   @param inst_os: operating system for which the environment is being built
2196   @type debug: integer
2197   @param debug: debug level (0 or 1, for OS Api 10)
2198   @rtype: dict
2199   @return: dict of environment variables
2200   @raise errors.BlockDeviceError: if the block device
2201       cannot be found
2202
2203   """
2204   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2205
2206   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2207     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2208
2209   result["HYPERVISOR"] = instance.hypervisor
2210   result["DISK_COUNT"] = "%d" % len(instance.disks)
2211   result["NIC_COUNT"] = "%d" % len(instance.nics)
2212   result["INSTANCE_SECONDARY_NODES"] = \
2213       ("%s" % " ".join(instance.secondary_nodes))
2214
2215   # Disks
2216   for idx, disk in enumerate(instance.disks):
2217     real_disk = _OpenRealBD(disk)
2218     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2219     result["DISK_%d_ACCESS" % idx] = disk.mode
2220     if constants.HV_DISK_TYPE in instance.hvparams:
2221       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2222         instance.hvparams[constants.HV_DISK_TYPE]
2223     if disk.dev_type in constants.LDS_BLOCK:
2224       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2225     elif disk.dev_type == constants.LD_FILE:
2226       result["DISK_%d_BACKEND_TYPE" % idx] = \
2227         "file:%s" % disk.physical_id[0]
2228
2229   # NICs
2230   for idx, nic in enumerate(instance.nics):
2231     result["NIC_%d_MAC" % idx] = nic.mac
2232     if nic.ip:
2233       result["NIC_%d_IP" % idx] = nic.ip
2234     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2235     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2236       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2237     if nic.nicparams[constants.NIC_LINK]:
2238       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2239     if constants.HV_NIC_TYPE in instance.hvparams:
2240       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2241         instance.hvparams[constants.HV_NIC_TYPE]
2242
2243   # HV/BE params
2244   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2245     for key, value in source.items():
2246       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2247
2248   return result
2249
2250
2251 def BlockdevGrow(disk, amount, dryrun):
2252   """Grow a stack of block devices.
2253
2254   This function is called recursively, with the childrens being the
2255   first ones to resize.
2256
2257   @type disk: L{objects.Disk}
2258   @param disk: the disk to be grown
2259   @type amount: integer
2260   @param amount: the amount (in mebibytes) to grow with
2261   @type dryrun: boolean
2262   @param dryrun: whether to execute the operation in simulation mode
2263       only, without actually increasing the size
2264   @rtype: (status, result)
2265   @return: a tuple with the status of the operation (True/False), and
2266       the errors message if status is False
2267
2268   """
2269   r_dev = _RecursiveFindBD(disk)
2270   if r_dev is None:
2271     _Fail("Cannot find block device %s", disk)
2272
2273   try:
2274     r_dev.Grow(amount, dryrun)
2275   except errors.BlockDeviceError, err:
2276     _Fail("Failed to grow block device: %s", err, exc=True)
2277
2278
2279 def BlockdevSnapshot(disk):
2280   """Create a snapshot copy of a block device.
2281
2282   This function is called recursively, and the snapshot is actually created
2283   just for the leaf lvm backend device.
2284
2285   @type disk: L{objects.Disk}
2286   @param disk: the disk to be snapshotted
2287   @rtype: string
2288   @return: snapshot disk ID as (vg, lv)
2289
2290   """
2291   if disk.dev_type == constants.LD_DRBD8:
2292     if not disk.children:
2293       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2294             disk.unique_id)
2295     return BlockdevSnapshot(disk.children[0])
2296   elif disk.dev_type == constants.LD_LV:
2297     r_dev = _RecursiveFindBD(disk)
2298     if r_dev is not None:
2299       # FIXME: choose a saner value for the snapshot size
2300       # let's stay on the safe side and ask for the full size, for now
2301       return r_dev.Snapshot(disk.size)
2302     else:
2303       _Fail("Cannot find block device %s", disk)
2304   else:
2305     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2306           disk.unique_id, disk.dev_type)
2307
2308
2309 def FinalizeExport(instance, snap_disks):
2310   """Write out the export configuration information.
2311
2312   @type instance: L{objects.Instance}
2313   @param instance: the instance which we export, used for
2314       saving configuration
2315   @type snap_disks: list of L{objects.Disk}
2316   @param snap_disks: list of snapshot block devices, which
2317       will be used to get the actual name of the dump file
2318
2319   @rtype: None
2320
2321   """
2322   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2323   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2324
2325   config = objects.SerializableConfigParser()
2326
2327   config.add_section(constants.INISECT_EXP)
2328   config.set(constants.INISECT_EXP, "version", "0")
2329   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2330   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2331   config.set(constants.INISECT_EXP, "os", instance.os)
2332   config.set(constants.INISECT_EXP, "compression", "none")
2333
2334   config.add_section(constants.INISECT_INS)
2335   config.set(constants.INISECT_INS, "name", instance.name)
2336   config.set(constants.INISECT_INS, "memory", "%d" %
2337              instance.beparams[constants.BE_MEMORY])
2338   config.set(constants.INISECT_INS, "vcpus", "%d" %
2339              instance.beparams[constants.BE_VCPUS])
2340   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2341   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2342   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2343
2344   nic_total = 0
2345   for nic_count, nic in enumerate(instance.nics):
2346     nic_total += 1
2347     config.set(constants.INISECT_INS, "nic%d_mac" %
2348                nic_count, "%s" % nic.mac)
2349     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2350     for param in constants.NICS_PARAMETER_TYPES:
2351       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2352                  "%s" % nic.nicparams.get(param, None))
2353   # TODO: redundant: on load can read nics until it doesn't exist
2354   config.set(constants.INISECT_INS, "nic_count" , "%d" % nic_total)
2355
2356   disk_total = 0
2357   for disk_count, disk in enumerate(snap_disks):
2358     if disk:
2359       disk_total += 1
2360       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2361                  ("%s" % disk.iv_name))
2362       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2363                  ("%s" % disk.physical_id[1]))
2364       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2365                  ("%d" % disk.size))
2366
2367   config.set(constants.INISECT_INS, "disk_count" , "%d" % disk_total)
2368
2369   # New-style hypervisor/backend parameters
2370
2371   config.add_section(constants.INISECT_HYP)
2372   for name, value in instance.hvparams.items():
2373     if name not in constants.HVC_GLOBALS:
2374       config.set(constants.INISECT_HYP, name, str(value))
2375
2376   config.add_section(constants.INISECT_BEP)
2377   for name, value in instance.beparams.items():
2378     config.set(constants.INISECT_BEP, name, str(value))
2379
2380   config.add_section(constants.INISECT_OSP)
2381   for name, value in instance.osparams.items():
2382     config.set(constants.INISECT_OSP, name, str(value))
2383
2384   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2385                   data=config.Dumps())
2386   shutil.rmtree(finaldestdir, ignore_errors=True)
2387   shutil.move(destdir, finaldestdir)
2388
2389
2390 def ExportInfo(dest):
2391   """Get export configuration information.
2392
2393   @type dest: str
2394   @param dest: directory containing the export
2395
2396   @rtype: L{objects.SerializableConfigParser}
2397   @return: a serializable config file containing the
2398       export info
2399
2400   """
2401   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2402
2403   config = objects.SerializableConfigParser()
2404   config.read(cff)
2405
2406   if (not config.has_section(constants.INISECT_EXP) or
2407       not config.has_section(constants.INISECT_INS)):
2408     _Fail("Export info file doesn't have the required fields")
2409
2410   return config.Dumps()
2411
2412
2413 def ListExports():
2414   """Return a list of exports currently available on this machine.
2415
2416   @rtype: list
2417   @return: list of the exports
2418
2419   """
2420   if os.path.isdir(constants.EXPORT_DIR):
2421     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2422   else:
2423     _Fail("No exports directory")
2424
2425
2426 def RemoveExport(export):
2427   """Remove an existing export from the node.
2428
2429   @type export: str
2430   @param export: the name of the export to remove
2431   @rtype: None
2432
2433   """
2434   target = utils.PathJoin(constants.EXPORT_DIR, export)
2435
2436   try:
2437     shutil.rmtree(target)
2438   except EnvironmentError, err:
2439     _Fail("Error while removing the export: %s", err, exc=True)
2440
2441
2442 def BlockdevRename(devlist):
2443   """Rename a list of block devices.
2444
2445   @type devlist: list of tuples
2446   @param devlist: list of tuples of the form  (disk,
2447       new_logical_id, new_physical_id); disk is an
2448       L{objects.Disk} object describing the current disk,
2449       and new logical_id/physical_id is the name we
2450       rename it to
2451   @rtype: boolean
2452   @return: True if all renames succeeded, False otherwise
2453
2454   """
2455   msgs = []
2456   result = True
2457   for disk, unique_id in devlist:
2458     dev = _RecursiveFindBD(disk)
2459     if dev is None:
2460       msgs.append("Can't find device %s in rename" % str(disk))
2461       result = False
2462       continue
2463     try:
2464       old_rpath = dev.dev_path
2465       dev.Rename(unique_id)
2466       new_rpath = dev.dev_path
2467       if old_rpath != new_rpath:
2468         DevCacheManager.RemoveCache(old_rpath)
2469         # FIXME: we should add the new cache information here, like:
2470         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2471         # but we don't have the owner here - maybe parse from existing
2472         # cache? for now, we only lose lvm data when we rename, which
2473         # is less critical than DRBD or MD
2474     except errors.BlockDeviceError, err:
2475       msgs.append("Can't rename device '%s' to '%s': %s" %
2476                   (dev, unique_id, err))
2477       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2478       result = False
2479   if not result:
2480     _Fail("; ".join(msgs))
2481
2482
2483 def _TransformFileStorageDir(fs_dir):
2484   """Checks whether given file_storage_dir is valid.
2485
2486   Checks wheter the given fs_dir is within the cluster-wide default
2487   file_storage_dir or the shared_file_storage_dir, which are stored in
2488   SimpleStore. Only paths under those directories are allowed.
2489
2490   @type fs_dir: str
2491   @param fs_dir: the path to check
2492
2493   @return: the normalized path if valid, None otherwise
2494
2495   """
2496   if not constants.ENABLE_FILE_STORAGE:
2497     _Fail("File storage disabled at configure time")
2498   cfg = _GetConfig()
2499   fs_dir = os.path.normpath(fs_dir)
2500   base_fstore = cfg.GetFileStorageDir()
2501   base_shared = cfg.GetSharedFileStorageDir()
2502   if ((os.path.commonprefix([fs_dir, base_fstore]) != base_fstore) and
2503       (os.path.commonprefix([fs_dir, base_shared]) != base_shared)):
2504     _Fail("File storage directory '%s' is not under base file"
2505           " storage directory '%s' or shared storage directory '%s'",
2506           fs_dir, base_fstore, base_shared)
2507   return fs_dir
2508
2509
2510 def CreateFileStorageDir(file_storage_dir):
2511   """Create file storage directory.
2512
2513   @type file_storage_dir: str
2514   @param file_storage_dir: directory to create
2515
2516   @rtype: tuple
2517   @return: tuple with first element a boolean indicating wheter dir
2518       creation was successful or not
2519
2520   """
2521   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2522   if os.path.exists(file_storage_dir):
2523     if not os.path.isdir(file_storage_dir):
2524       _Fail("Specified storage dir '%s' is not a directory",
2525             file_storage_dir)
2526   else:
2527     try:
2528       os.makedirs(file_storage_dir, 0750)
2529     except OSError, err:
2530       _Fail("Cannot create file storage directory '%s': %s",
2531             file_storage_dir, err, exc=True)
2532
2533
2534 def RemoveFileStorageDir(file_storage_dir):
2535   """Remove file storage directory.
2536
2537   Remove it only if it's empty. If not log an error and return.
2538
2539   @type file_storage_dir: str
2540   @param file_storage_dir: the directory we should cleanup
2541   @rtype: tuple (success,)
2542   @return: tuple of one element, C{success}, denoting
2543       whether the operation was successful
2544
2545   """
2546   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2547   if os.path.exists(file_storage_dir):
2548     if not os.path.isdir(file_storage_dir):
2549       _Fail("Specified Storage directory '%s' is not a directory",
2550             file_storage_dir)
2551     # deletes dir only if empty, otherwise we want to fail the rpc call
2552     try:
2553       os.rmdir(file_storage_dir)
2554     except OSError, err:
2555       _Fail("Cannot remove file storage directory '%s': %s",
2556             file_storage_dir, err)
2557
2558
2559 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2560   """Rename the file storage directory.
2561
2562   @type old_file_storage_dir: str
2563   @param old_file_storage_dir: the current path
2564   @type new_file_storage_dir: str
2565   @param new_file_storage_dir: the name we should rename to
2566   @rtype: tuple (success,)
2567   @return: tuple of one element, C{success}, denoting
2568       whether the operation was successful
2569
2570   """
2571   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2572   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2573   if not os.path.exists(new_file_storage_dir):
2574     if os.path.isdir(old_file_storage_dir):
2575       try:
2576         os.rename(old_file_storage_dir, new_file_storage_dir)
2577       except OSError, err:
2578         _Fail("Cannot rename '%s' to '%s': %s",
2579               old_file_storage_dir, new_file_storage_dir, err)
2580     else:
2581       _Fail("Specified storage dir '%s' is not a directory",
2582             old_file_storage_dir)
2583   else:
2584     if os.path.exists(old_file_storage_dir):
2585       _Fail("Cannot rename '%s' to '%s': both locations exist",
2586             old_file_storage_dir, new_file_storage_dir)
2587
2588
2589 def _EnsureJobQueueFile(file_name):
2590   """Checks whether the given filename is in the queue directory.
2591
2592   @type file_name: str
2593   @param file_name: the file name we should check
2594   @rtype: None
2595   @raises RPCFail: if the file is not valid
2596
2597   """
2598   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2599   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2600
2601   if not result:
2602     _Fail("Passed job queue file '%s' does not belong to"
2603           " the queue directory '%s'", file_name, queue_dir)
2604
2605
2606 def JobQueueUpdate(file_name, content):
2607   """Updates a file in the queue directory.
2608
2609   This is just a wrapper over L{utils.io.WriteFile}, with proper
2610   checking.
2611
2612   @type file_name: str
2613   @param file_name: the job file name
2614   @type content: str
2615   @param content: the new job contents
2616   @rtype: boolean
2617   @return: the success of the operation
2618
2619   """
2620   _EnsureJobQueueFile(file_name)
2621   getents = runtime.GetEnts()
2622
2623   # Write and replace the file atomically
2624   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2625                   gid=getents.masterd_gid)
2626
2627
2628 def JobQueueRename(old, new):
2629   """Renames a job queue file.
2630
2631   This is just a wrapper over os.rename with proper checking.
2632
2633   @type old: str
2634   @param old: the old (actual) file name
2635   @type new: str
2636   @param new: the desired file name
2637   @rtype: tuple
2638   @return: the success of the operation and payload
2639
2640   """
2641   _EnsureJobQueueFile(old)
2642   _EnsureJobQueueFile(new)
2643
2644   utils.RenameFile(old, new, mkdir=True)
2645
2646
2647 def BlockdevClose(instance_name, disks):
2648   """Closes the given block devices.
2649
2650   This means they will be switched to secondary mode (in case of
2651   DRBD).
2652
2653   @param instance_name: if the argument is not empty, the symlinks
2654       of this instance will be removed
2655   @type disks: list of L{objects.Disk}
2656   @param disks: the list of disks to be closed
2657   @rtype: tuple (success, message)
2658   @return: a tuple of success and message, where success
2659       indicates the succes of the operation, and message
2660       which will contain the error details in case we
2661       failed
2662
2663   """
2664   bdevs = []
2665   for cf in disks:
2666     rd = _RecursiveFindBD(cf)
2667     if rd is None:
2668       _Fail("Can't find device %s", cf)
2669     bdevs.append(rd)
2670
2671   msg = []
2672   for rd in bdevs:
2673     try:
2674       rd.Close()
2675     except errors.BlockDeviceError, err:
2676       msg.append(str(err))
2677   if msg:
2678     _Fail("Can't make devices secondary: %s", ",".join(msg))
2679   else:
2680     if instance_name:
2681       _RemoveBlockDevLinks(instance_name, disks)
2682
2683
2684 def ValidateHVParams(hvname, hvparams):
2685   """Validates the given hypervisor parameters.
2686
2687   @type hvname: string
2688   @param hvname: the hypervisor name
2689   @type hvparams: dict
2690   @param hvparams: the hypervisor parameters to be validated
2691   @rtype: None
2692
2693   """
2694   try:
2695     hv_type = hypervisor.GetHypervisor(hvname)
2696     hv_type.ValidateParameters(hvparams)
2697   except errors.HypervisorError, err:
2698     _Fail(str(err), log=False)
2699
2700
2701 def _CheckOSPList(os_obj, parameters):
2702   """Check whether a list of parameters is supported by the OS.
2703
2704   @type os_obj: L{objects.OS}
2705   @param os_obj: OS object to check
2706   @type parameters: list
2707   @param parameters: the list of parameters to check
2708
2709   """
2710   supported = [v[0] for v in os_obj.supported_parameters]
2711   delta = frozenset(parameters).difference(supported)
2712   if delta:
2713     _Fail("The following parameters are not supported"
2714           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2715
2716
2717 def ValidateOS(required, osname, checks, osparams):
2718   """Validate the given OS' parameters.
2719
2720   @type required: boolean
2721   @param required: whether absence of the OS should translate into
2722       failure or not
2723   @type osname: string
2724   @param osname: the OS to be validated
2725   @type checks: list
2726   @param checks: list of the checks to run (currently only 'parameters')
2727   @type osparams: dict
2728   @param osparams: dictionary with OS parameters
2729   @rtype: boolean
2730   @return: True if the validation passed, or False if the OS was not
2731       found and L{required} was false
2732
2733   """
2734   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2735     _Fail("Unknown checks required for OS %s: %s", osname,
2736           set(checks).difference(constants.OS_VALIDATE_CALLS))
2737
2738   name_only = objects.OS.GetName(osname)
2739   status, tbv = _TryOSFromDisk(name_only, None)
2740
2741   if not status:
2742     if required:
2743       _Fail(tbv)
2744     else:
2745       return False
2746
2747   if max(tbv.api_versions) < constants.OS_API_V20:
2748     return True
2749
2750   if constants.OS_VALIDATE_PARAMETERS in checks:
2751     _CheckOSPList(tbv, osparams.keys())
2752
2753   validate_env = OSCoreEnv(osname, tbv, osparams)
2754   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2755                         cwd=tbv.path, reset_env=True)
2756   if result.failed:
2757     logging.error("os validate command '%s' returned error: %s output: %s",
2758                   result.cmd, result.fail_reason, result.output)
2759     _Fail("OS validation script failed (%s), output: %s",
2760           result.fail_reason, result.output, log=False)
2761
2762   return True
2763
2764
2765 def DemoteFromMC():
2766   """Demotes the current node from master candidate role.
2767
2768   """
2769   # try to ensure we're not the master by mistake
2770   master, myself = ssconf.GetMasterAndMyself()
2771   if master == myself:
2772     _Fail("ssconf status shows I'm the master node, will not demote")
2773
2774   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2775   if not result.failed:
2776     _Fail("The master daemon is running, will not demote")
2777
2778   try:
2779     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2780       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2781   except EnvironmentError, err:
2782     if err.errno != errno.ENOENT:
2783       _Fail("Error while backing up cluster file: %s", err, exc=True)
2784
2785   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2786
2787
2788 def _GetX509Filenames(cryptodir, name):
2789   """Returns the full paths for the private key and certificate.
2790
2791   """
2792   return (utils.PathJoin(cryptodir, name),
2793           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2794           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2795
2796
2797 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2798   """Creates a new X509 certificate for SSL/TLS.
2799
2800   @type validity: int
2801   @param validity: Validity in seconds
2802   @rtype: tuple; (string, string)
2803   @return: Certificate name and public part
2804
2805   """
2806   (key_pem, cert_pem) = \
2807     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2808                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2809
2810   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2811                               prefix="x509-%s-" % utils.TimestampForFilename())
2812   try:
2813     name = os.path.basename(cert_dir)
2814     assert len(name) > 5
2815
2816     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2817
2818     utils.WriteFile(key_file, mode=0400, data=key_pem)
2819     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2820
2821     # Never return private key as it shouldn't leave the node
2822     return (name, cert_pem)
2823   except Exception:
2824     shutil.rmtree(cert_dir, ignore_errors=True)
2825     raise
2826
2827
2828 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2829   """Removes a X509 certificate.
2830
2831   @type name: string
2832   @param name: Certificate name
2833
2834   """
2835   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2836
2837   utils.RemoveFile(key_file)
2838   utils.RemoveFile(cert_file)
2839
2840   try:
2841     os.rmdir(cert_dir)
2842   except EnvironmentError, err:
2843     _Fail("Cannot remove certificate directory '%s': %s",
2844           cert_dir, err)
2845
2846
2847 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2848   """Returns the command for the requested input/output.
2849
2850   @type instance: L{objects.Instance}
2851   @param instance: The instance object
2852   @param mode: Import/export mode
2853   @param ieio: Input/output type
2854   @param ieargs: Input/output arguments
2855
2856   """
2857   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2858
2859   env = None
2860   prefix = None
2861   suffix = None
2862   exp_size = None
2863
2864   if ieio == constants.IEIO_FILE:
2865     (filename, ) = ieargs
2866
2867     if not utils.IsNormAbsPath(filename):
2868       _Fail("Path '%s' is not normalized or absolute", filename)
2869
2870     directory = os.path.normpath(os.path.dirname(filename))
2871
2872     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2873         constants.EXPORT_DIR):
2874       _Fail("File '%s' is not under exports directory '%s'",
2875             filename, constants.EXPORT_DIR)
2876
2877     # Create directory
2878     utils.Makedirs(directory, mode=0750)
2879
2880     quoted_filename = utils.ShellQuote(filename)
2881
2882     if mode == constants.IEM_IMPORT:
2883       suffix = "> %s" % quoted_filename
2884     elif mode == constants.IEM_EXPORT:
2885       suffix = "< %s" % quoted_filename
2886
2887       # Retrieve file size
2888       try:
2889         st = os.stat(filename)
2890       except EnvironmentError, err:
2891         logging.error("Can't stat(2) %s: %s", filename, err)
2892       else:
2893         exp_size = utils.BytesToMebibyte(st.st_size)
2894
2895   elif ieio == constants.IEIO_RAW_DISK:
2896     (disk, ) = ieargs
2897
2898     real_disk = _OpenRealBD(disk)
2899
2900     if mode == constants.IEM_IMPORT:
2901       # we set here a smaller block size as, due to transport buffering, more
2902       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2903       # is not already there or we pass a wrong path; we use notrunc to no
2904       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2905       # much memory; this means that at best, we flush every 64k, which will
2906       # not be very fast
2907       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2908                                     " bs=%s oflag=dsync"),
2909                                     real_disk.dev_path,
2910                                     str(64 * 1024))
2911
2912     elif mode == constants.IEM_EXPORT:
2913       # the block size on the read dd is 1MiB to match our units
2914       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2915                                    real_disk.dev_path,
2916                                    str(1024 * 1024), # 1 MB
2917                                    str(disk.size))
2918       exp_size = disk.size
2919
2920   elif ieio == constants.IEIO_SCRIPT:
2921     (disk, disk_index, ) = ieargs
2922
2923     assert isinstance(disk_index, (int, long))
2924
2925     real_disk = _OpenRealBD(disk)
2926
2927     inst_os = OSFromDisk(instance.os)
2928     env = OSEnvironment(instance, inst_os)
2929
2930     if mode == constants.IEM_IMPORT:
2931       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2932       env["IMPORT_INDEX"] = str(disk_index)
2933       script = inst_os.import_script
2934
2935     elif mode == constants.IEM_EXPORT:
2936       env["EXPORT_DEVICE"] = real_disk.dev_path
2937       env["EXPORT_INDEX"] = str(disk_index)
2938       script = inst_os.export_script
2939
2940     # TODO: Pass special environment only to script
2941     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2942
2943     if mode == constants.IEM_IMPORT:
2944       suffix = "| %s" % script_cmd
2945
2946     elif mode == constants.IEM_EXPORT:
2947       prefix = "%s |" % script_cmd
2948
2949     # Let script predict size
2950     exp_size = constants.IE_CUSTOM_SIZE
2951
2952   else:
2953     _Fail("Invalid %s I/O mode %r", mode, ieio)
2954
2955   return (env, prefix, suffix, exp_size)
2956
2957
2958 def _CreateImportExportStatusDir(prefix):
2959   """Creates status directory for import/export.
2960
2961   """
2962   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2963                           prefix=("%s-%s-" %
2964                                   (prefix, utils.TimestampForFilename())))
2965
2966
2967 def StartImportExportDaemon(mode, opts, host, port, instance, component,
2968                             ieio, ieioargs):
2969   """Starts an import or export daemon.
2970
2971   @param mode: Import/output mode
2972   @type opts: L{objects.ImportExportOptions}
2973   @param opts: Daemon options
2974   @type host: string
2975   @param host: Remote host for export (None for import)
2976   @type port: int
2977   @param port: Remote port for export (None for import)
2978   @type instance: L{objects.Instance}
2979   @param instance: Instance object
2980   @type component: string
2981   @param component: which part of the instance is transferred now,
2982       e.g. 'disk/0'
2983   @param ieio: Input/output type
2984   @param ieioargs: Input/output arguments
2985
2986   """
2987   if mode == constants.IEM_IMPORT:
2988     prefix = "import"
2989
2990     if not (host is None and port is None):
2991       _Fail("Can not specify host or port on import")
2992
2993   elif mode == constants.IEM_EXPORT:
2994     prefix = "export"
2995
2996     if host is None or port is None:
2997       _Fail("Host and port must be specified for an export")
2998
2999   else:
3000     _Fail("Invalid mode %r", mode)
3001
3002   if (opts.key_name is None) ^ (opts.ca_pem is None):
3003     _Fail("Cluster certificate can only be used for both key and CA")
3004
3005   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3006     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3007
3008   if opts.key_name is None:
3009     # Use server.pem
3010     key_path = constants.NODED_CERT_FILE
3011     cert_path = constants.NODED_CERT_FILE
3012     assert opts.ca_pem is None
3013   else:
3014     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3015                                                  opts.key_name)
3016     assert opts.ca_pem is not None
3017
3018   for i in [key_path, cert_path]:
3019     if not os.path.exists(i):
3020       _Fail("File '%s' does not exist" % i)
3021
3022   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3023   try:
3024     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3025     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3026     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3027
3028     if opts.ca_pem is None:
3029       # Use server.pem
3030       ca = utils.ReadFile(constants.NODED_CERT_FILE)
3031     else:
3032       ca = opts.ca_pem
3033
3034     # Write CA file
3035     utils.WriteFile(ca_file, data=ca, mode=0400)
3036
3037     cmd = [
3038       constants.IMPORT_EXPORT_DAEMON,
3039       status_file, mode,
3040       "--key=%s" % key_path,
3041       "--cert=%s" % cert_path,
3042       "--ca=%s" % ca_file,
3043       ]
3044
3045     if host:
3046       cmd.append("--host=%s" % host)
3047
3048     if port:
3049       cmd.append("--port=%s" % port)
3050
3051     if opts.ipv6:
3052       cmd.append("--ipv6")
3053     else:
3054       cmd.append("--ipv4")
3055
3056     if opts.compress:
3057       cmd.append("--compress=%s" % opts.compress)
3058
3059     if opts.magic:
3060       cmd.append("--magic=%s" % opts.magic)
3061
3062     if exp_size is not None:
3063       cmd.append("--expected-size=%s" % exp_size)
3064
3065     if cmd_prefix:
3066       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3067
3068     if cmd_suffix:
3069       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3070
3071     if mode == constants.IEM_EXPORT:
3072       # Retry connection a few times when connecting to remote peer
3073       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3074       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3075     elif opts.connect_timeout is not None:
3076       assert mode == constants.IEM_IMPORT
3077       # Overall timeout for establishing connection while listening
3078       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3079
3080     logfile = _InstanceLogName(prefix, instance.os, instance.name)
3081
3082     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3083     # support for receiving a file descriptor for output
3084     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3085                       output=logfile)
3086
3087     # The import/export name is simply the status directory name
3088     return os.path.basename(status_dir)
3089
3090   except Exception:
3091     shutil.rmtree(status_dir, ignore_errors=True)
3092     raise
3093
3094
3095 def GetImportExportStatus(names):
3096   """Returns import/export daemon status.
3097
3098   @type names: sequence
3099   @param names: List of names
3100   @rtype: List of dicts
3101   @return: Returns a list of the state of each named import/export or None if a
3102            status couldn't be read
3103
3104   """
3105   result = []
3106
3107   for name in names:
3108     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3109                                  _IES_STATUS_FILE)
3110
3111     try:
3112       data = utils.ReadFile(status_file)
3113     except EnvironmentError, err:
3114       if err.errno != errno.ENOENT:
3115         raise
3116       data = None
3117
3118     if not data:
3119       result.append(None)
3120       continue
3121
3122     result.append(serializer.LoadJson(data))
3123
3124   return result
3125
3126
3127 def AbortImportExport(name):
3128   """Sends SIGTERM to a running import/export daemon.
3129
3130   """
3131   logging.info("Abort import/export %s", name)
3132
3133   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3134   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3135
3136   if pid:
3137     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3138                  name, pid)
3139     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3140
3141
3142 def CleanupImportExport(name):
3143   """Cleanup after an import or export.
3144
3145   If the import/export daemon is still running it's killed. Afterwards the
3146   whole status directory is removed.
3147
3148   """
3149   logging.info("Finalizing import/export %s", name)
3150
3151   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3152
3153   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3154
3155   if pid:
3156     logging.info("Import/export %s is still running with PID %s",
3157                  name, pid)
3158     utils.KillProcess(pid, waitpid=False)
3159
3160   shutil.rmtree(status_dir, ignore_errors=True)
3161
3162
3163 def _FindDisks(nodes_ip, disks):
3164   """Sets the physical ID on disks and returns the block devices.
3165
3166   """
3167   # set the correct physical ID
3168   my_name = netutils.Hostname.GetSysName()
3169   for cf in disks:
3170     cf.SetPhysicalID(my_name, nodes_ip)
3171
3172   bdevs = []
3173
3174   for cf in disks:
3175     rd = _RecursiveFindBD(cf)
3176     if rd is None:
3177       _Fail("Can't find device %s", cf)
3178     bdevs.append(rd)
3179   return bdevs
3180
3181
3182 def DrbdDisconnectNet(nodes_ip, disks):
3183   """Disconnects the network on a list of drbd devices.
3184
3185   """
3186   bdevs = _FindDisks(nodes_ip, disks)
3187
3188   # disconnect disks
3189   for rd in bdevs:
3190     try:
3191       rd.DisconnectNet()
3192     except errors.BlockDeviceError, err:
3193       _Fail("Can't change network configuration to standalone mode: %s",
3194             err, exc=True)
3195
3196
3197 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3198   """Attaches the network on a list of drbd devices.
3199
3200   """
3201   bdevs = _FindDisks(nodes_ip, disks)
3202
3203   if multimaster:
3204     for idx, rd in enumerate(bdevs):
3205       try:
3206         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3207       except EnvironmentError, err:
3208         _Fail("Can't create symlink: %s", err)
3209   # reconnect disks, switch to new master configuration and if
3210   # needed primary mode
3211   for rd in bdevs:
3212     try:
3213       rd.AttachNet(multimaster)
3214     except errors.BlockDeviceError, err:
3215       _Fail("Can't change network configuration: %s", err)
3216
3217   # wait until the disks are connected; we need to retry the re-attach
3218   # if the device becomes standalone, as this might happen if the one
3219   # node disconnects and reconnects in a different mode before the
3220   # other node reconnects; in this case, one or both of the nodes will
3221   # decide it has wrong configuration and switch to standalone
3222
3223   def _Attach():
3224     all_connected = True
3225
3226     for rd in bdevs:
3227       stats = rd.GetProcStatus()
3228
3229       all_connected = (all_connected and
3230                        (stats.is_connected or stats.is_in_resync))
3231
3232       if stats.is_standalone:
3233         # peer had different config info and this node became
3234         # standalone, even though this should not happen with the
3235         # new staged way of changing disk configs
3236         try:
3237           rd.AttachNet(multimaster)
3238         except errors.BlockDeviceError, err:
3239           _Fail("Can't change network configuration: %s", err)
3240
3241     if not all_connected:
3242       raise utils.RetryAgain()
3243
3244   try:
3245     # Start with a delay of 100 miliseconds and go up to 5 seconds
3246     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3247   except utils.RetryTimeout:
3248     _Fail("Timeout in disk reconnecting")
3249
3250   if multimaster:
3251     # change to primary mode
3252     for rd in bdevs:
3253       try:
3254         rd.Open()
3255       except errors.BlockDeviceError, err:
3256         _Fail("Can't change to primary mode: %s", err)
3257
3258
3259 def DrbdWaitSync(nodes_ip, disks):
3260   """Wait until DRBDs have synchronized.
3261
3262   """
3263   def _helper(rd):
3264     stats = rd.GetProcStatus()
3265     if not (stats.is_connected or stats.is_in_resync):
3266       raise utils.RetryAgain()
3267     return stats
3268
3269   bdevs = _FindDisks(nodes_ip, disks)
3270
3271   min_resync = 100
3272   alldone = True
3273   for rd in bdevs:
3274     try:
3275       # poll each second for 15 seconds
3276       stats = utils.Retry(_helper, 1, 15, args=[rd])
3277     except utils.RetryTimeout:
3278       stats = rd.GetProcStatus()
3279       # last check
3280       if not (stats.is_connected or stats.is_in_resync):
3281         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3282     alldone = alldone and (not stats.is_in_resync)
3283     if stats.sync_percent is not None:
3284       min_resync = min(min_resync, stats.sync_percent)
3285
3286   return (alldone, min_resync)
3287
3288
3289 def GetDrbdUsermodeHelper():
3290   """Returns DRBD usermode helper currently configured.
3291
3292   """
3293   try:
3294     return bdev.BaseDRBD.GetUsermodeHelper()
3295   except errors.BlockDeviceError, err:
3296     _Fail(str(err))
3297
3298
3299 def PowercycleNode(hypervisor_type):
3300   """Hard-powercycle the node.
3301
3302   Because we need to return first, and schedule the powercycle in the
3303   background, we won't be able to report failures nicely.
3304
3305   """
3306   hyper = hypervisor.GetHypervisor(hypervisor_type)
3307   try:
3308     pid = os.fork()
3309   except OSError:
3310     # if we can't fork, we'll pretend that we're in the child process
3311     pid = 0
3312   if pid > 0:
3313     return "Reboot scheduled in 5 seconds"
3314   # ensure the child is running on ram
3315   try:
3316     utils.Mlockall()
3317   except Exception: # pylint: disable-msg=W0703
3318     pass
3319   time.sleep(5)
3320   hyper.PowercycleNode()
3321
3322
3323 class HooksRunner(object):
3324   """Hook runner.
3325
3326   This class is instantiated on the node side (ganeti-noded) and not
3327   on the master side.
3328
3329   """
3330   def __init__(self, hooks_base_dir=None):
3331     """Constructor for hooks runner.
3332
3333     @type hooks_base_dir: str or None
3334     @param hooks_base_dir: if not None, this overrides the
3335         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3336
3337     """
3338     if hooks_base_dir is None:
3339       hooks_base_dir = constants.HOOKS_BASE_DIR
3340     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3341     # constant
3342     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3343
3344   def RunHooks(self, hpath, phase, env):
3345     """Run the scripts in the hooks directory.
3346
3347     @type hpath: str
3348     @param hpath: the path to the hooks directory which
3349         holds the scripts
3350     @type phase: str
3351     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3352         L{constants.HOOKS_PHASE_POST}
3353     @type env: dict
3354     @param env: dictionary with the environment for the hook
3355     @rtype: list
3356     @return: list of 3-element tuples:
3357       - script path
3358       - script result, either L{constants.HKR_SUCCESS} or
3359         L{constants.HKR_FAIL}
3360       - output of the script
3361
3362     @raise errors.ProgrammerError: for invalid input
3363         parameters
3364
3365     """
3366     if phase == constants.HOOKS_PHASE_PRE:
3367       suffix = "pre"
3368     elif phase == constants.HOOKS_PHASE_POST:
3369       suffix = "post"
3370     else:
3371       _Fail("Unknown hooks phase '%s'", phase)
3372
3373
3374     subdir = "%s-%s.d" % (hpath, suffix)
3375     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3376
3377     results = []
3378
3379     if not os.path.isdir(dir_name):
3380       # for non-existing/non-dirs, we simply exit instead of logging a
3381       # warning at every operation
3382       return results
3383
3384     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3385
3386     for (relname, relstatus, runresult)  in runparts_results:
3387       if relstatus == constants.RUNPARTS_SKIP:
3388         rrval = constants.HKR_SKIP
3389         output = ""
3390       elif relstatus == constants.RUNPARTS_ERR:
3391         rrval = constants.HKR_FAIL
3392         output = "Hook script execution error: %s" % runresult
3393       elif relstatus == constants.RUNPARTS_RUN:
3394         if runresult.failed:
3395           rrval = constants.HKR_FAIL
3396         else:
3397           rrval = constants.HKR_SUCCESS
3398         output = utils.SafeEncode(runresult.output.strip())
3399       results.append(("%s/%s" % (subdir, relname), rrval, output))
3400
3401     return results
3402
3403
3404 class IAllocatorRunner(object):
3405   """IAllocator runner.
3406
3407   This class is instantiated on the node side (ganeti-noded) and not on
3408   the master side.
3409
3410   """
3411   @staticmethod
3412   def Run(name, idata):
3413     """Run an iallocator script.
3414
3415     @type name: str
3416     @param name: the iallocator script name
3417     @type idata: str
3418     @param idata: the allocator input data
3419
3420     @rtype: tuple
3421     @return: two element tuple of:
3422        - status
3423        - either error message or stdout of allocator (for success)
3424
3425     """
3426     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3427                                   os.path.isfile)
3428     if alloc_script is None:
3429       _Fail("iallocator module '%s' not found in the search path", name)
3430
3431     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3432     try:
3433       os.write(fd, idata)
3434       os.close(fd)
3435       result = utils.RunCmd([alloc_script, fin_name])
3436       if result.failed:
3437         _Fail("iallocator module '%s' failed: %s, output '%s'",
3438               name, result.fail_reason, result.output)
3439     finally:
3440       os.unlink(fin_name)
3441
3442     return result.stdout
3443
3444
3445 class DevCacheManager(object):
3446   """Simple class for managing a cache of block device information.
3447
3448   """
3449   _DEV_PREFIX = "/dev/"
3450   _ROOT_DIR = constants.BDEV_CACHE_DIR
3451
3452   @classmethod
3453   def _ConvertPath(cls, dev_path):
3454     """Converts a /dev/name path to the cache file name.
3455
3456     This replaces slashes with underscores and strips the /dev
3457     prefix. It then returns the full path to the cache file.
3458
3459     @type dev_path: str
3460     @param dev_path: the C{/dev/} path name
3461     @rtype: str
3462     @return: the converted path name
3463
3464     """
3465     if dev_path.startswith(cls._DEV_PREFIX):
3466       dev_path = dev_path[len(cls._DEV_PREFIX):]
3467     dev_path = dev_path.replace("/", "_")
3468     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3469     return fpath
3470
3471   @classmethod
3472   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3473     """Updates the cache information for a given device.
3474
3475     @type dev_path: str
3476     @param dev_path: the pathname of the device
3477     @type owner: str
3478     @param owner: the owner (instance name) of the device
3479     @type on_primary: bool
3480     @param on_primary: whether this is the primary
3481         node nor not
3482     @type iv_name: str
3483     @param iv_name: the instance-visible name of the
3484         device, as in objects.Disk.iv_name
3485
3486     @rtype: None
3487
3488     """
3489     if dev_path is None:
3490       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3491       return
3492     fpath = cls._ConvertPath(dev_path)
3493     if on_primary:
3494       state = "primary"
3495     else:
3496       state = "secondary"
3497     if iv_name is None:
3498       iv_name = "not_visible"
3499     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3500     try:
3501       utils.WriteFile(fpath, data=fdata)
3502     except EnvironmentError, err:
3503       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3504
3505   @classmethod
3506   def RemoveCache(cls, dev_path):
3507     """Remove data for a dev_path.
3508
3509     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3510     path name and logging.
3511
3512     @type dev_path: str
3513     @param dev_path: the pathname of the device
3514
3515     @rtype: None
3516
3517     """
3518     if dev_path is None:
3519       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3520       return
3521     fpath = cls._ConvertPath(dev_path)
3522     try:
3523       utils.RemoveFile(fpath)
3524     except EnvironmentError, err:
3525       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)