Add hypervisors ancillary files list
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.RAPI_USERS_FILE,
200     constants.CONFD_HMAC_KEY,
201     constants.CLUSTER_DOMAIN_SECRET_FILE,
202     ])
203
204   for hv_name in constants.HYPER_TYPES:
205     hv_class = hypervisor.GetHypervisorClass(hv_name)
206     allowed_files.update(hv_class.GetAncillaryFiles()[0])
207
208   return frozenset(allowed_files)
209
210
211 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212
213
214 def JobQueuePurge():
215   """Removes job queue files and archived jobs.
216
217   @rtype: tuple
218   @return: True, None
219
220   """
221   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
222   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223
224
225 def GetMasterInfo():
226   """Returns master information.
227
228   This is an utility function to compute master information, either
229   for consumption here or from the node daemon.
230
231   @rtype: tuple
232   @return: master_netdev, master_ip, master_name, primary_ip_family
233   @raise RPCFail: in case of errors
234
235   """
236   try:
237     cfg = _GetConfig()
238     master_netdev = cfg.GetMasterNetdev()
239     master_ip = cfg.GetMasterIP()
240     master_node = cfg.GetMasterNode()
241     primary_ip_family = cfg.GetPrimaryIPFamily()
242   except errors.ConfigurationError, err:
243     _Fail("Cluster configuration incomplete: %s", err, exc=True)
244   return (master_netdev, master_ip, master_node, primary_ip_family)
245
246
247 def ActivateMasterIp():
248   """Activate the IP address of the master daemon.
249
250   """
251   # GetMasterInfo will raise an exception if not able to return data
252   master_netdev, master_ip, _, family = GetMasterInfo()
253
254   err_msg = None
255   if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
256     if netutils.IPAddress.Own(master_ip):
257       # we already have the ip:
258       logging.debug("Master IP already configured, doing nothing")
259     else:
260       err_msg = "Someone else has the master ip, not activating"
261       logging.error(err_msg)
262   else:
263     ipcls = netutils.IP4Address
264     if family == netutils.IP6Address.family:
265       ipcls = netutils.IP6Address
266
267     result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
268                            "%s/%d" % (master_ip, ipcls.iplen),
269                            "dev", master_netdev, "label",
270                            "%s:0" % master_netdev])
271     if result.failed:
272       err_msg = "Can't activate master IP: %s" % result.output
273       logging.error(err_msg)
274
275     # we ignore the exit code of the following cmds
276     if ipcls == netutils.IP4Address:
277       utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
278                     master_ip, master_ip])
279     elif ipcls == netutils.IP6Address:
280       try:
281         utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
282       except errors.OpExecError:
283         # TODO: Better error reporting
284         logging.warning("Can't execute ndisc6, please install if missing")
285
286   if err_msg:
287     _Fail(err_msg)
288
289
290 def StartMasterDaemons(no_voting):
291   """Activate local node as master node.
292
293   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
294
295   @type no_voting: boolean
296   @param no_voting: whether to start ganeti-masterd without a node vote
297       but still non-interactively
298   @rtype: None
299
300   """
301
302   if no_voting:
303     masterd_args = "--no-voting --yes-do-it"
304   else:
305     masterd_args = ""
306
307   env = {
308     "EXTRA_MASTERD_ARGS": masterd_args,
309     }
310
311   result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
312   if result.failed:
313     msg = "Can't start Ganeti master: %s" % result.output
314     logging.error(msg)
315     _Fail(msg)
316
317
318 def DeactivateMasterIp():
319   """Deactivate the master IP on this node.
320
321   """
322   # TODO: log and report back to the caller the error failures; we
323   # need to decide in which case we fail the RPC for this
324
325   # GetMasterInfo will raise an exception if not able to return data
326   master_netdev, master_ip, _, family = GetMasterInfo()
327
328   ipcls = netutils.IP4Address
329   if family == netutils.IP6Address.family:
330     ipcls = netutils.IP6Address
331
332   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
333                          "%s/%d" % (master_ip, ipcls.iplen),
334                          "dev", master_netdev])
335   if result.failed:
336     logging.error("Can't remove the master IP, error: %s", result.output)
337     # but otherwise ignore the failure
338
339
340 def StopMasterDaemons():
341   """Stop the master daemons on this node.
342
343   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
344
345   @rtype: None
346
347   """
348   # TODO: log and report back to the caller the error failures; we
349   # need to decide in which case we fail the RPC for this
350
351   result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
352   if result.failed:
353     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
354                   " and error %s",
355                   result.cmd, result.exit_code, result.output)
356
357
358 def EtcHostsModify(mode, host, ip):
359   """Modify a host entry in /etc/hosts.
360
361   @param mode: The mode to operate. Either add or remove entry
362   @param host: The host to operate on
363   @param ip: The ip associated with the entry
364
365   """
366   if mode == constants.ETC_HOSTS_ADD:
367     if not ip:
368       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
369               " present")
370     utils.AddHostToEtcHosts(host, ip)
371   elif mode == constants.ETC_HOSTS_REMOVE:
372     if ip:
373       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374               " parameter is present")
375     utils.RemoveHostFromEtcHosts(host)
376   else:
377     RPCFail("Mode not supported")
378
379
380 def LeaveCluster(modify_ssh_setup):
381   """Cleans up and remove the current node.
382
383   This function cleans up and prepares the current node to be removed
384   from the cluster.
385
386   If processing is successful, then it raises an
387   L{errors.QuitGanetiException} which is used as a special case to
388   shutdown the node daemon.
389
390   @param modify_ssh_setup: boolean
391
392   """
393   _CleanDirectory(constants.DATA_DIR)
394   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
395   JobQueuePurge()
396
397   if modify_ssh_setup:
398     try:
399       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
400
401       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
402
403       utils.RemoveFile(priv_key)
404       utils.RemoveFile(pub_key)
405     except errors.OpExecError:
406       logging.exception("Error while processing ssh files")
407
408   try:
409     utils.RemoveFile(constants.CONFD_HMAC_KEY)
410     utils.RemoveFile(constants.RAPI_CERT_FILE)
411     utils.RemoveFile(constants.NODED_CERT_FILE)
412   except: # pylint: disable=W0702
413     logging.exception("Error while removing cluster secrets")
414
415   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
416   if result.failed:
417     logging.error("Command %s failed with exitcode %s and error %s",
418                   result.cmd, result.exit_code, result.output)
419
420   # Raise a custom exception (handled in ganeti-noded)
421   raise errors.QuitGanetiException(True, "Shutdown scheduled")
422
423
424 def GetNodeInfo(vgname, hypervisor_type):
425   """Gives back a hash with different information about the node.
426
427   @type vgname: C{string}
428   @param vgname: the name of the volume group to ask for disk space information
429   @type hypervisor_type: C{str}
430   @param hypervisor_type: the name of the hypervisor to ask for
431       memory information
432   @rtype: C{dict}
433   @return: dictionary with the following keys:
434       - vg_size is the size of the configured volume group in MiB
435       - vg_free is the free size of the volume group in MiB
436       - memory_dom0 is the memory allocated for domain0 in MiB
437       - memory_free is the currently available (free) ram in MiB
438       - memory_total is the total number of ram in MiB
439       - hv_version: the hypervisor version, if available
440
441   """
442   outputarray = {}
443
444   if vgname is not None:
445     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
446     vg_free = vg_size = None
447     if vginfo:
448       vg_free = int(round(vginfo[0][0], 0))
449       vg_size = int(round(vginfo[0][1], 0))
450     outputarray["vg_size"] = vg_size
451     outputarray["vg_free"] = vg_free
452
453   if hypervisor_type is not None:
454     hyper = hypervisor.GetHypervisor(hypervisor_type)
455     hyp_info = hyper.GetNodeInfo()
456     if hyp_info is not None:
457       outputarray.update(hyp_info)
458
459   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
460
461   return outputarray
462
463
464 def VerifyNode(what, cluster_name):
465   """Verify the status of the local node.
466
467   Based on the input L{what} parameter, various checks are done on the
468   local node.
469
470   If the I{filelist} key is present, this list of
471   files is checksummed and the file/checksum pairs are returned.
472
473   If the I{nodelist} key is present, we check that we have
474   connectivity via ssh with the target nodes (and check the hostname
475   report).
476
477   If the I{node-net-test} key is present, we check that we have
478   connectivity to the given nodes via both primary IP and, if
479   applicable, secondary IPs.
480
481   @type what: C{dict}
482   @param what: a dictionary of things to check:
483       - filelist: list of files for which to compute checksums
484       - nodelist: list of nodes we should check ssh communication with
485       - node-net-test: list of nodes we should check node daemon port
486         connectivity with
487       - hypervisor: list with hypervisors to run the verify for
488   @rtype: dict
489   @return: a dictionary with the same keys as the input dict, and
490       values representing the result of the checks
491
492   """
493   result = {}
494   my_name = netutils.Hostname.GetSysName()
495   port = netutils.GetDaemonPort(constants.NODED)
496   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
497
498   if constants.NV_HYPERVISOR in what and vm_capable:
499     result[constants.NV_HYPERVISOR] = tmp = {}
500     for hv_name in what[constants.NV_HYPERVISOR]:
501       try:
502         val = hypervisor.GetHypervisor(hv_name).Verify()
503       except errors.HypervisorError, err:
504         val = "Error while checking hypervisor: %s" % str(err)
505       tmp[hv_name] = val
506
507   if constants.NV_HVPARAMS in what and vm_capable:
508     result[constants.NV_HVPARAMS] = tmp = []
509     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
510       try:
511         logging.info("Validating hv %s, %s", hv_name, hvparms)
512         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
513       except errors.HypervisorError, err:
514         tmp.append((source, hv_name, str(err)))
515
516   if constants.NV_FILELIST in what:
517     result[constants.NV_FILELIST] = utils.FingerprintFiles(
518       what[constants.NV_FILELIST])
519
520   if constants.NV_NODELIST in what:
521     (nodes, bynode) = what[constants.NV_NODELIST]
522
523     # Add nodes from other groups (different for each node)
524     try:
525       nodes.extend(bynode[my_name])
526     except KeyError:
527       pass
528
529     # Use a random order
530     random.shuffle(nodes)
531
532     # Try to contact all nodes
533     val = {}
534     for node in nodes:
535       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
536       if not success:
537         val[node] = message
538
539     result[constants.NV_NODELIST] = val
540
541   if constants.NV_NODENETTEST in what:
542     result[constants.NV_NODENETTEST] = tmp = {}
543     my_pip = my_sip = None
544     for name, pip, sip in what[constants.NV_NODENETTEST]:
545       if name == my_name:
546         my_pip = pip
547         my_sip = sip
548         break
549     if not my_pip:
550       tmp[my_name] = ("Can't find my own primary/secondary IP"
551                       " in the node list")
552     else:
553       for name, pip, sip in what[constants.NV_NODENETTEST]:
554         fail = []
555         if not netutils.TcpPing(pip, port, source=my_pip):
556           fail.append("primary")
557         if sip != pip:
558           if not netutils.TcpPing(sip, port, source=my_sip):
559             fail.append("secondary")
560         if fail:
561           tmp[name] = ("failure using the %s interface(s)" %
562                        " and ".join(fail))
563
564   if constants.NV_MASTERIP in what:
565     # FIXME: add checks on incoming data structures (here and in the
566     # rest of the function)
567     master_name, master_ip = what[constants.NV_MASTERIP]
568     if master_name == my_name:
569       source = constants.IP4_ADDRESS_LOCALHOST
570     else:
571       source = None
572     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
573                                                   source=source)
574
575   if constants.NV_OOB_PATHS in what:
576     result[constants.NV_OOB_PATHS] = tmp = []
577     for path in what[constants.NV_OOB_PATHS]:
578       try:
579         st = os.stat(path)
580       except OSError, err:
581         tmp.append("error stating out of band helper: %s" % err)
582       else:
583         if stat.S_ISREG(st.st_mode):
584           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
585             tmp.append(None)
586           else:
587             tmp.append("out of band helper %s is not executable" % path)
588         else:
589           tmp.append("out of band helper %s is not a file" % path)
590
591   if constants.NV_LVLIST in what and vm_capable:
592     try:
593       val = GetVolumeList(utils.ListVolumeGroups().keys())
594     except RPCFail, err:
595       val = str(err)
596     result[constants.NV_LVLIST] = val
597
598   if constants.NV_INSTANCELIST in what and vm_capable:
599     # GetInstanceList can fail
600     try:
601       val = GetInstanceList(what[constants.NV_INSTANCELIST])
602     except RPCFail, err:
603       val = str(err)
604     result[constants.NV_INSTANCELIST] = val
605
606   if constants.NV_VGLIST in what and vm_capable:
607     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
608
609   if constants.NV_PVLIST in what and vm_capable:
610     result[constants.NV_PVLIST] = \
611       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
612                                    filter_allocatable=False)
613
614   if constants.NV_VERSION in what:
615     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
616                                     constants.RELEASE_VERSION)
617
618   if constants.NV_HVINFO in what and vm_capable:
619     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
620     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
621
622   if constants.NV_DRBDLIST in what and vm_capable:
623     try:
624       used_minors = bdev.DRBD8.GetUsedDevs().keys()
625     except errors.BlockDeviceError, err:
626       logging.warning("Can't get used minors list", exc_info=True)
627       used_minors = str(err)
628     result[constants.NV_DRBDLIST] = used_minors
629
630   if constants.NV_DRBDHELPER in what and vm_capable:
631     status = True
632     try:
633       payload = bdev.BaseDRBD.GetUsermodeHelper()
634     except errors.BlockDeviceError, err:
635       logging.error("Can't get DRBD usermode helper: %s", str(err))
636       status = False
637       payload = str(err)
638     result[constants.NV_DRBDHELPER] = (status, payload)
639
640   if constants.NV_NODESETUP in what:
641     result[constants.NV_NODESETUP] = tmpr = []
642     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
643       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
644                   " under /sys, missing required directories /sys/block"
645                   " and /sys/class/net")
646     if (not os.path.isdir("/proc/sys") or
647         not os.path.isfile("/proc/sysrq-trigger")):
648       tmpr.append("The procfs filesystem doesn't seem to be mounted"
649                   " under /proc, missing required directory /proc/sys and"
650                   " the file /proc/sysrq-trigger")
651
652   if constants.NV_TIME in what:
653     result[constants.NV_TIME] = utils.SplitTime(time.time())
654
655   if constants.NV_OSLIST in what and vm_capable:
656     result[constants.NV_OSLIST] = DiagnoseOS()
657
658   if constants.NV_BRIDGES in what and vm_capable:
659     result[constants.NV_BRIDGES] = [bridge
660                                     for bridge in what[constants.NV_BRIDGES]
661                                     if not utils.BridgeExists(bridge)]
662   return result
663
664
665 def GetBlockDevSizes(devices):
666   """Return the size of the given block devices
667
668   @type devices: list
669   @param devices: list of block device nodes to query
670   @rtype: dict
671   @return:
672     dictionary of all block devices under /dev (key). The value is their
673     size in MiB.
674
675     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
676
677   """
678   DEV_PREFIX = "/dev/"
679   blockdevs = {}
680
681   for devpath in devices:
682     if not utils.IsBelowDir(DEV_PREFIX, devpath):
683       continue
684
685     try:
686       st = os.stat(devpath)
687     except EnvironmentError, err:
688       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
689       continue
690
691     if stat.S_ISBLK(st.st_mode):
692       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
693       if result.failed:
694         # We don't want to fail, just do not list this device as available
695         logging.warning("Cannot get size for block device %s", devpath)
696         continue
697
698       size = int(result.stdout) / (1024 * 1024)
699       blockdevs[devpath] = size
700   return blockdevs
701
702
703 def GetVolumeList(vg_names):
704   """Compute list of logical volumes and their size.
705
706   @type vg_names: list
707   @param vg_names: the volume groups whose LVs we should list, or
708       empty for all volume groups
709   @rtype: dict
710   @return:
711       dictionary of all partions (key) with value being a tuple of
712       their size (in MiB), inactive and online status::
713
714         {'xenvg/test1': ('20.06', True, True)}
715
716       in case of errors, a string is returned with the error
717       details.
718
719   """
720   lvs = {}
721   sep = "|"
722   if not vg_names:
723     vg_names = []
724   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
725                          "--separator=%s" % sep,
726                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
727   if result.failed:
728     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
729
730   for line in result.stdout.splitlines():
731     line = line.strip()
732     match = _LVSLINE_REGEX.match(line)
733     if not match:
734       logging.error("Invalid line returned from lvs output: '%s'", line)
735       continue
736     vg_name, name, size, attr = match.groups()
737     inactive = attr[4] == "-"
738     online = attr[5] == "o"
739     virtual = attr[0] == "v"
740     if virtual:
741       # we don't want to report such volumes as existing, since they
742       # don't really hold data
743       continue
744     lvs[vg_name + "/" + name] = (size, inactive, online)
745
746   return lvs
747
748
749 def ListVolumeGroups():
750   """List the volume groups and their size.
751
752   @rtype: dict
753   @return: dictionary with keys volume name and values the
754       size of the volume
755
756   """
757   return utils.ListVolumeGroups()
758
759
760 def NodeVolumes():
761   """List all volumes on this node.
762
763   @rtype: list
764   @return:
765     A list of dictionaries, each having four keys:
766       - name: the logical volume name,
767       - size: the size of the logical volume
768       - dev: the physical device on which the LV lives
769       - vg: the volume group to which it belongs
770
771     In case of errors, we return an empty list and log the
772     error.
773
774     Note that since a logical volume can live on multiple physical
775     volumes, the resulting list might include a logical volume
776     multiple times.
777
778   """
779   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
780                          "--separator=|",
781                          "--options=lv_name,lv_size,devices,vg_name"])
782   if result.failed:
783     _Fail("Failed to list logical volumes, lvs output: %s",
784           result.output)
785
786   def parse_dev(dev):
787     return dev.split("(")[0]
788
789   def handle_dev(dev):
790     return [parse_dev(x) for x in dev.split(",")]
791
792   def map_line(line):
793     line = [v.strip() for v in line]
794     return [{"name": line[0], "size": line[1],
795              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
796
797   all_devs = []
798   for line in result.stdout.splitlines():
799     if line.count("|") >= 3:
800       all_devs.extend(map_line(line.split("|")))
801     else:
802       logging.warning("Strange line in the output from lvs: '%s'", line)
803   return all_devs
804
805
806 def BridgesExist(bridges_list):
807   """Check if a list of bridges exist on the current node.
808
809   @rtype: boolean
810   @return: C{True} if all of them exist, C{False} otherwise
811
812   """
813   missing = []
814   for bridge in bridges_list:
815     if not utils.BridgeExists(bridge):
816       missing.append(bridge)
817
818   if missing:
819     _Fail("Missing bridges %s", utils.CommaJoin(missing))
820
821
822 def GetInstanceList(hypervisor_list):
823   """Provides a list of instances.
824
825   @type hypervisor_list: list
826   @param hypervisor_list: the list of hypervisors to query information
827
828   @rtype: list
829   @return: a list of all running instances on the current node
830     - instance1.example.com
831     - instance2.example.com
832
833   """
834   results = []
835   for hname in hypervisor_list:
836     try:
837       names = hypervisor.GetHypervisor(hname).ListInstances()
838       results.extend(names)
839     except errors.HypervisorError, err:
840       _Fail("Error enumerating instances (hypervisor %s): %s",
841             hname, err, exc=True)
842
843   return results
844
845
846 def GetInstanceInfo(instance, hname):
847   """Gives back the information about an instance as a dictionary.
848
849   @type instance: string
850   @param instance: the instance name
851   @type hname: string
852   @param hname: the hypervisor type of the instance
853
854   @rtype: dict
855   @return: dictionary with the following keys:
856       - memory: memory size of instance (int)
857       - state: xen state of instance (string)
858       - time: cpu time of instance (float)
859
860   """
861   output = {}
862
863   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
864   if iinfo is not None:
865     output["memory"] = iinfo[2]
866     output["state"] = iinfo[4]
867     output["time"] = iinfo[5]
868
869   return output
870
871
872 def GetInstanceMigratable(instance):
873   """Gives whether an instance can be migrated.
874
875   @type instance: L{objects.Instance}
876   @param instance: object representing the instance to be checked.
877
878   @rtype: tuple
879   @return: tuple of (result, description) where:
880       - result: whether the instance can be migrated or not
881       - description: a description of the issue, if relevant
882
883   """
884   hyper = hypervisor.GetHypervisor(instance.hypervisor)
885   iname = instance.name
886   if iname not in hyper.ListInstances():
887     _Fail("Instance %s is not running", iname)
888
889   for idx in range(len(instance.disks)):
890     link_name = _GetBlockDevSymlinkPath(iname, idx)
891     if not os.path.islink(link_name):
892       logging.warning("Instance %s is missing symlink %s for disk %d",
893                       iname, link_name, idx)
894
895
896 def GetAllInstancesInfo(hypervisor_list):
897   """Gather data about all instances.
898
899   This is the equivalent of L{GetInstanceInfo}, except that it
900   computes data for all instances at once, thus being faster if one
901   needs data about more than one instance.
902
903   @type hypervisor_list: list
904   @param hypervisor_list: list of hypervisors to query for instance data
905
906   @rtype: dict
907   @return: dictionary of instance: data, with data having the following keys:
908       - memory: memory size of instance (int)
909       - state: xen state of instance (string)
910       - time: cpu time of instance (float)
911       - vcpus: the number of vcpus
912
913   """
914   output = {}
915
916   for hname in hypervisor_list:
917     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
918     if iinfo:
919       for name, _, memory, vcpus, state, times in iinfo:
920         value = {
921           "memory": memory,
922           "vcpus": vcpus,
923           "state": state,
924           "time": times,
925           }
926         if name in output:
927           # we only check static parameters, like memory and vcpus,
928           # and not state and time which can change between the
929           # invocations of the different hypervisors
930           for key in "memory", "vcpus":
931             if value[key] != output[name][key]:
932               _Fail("Instance %s is running twice"
933                     " with different parameters", name)
934         output[name] = value
935
936   return output
937
938
939 def _InstanceLogName(kind, os_name, instance, component):
940   """Compute the OS log filename for a given instance and operation.
941
942   The instance name and os name are passed in as strings since not all
943   operations have these as part of an instance object.
944
945   @type kind: string
946   @param kind: the operation type (e.g. add, import, etc.)
947   @type os_name: string
948   @param os_name: the os name
949   @type instance: string
950   @param instance: the name of the instance being imported/added/etc.
951   @type component: string or None
952   @param component: the name of the component of the instance being
953       transferred
954
955   """
956   # TODO: Use tempfile.mkstemp to create unique filename
957   if component:
958     assert "/" not in component
959     c_msg = "-%s" % component
960   else:
961     c_msg = ""
962   base = ("%s-%s-%s%s-%s.log" %
963           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
964   return utils.PathJoin(constants.LOG_OS_DIR, base)
965
966
967 def InstanceOsAdd(instance, reinstall, debug):
968   """Add an OS to an instance.
969
970   @type instance: L{objects.Instance}
971   @param instance: Instance whose OS is to be installed
972   @type reinstall: boolean
973   @param reinstall: whether this is an instance reinstall
974   @type debug: integer
975   @param debug: debug level, passed to the OS scripts
976   @rtype: None
977
978   """
979   inst_os = OSFromDisk(instance.os)
980
981   create_env = OSEnvironment(instance, inst_os, debug)
982   if reinstall:
983     create_env["INSTANCE_REINSTALL"] = "1"
984
985   logfile = _InstanceLogName("add", instance.os, instance.name, None)
986
987   result = utils.RunCmd([inst_os.create_script], env=create_env,
988                         cwd=inst_os.path, output=logfile, reset_env=True)
989   if result.failed:
990     logging.error("os create command '%s' returned error: %s, logfile: %s,"
991                   " output: %s", result.cmd, result.fail_reason, logfile,
992                   result.output)
993     lines = [utils.SafeEncode(val)
994              for val in utils.TailFile(logfile, lines=20)]
995     _Fail("OS create script failed (%s), last lines in the"
996           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
997
998
999 def RunRenameInstance(instance, old_name, debug):
1000   """Run the OS rename script for an instance.
1001
1002   @type instance: L{objects.Instance}
1003   @param instance: Instance whose OS is to be installed
1004   @type old_name: string
1005   @param old_name: previous instance name
1006   @type debug: integer
1007   @param debug: debug level, passed to the OS scripts
1008   @rtype: boolean
1009   @return: the success of the operation
1010
1011   """
1012   inst_os = OSFromDisk(instance.os)
1013
1014   rename_env = OSEnvironment(instance, inst_os, debug)
1015   rename_env["OLD_INSTANCE_NAME"] = old_name
1016
1017   logfile = _InstanceLogName("rename", instance.os,
1018                              "%s-%s" % (old_name, instance.name), None)
1019
1020   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1021                         cwd=inst_os.path, output=logfile, reset_env=True)
1022
1023   if result.failed:
1024     logging.error("os create command '%s' returned error: %s output: %s",
1025                   result.cmd, result.fail_reason, result.output)
1026     lines = [utils.SafeEncode(val)
1027              for val in utils.TailFile(logfile, lines=20)]
1028     _Fail("OS rename script failed (%s), last lines in the"
1029           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1030
1031
1032 def _GetBlockDevSymlinkPath(instance_name, idx):
1033   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1034                         (instance_name, constants.DISK_SEPARATOR, idx))
1035
1036
1037 def _SymlinkBlockDev(instance_name, device_path, idx):
1038   """Set up symlinks to a instance's block device.
1039
1040   This is an auxiliary function run when an instance is start (on the primary
1041   node) or when an instance is migrated (on the target node).
1042
1043
1044   @param instance_name: the name of the target instance
1045   @param device_path: path of the physical block device, on the node
1046   @param idx: the disk index
1047   @return: absolute path to the disk's symlink
1048
1049   """
1050   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1051   try:
1052     os.symlink(device_path, link_name)
1053   except OSError, err:
1054     if err.errno == errno.EEXIST:
1055       if (not os.path.islink(link_name) or
1056           os.readlink(link_name) != device_path):
1057         os.remove(link_name)
1058         os.symlink(device_path, link_name)
1059     else:
1060       raise
1061
1062   return link_name
1063
1064
1065 def _RemoveBlockDevLinks(instance_name, disks):
1066   """Remove the block device symlinks belonging to the given instance.
1067
1068   """
1069   for idx, _ in enumerate(disks):
1070     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1071     if os.path.islink(link_name):
1072       try:
1073         os.remove(link_name)
1074       except OSError:
1075         logging.exception("Can't remove symlink '%s'", link_name)
1076
1077
1078 def _GatherAndLinkBlockDevs(instance):
1079   """Set up an instance's block device(s).
1080
1081   This is run on the primary node at instance startup. The block
1082   devices must be already assembled.
1083
1084   @type instance: L{objects.Instance}
1085   @param instance: the instance whose disks we shoul assemble
1086   @rtype: list
1087   @return: list of (disk_object, device_path)
1088
1089   """
1090   block_devices = []
1091   for idx, disk in enumerate(instance.disks):
1092     device = _RecursiveFindBD(disk)
1093     if device is None:
1094       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1095                                     str(disk))
1096     device.Open()
1097     try:
1098       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1099     except OSError, e:
1100       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1101                                     e.strerror)
1102
1103     block_devices.append((disk, link_name))
1104
1105   return block_devices
1106
1107
1108 def StartInstance(instance, startup_paused):
1109   """Start an instance.
1110
1111   @type instance: L{objects.Instance}
1112   @param instance: the instance object
1113   @type startup_paused: bool
1114   @param instance: pause instance at startup?
1115   @rtype: None
1116
1117   """
1118   running_instances = GetInstanceList([instance.hypervisor])
1119
1120   if instance.name in running_instances:
1121     logging.info("Instance %s already running, not starting", instance.name)
1122     return
1123
1124   try:
1125     block_devices = _GatherAndLinkBlockDevs(instance)
1126     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1127     hyper.StartInstance(instance, block_devices, startup_paused)
1128   except errors.BlockDeviceError, err:
1129     _Fail("Block device error: %s", err, exc=True)
1130   except errors.HypervisorError, err:
1131     _RemoveBlockDevLinks(instance.name, instance.disks)
1132     _Fail("Hypervisor error: %s", err, exc=True)
1133
1134
1135 def InstanceShutdown(instance, timeout):
1136   """Shut an instance down.
1137
1138   @note: this functions uses polling with a hardcoded timeout.
1139
1140   @type instance: L{objects.Instance}
1141   @param instance: the instance object
1142   @type timeout: integer
1143   @param timeout: maximum timeout for soft shutdown
1144   @rtype: None
1145
1146   """
1147   hv_name = instance.hypervisor
1148   hyper = hypervisor.GetHypervisor(hv_name)
1149   iname = instance.name
1150
1151   if instance.name not in hyper.ListInstances():
1152     logging.info("Instance %s not running, doing nothing", iname)
1153     return
1154
1155   class _TryShutdown:
1156     def __init__(self):
1157       self.tried_once = False
1158
1159     def __call__(self):
1160       if iname not in hyper.ListInstances():
1161         return
1162
1163       try:
1164         hyper.StopInstance(instance, retry=self.tried_once)
1165       except errors.HypervisorError, err:
1166         if iname not in hyper.ListInstances():
1167           # if the instance is no longer existing, consider this a
1168           # success and go to cleanup
1169           return
1170
1171         _Fail("Failed to stop instance %s: %s", iname, err)
1172
1173       self.tried_once = True
1174
1175       raise utils.RetryAgain()
1176
1177   try:
1178     utils.Retry(_TryShutdown(), 5, timeout)
1179   except utils.RetryTimeout:
1180     # the shutdown did not succeed
1181     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1182
1183     try:
1184       hyper.StopInstance(instance, force=True)
1185     except errors.HypervisorError, err:
1186       if iname in hyper.ListInstances():
1187         # only raise an error if the instance still exists, otherwise
1188         # the error could simply be "instance ... unknown"!
1189         _Fail("Failed to force stop instance %s: %s", iname, err)
1190
1191     time.sleep(1)
1192
1193     if iname in hyper.ListInstances():
1194       _Fail("Could not shutdown instance %s even by destroy", iname)
1195
1196   try:
1197     hyper.CleanupInstance(instance.name)
1198   except errors.HypervisorError, err:
1199     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1200
1201   _RemoveBlockDevLinks(iname, instance.disks)
1202
1203
1204 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1205   """Reboot an instance.
1206
1207   @type instance: L{objects.Instance}
1208   @param instance: the instance object to reboot
1209   @type reboot_type: str
1210   @param reboot_type: the type of reboot, one the following
1211     constants:
1212       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1213         instance OS, do not recreate the VM
1214       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1215         restart the VM (at the hypervisor level)
1216       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1217         not accepted here, since that mode is handled differently, in
1218         cmdlib, and translates into full stop and start of the
1219         instance (instead of a call_instance_reboot RPC)
1220   @type shutdown_timeout: integer
1221   @param shutdown_timeout: maximum timeout for soft shutdown
1222   @rtype: None
1223
1224   """
1225   running_instances = GetInstanceList([instance.hypervisor])
1226
1227   if instance.name not in running_instances:
1228     _Fail("Cannot reboot instance %s that is not running", instance.name)
1229
1230   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1231   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1232     try:
1233       hyper.RebootInstance(instance)
1234     except errors.HypervisorError, err:
1235       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1236   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1237     try:
1238       InstanceShutdown(instance, shutdown_timeout)
1239       return StartInstance(instance, False)
1240     except errors.HypervisorError, err:
1241       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1242   else:
1243     _Fail("Invalid reboot_type received: %s", reboot_type)
1244
1245
1246 def MigrationInfo(instance):
1247   """Gather information about an instance to be migrated.
1248
1249   @type instance: L{objects.Instance}
1250   @param instance: the instance definition
1251
1252   """
1253   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1254   try:
1255     info = hyper.MigrationInfo(instance)
1256   except errors.HypervisorError, err:
1257     _Fail("Failed to fetch migration information: %s", err, exc=True)
1258   return info
1259
1260
1261 def AcceptInstance(instance, info, target):
1262   """Prepare the node to accept an instance.
1263
1264   @type instance: L{objects.Instance}
1265   @param instance: the instance definition
1266   @type info: string/data (opaque)
1267   @param info: migration information, from the source node
1268   @type target: string
1269   @param target: target host (usually ip), on this node
1270
1271   """
1272   # TODO: why is this required only for DTS_EXT_MIRROR?
1273   if instance.disk_template in constants.DTS_EXT_MIRROR:
1274     # Create the symlinks, as the disks are not active
1275     # in any way
1276     try:
1277       _GatherAndLinkBlockDevs(instance)
1278     except errors.BlockDeviceError, err:
1279       _Fail("Block device error: %s", err, exc=True)
1280
1281   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1282   try:
1283     hyper.AcceptInstance(instance, info, target)
1284   except errors.HypervisorError, err:
1285     if instance.disk_template in constants.DTS_EXT_MIRROR:
1286       _RemoveBlockDevLinks(instance.name, instance.disks)
1287     _Fail("Failed to accept instance: %s", err, exc=True)
1288
1289
1290 def FinalizeMigration(instance, info, success):
1291   """Finalize any preparation to accept an instance.
1292
1293   @type instance: L{objects.Instance}
1294   @param instance: the instance definition
1295   @type info: string/data (opaque)
1296   @param info: migration information, from the source node
1297   @type success: boolean
1298   @param success: whether the migration was a success or a failure
1299
1300   """
1301   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1302   try:
1303     hyper.FinalizeMigration(instance, info, success)
1304   except errors.HypervisorError, err:
1305     _Fail("Failed to finalize migration: %s", err, exc=True)
1306
1307
1308 def MigrateInstance(instance, target, live):
1309   """Migrates an instance to another node.
1310
1311   @type instance: L{objects.Instance}
1312   @param instance: the instance definition
1313   @type target: string
1314   @param target: the target node name
1315   @type live: boolean
1316   @param live: whether the migration should be done live or not (the
1317       interpretation of this parameter is left to the hypervisor)
1318   @rtype: tuple
1319   @return: a tuple of (success, msg) where:
1320       - succes is a boolean denoting the success/failure of the operation
1321       - msg is a string with details in case of failure
1322
1323   """
1324   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1325
1326   try:
1327     hyper.MigrateInstance(instance, target, live)
1328   except errors.HypervisorError, err:
1329     _Fail("Failed to migrate instance: %s", err, exc=True)
1330
1331
1332 def BlockdevCreate(disk, size, owner, on_primary, info):
1333   """Creates a block device for an instance.
1334
1335   @type disk: L{objects.Disk}
1336   @param disk: the object describing the disk we should create
1337   @type size: int
1338   @param size: the size of the physical underlying device, in MiB
1339   @type owner: str
1340   @param owner: the name of the instance for which disk is created,
1341       used for device cache data
1342   @type on_primary: boolean
1343   @param on_primary:  indicates if it is the primary node or not
1344   @type info: string
1345   @param info: string that will be sent to the physical device
1346       creation, used for example to set (LVM) tags on LVs
1347
1348   @return: the new unique_id of the device (this can sometime be
1349       computed only after creation), or None. On secondary nodes,
1350       it's not required to return anything.
1351
1352   """
1353   # TODO: remove the obsolete "size" argument
1354   # pylint: disable=W0613
1355   clist = []
1356   if disk.children:
1357     for child in disk.children:
1358       try:
1359         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1360       except errors.BlockDeviceError, err:
1361         _Fail("Can't assemble device %s: %s", child, err)
1362       if on_primary or disk.AssembleOnSecondary():
1363         # we need the children open in case the device itself has to
1364         # be assembled
1365         try:
1366           # pylint: disable=E1103
1367           crdev.Open()
1368         except errors.BlockDeviceError, err:
1369           _Fail("Can't make child '%s' read-write: %s", child, err)
1370       clist.append(crdev)
1371
1372   try:
1373     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1374   except errors.BlockDeviceError, err:
1375     _Fail("Can't create block device: %s", err)
1376
1377   if on_primary or disk.AssembleOnSecondary():
1378     try:
1379       device.Assemble()
1380     except errors.BlockDeviceError, err:
1381       _Fail("Can't assemble device after creation, unusual event: %s", err)
1382     device.SetSyncSpeed(constants.SYNC_SPEED)
1383     if on_primary or disk.OpenOnSecondary():
1384       try:
1385         device.Open(force=True)
1386       except errors.BlockDeviceError, err:
1387         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1388     DevCacheManager.UpdateCache(device.dev_path, owner,
1389                                 on_primary, disk.iv_name)
1390
1391   device.SetInfo(info)
1392
1393   return device.unique_id
1394
1395
1396 def _WipeDevice(path, offset, size):
1397   """This function actually wipes the device.
1398
1399   @param path: The path to the device to wipe
1400   @param offset: The offset in MiB in the file
1401   @param size: The size in MiB to write
1402
1403   """
1404   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1405          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1406          "count=%d" % size]
1407   result = utils.RunCmd(cmd)
1408
1409   if result.failed:
1410     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1411           result.fail_reason, result.output)
1412
1413
1414 def BlockdevWipe(disk, offset, size):
1415   """Wipes a block device.
1416
1417   @type disk: L{objects.Disk}
1418   @param disk: the disk object we want to wipe
1419   @type offset: int
1420   @param offset: The offset in MiB in the file
1421   @type size: int
1422   @param size: The size in MiB to write
1423
1424   """
1425   try:
1426     rdev = _RecursiveFindBD(disk)
1427   except errors.BlockDeviceError:
1428     rdev = None
1429
1430   if not rdev:
1431     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1432
1433   # Do cross verify some of the parameters
1434   if offset > rdev.size:
1435     _Fail("Offset is bigger than device size")
1436   if (offset + size) > rdev.size:
1437     _Fail("The provided offset and size to wipe is bigger than device size")
1438
1439   _WipeDevice(rdev.dev_path, offset, size)
1440
1441
1442 def BlockdevPauseResumeSync(disks, pause):
1443   """Pause or resume the sync of the block device.
1444
1445   @type disks: list of L{objects.Disk}
1446   @param disks: the disks object we want to pause/resume
1447   @type pause: bool
1448   @param pause: Wheater to pause or resume
1449
1450   """
1451   success = []
1452   for disk in disks:
1453     try:
1454       rdev = _RecursiveFindBD(disk)
1455     except errors.BlockDeviceError:
1456       rdev = None
1457
1458     if not rdev:
1459       success.append((False, ("Cannot change sync for device %s:"
1460                               " device not found" % disk.iv_name)))
1461       continue
1462
1463     result = rdev.PauseResumeSync(pause)
1464
1465     if result:
1466       success.append((result, None))
1467     else:
1468       if pause:
1469         msg = "Pause"
1470       else:
1471         msg = "Resume"
1472       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1473
1474   return success
1475
1476
1477 def BlockdevRemove(disk):
1478   """Remove a block device.
1479
1480   @note: This is intended to be called recursively.
1481
1482   @type disk: L{objects.Disk}
1483   @param disk: the disk object we should remove
1484   @rtype: boolean
1485   @return: the success of the operation
1486
1487   """
1488   msgs = []
1489   try:
1490     rdev = _RecursiveFindBD(disk)
1491   except errors.BlockDeviceError, err:
1492     # probably can't attach
1493     logging.info("Can't attach to device %s in remove", disk)
1494     rdev = None
1495   if rdev is not None:
1496     r_path = rdev.dev_path
1497     try:
1498       rdev.Remove()
1499     except errors.BlockDeviceError, err:
1500       msgs.append(str(err))
1501     if not msgs:
1502       DevCacheManager.RemoveCache(r_path)
1503
1504   if disk.children:
1505     for child in disk.children:
1506       try:
1507         BlockdevRemove(child)
1508       except RPCFail, err:
1509         msgs.append(str(err))
1510
1511   if msgs:
1512     _Fail("; ".join(msgs))
1513
1514
1515 def _RecursiveAssembleBD(disk, owner, as_primary):
1516   """Activate a block device for an instance.
1517
1518   This is run on the primary and secondary nodes for an instance.
1519
1520   @note: this function is called recursively.
1521
1522   @type disk: L{objects.Disk}
1523   @param disk: the disk we try to assemble
1524   @type owner: str
1525   @param owner: the name of the instance which owns the disk
1526   @type as_primary: boolean
1527   @param as_primary: if we should make the block device
1528       read/write
1529
1530   @return: the assembled device or None (in case no device
1531       was assembled)
1532   @raise errors.BlockDeviceError: in case there is an error
1533       during the activation of the children or the device
1534       itself
1535
1536   """
1537   children = []
1538   if disk.children:
1539     mcn = disk.ChildrenNeeded()
1540     if mcn == -1:
1541       mcn = 0 # max number of Nones allowed
1542     else:
1543       mcn = len(disk.children) - mcn # max number of Nones
1544     for chld_disk in disk.children:
1545       try:
1546         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1547       except errors.BlockDeviceError, err:
1548         if children.count(None) >= mcn:
1549           raise
1550         cdev = None
1551         logging.error("Error in child activation (but continuing): %s",
1552                       str(err))
1553       children.append(cdev)
1554
1555   if as_primary or disk.AssembleOnSecondary():
1556     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1557     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1558     result = r_dev
1559     if as_primary or disk.OpenOnSecondary():
1560       r_dev.Open()
1561     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1562                                 as_primary, disk.iv_name)
1563
1564   else:
1565     result = True
1566   return result
1567
1568
1569 def BlockdevAssemble(disk, owner, as_primary, idx):
1570   """Activate a block device for an instance.
1571
1572   This is a wrapper over _RecursiveAssembleBD.
1573
1574   @rtype: str or boolean
1575   @return: a C{/dev/...} path for primary nodes, and
1576       C{True} for secondary nodes
1577
1578   """
1579   try:
1580     result = _RecursiveAssembleBD(disk, owner, as_primary)
1581     if isinstance(result, bdev.BlockDev):
1582       # pylint: disable=E1103
1583       result = result.dev_path
1584       if as_primary:
1585         _SymlinkBlockDev(owner, result, idx)
1586   except errors.BlockDeviceError, err:
1587     _Fail("Error while assembling disk: %s", err, exc=True)
1588   except OSError, err:
1589     _Fail("Error while symlinking disk: %s", err, exc=True)
1590
1591   return result
1592
1593
1594 def BlockdevShutdown(disk):
1595   """Shut down a block device.
1596
1597   First, if the device is assembled (Attach() is successful), then
1598   the device is shutdown. Then the children of the device are
1599   shutdown.
1600
1601   This function is called recursively. Note that we don't cache the
1602   children or such, as oppossed to assemble, shutdown of different
1603   devices doesn't require that the upper device was active.
1604
1605   @type disk: L{objects.Disk}
1606   @param disk: the description of the disk we should
1607       shutdown
1608   @rtype: None
1609
1610   """
1611   msgs = []
1612   r_dev = _RecursiveFindBD(disk)
1613   if r_dev is not None:
1614     r_path = r_dev.dev_path
1615     try:
1616       r_dev.Shutdown()
1617       DevCacheManager.RemoveCache(r_path)
1618     except errors.BlockDeviceError, err:
1619       msgs.append(str(err))
1620
1621   if disk.children:
1622     for child in disk.children:
1623       try:
1624         BlockdevShutdown(child)
1625       except RPCFail, err:
1626         msgs.append(str(err))
1627
1628   if msgs:
1629     _Fail("; ".join(msgs))
1630
1631
1632 def BlockdevAddchildren(parent_cdev, new_cdevs):
1633   """Extend a mirrored block device.
1634
1635   @type parent_cdev: L{objects.Disk}
1636   @param parent_cdev: the disk to which we should add children
1637   @type new_cdevs: list of L{objects.Disk}
1638   @param new_cdevs: the list of children which we should add
1639   @rtype: None
1640
1641   """
1642   parent_bdev = _RecursiveFindBD(parent_cdev)
1643   if parent_bdev is None:
1644     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1645   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1646   if new_bdevs.count(None) > 0:
1647     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1648   parent_bdev.AddChildren(new_bdevs)
1649
1650
1651 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1652   """Shrink a mirrored block device.
1653
1654   @type parent_cdev: L{objects.Disk}
1655   @param parent_cdev: the disk from which we should remove children
1656   @type new_cdevs: list of L{objects.Disk}
1657   @param new_cdevs: the list of children which we should remove
1658   @rtype: None
1659
1660   """
1661   parent_bdev = _RecursiveFindBD(parent_cdev)
1662   if parent_bdev is None:
1663     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1664   devs = []
1665   for disk in new_cdevs:
1666     rpath = disk.StaticDevPath()
1667     if rpath is None:
1668       bd = _RecursiveFindBD(disk)
1669       if bd is None:
1670         _Fail("Can't find device %s while removing children", disk)
1671       else:
1672         devs.append(bd.dev_path)
1673     else:
1674       if not utils.IsNormAbsPath(rpath):
1675         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1676       devs.append(rpath)
1677   parent_bdev.RemoveChildren(devs)
1678
1679
1680 def BlockdevGetmirrorstatus(disks):
1681   """Get the mirroring status of a list of devices.
1682
1683   @type disks: list of L{objects.Disk}
1684   @param disks: the list of disks which we should query
1685   @rtype: disk
1686   @return: List of L{objects.BlockDevStatus}, one for each disk
1687   @raise errors.BlockDeviceError: if any of the disks cannot be
1688       found
1689
1690   """
1691   stats = []
1692   for dsk in disks:
1693     rbd = _RecursiveFindBD(dsk)
1694     if rbd is None:
1695       _Fail("Can't find device %s", dsk)
1696
1697     stats.append(rbd.CombinedSyncStatus())
1698
1699   return stats
1700
1701
1702 def BlockdevGetmirrorstatusMulti(disks):
1703   """Get the mirroring status of a list of devices.
1704
1705   @type disks: list of L{objects.Disk}
1706   @param disks: the list of disks which we should query
1707   @rtype: disk
1708   @return: List of tuples, (bool, status), one for each disk; bool denotes
1709     success/failure, status is L{objects.BlockDevStatus} on success, string
1710     otherwise
1711
1712   """
1713   result = []
1714   for disk in disks:
1715     try:
1716       rbd = _RecursiveFindBD(disk)
1717       if rbd is None:
1718         result.append((False, "Can't find device %s" % disk))
1719         continue
1720
1721       status = rbd.CombinedSyncStatus()
1722     except errors.BlockDeviceError, err:
1723       logging.exception("Error while getting disk status")
1724       result.append((False, str(err)))
1725     else:
1726       result.append((True, status))
1727
1728   assert len(disks) == len(result)
1729
1730   return result
1731
1732
1733 def _RecursiveFindBD(disk):
1734   """Check if a device is activated.
1735
1736   If so, return information about the real device.
1737
1738   @type disk: L{objects.Disk}
1739   @param disk: the disk object we need to find
1740
1741   @return: None if the device can't be found,
1742       otherwise the device instance
1743
1744   """
1745   children = []
1746   if disk.children:
1747     for chdisk in disk.children:
1748       children.append(_RecursiveFindBD(chdisk))
1749
1750   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1751
1752
1753 def _OpenRealBD(disk):
1754   """Opens the underlying block device of a disk.
1755
1756   @type disk: L{objects.Disk}
1757   @param disk: the disk object we want to open
1758
1759   """
1760   real_disk = _RecursiveFindBD(disk)
1761   if real_disk is None:
1762     _Fail("Block device '%s' is not set up", disk)
1763
1764   real_disk.Open()
1765
1766   return real_disk
1767
1768
1769 def BlockdevFind(disk):
1770   """Check if a device is activated.
1771
1772   If it is, return information about the real device.
1773
1774   @type disk: L{objects.Disk}
1775   @param disk: the disk to find
1776   @rtype: None or objects.BlockDevStatus
1777   @return: None if the disk cannot be found, otherwise a the current
1778            information
1779
1780   """
1781   try:
1782     rbd = _RecursiveFindBD(disk)
1783   except errors.BlockDeviceError, err:
1784     _Fail("Failed to find device: %s", err, exc=True)
1785
1786   if rbd is None:
1787     return None
1788
1789   return rbd.GetSyncStatus()
1790
1791
1792 def BlockdevGetsize(disks):
1793   """Computes the size of the given disks.
1794
1795   If a disk is not found, returns None instead.
1796
1797   @type disks: list of L{objects.Disk}
1798   @param disks: the list of disk to compute the size for
1799   @rtype: list
1800   @return: list with elements None if the disk cannot be found,
1801       otherwise the size
1802
1803   """
1804   result = []
1805   for cf in disks:
1806     try:
1807       rbd = _RecursiveFindBD(cf)
1808     except errors.BlockDeviceError:
1809       result.append(None)
1810       continue
1811     if rbd is None:
1812       result.append(None)
1813     else:
1814       result.append(rbd.GetActualSize())
1815   return result
1816
1817
1818 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1819   """Export a block device to a remote node.
1820
1821   @type disk: L{objects.Disk}
1822   @param disk: the description of the disk to export
1823   @type dest_node: str
1824   @param dest_node: the destination node to export to
1825   @type dest_path: str
1826   @param dest_path: the destination path on the target node
1827   @type cluster_name: str
1828   @param cluster_name: the cluster name, needed for SSH hostalias
1829   @rtype: None
1830
1831   """
1832   real_disk = _OpenRealBD(disk)
1833
1834   # the block size on the read dd is 1MiB to match our units
1835   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1836                                "dd if=%s bs=1048576 count=%s",
1837                                real_disk.dev_path, str(disk.size))
1838
1839   # we set here a smaller block size as, due to ssh buffering, more
1840   # than 64-128k will mostly ignored; we use nocreat to fail if the
1841   # device is not already there or we pass a wrong path; we use
1842   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1843   # to not buffer too much memory; this means that at best, we flush
1844   # every 64k, which will not be very fast
1845   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1846                                 " oflag=dsync", dest_path)
1847
1848   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1849                                                    constants.GANETI_RUNAS,
1850                                                    destcmd)
1851
1852   # all commands have been checked, so we're safe to combine them
1853   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1854
1855   result = utils.RunCmd(["bash", "-c", command])
1856
1857   if result.failed:
1858     _Fail("Disk copy command '%s' returned error: %s"
1859           " output: %s", command, result.fail_reason, result.output)
1860
1861
1862 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1863   """Write a file to the filesystem.
1864
1865   This allows the master to overwrite(!) a file. It will only perform
1866   the operation if the file belongs to a list of configuration files.
1867
1868   @type file_name: str
1869   @param file_name: the target file name
1870   @type data: str
1871   @param data: the new contents of the file
1872   @type mode: int
1873   @param mode: the mode to give the file (can be None)
1874   @type uid: string
1875   @param uid: the owner of the file
1876   @type gid: string
1877   @param gid: the group of the file
1878   @type atime: float
1879   @param atime: the atime to set on the file (can be None)
1880   @type mtime: float
1881   @param mtime: the mtime to set on the file (can be None)
1882   @rtype: None
1883
1884   """
1885   if not os.path.isabs(file_name):
1886     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1887
1888   if file_name not in _ALLOWED_UPLOAD_FILES:
1889     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1890           file_name)
1891
1892   raw_data = _Decompress(data)
1893
1894   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1895     _Fail("Invalid username/groupname type")
1896
1897   getents = runtime.GetEnts()
1898   uid = getents.LookupUser(uid)
1899   gid = getents.LookupGroup(gid)
1900
1901   utils.SafeWriteFile(file_name, None,
1902                       data=raw_data, mode=mode, uid=uid, gid=gid,
1903                       atime=atime, mtime=mtime)
1904
1905
1906 def RunOob(oob_program, command, node, timeout):
1907   """Executes oob_program with given command on given node.
1908
1909   @param oob_program: The path to the executable oob_program
1910   @param command: The command to invoke on oob_program
1911   @param node: The node given as an argument to the program
1912   @param timeout: Timeout after which we kill the oob program
1913
1914   @return: stdout
1915   @raise RPCFail: If execution fails for some reason
1916
1917   """
1918   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1919
1920   if result.failed:
1921     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1922           result.fail_reason, result.output)
1923
1924   return result.stdout
1925
1926
1927 def WriteSsconfFiles(values):
1928   """Update all ssconf files.
1929
1930   Wrapper around the SimpleStore.WriteFiles.
1931
1932   """
1933   ssconf.SimpleStore().WriteFiles(values)
1934
1935
1936 def _ErrnoOrStr(err):
1937   """Format an EnvironmentError exception.
1938
1939   If the L{err} argument has an errno attribute, it will be looked up
1940   and converted into a textual C{E...} description. Otherwise the
1941   string representation of the error will be returned.
1942
1943   @type err: L{EnvironmentError}
1944   @param err: the exception to format
1945
1946   """
1947   if hasattr(err, "errno"):
1948     detail = errno.errorcode[err.errno]
1949   else:
1950     detail = str(err)
1951   return detail
1952
1953
1954 def _OSOndiskAPIVersion(os_dir):
1955   """Compute and return the API version of a given OS.
1956
1957   This function will try to read the API version of the OS residing in
1958   the 'os_dir' directory.
1959
1960   @type os_dir: str
1961   @param os_dir: the directory in which we should look for the OS
1962   @rtype: tuple
1963   @return: tuple (status, data) with status denoting the validity and
1964       data holding either the vaid versions or an error message
1965
1966   """
1967   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1968
1969   try:
1970     st = os.stat(api_file)
1971   except EnvironmentError, err:
1972     return False, ("Required file '%s' not found under path %s: %s" %
1973                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1974
1975   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1976     return False, ("File '%s' in %s is not a regular file" %
1977                    (constants.OS_API_FILE, os_dir))
1978
1979   try:
1980     api_versions = utils.ReadFile(api_file).splitlines()
1981   except EnvironmentError, err:
1982     return False, ("Error while reading the API version file at %s: %s" %
1983                    (api_file, _ErrnoOrStr(err)))
1984
1985   try:
1986     api_versions = [int(version.strip()) for version in api_versions]
1987   except (TypeError, ValueError), err:
1988     return False, ("API version(s) can't be converted to integer: %s" %
1989                    str(err))
1990
1991   return True, api_versions
1992
1993
1994 def DiagnoseOS(top_dirs=None):
1995   """Compute the validity for all OSes.
1996
1997   @type top_dirs: list
1998   @param top_dirs: the list of directories in which to
1999       search (if not given defaults to
2000       L{constants.OS_SEARCH_PATH})
2001   @rtype: list of L{objects.OS}
2002   @return: a list of tuples (name, path, status, diagnose, variants,
2003       parameters, api_version) for all (potential) OSes under all
2004       search paths, where:
2005           - name is the (potential) OS name
2006           - path is the full path to the OS
2007           - status True/False is the validity of the OS
2008           - diagnose is the error message for an invalid OS, otherwise empty
2009           - variants is a list of supported OS variants, if any
2010           - parameters is a list of (name, help) parameters, if any
2011           - api_version is a list of support OS API versions
2012
2013   """
2014   if top_dirs is None:
2015     top_dirs = constants.OS_SEARCH_PATH
2016
2017   result = []
2018   for dir_name in top_dirs:
2019     if os.path.isdir(dir_name):
2020       try:
2021         f_names = utils.ListVisibleFiles(dir_name)
2022       except EnvironmentError, err:
2023         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2024         break
2025       for name in f_names:
2026         os_path = utils.PathJoin(dir_name, name)
2027         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2028         if status:
2029           diagnose = ""
2030           variants = os_inst.supported_variants
2031           parameters = os_inst.supported_parameters
2032           api_versions = os_inst.api_versions
2033         else:
2034           diagnose = os_inst
2035           variants = parameters = api_versions = []
2036         result.append((name, os_path, status, diagnose, variants,
2037                        parameters, api_versions))
2038
2039   return result
2040
2041
2042 def _TryOSFromDisk(name, base_dir=None):
2043   """Create an OS instance from disk.
2044
2045   This function will return an OS instance if the given name is a
2046   valid OS name.
2047
2048   @type base_dir: string
2049   @keyword base_dir: Base directory containing OS installations.
2050                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2051   @rtype: tuple
2052   @return: success and either the OS instance if we find a valid one,
2053       or error message
2054
2055   """
2056   if base_dir is None:
2057     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2058   else:
2059     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2060
2061   if os_dir is None:
2062     return False, "Directory for OS %s not found in search path" % name
2063
2064   status, api_versions = _OSOndiskAPIVersion(os_dir)
2065   if not status:
2066     # push the error up
2067     return status, api_versions
2068
2069   if not constants.OS_API_VERSIONS.intersection(api_versions):
2070     return False, ("API version mismatch for path '%s': found %s, want %s." %
2071                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2072
2073   # OS Files dictionary, we will populate it with the absolute path
2074   # names; if the value is True, then it is a required file, otherwise
2075   # an optional one
2076   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2077
2078   if max(api_versions) >= constants.OS_API_V15:
2079     os_files[constants.OS_VARIANTS_FILE] = False
2080
2081   if max(api_versions) >= constants.OS_API_V20:
2082     os_files[constants.OS_PARAMETERS_FILE] = True
2083   else:
2084     del os_files[constants.OS_SCRIPT_VERIFY]
2085
2086   for (filename, required) in os_files.items():
2087     os_files[filename] = utils.PathJoin(os_dir, filename)
2088
2089     try:
2090       st = os.stat(os_files[filename])
2091     except EnvironmentError, err:
2092       if err.errno == errno.ENOENT and not required:
2093         del os_files[filename]
2094         continue
2095       return False, ("File '%s' under path '%s' is missing (%s)" %
2096                      (filename, os_dir, _ErrnoOrStr(err)))
2097
2098     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2099       return False, ("File '%s' under path '%s' is not a regular file" %
2100                      (filename, os_dir))
2101
2102     if filename in constants.OS_SCRIPTS:
2103       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2104         return False, ("File '%s' under path '%s' is not executable" %
2105                        (filename, os_dir))
2106
2107   variants = []
2108   if constants.OS_VARIANTS_FILE in os_files:
2109     variants_file = os_files[constants.OS_VARIANTS_FILE]
2110     try:
2111       variants = utils.ReadFile(variants_file).splitlines()
2112     except EnvironmentError, err:
2113       # we accept missing files, but not other errors
2114       if err.errno != errno.ENOENT:
2115         return False, ("Error while reading the OS variants file at %s: %s" %
2116                        (variants_file, _ErrnoOrStr(err)))
2117
2118   parameters = []
2119   if constants.OS_PARAMETERS_FILE in os_files:
2120     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2121     try:
2122       parameters = utils.ReadFile(parameters_file).splitlines()
2123     except EnvironmentError, err:
2124       return False, ("Error while reading the OS parameters file at %s: %s" %
2125                      (parameters_file, _ErrnoOrStr(err)))
2126     parameters = [v.split(None, 1) for v in parameters]
2127
2128   os_obj = objects.OS(name=name, path=os_dir,
2129                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2130                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2131                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2132                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2133                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2134                                                  None),
2135                       supported_variants=variants,
2136                       supported_parameters=parameters,
2137                       api_versions=api_versions)
2138   return True, os_obj
2139
2140
2141 def OSFromDisk(name, base_dir=None):
2142   """Create an OS instance from disk.
2143
2144   This function will return an OS instance if the given name is a
2145   valid OS name. Otherwise, it will raise an appropriate
2146   L{RPCFail} exception, detailing why this is not a valid OS.
2147
2148   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2149   an exception but returns true/false status data.
2150
2151   @type base_dir: string
2152   @keyword base_dir: Base directory containing OS installations.
2153                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2154   @rtype: L{objects.OS}
2155   @return: the OS instance if we find a valid one
2156   @raise RPCFail: if we don't find a valid OS
2157
2158   """
2159   name_only = objects.OS.GetName(name)
2160   status, payload = _TryOSFromDisk(name_only, base_dir)
2161
2162   if not status:
2163     _Fail(payload)
2164
2165   return payload
2166
2167
2168 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2169   """Calculate the basic environment for an os script.
2170
2171   @type os_name: str
2172   @param os_name: full operating system name (including variant)
2173   @type inst_os: L{objects.OS}
2174   @param inst_os: operating system for which the environment is being built
2175   @type os_params: dict
2176   @param os_params: the OS parameters
2177   @type debug: integer
2178   @param debug: debug level (0 or 1, for OS Api 10)
2179   @rtype: dict
2180   @return: dict of environment variables
2181   @raise errors.BlockDeviceError: if the block device
2182       cannot be found
2183
2184   """
2185   result = {}
2186   api_version = \
2187     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2188   result["OS_API_VERSION"] = "%d" % api_version
2189   result["OS_NAME"] = inst_os.name
2190   result["DEBUG_LEVEL"] = "%d" % debug
2191
2192   # OS variants
2193   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2194     variant = objects.OS.GetVariant(os_name)
2195     if not variant:
2196       variant = inst_os.supported_variants[0]
2197   else:
2198     variant = ""
2199   result["OS_VARIANT"] = variant
2200
2201   # OS params
2202   for pname, pvalue in os_params.items():
2203     result["OSP_%s" % pname.upper()] = pvalue
2204
2205   return result
2206
2207
2208 def OSEnvironment(instance, inst_os, debug=0):
2209   """Calculate the environment for an os script.
2210
2211   @type instance: L{objects.Instance}
2212   @param instance: target instance for the os script run
2213   @type inst_os: L{objects.OS}
2214   @param inst_os: operating system for which the environment is being built
2215   @type debug: integer
2216   @param debug: debug level (0 or 1, for OS Api 10)
2217   @rtype: dict
2218   @return: dict of environment variables
2219   @raise errors.BlockDeviceError: if the block device
2220       cannot be found
2221
2222   """
2223   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2224
2225   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2226     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2227
2228   result["HYPERVISOR"] = instance.hypervisor
2229   result["DISK_COUNT"] = "%d" % len(instance.disks)
2230   result["NIC_COUNT"] = "%d" % len(instance.nics)
2231   result["INSTANCE_SECONDARY_NODES"] = \
2232       ("%s" % " ".join(instance.secondary_nodes))
2233
2234   # Disks
2235   for idx, disk in enumerate(instance.disks):
2236     real_disk = _OpenRealBD(disk)
2237     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2238     result["DISK_%d_ACCESS" % idx] = disk.mode
2239     if constants.HV_DISK_TYPE in instance.hvparams:
2240       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2241         instance.hvparams[constants.HV_DISK_TYPE]
2242     if disk.dev_type in constants.LDS_BLOCK:
2243       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2244     elif disk.dev_type == constants.LD_FILE:
2245       result["DISK_%d_BACKEND_TYPE" % idx] = \
2246         "file:%s" % disk.physical_id[0]
2247
2248   # NICs
2249   for idx, nic in enumerate(instance.nics):
2250     result["NIC_%d_MAC" % idx] = nic.mac
2251     if nic.ip:
2252       result["NIC_%d_IP" % idx] = nic.ip
2253     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2254     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2255       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2256     if nic.nicparams[constants.NIC_LINK]:
2257       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2258     if constants.HV_NIC_TYPE in instance.hvparams:
2259       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2260         instance.hvparams[constants.HV_NIC_TYPE]
2261
2262   # HV/BE params
2263   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2264     for key, value in source.items():
2265       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2266
2267   return result
2268
2269
2270 def BlockdevGrow(disk, amount, dryrun):
2271   """Grow a stack of block devices.
2272
2273   This function is called recursively, with the childrens being the
2274   first ones to resize.
2275
2276   @type disk: L{objects.Disk}
2277   @param disk: the disk to be grown
2278   @type amount: integer
2279   @param amount: the amount (in mebibytes) to grow with
2280   @type dryrun: boolean
2281   @param dryrun: whether to execute the operation in simulation mode
2282       only, without actually increasing the size
2283   @rtype: (status, result)
2284   @return: a tuple with the status of the operation (True/False), and
2285       the errors message if status is False
2286
2287   """
2288   r_dev = _RecursiveFindBD(disk)
2289   if r_dev is None:
2290     _Fail("Cannot find block device %s", disk)
2291
2292   try:
2293     r_dev.Grow(amount, dryrun)
2294   except errors.BlockDeviceError, err:
2295     _Fail("Failed to grow block device: %s", err, exc=True)
2296
2297
2298 def BlockdevSnapshot(disk):
2299   """Create a snapshot copy of a block device.
2300
2301   This function is called recursively, and the snapshot is actually created
2302   just for the leaf lvm backend device.
2303
2304   @type disk: L{objects.Disk}
2305   @param disk: the disk to be snapshotted
2306   @rtype: string
2307   @return: snapshot disk ID as (vg, lv)
2308
2309   """
2310   if disk.dev_type == constants.LD_DRBD8:
2311     if not disk.children:
2312       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2313             disk.unique_id)
2314     return BlockdevSnapshot(disk.children[0])
2315   elif disk.dev_type == constants.LD_LV:
2316     r_dev = _RecursiveFindBD(disk)
2317     if r_dev is not None:
2318       # FIXME: choose a saner value for the snapshot size
2319       # let's stay on the safe side and ask for the full size, for now
2320       return r_dev.Snapshot(disk.size)
2321     else:
2322       _Fail("Cannot find block device %s", disk)
2323   else:
2324     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2325           disk.unique_id, disk.dev_type)
2326
2327
2328 def FinalizeExport(instance, snap_disks):
2329   """Write out the export configuration information.
2330
2331   @type instance: L{objects.Instance}
2332   @param instance: the instance which we export, used for
2333       saving configuration
2334   @type snap_disks: list of L{objects.Disk}
2335   @param snap_disks: list of snapshot block devices, which
2336       will be used to get the actual name of the dump file
2337
2338   @rtype: None
2339
2340   """
2341   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2342   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2343
2344   config = objects.SerializableConfigParser()
2345
2346   config.add_section(constants.INISECT_EXP)
2347   config.set(constants.INISECT_EXP, "version", "0")
2348   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2349   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2350   config.set(constants.INISECT_EXP, "os", instance.os)
2351   config.set(constants.INISECT_EXP, "compression", "none")
2352
2353   config.add_section(constants.INISECT_INS)
2354   config.set(constants.INISECT_INS, "name", instance.name)
2355   config.set(constants.INISECT_INS, "memory", "%d" %
2356              instance.beparams[constants.BE_MEMORY])
2357   config.set(constants.INISECT_INS, "vcpus", "%d" %
2358              instance.beparams[constants.BE_VCPUS])
2359   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2360   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2361   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2362
2363   nic_total = 0
2364   for nic_count, nic in enumerate(instance.nics):
2365     nic_total += 1
2366     config.set(constants.INISECT_INS, "nic%d_mac" %
2367                nic_count, "%s" % nic.mac)
2368     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2369     for param in constants.NICS_PARAMETER_TYPES:
2370       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2371                  "%s" % nic.nicparams.get(param, None))
2372   # TODO: redundant: on load can read nics until it doesn't exist
2373   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2374
2375   disk_total = 0
2376   for disk_count, disk in enumerate(snap_disks):
2377     if disk:
2378       disk_total += 1
2379       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2380                  ("%s" % disk.iv_name))
2381       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2382                  ("%s" % disk.physical_id[1]))
2383       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2384                  ("%d" % disk.size))
2385
2386   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2387
2388   # New-style hypervisor/backend parameters
2389
2390   config.add_section(constants.INISECT_HYP)
2391   for name, value in instance.hvparams.items():
2392     if name not in constants.HVC_GLOBALS:
2393       config.set(constants.INISECT_HYP, name, str(value))
2394
2395   config.add_section(constants.INISECT_BEP)
2396   for name, value in instance.beparams.items():
2397     config.set(constants.INISECT_BEP, name, str(value))
2398
2399   config.add_section(constants.INISECT_OSP)
2400   for name, value in instance.osparams.items():
2401     config.set(constants.INISECT_OSP, name, str(value))
2402
2403   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2404                   data=config.Dumps())
2405   shutil.rmtree(finaldestdir, ignore_errors=True)
2406   shutil.move(destdir, finaldestdir)
2407
2408
2409 def ExportInfo(dest):
2410   """Get export configuration information.
2411
2412   @type dest: str
2413   @param dest: directory containing the export
2414
2415   @rtype: L{objects.SerializableConfigParser}
2416   @return: a serializable config file containing the
2417       export info
2418
2419   """
2420   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2421
2422   config = objects.SerializableConfigParser()
2423   config.read(cff)
2424
2425   if (not config.has_section(constants.INISECT_EXP) or
2426       not config.has_section(constants.INISECT_INS)):
2427     _Fail("Export info file doesn't have the required fields")
2428
2429   return config.Dumps()
2430
2431
2432 def ListExports():
2433   """Return a list of exports currently available on this machine.
2434
2435   @rtype: list
2436   @return: list of the exports
2437
2438   """
2439   if os.path.isdir(constants.EXPORT_DIR):
2440     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2441   else:
2442     _Fail("No exports directory")
2443
2444
2445 def RemoveExport(export):
2446   """Remove an existing export from the node.
2447
2448   @type export: str
2449   @param export: the name of the export to remove
2450   @rtype: None
2451
2452   """
2453   target = utils.PathJoin(constants.EXPORT_DIR, export)
2454
2455   try:
2456     shutil.rmtree(target)
2457   except EnvironmentError, err:
2458     _Fail("Error while removing the export: %s", err, exc=True)
2459
2460
2461 def BlockdevRename(devlist):
2462   """Rename a list of block devices.
2463
2464   @type devlist: list of tuples
2465   @param devlist: list of tuples of the form  (disk,
2466       new_logical_id, new_physical_id); disk is an
2467       L{objects.Disk} object describing the current disk,
2468       and new logical_id/physical_id is the name we
2469       rename it to
2470   @rtype: boolean
2471   @return: True if all renames succeeded, False otherwise
2472
2473   """
2474   msgs = []
2475   result = True
2476   for disk, unique_id in devlist:
2477     dev = _RecursiveFindBD(disk)
2478     if dev is None:
2479       msgs.append("Can't find device %s in rename" % str(disk))
2480       result = False
2481       continue
2482     try:
2483       old_rpath = dev.dev_path
2484       dev.Rename(unique_id)
2485       new_rpath = dev.dev_path
2486       if old_rpath != new_rpath:
2487         DevCacheManager.RemoveCache(old_rpath)
2488         # FIXME: we should add the new cache information here, like:
2489         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2490         # but we don't have the owner here - maybe parse from existing
2491         # cache? for now, we only lose lvm data when we rename, which
2492         # is less critical than DRBD or MD
2493     except errors.BlockDeviceError, err:
2494       msgs.append("Can't rename device '%s' to '%s': %s" %
2495                   (dev, unique_id, err))
2496       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2497       result = False
2498   if not result:
2499     _Fail("; ".join(msgs))
2500
2501
2502 def _TransformFileStorageDir(fs_dir):
2503   """Checks whether given file_storage_dir is valid.
2504
2505   Checks wheter the given fs_dir is within the cluster-wide default
2506   file_storage_dir or the shared_file_storage_dir, which are stored in
2507   SimpleStore. Only paths under those directories are allowed.
2508
2509   @type fs_dir: str
2510   @param fs_dir: the path to check
2511
2512   @return: the normalized path if valid, None otherwise
2513
2514   """
2515   if not constants.ENABLE_FILE_STORAGE:
2516     _Fail("File storage disabled at configure time")
2517   cfg = _GetConfig()
2518   fs_dir = os.path.normpath(fs_dir)
2519   base_fstore = cfg.GetFileStorageDir()
2520   base_shared = cfg.GetSharedFileStorageDir()
2521   if not (utils.IsBelowDir(base_fstore, fs_dir) or
2522           utils.IsBelowDir(base_shared, fs_dir)):
2523     _Fail("File storage directory '%s' is not under base file"
2524           " storage directory '%s' or shared storage directory '%s'",
2525           fs_dir, base_fstore, base_shared)
2526   return fs_dir
2527
2528
2529 def CreateFileStorageDir(file_storage_dir):
2530   """Create file storage directory.
2531
2532   @type file_storage_dir: str
2533   @param file_storage_dir: directory to create
2534
2535   @rtype: tuple
2536   @return: tuple with first element a boolean indicating wheter dir
2537       creation was successful or not
2538
2539   """
2540   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2541   if os.path.exists(file_storage_dir):
2542     if not os.path.isdir(file_storage_dir):
2543       _Fail("Specified storage dir '%s' is not a directory",
2544             file_storage_dir)
2545   else:
2546     try:
2547       os.makedirs(file_storage_dir, 0750)
2548     except OSError, err:
2549       _Fail("Cannot create file storage directory '%s': %s",
2550             file_storage_dir, err, exc=True)
2551
2552
2553 def RemoveFileStorageDir(file_storage_dir):
2554   """Remove file storage directory.
2555
2556   Remove it only if it's empty. If not log an error and return.
2557
2558   @type file_storage_dir: str
2559   @param file_storage_dir: the directory we should cleanup
2560   @rtype: tuple (success,)
2561   @return: tuple of one element, C{success}, denoting
2562       whether the operation was successful
2563
2564   """
2565   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2566   if os.path.exists(file_storage_dir):
2567     if not os.path.isdir(file_storage_dir):
2568       _Fail("Specified Storage directory '%s' is not a directory",
2569             file_storage_dir)
2570     # deletes dir only if empty, otherwise we want to fail the rpc call
2571     try:
2572       os.rmdir(file_storage_dir)
2573     except OSError, err:
2574       _Fail("Cannot remove file storage directory '%s': %s",
2575             file_storage_dir, err)
2576
2577
2578 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2579   """Rename the file storage directory.
2580
2581   @type old_file_storage_dir: str
2582   @param old_file_storage_dir: the current path
2583   @type new_file_storage_dir: str
2584   @param new_file_storage_dir: the name we should rename to
2585   @rtype: tuple (success,)
2586   @return: tuple of one element, C{success}, denoting
2587       whether the operation was successful
2588
2589   """
2590   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2591   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2592   if not os.path.exists(new_file_storage_dir):
2593     if os.path.isdir(old_file_storage_dir):
2594       try:
2595         os.rename(old_file_storage_dir, new_file_storage_dir)
2596       except OSError, err:
2597         _Fail("Cannot rename '%s' to '%s': %s",
2598               old_file_storage_dir, new_file_storage_dir, err)
2599     else:
2600       _Fail("Specified storage dir '%s' is not a directory",
2601             old_file_storage_dir)
2602   else:
2603     if os.path.exists(old_file_storage_dir):
2604       _Fail("Cannot rename '%s' to '%s': both locations exist",
2605             old_file_storage_dir, new_file_storage_dir)
2606
2607
2608 def _EnsureJobQueueFile(file_name):
2609   """Checks whether the given filename is in the queue directory.
2610
2611   @type file_name: str
2612   @param file_name: the file name we should check
2613   @rtype: None
2614   @raises RPCFail: if the file is not valid
2615
2616   """
2617   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2618   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2619
2620   if not result:
2621     _Fail("Passed job queue file '%s' does not belong to"
2622           " the queue directory '%s'", file_name, queue_dir)
2623
2624
2625 def JobQueueUpdate(file_name, content):
2626   """Updates a file in the queue directory.
2627
2628   This is just a wrapper over L{utils.io.WriteFile}, with proper
2629   checking.
2630
2631   @type file_name: str
2632   @param file_name: the job file name
2633   @type content: str
2634   @param content: the new job contents
2635   @rtype: boolean
2636   @return: the success of the operation
2637
2638   """
2639   _EnsureJobQueueFile(file_name)
2640   getents = runtime.GetEnts()
2641
2642   # Write and replace the file atomically
2643   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2644                   gid=getents.masterd_gid)
2645
2646
2647 def JobQueueRename(old, new):
2648   """Renames a job queue file.
2649
2650   This is just a wrapper over os.rename with proper checking.
2651
2652   @type old: str
2653   @param old: the old (actual) file name
2654   @type new: str
2655   @param new: the desired file name
2656   @rtype: tuple
2657   @return: the success of the operation and payload
2658
2659   """
2660   _EnsureJobQueueFile(old)
2661   _EnsureJobQueueFile(new)
2662
2663   utils.RenameFile(old, new, mkdir=True)
2664
2665
2666 def BlockdevClose(instance_name, disks):
2667   """Closes the given block devices.
2668
2669   This means they will be switched to secondary mode (in case of
2670   DRBD).
2671
2672   @param instance_name: if the argument is not empty, the symlinks
2673       of this instance will be removed
2674   @type disks: list of L{objects.Disk}
2675   @param disks: the list of disks to be closed
2676   @rtype: tuple (success, message)
2677   @return: a tuple of success and message, where success
2678       indicates the succes of the operation, and message
2679       which will contain the error details in case we
2680       failed
2681
2682   """
2683   bdevs = []
2684   for cf in disks:
2685     rd = _RecursiveFindBD(cf)
2686     if rd is None:
2687       _Fail("Can't find device %s", cf)
2688     bdevs.append(rd)
2689
2690   msg = []
2691   for rd in bdevs:
2692     try:
2693       rd.Close()
2694     except errors.BlockDeviceError, err:
2695       msg.append(str(err))
2696   if msg:
2697     _Fail("Can't make devices secondary: %s", ",".join(msg))
2698   else:
2699     if instance_name:
2700       _RemoveBlockDevLinks(instance_name, disks)
2701
2702
2703 def ValidateHVParams(hvname, hvparams):
2704   """Validates the given hypervisor parameters.
2705
2706   @type hvname: string
2707   @param hvname: the hypervisor name
2708   @type hvparams: dict
2709   @param hvparams: the hypervisor parameters to be validated
2710   @rtype: None
2711
2712   """
2713   try:
2714     hv_type = hypervisor.GetHypervisor(hvname)
2715     hv_type.ValidateParameters(hvparams)
2716   except errors.HypervisorError, err:
2717     _Fail(str(err), log=False)
2718
2719
2720 def _CheckOSPList(os_obj, parameters):
2721   """Check whether a list of parameters is supported by the OS.
2722
2723   @type os_obj: L{objects.OS}
2724   @param os_obj: OS object to check
2725   @type parameters: list
2726   @param parameters: the list of parameters to check
2727
2728   """
2729   supported = [v[0] for v in os_obj.supported_parameters]
2730   delta = frozenset(parameters).difference(supported)
2731   if delta:
2732     _Fail("The following parameters are not supported"
2733           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2734
2735
2736 def ValidateOS(required, osname, checks, osparams):
2737   """Validate the given OS' parameters.
2738
2739   @type required: boolean
2740   @param required: whether absence of the OS should translate into
2741       failure or not
2742   @type osname: string
2743   @param osname: the OS to be validated
2744   @type checks: list
2745   @param checks: list of the checks to run (currently only 'parameters')
2746   @type osparams: dict
2747   @param osparams: dictionary with OS parameters
2748   @rtype: boolean
2749   @return: True if the validation passed, or False if the OS was not
2750       found and L{required} was false
2751
2752   """
2753   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2754     _Fail("Unknown checks required for OS %s: %s", osname,
2755           set(checks).difference(constants.OS_VALIDATE_CALLS))
2756
2757   name_only = objects.OS.GetName(osname)
2758   status, tbv = _TryOSFromDisk(name_only, None)
2759
2760   if not status:
2761     if required:
2762       _Fail(tbv)
2763     else:
2764       return False
2765
2766   if max(tbv.api_versions) < constants.OS_API_V20:
2767     return True
2768
2769   if constants.OS_VALIDATE_PARAMETERS in checks:
2770     _CheckOSPList(tbv, osparams.keys())
2771
2772   validate_env = OSCoreEnv(osname, tbv, osparams)
2773   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2774                         cwd=tbv.path, reset_env=True)
2775   if result.failed:
2776     logging.error("os validate command '%s' returned error: %s output: %s",
2777                   result.cmd, result.fail_reason, result.output)
2778     _Fail("OS validation script failed (%s), output: %s",
2779           result.fail_reason, result.output, log=False)
2780
2781   return True
2782
2783
2784 def DemoteFromMC():
2785   """Demotes the current node from master candidate role.
2786
2787   """
2788   # try to ensure we're not the master by mistake
2789   master, myself = ssconf.GetMasterAndMyself()
2790   if master == myself:
2791     _Fail("ssconf status shows I'm the master node, will not demote")
2792
2793   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2794   if not result.failed:
2795     _Fail("The master daemon is running, will not demote")
2796
2797   try:
2798     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2799       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2800   except EnvironmentError, err:
2801     if err.errno != errno.ENOENT:
2802       _Fail("Error while backing up cluster file: %s", err, exc=True)
2803
2804   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2805
2806
2807 def _GetX509Filenames(cryptodir, name):
2808   """Returns the full paths for the private key and certificate.
2809
2810   """
2811   return (utils.PathJoin(cryptodir, name),
2812           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2813           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2814
2815
2816 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2817   """Creates a new X509 certificate for SSL/TLS.
2818
2819   @type validity: int
2820   @param validity: Validity in seconds
2821   @rtype: tuple; (string, string)
2822   @return: Certificate name and public part
2823
2824   """
2825   (key_pem, cert_pem) = \
2826     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2827                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2828
2829   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2830                               prefix="x509-%s-" % utils.TimestampForFilename())
2831   try:
2832     name = os.path.basename(cert_dir)
2833     assert len(name) > 5
2834
2835     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2836
2837     utils.WriteFile(key_file, mode=0400, data=key_pem)
2838     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2839
2840     # Never return private key as it shouldn't leave the node
2841     return (name, cert_pem)
2842   except Exception:
2843     shutil.rmtree(cert_dir, ignore_errors=True)
2844     raise
2845
2846
2847 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2848   """Removes a X509 certificate.
2849
2850   @type name: string
2851   @param name: Certificate name
2852
2853   """
2854   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2855
2856   utils.RemoveFile(key_file)
2857   utils.RemoveFile(cert_file)
2858
2859   try:
2860     os.rmdir(cert_dir)
2861   except EnvironmentError, err:
2862     _Fail("Cannot remove certificate directory '%s': %s",
2863           cert_dir, err)
2864
2865
2866 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2867   """Returns the command for the requested input/output.
2868
2869   @type instance: L{objects.Instance}
2870   @param instance: The instance object
2871   @param mode: Import/export mode
2872   @param ieio: Input/output type
2873   @param ieargs: Input/output arguments
2874
2875   """
2876   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2877
2878   env = None
2879   prefix = None
2880   suffix = None
2881   exp_size = None
2882
2883   if ieio == constants.IEIO_FILE:
2884     (filename, ) = ieargs
2885
2886     if not utils.IsNormAbsPath(filename):
2887       _Fail("Path '%s' is not normalized or absolute", filename)
2888
2889     real_filename = os.path.realpath(filename)
2890     directory = os.path.dirname(real_filename)
2891
2892     if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2893       _Fail("File '%s' is not under exports directory '%s': %s",
2894             filename, constants.EXPORT_DIR, real_filename)
2895
2896     # Create directory
2897     utils.Makedirs(directory, mode=0750)
2898
2899     quoted_filename = utils.ShellQuote(filename)
2900
2901     if mode == constants.IEM_IMPORT:
2902       suffix = "> %s" % quoted_filename
2903     elif mode == constants.IEM_EXPORT:
2904       suffix = "< %s" % quoted_filename
2905
2906       # Retrieve file size
2907       try:
2908         st = os.stat(filename)
2909       except EnvironmentError, err:
2910         logging.error("Can't stat(2) %s: %s", filename, err)
2911       else:
2912         exp_size = utils.BytesToMebibyte(st.st_size)
2913
2914   elif ieio == constants.IEIO_RAW_DISK:
2915     (disk, ) = ieargs
2916
2917     real_disk = _OpenRealBD(disk)
2918
2919     if mode == constants.IEM_IMPORT:
2920       # we set here a smaller block size as, due to transport buffering, more
2921       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2922       # is not already there or we pass a wrong path; we use notrunc to no
2923       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2924       # much memory; this means that at best, we flush every 64k, which will
2925       # not be very fast
2926       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2927                                     " bs=%s oflag=dsync"),
2928                                     real_disk.dev_path,
2929                                     str(64 * 1024))
2930
2931     elif mode == constants.IEM_EXPORT:
2932       # the block size on the read dd is 1MiB to match our units
2933       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2934                                    real_disk.dev_path,
2935                                    str(1024 * 1024), # 1 MB
2936                                    str(disk.size))
2937       exp_size = disk.size
2938
2939   elif ieio == constants.IEIO_SCRIPT:
2940     (disk, disk_index, ) = ieargs
2941
2942     assert isinstance(disk_index, (int, long))
2943
2944     real_disk = _OpenRealBD(disk)
2945
2946     inst_os = OSFromDisk(instance.os)
2947     env = OSEnvironment(instance, inst_os)
2948
2949     if mode == constants.IEM_IMPORT:
2950       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2951       env["IMPORT_INDEX"] = str(disk_index)
2952       script = inst_os.import_script
2953
2954     elif mode == constants.IEM_EXPORT:
2955       env["EXPORT_DEVICE"] = real_disk.dev_path
2956       env["EXPORT_INDEX"] = str(disk_index)
2957       script = inst_os.export_script
2958
2959     # TODO: Pass special environment only to script
2960     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2961
2962     if mode == constants.IEM_IMPORT:
2963       suffix = "| %s" % script_cmd
2964
2965     elif mode == constants.IEM_EXPORT:
2966       prefix = "%s |" % script_cmd
2967
2968     # Let script predict size
2969     exp_size = constants.IE_CUSTOM_SIZE
2970
2971   else:
2972     _Fail("Invalid %s I/O mode %r", mode, ieio)
2973
2974   return (env, prefix, suffix, exp_size)
2975
2976
2977 def _CreateImportExportStatusDir(prefix):
2978   """Creates status directory for import/export.
2979
2980   """
2981   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2982                           prefix=("%s-%s-" %
2983                                   (prefix, utils.TimestampForFilename())))
2984
2985
2986 def StartImportExportDaemon(mode, opts, host, port, instance, component,
2987                             ieio, ieioargs):
2988   """Starts an import or export daemon.
2989
2990   @param mode: Import/output mode
2991   @type opts: L{objects.ImportExportOptions}
2992   @param opts: Daemon options
2993   @type host: string
2994   @param host: Remote host for export (None for import)
2995   @type port: int
2996   @param port: Remote port for export (None for import)
2997   @type instance: L{objects.Instance}
2998   @param instance: Instance object
2999   @type component: string
3000   @param component: which part of the instance is transferred now,
3001       e.g. 'disk/0'
3002   @param ieio: Input/output type
3003   @param ieioargs: Input/output arguments
3004
3005   """
3006   if mode == constants.IEM_IMPORT:
3007     prefix = "import"
3008
3009     if not (host is None and port is None):
3010       _Fail("Can not specify host or port on import")
3011
3012   elif mode == constants.IEM_EXPORT:
3013     prefix = "export"
3014
3015     if host is None or port is None:
3016       _Fail("Host and port must be specified for an export")
3017
3018   else:
3019     _Fail("Invalid mode %r", mode)
3020
3021   if (opts.key_name is None) ^ (opts.ca_pem is None):
3022     _Fail("Cluster certificate can only be used for both key and CA")
3023
3024   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3025     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3026
3027   if opts.key_name is None:
3028     # Use server.pem
3029     key_path = constants.NODED_CERT_FILE
3030     cert_path = constants.NODED_CERT_FILE
3031     assert opts.ca_pem is None
3032   else:
3033     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3034                                                  opts.key_name)
3035     assert opts.ca_pem is not None
3036
3037   for i in [key_path, cert_path]:
3038     if not os.path.exists(i):
3039       _Fail("File '%s' does not exist" % i)
3040
3041   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3042   try:
3043     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3044     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3045     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3046
3047     if opts.ca_pem is None:
3048       # Use server.pem
3049       ca = utils.ReadFile(constants.NODED_CERT_FILE)
3050     else:
3051       ca = opts.ca_pem
3052
3053     # Write CA file
3054     utils.WriteFile(ca_file, data=ca, mode=0400)
3055
3056     cmd = [
3057       constants.IMPORT_EXPORT_DAEMON,
3058       status_file, mode,
3059       "--key=%s" % key_path,
3060       "--cert=%s" % cert_path,
3061       "--ca=%s" % ca_file,
3062       ]
3063
3064     if host:
3065       cmd.append("--host=%s" % host)
3066
3067     if port:
3068       cmd.append("--port=%s" % port)
3069
3070     if opts.ipv6:
3071       cmd.append("--ipv6")
3072     else:
3073       cmd.append("--ipv4")
3074
3075     if opts.compress:
3076       cmd.append("--compress=%s" % opts.compress)
3077
3078     if opts.magic:
3079       cmd.append("--magic=%s" % opts.magic)
3080
3081     if exp_size is not None:
3082       cmd.append("--expected-size=%s" % exp_size)
3083
3084     if cmd_prefix:
3085       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3086
3087     if cmd_suffix:
3088       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3089
3090     if mode == constants.IEM_EXPORT:
3091       # Retry connection a few times when connecting to remote peer
3092       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3093       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3094     elif opts.connect_timeout is not None:
3095       assert mode == constants.IEM_IMPORT
3096       # Overall timeout for establishing connection while listening
3097       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3098
3099     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3100
3101     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3102     # support for receiving a file descriptor for output
3103     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3104                       output=logfile)
3105
3106     # The import/export name is simply the status directory name
3107     return os.path.basename(status_dir)
3108
3109   except Exception:
3110     shutil.rmtree(status_dir, ignore_errors=True)
3111     raise
3112
3113
3114 def GetImportExportStatus(names):
3115   """Returns import/export daemon status.
3116
3117   @type names: sequence
3118   @param names: List of names
3119   @rtype: List of dicts
3120   @return: Returns a list of the state of each named import/export or None if a
3121            status couldn't be read
3122
3123   """
3124   result = []
3125
3126   for name in names:
3127     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3128                                  _IES_STATUS_FILE)
3129
3130     try:
3131       data = utils.ReadFile(status_file)
3132     except EnvironmentError, err:
3133       if err.errno != errno.ENOENT:
3134         raise
3135       data = None
3136
3137     if not data:
3138       result.append(None)
3139       continue
3140
3141     result.append(serializer.LoadJson(data))
3142
3143   return result
3144
3145
3146 def AbortImportExport(name):
3147   """Sends SIGTERM to a running import/export daemon.
3148
3149   """
3150   logging.info("Abort import/export %s", name)
3151
3152   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3153   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3154
3155   if pid:
3156     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3157                  name, pid)
3158     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3159
3160
3161 def CleanupImportExport(name):
3162   """Cleanup after an import or export.
3163
3164   If the import/export daemon is still running it's killed. Afterwards the
3165   whole status directory is removed.
3166
3167   """
3168   logging.info("Finalizing import/export %s", name)
3169
3170   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3171
3172   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3173
3174   if pid:
3175     logging.info("Import/export %s is still running with PID %s",
3176                  name, pid)
3177     utils.KillProcess(pid, waitpid=False)
3178
3179   shutil.rmtree(status_dir, ignore_errors=True)
3180
3181
3182 def _FindDisks(nodes_ip, disks):
3183   """Sets the physical ID on disks and returns the block devices.
3184
3185   """
3186   # set the correct physical ID
3187   my_name = netutils.Hostname.GetSysName()
3188   for cf in disks:
3189     cf.SetPhysicalID(my_name, nodes_ip)
3190
3191   bdevs = []
3192
3193   for cf in disks:
3194     rd = _RecursiveFindBD(cf)
3195     if rd is None:
3196       _Fail("Can't find device %s", cf)
3197     bdevs.append(rd)
3198   return bdevs
3199
3200
3201 def DrbdDisconnectNet(nodes_ip, disks):
3202   """Disconnects the network on a list of drbd devices.
3203
3204   """
3205   bdevs = _FindDisks(nodes_ip, disks)
3206
3207   # disconnect disks
3208   for rd in bdevs:
3209     try:
3210       rd.DisconnectNet()
3211     except errors.BlockDeviceError, err:
3212       _Fail("Can't change network configuration to standalone mode: %s",
3213             err, exc=True)
3214
3215
3216 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3217   """Attaches the network on a list of drbd devices.
3218
3219   """
3220   bdevs = _FindDisks(nodes_ip, disks)
3221
3222   if multimaster:
3223     for idx, rd in enumerate(bdevs):
3224       try:
3225         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3226       except EnvironmentError, err:
3227         _Fail("Can't create symlink: %s", err)
3228   # reconnect disks, switch to new master configuration and if
3229   # needed primary mode
3230   for rd in bdevs:
3231     try:
3232       rd.AttachNet(multimaster)
3233     except errors.BlockDeviceError, err:
3234       _Fail("Can't change network configuration: %s", err)
3235
3236   # wait until the disks are connected; we need to retry the re-attach
3237   # if the device becomes standalone, as this might happen if the one
3238   # node disconnects and reconnects in a different mode before the
3239   # other node reconnects; in this case, one or both of the nodes will
3240   # decide it has wrong configuration and switch to standalone
3241
3242   def _Attach():
3243     all_connected = True
3244
3245     for rd in bdevs:
3246       stats = rd.GetProcStatus()
3247
3248       all_connected = (all_connected and
3249                        (stats.is_connected or stats.is_in_resync))
3250
3251       if stats.is_standalone:
3252         # peer had different config info and this node became
3253         # standalone, even though this should not happen with the
3254         # new staged way of changing disk configs
3255         try:
3256           rd.AttachNet(multimaster)
3257         except errors.BlockDeviceError, err:
3258           _Fail("Can't change network configuration: %s", err)
3259
3260     if not all_connected:
3261       raise utils.RetryAgain()
3262
3263   try:
3264     # Start with a delay of 100 miliseconds and go up to 5 seconds
3265     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3266   except utils.RetryTimeout:
3267     _Fail("Timeout in disk reconnecting")
3268
3269   if multimaster:
3270     # change to primary mode
3271     for rd in bdevs:
3272       try:
3273         rd.Open()
3274       except errors.BlockDeviceError, err:
3275         _Fail("Can't change to primary mode: %s", err)
3276
3277
3278 def DrbdWaitSync(nodes_ip, disks):
3279   """Wait until DRBDs have synchronized.
3280
3281   """
3282   def _helper(rd):
3283     stats = rd.GetProcStatus()
3284     if not (stats.is_connected or stats.is_in_resync):
3285       raise utils.RetryAgain()
3286     return stats
3287
3288   bdevs = _FindDisks(nodes_ip, disks)
3289
3290   min_resync = 100
3291   alldone = True
3292   for rd in bdevs:
3293     try:
3294       # poll each second for 15 seconds
3295       stats = utils.Retry(_helper, 1, 15, args=[rd])
3296     except utils.RetryTimeout:
3297       stats = rd.GetProcStatus()
3298       # last check
3299       if not (stats.is_connected or stats.is_in_resync):
3300         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3301     alldone = alldone and (not stats.is_in_resync)
3302     if stats.sync_percent is not None:
3303       min_resync = min(min_resync, stats.sync_percent)
3304
3305   return (alldone, min_resync)
3306
3307
3308 def GetDrbdUsermodeHelper():
3309   """Returns DRBD usermode helper currently configured.
3310
3311   """
3312   try:
3313     return bdev.BaseDRBD.GetUsermodeHelper()
3314   except errors.BlockDeviceError, err:
3315     _Fail(str(err))
3316
3317
3318 def PowercycleNode(hypervisor_type):
3319   """Hard-powercycle the node.
3320
3321   Because we need to return first, and schedule the powercycle in the
3322   background, we won't be able to report failures nicely.
3323
3324   """
3325   hyper = hypervisor.GetHypervisor(hypervisor_type)
3326   try:
3327     pid = os.fork()
3328   except OSError:
3329     # if we can't fork, we'll pretend that we're in the child process
3330     pid = 0
3331   if pid > 0:
3332     return "Reboot scheduled in 5 seconds"
3333   # ensure the child is running on ram
3334   try:
3335     utils.Mlockall()
3336   except Exception: # pylint: disable=W0703
3337     pass
3338   time.sleep(5)
3339   hyper.PowercycleNode()
3340
3341
3342 class HooksRunner(object):
3343   """Hook runner.
3344
3345   This class is instantiated on the node side (ganeti-noded) and not
3346   on the master side.
3347
3348   """
3349   def __init__(self, hooks_base_dir=None):
3350     """Constructor for hooks runner.
3351
3352     @type hooks_base_dir: str or None
3353     @param hooks_base_dir: if not None, this overrides the
3354         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3355
3356     """
3357     if hooks_base_dir is None:
3358       hooks_base_dir = constants.HOOKS_BASE_DIR
3359     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3360     # constant
3361     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3362
3363   def RunHooks(self, hpath, phase, env):
3364     """Run the scripts in the hooks directory.
3365
3366     @type hpath: str
3367     @param hpath: the path to the hooks directory which
3368         holds the scripts
3369     @type phase: str
3370     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3371         L{constants.HOOKS_PHASE_POST}
3372     @type env: dict
3373     @param env: dictionary with the environment for the hook
3374     @rtype: list
3375     @return: list of 3-element tuples:
3376       - script path
3377       - script result, either L{constants.HKR_SUCCESS} or
3378         L{constants.HKR_FAIL}
3379       - output of the script
3380
3381     @raise errors.ProgrammerError: for invalid input
3382         parameters
3383
3384     """
3385     if phase == constants.HOOKS_PHASE_PRE:
3386       suffix = "pre"
3387     elif phase == constants.HOOKS_PHASE_POST:
3388       suffix = "post"
3389     else:
3390       _Fail("Unknown hooks phase '%s'", phase)
3391
3392     subdir = "%s-%s.d" % (hpath, suffix)
3393     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3394
3395     results = []
3396
3397     if not os.path.isdir(dir_name):
3398       # for non-existing/non-dirs, we simply exit instead of logging a
3399       # warning at every operation
3400       return results
3401
3402     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3403
3404     for (relname, relstatus, runresult)  in runparts_results:
3405       if relstatus == constants.RUNPARTS_SKIP:
3406         rrval = constants.HKR_SKIP
3407         output = ""
3408       elif relstatus == constants.RUNPARTS_ERR:
3409         rrval = constants.HKR_FAIL
3410         output = "Hook script execution error: %s" % runresult
3411       elif relstatus == constants.RUNPARTS_RUN:
3412         if runresult.failed:
3413           rrval = constants.HKR_FAIL
3414         else:
3415           rrval = constants.HKR_SUCCESS
3416         output = utils.SafeEncode(runresult.output.strip())
3417       results.append(("%s/%s" % (subdir, relname), rrval, output))
3418
3419     return results
3420
3421
3422 class IAllocatorRunner(object):
3423   """IAllocator runner.
3424
3425   This class is instantiated on the node side (ganeti-noded) and not on
3426   the master side.
3427
3428   """
3429   @staticmethod
3430   def Run(name, idata):
3431     """Run an iallocator script.
3432
3433     @type name: str
3434     @param name: the iallocator script name
3435     @type idata: str
3436     @param idata: the allocator input data
3437
3438     @rtype: tuple
3439     @return: two element tuple of:
3440        - status
3441        - either error message or stdout of allocator (for success)
3442
3443     """
3444     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3445                                   os.path.isfile)
3446     if alloc_script is None:
3447       _Fail("iallocator module '%s' not found in the search path", name)
3448
3449     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3450     try:
3451       os.write(fd, idata)
3452       os.close(fd)
3453       result = utils.RunCmd([alloc_script, fin_name])
3454       if result.failed:
3455         _Fail("iallocator module '%s' failed: %s, output '%s'",
3456               name, result.fail_reason, result.output)
3457     finally:
3458       os.unlink(fin_name)
3459
3460     return result.stdout
3461
3462
3463 class DevCacheManager(object):
3464   """Simple class for managing a cache of block device information.
3465
3466   """
3467   _DEV_PREFIX = "/dev/"
3468   _ROOT_DIR = constants.BDEV_CACHE_DIR
3469
3470   @classmethod
3471   def _ConvertPath(cls, dev_path):
3472     """Converts a /dev/name path to the cache file name.
3473
3474     This replaces slashes with underscores and strips the /dev
3475     prefix. It then returns the full path to the cache file.
3476
3477     @type dev_path: str
3478     @param dev_path: the C{/dev/} path name
3479     @rtype: str
3480     @return: the converted path name
3481
3482     """
3483     if dev_path.startswith(cls._DEV_PREFIX):
3484       dev_path = dev_path[len(cls._DEV_PREFIX):]
3485     dev_path = dev_path.replace("/", "_")
3486     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3487     return fpath
3488
3489   @classmethod
3490   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3491     """Updates the cache information for a given device.
3492
3493     @type dev_path: str
3494     @param dev_path: the pathname of the device
3495     @type owner: str
3496     @param owner: the owner (instance name) of the device
3497     @type on_primary: bool
3498     @param on_primary: whether this is the primary
3499         node nor not
3500     @type iv_name: str
3501     @param iv_name: the instance-visible name of the
3502         device, as in objects.Disk.iv_name
3503
3504     @rtype: None
3505
3506     """
3507     if dev_path is None:
3508       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3509       return
3510     fpath = cls._ConvertPath(dev_path)
3511     if on_primary:
3512       state = "primary"
3513     else:
3514       state = "secondary"
3515     if iv_name is None:
3516       iv_name = "not_visible"
3517     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3518     try:
3519       utils.WriteFile(fpath, data=fdata)
3520     except EnvironmentError, err:
3521       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3522
3523   @classmethod
3524   def RemoveCache(cls, dev_path):
3525     """Remove data for a dev_path.
3526
3527     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3528     path name and logging.
3529
3530     @type dev_path: str
3531     @param dev_path: the pathname of the device
3532
3533     @rtype: None
3534
3535     """
3536     if dev_path is None:
3537       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3538       return
3539     fpath = cls._ConvertPath(dev_path)
3540     try:
3541       utils.RemoveFile(fpath)
3542     except EnvironmentError, err:
3543       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)