KVM: Check instances for actual liveness
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50
51 from ganeti import errors
52 from ganeti import utils
53 from ganeti import ssh
54 from ganeti import hypervisor
55 from ganeti import constants
56 from ganeti import bdev
57 from ganeti import objects
58 from ganeti import ssconf
59
60
61 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62 _ALLOWED_CLEAN_DIRS = frozenset([
63   constants.DATA_DIR,
64   constants.JOB_QUEUE_ARCHIVE_DIR,
65   constants.QUEUE_DIR,
66   ])
67
68
69 class RPCFail(Exception):
70   """Class denoting RPC failure.
71
72   Its argument is the error message.
73
74   """
75
76
77 def _Fail(msg, *args, **kwargs):
78   """Log an error and the raise an RPCFail exception.
79
80   This exception is then handled specially in the ganeti daemon and
81   turned into a 'failed' return type. As such, this function is a
82   useful shortcut for logging the error and returning it to the master
83   daemon.
84
85   @type msg: string
86   @param msg: the text of the exception
87   @raise RPCFail
88
89   """
90   if args:
91     msg = msg % args
92   if "log" not in kwargs or kwargs["log"]: # if we should log this error
93     if "exc" in kwargs and kwargs["exc"]:
94       logging.exception(msg)
95     else:
96       logging.error(msg)
97   raise RPCFail(msg)
98
99
100 def _GetConfig():
101   """Simple wrapper to return a SimpleStore.
102
103   @rtype: L{ssconf.SimpleStore}
104   @return: a SimpleStore instance
105
106   """
107   return ssconf.SimpleStore()
108
109
110 def _GetSshRunner(cluster_name):
111   """Simple wrapper to return an SshRunner.
112
113   @type cluster_name: str
114   @param cluster_name: the cluster name, which is needed
115       by the SshRunner constructor
116   @rtype: L{ssh.SshRunner}
117   @return: an SshRunner instance
118
119   """
120   return ssh.SshRunner(cluster_name)
121
122
123 def _Decompress(data):
124   """Unpacks data compressed by the RPC client.
125
126   @type data: list or tuple
127   @param data: Data sent by RPC client
128   @rtype: str
129   @return: Decompressed data
130
131   """
132   assert isinstance(data, (list, tuple))
133   assert len(data) == 2
134   (encoding, content) = data
135   if encoding == constants.RPC_ENCODING_NONE:
136     return content
137   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
138     return zlib.decompress(base64.b64decode(content))
139   else:
140     raise AssertionError("Unknown data encoding")
141
142
143 def _CleanDirectory(path, exclude=None):
144   """Removes all regular files in a directory.
145
146   @type path: str
147   @param path: the directory to clean
148   @type exclude: list
149   @param exclude: list of files to be excluded, defaults
150       to the empty list
151
152   """
153   if path not in _ALLOWED_CLEAN_DIRS:
154     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
155           path)
156
157   if not os.path.isdir(path):
158     return
159   if exclude is None:
160     exclude = []
161   else:
162     # Normalize excluded paths
163     exclude = [os.path.normpath(i) for i in exclude]
164
165   for rel_name in utils.ListVisibleFiles(path):
166     full_name = utils.PathJoin(path, rel_name)
167     if full_name in exclude:
168       continue
169     if os.path.isfile(full_name) and not os.path.islink(full_name):
170       utils.RemoveFile(full_name)
171
172
173 def _BuildUploadFileList():
174   """Build the list of allowed upload files.
175
176   This is abstracted so that it's built only once at module import time.
177
178   """
179   allowed_files = set([
180     constants.CLUSTER_CONF_FILE,
181     constants.ETC_HOSTS,
182     constants.SSH_KNOWN_HOSTS_FILE,
183     constants.VNC_PASSWORD_FILE,
184     constants.RAPI_CERT_FILE,
185     constants.RAPI_USERS_FILE,
186     constants.CONFD_HMAC_KEY,
187     ])
188
189   for hv_name in constants.HYPER_TYPES:
190     hv_class = hypervisor.GetHypervisorClass(hv_name)
191     allowed_files.update(hv_class.GetAncillaryFiles())
192
193   return frozenset(allowed_files)
194
195
196 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
197
198
199 def JobQueuePurge():
200   """Removes job queue files and archived jobs.
201
202   @rtype: tuple
203   @return: True, None
204
205   """
206   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
207   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
208
209
210 def GetMasterInfo():
211   """Returns master information.
212
213   This is an utility function to compute master information, either
214   for consumption here or from the node daemon.
215
216   @rtype: tuple
217   @return: master_netdev, master_ip, master_name
218   @raise RPCFail: in case of errors
219
220   """
221   try:
222     cfg = _GetConfig()
223     master_netdev = cfg.GetMasterNetdev()
224     master_ip = cfg.GetMasterIP()
225     master_node = cfg.GetMasterNode()
226   except errors.ConfigurationError, err:
227     _Fail("Cluster configuration incomplete: %s", err, exc=True)
228   return (master_netdev, master_ip, master_node)
229
230
231 def StartMaster(start_daemons, no_voting):
232   """Activate local node as master node.
233
234   The function will always try activate the IP address of the master
235   (unless someone else has it). It will also start the master daemons,
236   based on the start_daemons parameter.
237
238   @type start_daemons: boolean
239   @param start_daemons: whether to also start the master
240       daemons (ganeti-masterd and ganeti-rapi)
241   @type no_voting: boolean
242   @param no_voting: whether to start ganeti-masterd without a node vote
243       (if start_daemons is True), but still non-interactively
244   @rtype: None
245
246   """
247   # GetMasterInfo will raise an exception if not able to return data
248   master_netdev, master_ip, _ = GetMasterInfo()
249
250   err_msgs = []
251   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
252     if utils.OwnIpAddress(master_ip):
253       # we already have the ip:
254       logging.debug("Master IP already configured, doing nothing")
255     else:
256       msg = "Someone else has the master ip, not activating"
257       logging.error(msg)
258       err_msgs.append(msg)
259   else:
260     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
261                            "dev", master_netdev, "label",
262                            "%s:0" % master_netdev])
263     if result.failed:
264       msg = "Can't activate master IP: %s" % result.output
265       logging.error(msg)
266       err_msgs.append(msg)
267
268     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
269                            "-s", master_ip, master_ip])
270     # we'll ignore the exit code of arping
271
272   # and now start the master and rapi daemons
273   if start_daemons:
274     if no_voting:
275       masterd_args = "--no-voting --yes-do-it"
276     else:
277       masterd_args = ""
278
279     env = {
280       "EXTRA_MASTERD_ARGS": masterd_args,
281       }
282
283     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
284     if result.failed:
285       msg = "Can't start Ganeti master: %s" % result.output
286       logging.error(msg)
287       err_msgs.append(msg)
288
289   if err_msgs:
290     _Fail("; ".join(err_msgs))
291
292
293 def StopMaster(stop_daemons):
294   """Deactivate this node as master.
295
296   The function will always try to deactivate the IP address of the
297   master. It will also stop the master daemons depending on the
298   stop_daemons parameter.
299
300   @type stop_daemons: boolean
301   @param stop_daemons: whether to also stop the master daemons
302       (ganeti-masterd and ganeti-rapi)
303   @rtype: None
304
305   """
306   # TODO: log and report back to the caller the error failures; we
307   # need to decide in which case we fail the RPC for this
308
309   # GetMasterInfo will raise an exception if not able to return data
310   master_netdev, master_ip, _ = GetMasterInfo()
311
312   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
313                          "dev", master_netdev])
314   if result.failed:
315     logging.error("Can't remove the master IP, error: %s", result.output)
316     # but otherwise ignore the failure
317
318   if stop_daemons:
319     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
320     if result.failed:
321       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
322                     " and error %s",
323                     result.cmd, result.exit_code, result.output)
324
325
326 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
327   """Joins this node to the cluster.
328
329   This does the following:
330       - updates the hostkeys of the machine (rsa and dsa)
331       - adds the ssh private key to the user
332       - adds the ssh public key to the users' authorized_keys file
333
334   @type dsa: str
335   @param dsa: the DSA private key to write
336   @type dsapub: str
337   @param dsapub: the DSA public key to write
338   @type rsa: str
339   @param rsa: the RSA private key to write
340   @type rsapub: str
341   @param rsapub: the RSA public key to write
342   @type sshkey: str
343   @param sshkey: the SSH private key to write
344   @type sshpub: str
345   @param sshpub: the SSH public key to write
346   @rtype: boolean
347   @return: the success of the operation
348
349   """
350   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
351                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
352                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
353                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
354   for name, content, mode in sshd_keys:
355     utils.WriteFile(name, data=content, mode=mode)
356
357   try:
358     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
359                                                     mkdir=True)
360   except errors.OpExecError, err:
361     _Fail("Error while processing user ssh files: %s", err, exc=True)
362
363   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
364     utils.WriteFile(name, data=content, mode=0600)
365
366   utils.AddAuthorizedKey(auth_keys, sshpub)
367
368   result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
369   if result.failed:
370     _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
371           result.cmd, result.exit_code, result.output)
372
373
374 def LeaveCluster(modify_ssh_setup):
375   """Cleans up and remove the current node.
376
377   This function cleans up and prepares the current node to be removed
378   from the cluster.
379
380   If processing is successful, then it raises an
381   L{errors.QuitGanetiException} which is used as a special case to
382   shutdown the node daemon.
383
384   @param modify_ssh_setup: boolean
385
386   """
387   _CleanDirectory(constants.DATA_DIR)
388   JobQueuePurge()
389
390   if modify_ssh_setup:
391     try:
392       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
393
394       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
395
396       utils.RemoveFile(priv_key)
397       utils.RemoveFile(pub_key)
398     except errors.OpExecError:
399       logging.exception("Error while processing ssh files")
400
401   try:
402     utils.RemoveFile(constants.CONFD_HMAC_KEY)
403     utils.RemoveFile(constants.RAPI_CERT_FILE)
404     utils.RemoveFile(constants.NODED_CERT_FILE)
405   except: # pylint: disable-msg=W0702
406     logging.exception("Error while removing cluster secrets")
407
408   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
409   if result.failed:
410     logging.error("Command %s failed with exitcode %s and error %s",
411                   result.cmd, result.exit_code, result.output)
412
413   # Raise a custom exception (handled in ganeti-noded)
414   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
415
416
417 def GetNodeInfo(vgname, hypervisor_type):
418   """Gives back a hash with different information about the node.
419
420   @type vgname: C{string}
421   @param vgname: the name of the volume group to ask for disk space information
422   @type hypervisor_type: C{str}
423   @param hypervisor_type: the name of the hypervisor to ask for
424       memory information
425   @rtype: C{dict}
426   @return: dictionary with the following keys:
427       - vg_size is the size of the configured volume group in MiB
428       - vg_free is the free size of the volume group in MiB
429       - memory_dom0 is the memory allocated for domain0 in MiB
430       - memory_free is the currently available (free) ram in MiB
431       - memory_total is the total number of ram in MiB
432
433   """
434   outputarray = {}
435   vginfo = _GetVGInfo(vgname)
436   outputarray['vg_size'] = vginfo['vg_size']
437   outputarray['vg_free'] = vginfo['vg_free']
438
439   hyper = hypervisor.GetHypervisor(hypervisor_type)
440   hyp_info = hyper.GetNodeInfo()
441   if hyp_info is not None:
442     outputarray.update(hyp_info)
443
444   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
445
446   return outputarray
447
448
449 def VerifyNode(what, cluster_name):
450   """Verify the status of the local node.
451
452   Based on the input L{what} parameter, various checks are done on the
453   local node.
454
455   If the I{filelist} key is present, this list of
456   files is checksummed and the file/checksum pairs are returned.
457
458   If the I{nodelist} key is present, we check that we have
459   connectivity via ssh with the target nodes (and check the hostname
460   report).
461
462   If the I{node-net-test} key is present, we check that we have
463   connectivity to the given nodes via both primary IP and, if
464   applicable, secondary IPs.
465
466   @type what: C{dict}
467   @param what: a dictionary of things to check:
468       - filelist: list of files for which to compute checksums
469       - nodelist: list of nodes we should check ssh communication with
470       - node-net-test: list of nodes we should check node daemon port
471         connectivity with
472       - hypervisor: list with hypervisors to run the verify for
473   @rtype: dict
474   @return: a dictionary with the same keys as the input dict, and
475       values representing the result of the checks
476
477   """
478   result = {}
479
480   if constants.NV_HYPERVISOR in what:
481     result[constants.NV_HYPERVISOR] = tmp = {}
482     for hv_name in what[constants.NV_HYPERVISOR]:
483       try:
484         val = hypervisor.GetHypervisor(hv_name).Verify()
485       except errors.HypervisorError, err:
486         val = "Error while checking hypervisor: %s" % str(err)
487       tmp[hv_name] = val
488
489   if constants.NV_FILELIST in what:
490     result[constants.NV_FILELIST] = utils.FingerprintFiles(
491       what[constants.NV_FILELIST])
492
493   if constants.NV_NODELIST in what:
494     result[constants.NV_NODELIST] = tmp = {}
495     random.shuffle(what[constants.NV_NODELIST])
496     for node in what[constants.NV_NODELIST]:
497       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
498       if not success:
499         tmp[node] = message
500
501   if constants.NV_NODENETTEST in what:
502     result[constants.NV_NODENETTEST] = tmp = {}
503     my_name = utils.HostInfo().name
504     my_pip = my_sip = None
505     for name, pip, sip in what[constants.NV_NODENETTEST]:
506       if name == my_name:
507         my_pip = pip
508         my_sip = sip
509         break
510     if not my_pip:
511       tmp[my_name] = ("Can't find my own primary/secondary IP"
512                       " in the node list")
513     else:
514       port = utils.GetDaemonPort(constants.NODED)
515       for name, pip, sip in what[constants.NV_NODENETTEST]:
516         fail = []
517         if not utils.TcpPing(pip, port, source=my_pip):
518           fail.append("primary")
519         if sip != pip:
520           if not utils.TcpPing(sip, port, source=my_sip):
521             fail.append("secondary")
522         if fail:
523           tmp[name] = ("failure using the %s interface(s)" %
524                        " and ".join(fail))
525
526   if constants.NV_LVLIST in what:
527     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
528
529   if constants.NV_INSTANCELIST in what:
530     # GetInstanceList can fail
531     try:
532       val = GetInstanceList(what[constants.NV_INSTANCELIST])
533     except RPCFail, err:
534       val = str(err)
535     result[constants.NV_INSTANCELIST] = val
536
537   if constants.NV_VGLIST in what:
538     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
539
540   if constants.NV_PVLIST in what:
541     result[constants.NV_PVLIST] = \
542       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
543                                    filter_allocatable=False)
544
545   if constants.NV_VERSION in what:
546     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
547                                     constants.RELEASE_VERSION)
548
549   if constants.NV_HVINFO in what:
550     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
551     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
552
553   if constants.NV_DRBDLIST in what:
554     try:
555       used_minors = bdev.DRBD8.GetUsedDevs().keys()
556     except errors.BlockDeviceError, err:
557       logging.warning("Can't get used minors list", exc_info=True)
558       used_minors = str(err)
559     result[constants.NV_DRBDLIST] = used_minors
560
561   if constants.NV_NODESETUP in what:
562     result[constants.NV_NODESETUP] = tmpr = []
563     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
564       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
565                   " under /sys, missing required directories /sys/block"
566                   " and /sys/class/net")
567     if (not os.path.isdir("/proc/sys") or
568         not os.path.isfile("/proc/sysrq-trigger")):
569       tmpr.append("The procfs filesystem doesn't seem to be mounted"
570                   " under /proc, missing required directory /proc/sys and"
571                   " the file /proc/sysrq-trigger")
572
573   if constants.NV_TIME in what:
574     result[constants.NV_TIME] = utils.SplitTime(time.time())
575
576   return result
577
578
579 def GetVolumeList(vg_name):
580   """Compute list of logical volumes and their size.
581
582   @type vg_name: str
583   @param vg_name: the volume group whose LVs we should list
584   @rtype: dict
585   @return:
586       dictionary of all partions (key) with value being a tuple of
587       their size (in MiB), inactive and online status::
588
589         {'test1': ('20.06', True, True)}
590
591       in case of errors, a string is returned with the error
592       details.
593
594   """
595   lvs = {}
596   sep = '|'
597   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
598                          "--separator=%s" % sep,
599                          "-olv_name,lv_size,lv_attr", vg_name])
600   if result.failed:
601     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
602
603   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
604   for line in result.stdout.splitlines():
605     line = line.strip()
606     match = valid_line_re.match(line)
607     if not match:
608       logging.error("Invalid line returned from lvs output: '%s'", line)
609       continue
610     name, size, attr = match.groups()
611     inactive = attr[4] == '-'
612     online = attr[5] == 'o'
613     virtual = attr[0] == 'v'
614     if virtual:
615       # we don't want to report such volumes as existing, since they
616       # don't really hold data
617       continue
618     lvs[name] = (size, inactive, online)
619
620   return lvs
621
622
623 def ListVolumeGroups():
624   """List the volume groups and their size.
625
626   @rtype: dict
627   @return: dictionary with keys volume name and values the
628       size of the volume
629
630   """
631   return utils.ListVolumeGroups()
632
633
634 def NodeVolumes():
635   """List all volumes on this node.
636
637   @rtype: list
638   @return:
639     A list of dictionaries, each having four keys:
640       - name: the logical volume name,
641       - size: the size of the logical volume
642       - dev: the physical device on which the LV lives
643       - vg: the volume group to which it belongs
644
645     In case of errors, we return an empty list and log the
646     error.
647
648     Note that since a logical volume can live on multiple physical
649     volumes, the resulting list might include a logical volume
650     multiple times.
651
652   """
653   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
654                          "--separator=|",
655                          "--options=lv_name,lv_size,devices,vg_name"])
656   if result.failed:
657     _Fail("Failed to list logical volumes, lvs output: %s",
658           result.output)
659
660   def parse_dev(dev):
661     return dev.split('(')[0]
662
663   def handle_dev(dev):
664     return [parse_dev(x) for x in dev.split(",")]
665
666   def map_line(line):
667     line = [v.strip() for v in line]
668     return [{'name': line[0], 'size': line[1],
669              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
670
671   all_devs = []
672   for line in result.stdout.splitlines():
673     if line.count('|') >= 3:
674       all_devs.extend(map_line(line.split('|')))
675     else:
676       logging.warning("Strange line in the output from lvs: '%s'", line)
677   return all_devs
678
679
680 def BridgesExist(bridges_list):
681   """Check if a list of bridges exist on the current node.
682
683   @rtype: boolean
684   @return: C{True} if all of them exist, C{False} otherwise
685
686   """
687   missing = []
688   for bridge in bridges_list:
689     if not utils.BridgeExists(bridge):
690       missing.append(bridge)
691
692   if missing:
693     _Fail("Missing bridges %s", utils.CommaJoin(missing))
694
695
696 def GetInstanceList(hypervisor_list):
697   """Provides a list of instances.
698
699   @type hypervisor_list: list
700   @param hypervisor_list: the list of hypervisors to query information
701
702   @rtype: list
703   @return: a list of all running instances on the current node
704     - instance1.example.com
705     - instance2.example.com
706
707   """
708   results = []
709   for hname in hypervisor_list:
710     try:
711       names = hypervisor.GetHypervisor(hname).ListInstances()
712       results.extend(names)
713     except errors.HypervisorError, err:
714       _Fail("Error enumerating instances (hypervisor %s): %s",
715             hname, err, exc=True)
716
717   return results
718
719
720 def GetInstanceInfo(instance, hname):
721   """Gives back the information about an instance as a dictionary.
722
723   @type instance: string
724   @param instance: the instance name
725   @type hname: string
726   @param hname: the hypervisor type of the instance
727
728   @rtype: dict
729   @return: dictionary with the following keys:
730       - memory: memory size of instance (int)
731       - state: xen state of instance (string)
732       - time: cpu time of instance (float)
733
734   """
735   output = {}
736
737   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
738   if iinfo is not None:
739     output['memory'] = iinfo[2]
740     output['state'] = iinfo[4]
741     output['time'] = iinfo[5]
742
743   return output
744
745
746 def GetInstanceMigratable(instance):
747   """Gives whether an instance can be migrated.
748
749   @type instance: L{objects.Instance}
750   @param instance: object representing the instance to be checked.
751
752   @rtype: tuple
753   @return: tuple of (result, description) where:
754       - result: whether the instance can be migrated or not
755       - description: a description of the issue, if relevant
756
757   """
758   hyper = hypervisor.GetHypervisor(instance.hypervisor)
759   iname = instance.name
760   if iname not in hyper.ListInstances():
761     _Fail("Instance %s is not running", iname)
762
763   for idx in range(len(instance.disks)):
764     link_name = _GetBlockDevSymlinkPath(iname, idx)
765     if not os.path.islink(link_name):
766       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
767
768
769 def GetAllInstancesInfo(hypervisor_list):
770   """Gather data about all instances.
771
772   This is the equivalent of L{GetInstanceInfo}, except that it
773   computes data for all instances at once, thus being faster if one
774   needs data about more than one instance.
775
776   @type hypervisor_list: list
777   @param hypervisor_list: list of hypervisors to query for instance data
778
779   @rtype: dict
780   @return: dictionary of instance: data, with data having the following keys:
781       - memory: memory size of instance (int)
782       - state: xen state of instance (string)
783       - time: cpu time of instance (float)
784       - vcpus: the number of vcpus
785
786   """
787   output = {}
788
789   for hname in hypervisor_list:
790     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
791     if iinfo:
792       for name, _, memory, vcpus, state, times in iinfo:
793         value = {
794           'memory': memory,
795           'vcpus': vcpus,
796           'state': state,
797           'time': times,
798           }
799         if name in output:
800           # we only check static parameters, like memory and vcpus,
801           # and not state and time which can change between the
802           # invocations of the different hypervisors
803           for key in 'memory', 'vcpus':
804             if value[key] != output[name][key]:
805               _Fail("Instance %s is running twice"
806                     " with different parameters", name)
807         output[name] = value
808
809   return output
810
811
812 def _InstanceLogName(kind, os_name, instance):
813   """Compute the OS log filename for a given instance and operation.
814
815   The instance name and os name are passed in as strings since not all
816   operations have these as part of an instance object.
817
818   @type kind: string
819   @param kind: the operation type (e.g. add, import, etc.)
820   @type os_name: string
821   @param os_name: the os name
822   @type instance: string
823   @param instance: the name of the instance being imported/added/etc.
824
825   """
826   base = ("%s-%s-%s-%s.log" %
827           (kind, os_name, instance, utils.TimestampForFilename()))
828   return utils.PathJoin(constants.LOG_OS_DIR, base)
829
830
831 def InstanceOsAdd(instance, reinstall, debug):
832   """Add an OS to an instance.
833
834   @type instance: L{objects.Instance}
835   @param instance: Instance whose OS is to be installed
836   @type reinstall: boolean
837   @param reinstall: whether this is an instance reinstall
838   @type debug: integer
839   @param debug: debug level, passed to the OS scripts
840   @rtype: None
841
842   """
843   inst_os = OSFromDisk(instance.os)
844
845   create_env = OSEnvironment(instance, inst_os, debug)
846   if reinstall:
847     create_env['INSTANCE_REINSTALL'] = "1"
848
849   logfile = _InstanceLogName("add", instance.os, instance.name)
850
851   result = utils.RunCmd([inst_os.create_script], env=create_env,
852                         cwd=inst_os.path, output=logfile,)
853   if result.failed:
854     logging.error("os create command '%s' returned error: %s, logfile: %s,"
855                   " output: %s", result.cmd, result.fail_reason, logfile,
856                   result.output)
857     lines = [utils.SafeEncode(val)
858              for val in utils.TailFile(logfile, lines=20)]
859     _Fail("OS create script failed (%s), last lines in the"
860           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
861
862
863 def RunRenameInstance(instance, old_name, debug):
864   """Run the OS rename script for an instance.
865
866   @type instance: L{objects.Instance}
867   @param instance: Instance whose OS is to be installed
868   @type old_name: string
869   @param old_name: previous instance name
870   @type debug: integer
871   @param debug: debug level, passed to the OS scripts
872   @rtype: boolean
873   @return: the success of the operation
874
875   """
876   inst_os = OSFromDisk(instance.os)
877
878   rename_env = OSEnvironment(instance, inst_os, debug)
879   rename_env['OLD_INSTANCE_NAME'] = old_name
880
881   logfile = _InstanceLogName("rename", instance.os,
882                              "%s-%s" % (old_name, instance.name))
883
884   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
885                         cwd=inst_os.path, output=logfile)
886
887   if result.failed:
888     logging.error("os create command '%s' returned error: %s output: %s",
889                   result.cmd, result.fail_reason, result.output)
890     lines = [utils.SafeEncode(val)
891              for val in utils.TailFile(logfile, lines=20)]
892     _Fail("OS rename script failed (%s), last lines in the"
893           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
894
895
896 def _GetVGInfo(vg_name):
897   """Get information about the volume group.
898
899   @type vg_name: str
900   @param vg_name: the volume group which we query
901   @rtype: dict
902   @return:
903     A dictionary with the following keys:
904       - C{vg_size} is the total size of the volume group in MiB
905       - C{vg_free} is the free size of the volume group in MiB
906       - C{pv_count} are the number of physical disks in that VG
907
908     If an error occurs during gathering of data, we return the same dict
909     with keys all set to None.
910
911   """
912   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
913
914   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
915                          "--nosuffix", "--units=m", "--separator=:", vg_name])
916
917   if retval.failed:
918     logging.error("volume group %s not present", vg_name)
919     return retdic
920   valarr = retval.stdout.strip().rstrip(':').split(':')
921   if len(valarr) == 3:
922     try:
923       retdic = {
924         "vg_size": int(round(float(valarr[0]), 0)),
925         "vg_free": int(round(float(valarr[1]), 0)),
926         "pv_count": int(valarr[2]),
927         }
928     except (TypeError, ValueError), err:
929       logging.exception("Fail to parse vgs output: %s", err)
930   else:
931     logging.error("vgs output has the wrong number of fields (expected"
932                   " three): %s", str(valarr))
933   return retdic
934
935
936 def _GetBlockDevSymlinkPath(instance_name, idx):
937   return utils.PathJoin(constants.DISK_LINKS_DIR,
938                         "%s:%d" % (instance_name, idx))
939
940
941 def _SymlinkBlockDev(instance_name, device_path, idx):
942   """Set up symlinks to a instance's block device.
943
944   This is an auxiliary function run when an instance is start (on the primary
945   node) or when an instance is migrated (on the target node).
946
947
948   @param instance_name: the name of the target instance
949   @param device_path: path of the physical block device, on the node
950   @param idx: the disk index
951   @return: absolute path to the disk's symlink
952
953   """
954   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
955   try:
956     os.symlink(device_path, link_name)
957   except OSError, err:
958     if err.errno == errno.EEXIST:
959       if (not os.path.islink(link_name) or
960           os.readlink(link_name) != device_path):
961         os.remove(link_name)
962         os.symlink(device_path, link_name)
963     else:
964       raise
965
966   return link_name
967
968
969 def _RemoveBlockDevLinks(instance_name, disks):
970   """Remove the block device symlinks belonging to the given instance.
971
972   """
973   for idx, _ in enumerate(disks):
974     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
975     if os.path.islink(link_name):
976       try:
977         os.remove(link_name)
978       except OSError:
979         logging.exception("Can't remove symlink '%s'", link_name)
980
981
982 def _GatherAndLinkBlockDevs(instance):
983   """Set up an instance's block device(s).
984
985   This is run on the primary node at instance startup. The block
986   devices must be already assembled.
987
988   @type instance: L{objects.Instance}
989   @param instance: the instance whose disks we shoul assemble
990   @rtype: list
991   @return: list of (disk_object, device_path)
992
993   """
994   block_devices = []
995   for idx, disk in enumerate(instance.disks):
996     device = _RecursiveFindBD(disk)
997     if device is None:
998       raise errors.BlockDeviceError("Block device '%s' is not set up." %
999                                     str(disk))
1000     device.Open()
1001     try:
1002       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1003     except OSError, e:
1004       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1005                                     e.strerror)
1006
1007     block_devices.append((disk, link_name))
1008
1009   return block_devices
1010
1011
1012 def StartInstance(instance):
1013   """Start an instance.
1014
1015   @type instance: L{objects.Instance}
1016   @param instance: the instance object
1017   @rtype: None
1018
1019   """
1020   running_instances = GetInstanceList([instance.hypervisor])
1021
1022   if instance.name in running_instances:
1023     logging.info("Instance %s already running, not starting", instance.name)
1024     return
1025
1026   try:
1027     block_devices = _GatherAndLinkBlockDevs(instance)
1028     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1029     hyper.StartInstance(instance, block_devices)
1030   except errors.BlockDeviceError, err:
1031     _Fail("Block device error: %s", err, exc=True)
1032   except errors.HypervisorError, err:
1033     _RemoveBlockDevLinks(instance.name, instance.disks)
1034     _Fail("Hypervisor error: %s", err, exc=True)
1035
1036
1037 def InstanceShutdown(instance, timeout):
1038   """Shut an instance down.
1039
1040   @note: this functions uses polling with a hardcoded timeout.
1041
1042   @type instance: L{objects.Instance}
1043   @param instance: the instance object
1044   @type timeout: integer
1045   @param timeout: maximum timeout for soft shutdown
1046   @rtype: None
1047
1048   """
1049   hv_name = instance.hypervisor
1050   hyper = hypervisor.GetHypervisor(hv_name)
1051   iname = instance.name
1052
1053   if instance.name not in hyper.ListInstances():
1054     logging.info("Instance %s not running, doing nothing", iname)
1055     return
1056
1057   class _TryShutdown:
1058     def __init__(self):
1059       self.tried_once = False
1060
1061     def __call__(self):
1062       if iname not in hyper.ListInstances():
1063         return
1064
1065       try:
1066         hyper.StopInstance(instance, retry=self.tried_once)
1067       except errors.HypervisorError, err:
1068         if iname not in hyper.ListInstances():
1069           # if the instance is no longer existing, consider this a
1070           # success and go to cleanup
1071           return
1072
1073         _Fail("Failed to stop instance %s: %s", iname, err)
1074
1075       self.tried_once = True
1076
1077       raise utils.RetryAgain()
1078
1079   try:
1080     utils.Retry(_TryShutdown(), 5, timeout)
1081   except utils.RetryTimeout:
1082     # the shutdown did not succeed
1083     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1084
1085     try:
1086       hyper.StopInstance(instance, force=True)
1087     except errors.HypervisorError, err:
1088       if iname in hyper.ListInstances():
1089         # only raise an error if the instance still exists, otherwise
1090         # the error could simply be "instance ... unknown"!
1091         _Fail("Failed to force stop instance %s: %s", iname, err)
1092
1093     time.sleep(1)
1094
1095     if iname in hyper.ListInstances():
1096       _Fail("Could not shutdown instance %s even by destroy", iname)
1097
1098   _RemoveBlockDevLinks(iname, instance.disks)
1099
1100
1101 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1102   """Reboot an instance.
1103
1104   @type instance: L{objects.Instance}
1105   @param instance: the instance object to reboot
1106   @type reboot_type: str
1107   @param reboot_type: the type of reboot, one the following
1108     constants:
1109       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1110         instance OS, do not recreate the VM
1111       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1112         restart the VM (at the hypervisor level)
1113       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1114         not accepted here, since that mode is handled differently, in
1115         cmdlib, and translates into full stop and start of the
1116         instance (instead of a call_instance_reboot RPC)
1117   @type shutdown_timeout: integer
1118   @param shutdown_timeout: maximum timeout for soft shutdown
1119   @rtype: None
1120
1121   """
1122   running_instances = GetInstanceList([instance.hypervisor])
1123
1124   if instance.name not in running_instances:
1125     _Fail("Cannot reboot instance %s that is not running", instance.name)
1126
1127   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1128   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1129     try:
1130       hyper.RebootInstance(instance)
1131     except errors.HypervisorError, err:
1132       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1133   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1134     try:
1135       InstanceShutdown(instance, shutdown_timeout)
1136       return StartInstance(instance)
1137     except errors.HypervisorError, err:
1138       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1139   else:
1140     _Fail("Invalid reboot_type received: %s", reboot_type)
1141
1142
1143 def MigrationInfo(instance):
1144   """Gather information about an instance to be migrated.
1145
1146   @type instance: L{objects.Instance}
1147   @param instance: the instance definition
1148
1149   """
1150   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1151   try:
1152     info = hyper.MigrationInfo(instance)
1153   except errors.HypervisorError, err:
1154     _Fail("Failed to fetch migration information: %s", err, exc=True)
1155   return info
1156
1157
1158 def AcceptInstance(instance, info, target):
1159   """Prepare the node to accept an instance.
1160
1161   @type instance: L{objects.Instance}
1162   @param instance: the instance definition
1163   @type info: string/data (opaque)
1164   @param info: migration information, from the source node
1165   @type target: string
1166   @param target: target host (usually ip), on this node
1167
1168   """
1169   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1170   try:
1171     hyper.AcceptInstance(instance, info, target)
1172   except errors.HypervisorError, err:
1173     _Fail("Failed to accept instance: %s", err, exc=True)
1174
1175
1176 def FinalizeMigration(instance, info, success):
1177   """Finalize any preparation to accept an instance.
1178
1179   @type instance: L{objects.Instance}
1180   @param instance: the instance definition
1181   @type info: string/data (opaque)
1182   @param info: migration information, from the source node
1183   @type success: boolean
1184   @param success: whether the migration was a success or a failure
1185
1186   """
1187   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1188   try:
1189     hyper.FinalizeMigration(instance, info, success)
1190   except errors.HypervisorError, err:
1191     _Fail("Failed to finalize migration: %s", err, exc=True)
1192
1193
1194 def MigrateInstance(instance, target, live):
1195   """Migrates an instance to another node.
1196
1197   @type instance: L{objects.Instance}
1198   @param instance: the instance definition
1199   @type target: string
1200   @param target: the target node name
1201   @type live: boolean
1202   @param live: whether the migration should be done live or not (the
1203       interpretation of this parameter is left to the hypervisor)
1204   @rtype: tuple
1205   @return: a tuple of (success, msg) where:
1206       - succes is a boolean denoting the success/failure of the operation
1207       - msg is a string with details in case of failure
1208
1209   """
1210   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1211
1212   try:
1213     hyper.MigrateInstance(instance, target, live)
1214   except errors.HypervisorError, err:
1215     _Fail("Failed to migrate instance: %s", err, exc=True)
1216
1217
1218 def BlockdevCreate(disk, size, owner, on_primary, info):
1219   """Creates a block device for an instance.
1220
1221   @type disk: L{objects.Disk}
1222   @param disk: the object describing the disk we should create
1223   @type size: int
1224   @param size: the size of the physical underlying device, in MiB
1225   @type owner: str
1226   @param owner: the name of the instance for which disk is created,
1227       used for device cache data
1228   @type on_primary: boolean
1229   @param on_primary:  indicates if it is the primary node or not
1230   @type info: string
1231   @param info: string that will be sent to the physical device
1232       creation, used for example to set (LVM) tags on LVs
1233
1234   @return: the new unique_id of the device (this can sometime be
1235       computed only after creation), or None. On secondary nodes,
1236       it's not required to return anything.
1237
1238   """
1239   # TODO: remove the obsolete 'size' argument
1240   # pylint: disable-msg=W0613
1241   clist = []
1242   if disk.children:
1243     for child in disk.children:
1244       try:
1245         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1246       except errors.BlockDeviceError, err:
1247         _Fail("Can't assemble device %s: %s", child, err)
1248       if on_primary or disk.AssembleOnSecondary():
1249         # we need the children open in case the device itself has to
1250         # be assembled
1251         try:
1252           # pylint: disable-msg=E1103
1253           crdev.Open()
1254         except errors.BlockDeviceError, err:
1255           _Fail("Can't make child '%s' read-write: %s", child, err)
1256       clist.append(crdev)
1257
1258   try:
1259     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1260   except errors.BlockDeviceError, err:
1261     _Fail("Can't create block device: %s", err)
1262
1263   if on_primary or disk.AssembleOnSecondary():
1264     try:
1265       device.Assemble()
1266     except errors.BlockDeviceError, err:
1267       _Fail("Can't assemble device after creation, unusual event: %s", err)
1268     device.SetSyncSpeed(constants.SYNC_SPEED)
1269     if on_primary or disk.OpenOnSecondary():
1270       try:
1271         device.Open(force=True)
1272       except errors.BlockDeviceError, err:
1273         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1274     DevCacheManager.UpdateCache(device.dev_path, owner,
1275                                 on_primary, disk.iv_name)
1276
1277   device.SetInfo(info)
1278
1279   return device.unique_id
1280
1281
1282 def BlockdevRemove(disk):
1283   """Remove a block device.
1284
1285   @note: This is intended to be called recursively.
1286
1287   @type disk: L{objects.Disk}
1288   @param disk: the disk object we should remove
1289   @rtype: boolean
1290   @return: the success of the operation
1291
1292   """
1293   msgs = []
1294   try:
1295     rdev = _RecursiveFindBD(disk)
1296   except errors.BlockDeviceError, err:
1297     # probably can't attach
1298     logging.info("Can't attach to device %s in remove", disk)
1299     rdev = None
1300   if rdev is not None:
1301     r_path = rdev.dev_path
1302     try:
1303       rdev.Remove()
1304     except errors.BlockDeviceError, err:
1305       msgs.append(str(err))
1306     if not msgs:
1307       DevCacheManager.RemoveCache(r_path)
1308
1309   if disk.children:
1310     for child in disk.children:
1311       try:
1312         BlockdevRemove(child)
1313       except RPCFail, err:
1314         msgs.append(str(err))
1315
1316   if msgs:
1317     _Fail("; ".join(msgs))
1318
1319
1320 def _RecursiveAssembleBD(disk, owner, as_primary):
1321   """Activate a block device for an instance.
1322
1323   This is run on the primary and secondary nodes for an instance.
1324
1325   @note: this function is called recursively.
1326
1327   @type disk: L{objects.Disk}
1328   @param disk: the disk we try to assemble
1329   @type owner: str
1330   @param owner: the name of the instance which owns the disk
1331   @type as_primary: boolean
1332   @param as_primary: if we should make the block device
1333       read/write
1334
1335   @return: the assembled device or None (in case no device
1336       was assembled)
1337   @raise errors.BlockDeviceError: in case there is an error
1338       during the activation of the children or the device
1339       itself
1340
1341   """
1342   children = []
1343   if disk.children:
1344     mcn = disk.ChildrenNeeded()
1345     if mcn == -1:
1346       mcn = 0 # max number of Nones allowed
1347     else:
1348       mcn = len(disk.children) - mcn # max number of Nones
1349     for chld_disk in disk.children:
1350       try:
1351         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1352       except errors.BlockDeviceError, err:
1353         if children.count(None) >= mcn:
1354           raise
1355         cdev = None
1356         logging.error("Error in child activation (but continuing): %s",
1357                       str(err))
1358       children.append(cdev)
1359
1360   if as_primary or disk.AssembleOnSecondary():
1361     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1362     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1363     result = r_dev
1364     if as_primary or disk.OpenOnSecondary():
1365       r_dev.Open()
1366     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1367                                 as_primary, disk.iv_name)
1368
1369   else:
1370     result = True
1371   return result
1372
1373
1374 def BlockdevAssemble(disk, owner, as_primary):
1375   """Activate a block device for an instance.
1376
1377   This is a wrapper over _RecursiveAssembleBD.
1378
1379   @rtype: str or boolean
1380   @return: a C{/dev/...} path for primary nodes, and
1381       C{True} for secondary nodes
1382
1383   """
1384   try:
1385     result = _RecursiveAssembleBD(disk, owner, as_primary)
1386     if isinstance(result, bdev.BlockDev):
1387       # pylint: disable-msg=E1103
1388       result = result.dev_path
1389   except errors.BlockDeviceError, err:
1390     _Fail("Error while assembling disk: %s", err, exc=True)
1391
1392   return result
1393
1394
1395 def BlockdevShutdown(disk):
1396   """Shut down a block device.
1397
1398   First, if the device is assembled (Attach() is successful), then
1399   the device is shutdown. Then the children of the device are
1400   shutdown.
1401
1402   This function is called recursively. Note that we don't cache the
1403   children or such, as oppossed to assemble, shutdown of different
1404   devices doesn't require that the upper device was active.
1405
1406   @type disk: L{objects.Disk}
1407   @param disk: the description of the disk we should
1408       shutdown
1409   @rtype: None
1410
1411   """
1412   msgs = []
1413   r_dev = _RecursiveFindBD(disk)
1414   if r_dev is not None:
1415     r_path = r_dev.dev_path
1416     try:
1417       r_dev.Shutdown()
1418       DevCacheManager.RemoveCache(r_path)
1419     except errors.BlockDeviceError, err:
1420       msgs.append(str(err))
1421
1422   if disk.children:
1423     for child in disk.children:
1424       try:
1425         BlockdevShutdown(child)
1426       except RPCFail, err:
1427         msgs.append(str(err))
1428
1429   if msgs:
1430     _Fail("; ".join(msgs))
1431
1432
1433 def BlockdevAddchildren(parent_cdev, new_cdevs):
1434   """Extend a mirrored block device.
1435
1436   @type parent_cdev: L{objects.Disk}
1437   @param parent_cdev: the disk to which we should add children
1438   @type new_cdevs: list of L{objects.Disk}
1439   @param new_cdevs: the list of children which we should add
1440   @rtype: None
1441
1442   """
1443   parent_bdev = _RecursiveFindBD(parent_cdev)
1444   if parent_bdev is None:
1445     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1446   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1447   if new_bdevs.count(None) > 0:
1448     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1449   parent_bdev.AddChildren(new_bdevs)
1450
1451
1452 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1453   """Shrink a mirrored block device.
1454
1455   @type parent_cdev: L{objects.Disk}
1456   @param parent_cdev: the disk from which we should remove children
1457   @type new_cdevs: list of L{objects.Disk}
1458   @param new_cdevs: the list of children which we should remove
1459   @rtype: None
1460
1461   """
1462   parent_bdev = _RecursiveFindBD(parent_cdev)
1463   if parent_bdev is None:
1464     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1465   devs = []
1466   for disk in new_cdevs:
1467     rpath = disk.StaticDevPath()
1468     if rpath is None:
1469       bd = _RecursiveFindBD(disk)
1470       if bd is None:
1471         _Fail("Can't find device %s while removing children", disk)
1472       else:
1473         devs.append(bd.dev_path)
1474     else:
1475       if not utils.IsNormAbsPath(rpath):
1476         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1477       devs.append(rpath)
1478   parent_bdev.RemoveChildren(devs)
1479
1480
1481 def BlockdevGetmirrorstatus(disks):
1482   """Get the mirroring status of a list of devices.
1483
1484   @type disks: list of L{objects.Disk}
1485   @param disks: the list of disks which we should query
1486   @rtype: disk
1487   @return:
1488       a list of (mirror_done, estimated_time) tuples, which
1489       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1490   @raise errors.BlockDeviceError: if any of the disks cannot be
1491       found
1492
1493   """
1494   stats = []
1495   for dsk in disks:
1496     rbd = _RecursiveFindBD(dsk)
1497     if rbd is None:
1498       _Fail("Can't find device %s", dsk)
1499
1500     stats.append(rbd.CombinedSyncStatus())
1501
1502   return stats
1503
1504
1505 def _RecursiveFindBD(disk):
1506   """Check if a device is activated.
1507
1508   If so, return information about the real device.
1509
1510   @type disk: L{objects.Disk}
1511   @param disk: the disk object we need to find
1512
1513   @return: None if the device can't be found,
1514       otherwise the device instance
1515
1516   """
1517   children = []
1518   if disk.children:
1519     for chdisk in disk.children:
1520       children.append(_RecursiveFindBD(chdisk))
1521
1522   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1523
1524
1525 def BlockdevFind(disk):
1526   """Check if a device is activated.
1527
1528   If it is, return information about the real device.
1529
1530   @type disk: L{objects.Disk}
1531   @param disk: the disk to find
1532   @rtype: None or objects.BlockDevStatus
1533   @return: None if the disk cannot be found, otherwise a the current
1534            information
1535
1536   """
1537   try:
1538     rbd = _RecursiveFindBD(disk)
1539   except errors.BlockDeviceError, err:
1540     _Fail("Failed to find device: %s", err, exc=True)
1541
1542   if rbd is None:
1543     return None
1544
1545   return rbd.GetSyncStatus()
1546
1547
1548 def BlockdevGetsize(disks):
1549   """Computes the size of the given disks.
1550
1551   If a disk is not found, returns None instead.
1552
1553   @type disks: list of L{objects.Disk}
1554   @param disks: the list of disk to compute the size for
1555   @rtype: list
1556   @return: list with elements None if the disk cannot be found,
1557       otherwise the size
1558
1559   """
1560   result = []
1561   for cf in disks:
1562     try:
1563       rbd = _RecursiveFindBD(cf)
1564     except errors.BlockDeviceError:
1565       result.append(None)
1566       continue
1567     if rbd is None:
1568       result.append(None)
1569     else:
1570       result.append(rbd.GetActualSize())
1571   return result
1572
1573
1574 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1575   """Export a block device to a remote node.
1576
1577   @type disk: L{objects.Disk}
1578   @param disk: the description of the disk to export
1579   @type dest_node: str
1580   @param dest_node: the destination node to export to
1581   @type dest_path: str
1582   @param dest_path: the destination path on the target node
1583   @type cluster_name: str
1584   @param cluster_name: the cluster name, needed for SSH hostalias
1585   @rtype: None
1586
1587   """
1588   real_disk = _RecursiveFindBD(disk)
1589   if real_disk is None:
1590     _Fail("Block device '%s' is not set up", disk)
1591
1592   real_disk.Open()
1593
1594   # the block size on the read dd is 1MiB to match our units
1595   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1596                                "dd if=%s bs=1048576 count=%s",
1597                                real_disk.dev_path, str(disk.size))
1598
1599   # we set here a smaller block size as, due to ssh buffering, more
1600   # than 64-128k will mostly ignored; we use nocreat to fail if the
1601   # device is not already there or we pass a wrong path; we use
1602   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1603   # to not buffer too much memory; this means that at best, we flush
1604   # every 64k, which will not be very fast
1605   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1606                                 " oflag=dsync", dest_path)
1607
1608   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1609                                                    constants.GANETI_RUNAS,
1610                                                    destcmd)
1611
1612   # all commands have been checked, so we're safe to combine them
1613   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1614
1615   result = utils.RunCmd(["bash", "-c", command])
1616
1617   if result.failed:
1618     _Fail("Disk copy command '%s' returned error: %s"
1619           " output: %s", command, result.fail_reason, result.output)
1620
1621
1622 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1623   """Write a file to the filesystem.
1624
1625   This allows the master to overwrite(!) a file. It will only perform
1626   the operation if the file belongs to a list of configuration files.
1627
1628   @type file_name: str
1629   @param file_name: the target file name
1630   @type data: str
1631   @param data: the new contents of the file
1632   @type mode: int
1633   @param mode: the mode to give the file (can be None)
1634   @type uid: int
1635   @param uid: the owner of the file (can be -1 for default)
1636   @type gid: int
1637   @param gid: the group of the file (can be -1 for default)
1638   @type atime: float
1639   @param atime: the atime to set on the file (can be None)
1640   @type mtime: float
1641   @param mtime: the mtime to set on the file (can be None)
1642   @rtype: None
1643
1644   """
1645   if not os.path.isabs(file_name):
1646     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1647
1648   if file_name not in _ALLOWED_UPLOAD_FILES:
1649     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1650           file_name)
1651
1652   raw_data = _Decompress(data)
1653
1654   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1655                   atime=atime, mtime=mtime)
1656
1657
1658 def WriteSsconfFiles(values):
1659   """Update all ssconf files.
1660
1661   Wrapper around the SimpleStore.WriteFiles.
1662
1663   """
1664   ssconf.SimpleStore().WriteFiles(values)
1665
1666
1667 def _ErrnoOrStr(err):
1668   """Format an EnvironmentError exception.
1669
1670   If the L{err} argument has an errno attribute, it will be looked up
1671   and converted into a textual C{E...} description. Otherwise the
1672   string representation of the error will be returned.
1673
1674   @type err: L{EnvironmentError}
1675   @param err: the exception to format
1676
1677   """
1678   if hasattr(err, 'errno'):
1679     detail = errno.errorcode[err.errno]
1680   else:
1681     detail = str(err)
1682   return detail
1683
1684
1685 def _OSOndiskAPIVersion(os_dir):
1686   """Compute and return the API version of a given OS.
1687
1688   This function will try to read the API version of the OS residing in
1689   the 'os_dir' directory.
1690
1691   @type os_dir: str
1692   @param os_dir: the directory in which we should look for the OS
1693   @rtype: tuple
1694   @return: tuple (status, data) with status denoting the validity and
1695       data holding either the vaid versions or an error message
1696
1697   """
1698   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1699
1700   try:
1701     st = os.stat(api_file)
1702   except EnvironmentError, err:
1703     return False, ("Required file '%s' not found under path %s: %s" %
1704                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1705
1706   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1707     return False, ("File '%s' in %s is not a regular file" %
1708                    (constants.OS_API_FILE, os_dir))
1709
1710   try:
1711     api_versions = utils.ReadFile(api_file).splitlines()
1712   except EnvironmentError, err:
1713     return False, ("Error while reading the API version file at %s: %s" %
1714                    (api_file, _ErrnoOrStr(err)))
1715
1716   try:
1717     api_versions = [int(version.strip()) for version in api_versions]
1718   except (TypeError, ValueError), err:
1719     return False, ("API version(s) can't be converted to integer: %s" %
1720                    str(err))
1721
1722   return True, api_versions
1723
1724
1725 def DiagnoseOS(top_dirs=None):
1726   """Compute the validity for all OSes.
1727
1728   @type top_dirs: list
1729   @param top_dirs: the list of directories in which to
1730       search (if not given defaults to
1731       L{constants.OS_SEARCH_PATH})
1732   @rtype: list of L{objects.OS}
1733   @return: a list of tuples (name, path, status, diagnose, variants)
1734       for all (potential) OSes under all search paths, where:
1735           - name is the (potential) OS name
1736           - path is the full path to the OS
1737           - status True/False is the validity of the OS
1738           - diagnose is the error message for an invalid OS, otherwise empty
1739           - variants is a list of supported OS variants, if any
1740
1741   """
1742   if top_dirs is None:
1743     top_dirs = constants.OS_SEARCH_PATH
1744
1745   result = []
1746   for dir_name in top_dirs:
1747     if os.path.isdir(dir_name):
1748       try:
1749         f_names = utils.ListVisibleFiles(dir_name)
1750       except EnvironmentError, err:
1751         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1752         break
1753       for name in f_names:
1754         os_path = utils.PathJoin(dir_name, name)
1755         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1756         if status:
1757           diagnose = ""
1758           variants = os_inst.supported_variants
1759         else:
1760           diagnose = os_inst
1761           variants = []
1762         result.append((name, os_path, status, diagnose, variants))
1763
1764   return result
1765
1766
1767 def _TryOSFromDisk(name, base_dir=None):
1768   """Create an OS instance from disk.
1769
1770   This function will return an OS instance if the given name is a
1771   valid OS name.
1772
1773   @type base_dir: string
1774   @keyword base_dir: Base directory containing OS installations.
1775                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1776   @rtype: tuple
1777   @return: success and either the OS instance if we find a valid one,
1778       or error message
1779
1780   """
1781   if base_dir is None:
1782     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1783   else:
1784     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1785
1786   if os_dir is None:
1787     return False, "Directory for OS %s not found in search path" % name
1788
1789   status, api_versions = _OSOndiskAPIVersion(os_dir)
1790   if not status:
1791     # push the error up
1792     return status, api_versions
1793
1794   if not constants.OS_API_VERSIONS.intersection(api_versions):
1795     return False, ("API version mismatch for path '%s': found %s, want %s." %
1796                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1797
1798   # OS Files dictionary, we will populate it with the absolute path names
1799   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1800
1801   if max(api_versions) >= constants.OS_API_V15:
1802     os_files[constants.OS_VARIANTS_FILE] = ''
1803
1804   for filename in os_files:
1805     os_files[filename] = utils.PathJoin(os_dir, filename)
1806
1807     try:
1808       st = os.stat(os_files[filename])
1809     except EnvironmentError, err:
1810       return False, ("File '%s' under path '%s' is missing (%s)" %
1811                      (filename, os_dir, _ErrnoOrStr(err)))
1812
1813     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1814       return False, ("File '%s' under path '%s' is not a regular file" %
1815                      (filename, os_dir))
1816
1817     if filename in constants.OS_SCRIPTS:
1818       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1819         return False, ("File '%s' under path '%s' is not executable" %
1820                        (filename, os_dir))
1821
1822   variants = None
1823   if constants.OS_VARIANTS_FILE in os_files:
1824     variants_file = os_files[constants.OS_VARIANTS_FILE]
1825     try:
1826       variants = utils.ReadFile(variants_file).splitlines()
1827     except EnvironmentError, err:
1828       return False, ("Error while reading the OS variants file at %s: %s" %
1829                      (variants_file, _ErrnoOrStr(err)))
1830     if not variants:
1831       return False, ("No supported os variant found")
1832
1833   os_obj = objects.OS(name=name, path=os_dir,
1834                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1835                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1836                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1837                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1838                       supported_variants=variants,
1839                       api_versions=api_versions)
1840   return True, os_obj
1841
1842
1843 def OSFromDisk(name, base_dir=None):
1844   """Create an OS instance from disk.
1845
1846   This function will return an OS instance if the given name is a
1847   valid OS name. Otherwise, it will raise an appropriate
1848   L{RPCFail} exception, detailing why this is not a valid OS.
1849
1850   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1851   an exception but returns true/false status data.
1852
1853   @type base_dir: string
1854   @keyword base_dir: Base directory containing OS installations.
1855                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1856   @rtype: L{objects.OS}
1857   @return: the OS instance if we find a valid one
1858   @raise RPCFail: if we don't find a valid OS
1859
1860   """
1861   name_only = name.split("+", 1)[0]
1862   status, payload = _TryOSFromDisk(name_only, base_dir)
1863
1864   if not status:
1865     _Fail(payload)
1866
1867   return payload
1868
1869
1870 def OSEnvironment(instance, inst_os, debug=0):
1871   """Calculate the environment for an os script.
1872
1873   @type instance: L{objects.Instance}
1874   @param instance: target instance for the os script run
1875   @type inst_os: L{objects.OS}
1876   @param inst_os: operating system for which the environment is being built
1877   @type debug: integer
1878   @param debug: debug level (0 or 1, for OS Api 10)
1879   @rtype: dict
1880   @return: dict of environment variables
1881   @raise errors.BlockDeviceError: if the block device
1882       cannot be found
1883
1884   """
1885   result = {}
1886   api_version = \
1887     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1888   result['OS_API_VERSION'] = '%d' % api_version
1889   result['INSTANCE_NAME'] = instance.name
1890   result['INSTANCE_OS'] = instance.os
1891   result['HYPERVISOR'] = instance.hypervisor
1892   result['DISK_COUNT'] = '%d' % len(instance.disks)
1893   result['NIC_COUNT'] = '%d' % len(instance.nics)
1894   result['DEBUG_LEVEL'] = '%d' % debug
1895   if api_version >= constants.OS_API_V15:
1896     try:
1897       variant = instance.os.split('+', 1)[1]
1898     except IndexError:
1899       variant = inst_os.supported_variants[0]
1900     result['OS_VARIANT'] = variant
1901   for idx, disk in enumerate(instance.disks):
1902     real_disk = _RecursiveFindBD(disk)
1903     if real_disk is None:
1904       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1905                                     str(disk))
1906     real_disk.Open()
1907     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1908     result['DISK_%d_ACCESS' % idx] = disk.mode
1909     if constants.HV_DISK_TYPE in instance.hvparams:
1910       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1911         instance.hvparams[constants.HV_DISK_TYPE]
1912     if disk.dev_type in constants.LDS_BLOCK:
1913       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1914     elif disk.dev_type == constants.LD_FILE:
1915       result['DISK_%d_BACKEND_TYPE' % idx] = \
1916         'file:%s' % disk.physical_id[0]
1917   for idx, nic in enumerate(instance.nics):
1918     result['NIC_%d_MAC' % idx] = nic.mac
1919     if nic.ip:
1920       result['NIC_%d_IP' % idx] = nic.ip
1921     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1922     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1923       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1924     if nic.nicparams[constants.NIC_LINK]:
1925       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1926     if constants.HV_NIC_TYPE in instance.hvparams:
1927       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1928         instance.hvparams[constants.HV_NIC_TYPE]
1929
1930   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1931     for key, value in source.items():
1932       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1933
1934   return result
1935
1936 def BlockdevGrow(disk, amount):
1937   """Grow a stack of block devices.
1938
1939   This function is called recursively, with the childrens being the
1940   first ones to resize.
1941
1942   @type disk: L{objects.Disk}
1943   @param disk: the disk to be grown
1944   @rtype: (status, result)
1945   @return: a tuple with the status of the operation
1946       (True/False), and the errors message if status
1947       is False
1948
1949   """
1950   r_dev = _RecursiveFindBD(disk)
1951   if r_dev is None:
1952     _Fail("Cannot find block device %s", disk)
1953
1954   try:
1955     r_dev.Grow(amount)
1956   except errors.BlockDeviceError, err:
1957     _Fail("Failed to grow block device: %s", err, exc=True)
1958
1959
1960 def BlockdevSnapshot(disk):
1961   """Create a snapshot copy of a block device.
1962
1963   This function is called recursively, and the snapshot is actually created
1964   just for the leaf lvm backend device.
1965
1966   @type disk: L{objects.Disk}
1967   @param disk: the disk to be snapshotted
1968   @rtype: string
1969   @return: snapshot disk path
1970
1971   """
1972   if disk.dev_type == constants.LD_DRBD8:
1973     if not disk.children:
1974       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
1975             disk.unique_id)
1976     return BlockdevSnapshot(disk.children[0])
1977   elif disk.dev_type == constants.LD_LV:
1978     r_dev = _RecursiveFindBD(disk)
1979     if r_dev is not None:
1980       # FIXME: choose a saner value for the snapshot size
1981       # let's stay on the safe side and ask for the full size, for now
1982       return r_dev.Snapshot(disk.size)
1983     else:
1984       _Fail("Cannot find block device %s", disk)
1985   else:
1986     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1987           disk.unique_id, disk.dev_type)
1988
1989
1990 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
1991   """Export a block device snapshot to a remote node.
1992
1993   @type disk: L{objects.Disk}
1994   @param disk: the description of the disk to export
1995   @type dest_node: str
1996   @param dest_node: the destination node to export to
1997   @type instance: L{objects.Instance}
1998   @param instance: the instance object to whom the disk belongs
1999   @type cluster_name: str
2000   @param cluster_name: the cluster name, needed for SSH hostalias
2001   @type idx: int
2002   @param idx: the index of the disk in the instance's disk list,
2003       used to export to the OS scripts environment
2004   @type debug: integer
2005   @param debug: debug level, passed to the OS scripts
2006   @rtype: None
2007
2008   """
2009   inst_os = OSFromDisk(instance.os)
2010   export_env = OSEnvironment(instance, inst_os, debug)
2011
2012   export_script = inst_os.export_script
2013
2014   logfile = _InstanceLogName("export", inst_os.name, instance.name)
2015   if not os.path.exists(constants.LOG_OS_DIR):
2016     os.mkdir(constants.LOG_OS_DIR, 0750)
2017   real_disk = _RecursiveFindBD(disk)
2018   if real_disk is None:
2019     _Fail("Block device '%s' is not set up", disk)
2020
2021   real_disk.Open()
2022
2023   export_env['EXPORT_DEVICE'] = real_disk.dev_path
2024   export_env['EXPORT_INDEX'] = str(idx)
2025
2026   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2027   destfile = disk.physical_id[1]
2028
2029   # the target command is built out of three individual commands,
2030   # which are joined by pipes; we check each individual command for
2031   # valid parameters
2032   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2033                                inst_os.path, export_script, logfile)
2034
2035   comprcmd = "gzip"
2036
2037   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2038                                 destdir, utils.PathJoin(destdir, destfile))
2039   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2040                                                    constants.GANETI_RUNAS,
2041                                                    destcmd)
2042
2043   # all commands have been checked, so we're safe to combine them
2044   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2045
2046   result = utils.RunCmd(["bash", "-c", command], env=export_env)
2047
2048   if result.failed:
2049     _Fail("OS snapshot export command '%s' returned error: %s"
2050           " output: %s", command, result.fail_reason, result.output)
2051
2052
2053 def FinalizeExport(instance, snap_disks):
2054   """Write out the export configuration information.
2055
2056   @type instance: L{objects.Instance}
2057   @param instance: the instance which we export, used for
2058       saving configuration
2059   @type snap_disks: list of L{objects.Disk}
2060   @param snap_disks: list of snapshot block devices, which
2061       will be used to get the actual name of the dump file
2062
2063   @rtype: None
2064
2065   """
2066   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2067   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2068
2069   config = objects.SerializableConfigParser()
2070
2071   config.add_section(constants.INISECT_EXP)
2072   config.set(constants.INISECT_EXP, 'version', '0')
2073   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2074   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2075   config.set(constants.INISECT_EXP, 'os', instance.os)
2076   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2077
2078   config.add_section(constants.INISECT_INS)
2079   config.set(constants.INISECT_INS, 'name', instance.name)
2080   config.set(constants.INISECT_INS, 'memory', '%d' %
2081              instance.beparams[constants.BE_MEMORY])
2082   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2083              instance.beparams[constants.BE_VCPUS])
2084   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2085
2086   nic_total = 0
2087   for nic_count, nic in enumerate(instance.nics):
2088     nic_total += 1
2089     config.set(constants.INISECT_INS, 'nic%d_mac' %
2090                nic_count, '%s' % nic.mac)
2091     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2092     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2093                '%s' % nic.bridge)
2094   # TODO: redundant: on load can read nics until it doesn't exist
2095   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2096
2097   disk_total = 0
2098   for disk_count, disk in enumerate(snap_disks):
2099     if disk:
2100       disk_total += 1
2101       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2102                  ('%s' % disk.iv_name))
2103       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2104                  ('%s' % disk.physical_id[1]))
2105       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2106                  ('%d' % disk.size))
2107
2108   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2109
2110   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2111                   data=config.Dumps())
2112   shutil.rmtree(finaldestdir, ignore_errors=True)
2113   shutil.move(destdir, finaldestdir)
2114
2115
2116 def ExportInfo(dest):
2117   """Get export configuration information.
2118
2119   @type dest: str
2120   @param dest: directory containing the export
2121
2122   @rtype: L{objects.SerializableConfigParser}
2123   @return: a serializable config file containing the
2124       export info
2125
2126   """
2127   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2128
2129   config = objects.SerializableConfigParser()
2130   config.read(cff)
2131
2132   if (not config.has_section(constants.INISECT_EXP) or
2133       not config.has_section(constants.INISECT_INS)):
2134     _Fail("Export info file doesn't have the required fields")
2135
2136   return config.Dumps()
2137
2138
2139 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2140   """Import an os image into an instance.
2141
2142   @type instance: L{objects.Instance}
2143   @param instance: instance to import the disks into
2144   @type src_node: string
2145   @param src_node: source node for the disk images
2146   @type src_images: list of string
2147   @param src_images: absolute paths of the disk images
2148   @type debug: integer
2149   @param debug: debug level, passed to the OS scripts
2150   @rtype: list of boolean
2151   @return: each boolean represent the success of importing the n-th disk
2152
2153   """
2154   inst_os = OSFromDisk(instance.os)
2155   import_env = OSEnvironment(instance, inst_os, debug)
2156   import_script = inst_os.import_script
2157
2158   logfile = _InstanceLogName("import", instance.os, instance.name)
2159   if not os.path.exists(constants.LOG_OS_DIR):
2160     os.mkdir(constants.LOG_OS_DIR, 0750)
2161
2162   comprcmd = "gunzip"
2163   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2164                                import_script, logfile)
2165
2166   final_result = []
2167   for idx, image in enumerate(src_images):
2168     if image:
2169       destcmd = utils.BuildShellCmd('cat %s', image)
2170       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2171                                                        constants.GANETI_RUNAS,
2172                                                        destcmd)
2173       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2174       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2175       import_env['IMPORT_INDEX'] = str(idx)
2176       result = utils.RunCmd(command, env=import_env)
2177       if result.failed:
2178         logging.error("Disk import command '%s' returned error: %s"
2179                       " output: %s", command, result.fail_reason,
2180                       result.output)
2181         final_result.append("error importing disk %d: %s, %s" %
2182                             (idx, result.fail_reason, result.output[-100]))
2183
2184   if final_result:
2185     _Fail("; ".join(final_result), log=False)
2186
2187
2188 def ListExports():
2189   """Return a list of exports currently available on this machine.
2190
2191   @rtype: list
2192   @return: list of the exports
2193
2194   """
2195   if os.path.isdir(constants.EXPORT_DIR):
2196     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2197   else:
2198     _Fail("No exports directory")
2199
2200
2201 def RemoveExport(export):
2202   """Remove an existing export from the node.
2203
2204   @type export: str
2205   @param export: the name of the export to remove
2206   @rtype: None
2207
2208   """
2209   target = utils.PathJoin(constants.EXPORT_DIR, export)
2210
2211   try:
2212     shutil.rmtree(target)
2213   except EnvironmentError, err:
2214     _Fail("Error while removing the export: %s", err, exc=True)
2215
2216
2217 def BlockdevRename(devlist):
2218   """Rename a list of block devices.
2219
2220   @type devlist: list of tuples
2221   @param devlist: list of tuples of the form  (disk,
2222       new_logical_id, new_physical_id); disk is an
2223       L{objects.Disk} object describing the current disk,
2224       and new logical_id/physical_id is the name we
2225       rename it to
2226   @rtype: boolean
2227   @return: True if all renames succeeded, False otherwise
2228
2229   """
2230   msgs = []
2231   result = True
2232   for disk, unique_id in devlist:
2233     dev = _RecursiveFindBD(disk)
2234     if dev is None:
2235       msgs.append("Can't find device %s in rename" % str(disk))
2236       result = False
2237       continue
2238     try:
2239       old_rpath = dev.dev_path
2240       dev.Rename(unique_id)
2241       new_rpath = dev.dev_path
2242       if old_rpath != new_rpath:
2243         DevCacheManager.RemoveCache(old_rpath)
2244         # FIXME: we should add the new cache information here, like:
2245         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2246         # but we don't have the owner here - maybe parse from existing
2247         # cache? for now, we only lose lvm data when we rename, which
2248         # is less critical than DRBD or MD
2249     except errors.BlockDeviceError, err:
2250       msgs.append("Can't rename device '%s' to '%s': %s" %
2251                   (dev, unique_id, err))
2252       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2253       result = False
2254   if not result:
2255     _Fail("; ".join(msgs))
2256
2257
2258 def _TransformFileStorageDir(file_storage_dir):
2259   """Checks whether given file_storage_dir is valid.
2260
2261   Checks wheter the given file_storage_dir is within the cluster-wide
2262   default file_storage_dir stored in SimpleStore. Only paths under that
2263   directory are allowed.
2264
2265   @type file_storage_dir: str
2266   @param file_storage_dir: the path to check
2267
2268   @return: the normalized path if valid, None otherwise
2269
2270   """
2271   if not constants.ENABLE_FILE_STORAGE:
2272     _Fail("File storage disabled at configure time")
2273   cfg = _GetConfig()
2274   file_storage_dir = os.path.normpath(file_storage_dir)
2275   base_file_storage_dir = cfg.GetFileStorageDir()
2276   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2277       base_file_storage_dir):
2278     _Fail("File storage directory '%s' is not under base file"
2279           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2280   return file_storage_dir
2281
2282
2283 def CreateFileStorageDir(file_storage_dir):
2284   """Create file storage directory.
2285
2286   @type file_storage_dir: str
2287   @param file_storage_dir: directory to create
2288
2289   @rtype: tuple
2290   @return: tuple with first element a boolean indicating wheter dir
2291       creation was successful or not
2292
2293   """
2294   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2295   if os.path.exists(file_storage_dir):
2296     if not os.path.isdir(file_storage_dir):
2297       _Fail("Specified storage dir '%s' is not a directory",
2298             file_storage_dir)
2299   else:
2300     try:
2301       os.makedirs(file_storage_dir, 0750)
2302     except OSError, err:
2303       _Fail("Cannot create file storage directory '%s': %s",
2304             file_storage_dir, err, exc=True)
2305
2306
2307 def RemoveFileStorageDir(file_storage_dir):
2308   """Remove file storage directory.
2309
2310   Remove it only if it's empty. If not log an error and return.
2311
2312   @type file_storage_dir: str
2313   @param file_storage_dir: the directory we should cleanup
2314   @rtype: tuple (success,)
2315   @return: tuple of one element, C{success}, denoting
2316       whether the operation was successful
2317
2318   """
2319   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2320   if os.path.exists(file_storage_dir):
2321     if not os.path.isdir(file_storage_dir):
2322       _Fail("Specified Storage directory '%s' is not a directory",
2323             file_storage_dir)
2324     # deletes dir only if empty, otherwise we want to fail the rpc call
2325     try:
2326       os.rmdir(file_storage_dir)
2327     except OSError, err:
2328       _Fail("Cannot remove file storage directory '%s': %s",
2329             file_storage_dir, err)
2330
2331
2332 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2333   """Rename the file storage directory.
2334
2335   @type old_file_storage_dir: str
2336   @param old_file_storage_dir: the current path
2337   @type new_file_storage_dir: str
2338   @param new_file_storage_dir: the name we should rename to
2339   @rtype: tuple (success,)
2340   @return: tuple of one element, C{success}, denoting
2341       whether the operation was successful
2342
2343   """
2344   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2345   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2346   if not os.path.exists(new_file_storage_dir):
2347     if os.path.isdir(old_file_storage_dir):
2348       try:
2349         os.rename(old_file_storage_dir, new_file_storage_dir)
2350       except OSError, err:
2351         _Fail("Cannot rename '%s' to '%s': %s",
2352               old_file_storage_dir, new_file_storage_dir, err)
2353     else:
2354       _Fail("Specified storage dir '%s' is not a directory",
2355             old_file_storage_dir)
2356   else:
2357     if os.path.exists(old_file_storage_dir):
2358       _Fail("Cannot rename '%s' to '%s': both locations exist",
2359             old_file_storage_dir, new_file_storage_dir)
2360
2361
2362 def _EnsureJobQueueFile(file_name):
2363   """Checks whether the given filename is in the queue directory.
2364
2365   @type file_name: str
2366   @param file_name: the file name we should check
2367   @rtype: None
2368   @raises RPCFail: if the file is not valid
2369
2370   """
2371   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2372   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2373
2374   if not result:
2375     _Fail("Passed job queue file '%s' does not belong to"
2376           " the queue directory '%s'", file_name, queue_dir)
2377
2378
2379 def JobQueueUpdate(file_name, content):
2380   """Updates a file in the queue directory.
2381
2382   This is just a wrapper over L{utils.WriteFile}, with proper
2383   checking.
2384
2385   @type file_name: str
2386   @param file_name: the job file name
2387   @type content: str
2388   @param content: the new job contents
2389   @rtype: boolean
2390   @return: the success of the operation
2391
2392   """
2393   _EnsureJobQueueFile(file_name)
2394
2395   # Write and replace the file atomically
2396   utils.WriteFile(file_name, data=_Decompress(content))
2397
2398
2399 def JobQueueRename(old, new):
2400   """Renames a job queue file.
2401
2402   This is just a wrapper over os.rename with proper checking.
2403
2404   @type old: str
2405   @param old: the old (actual) file name
2406   @type new: str
2407   @param new: the desired file name
2408   @rtype: tuple
2409   @return: the success of the operation and payload
2410
2411   """
2412   _EnsureJobQueueFile(old)
2413   _EnsureJobQueueFile(new)
2414
2415   utils.RenameFile(old, new, mkdir=True)
2416
2417
2418 def JobQueueSetDrainFlag(drain_flag):
2419   """Set the drain flag for the queue.
2420
2421   This will set or unset the queue drain flag.
2422
2423   @type drain_flag: boolean
2424   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2425   @rtype: truple
2426   @return: always True, None
2427   @warning: the function always returns True
2428
2429   """
2430   if drain_flag:
2431     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2432   else:
2433     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2434
2435
2436 def BlockdevClose(instance_name, disks):
2437   """Closes the given block devices.
2438
2439   This means they will be switched to secondary mode (in case of
2440   DRBD).
2441
2442   @param instance_name: if the argument is not empty, the symlinks
2443       of this instance will be removed
2444   @type disks: list of L{objects.Disk}
2445   @param disks: the list of disks to be closed
2446   @rtype: tuple (success, message)
2447   @return: a tuple of success and message, where success
2448       indicates the succes of the operation, and message
2449       which will contain the error details in case we
2450       failed
2451
2452   """
2453   bdevs = []
2454   for cf in disks:
2455     rd = _RecursiveFindBD(cf)
2456     if rd is None:
2457       _Fail("Can't find device %s", cf)
2458     bdevs.append(rd)
2459
2460   msg = []
2461   for rd in bdevs:
2462     try:
2463       rd.Close()
2464     except errors.BlockDeviceError, err:
2465       msg.append(str(err))
2466   if msg:
2467     _Fail("Can't make devices secondary: %s", ",".join(msg))
2468   else:
2469     if instance_name:
2470       _RemoveBlockDevLinks(instance_name, disks)
2471
2472
2473 def ValidateHVParams(hvname, hvparams):
2474   """Validates the given hypervisor parameters.
2475
2476   @type hvname: string
2477   @param hvname: the hypervisor name
2478   @type hvparams: dict
2479   @param hvparams: the hypervisor parameters to be validated
2480   @rtype: None
2481
2482   """
2483   try:
2484     hv_type = hypervisor.GetHypervisor(hvname)
2485     hv_type.ValidateParameters(hvparams)
2486   except errors.HypervisorError, err:
2487     _Fail(str(err), log=False)
2488
2489
2490 def DemoteFromMC():
2491   """Demotes the current node from master candidate role.
2492
2493   """
2494   # try to ensure we're not the master by mistake
2495   master, myself = ssconf.GetMasterAndMyself()
2496   if master == myself:
2497     _Fail("ssconf status shows I'm the master node, will not demote")
2498
2499   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2500   if not result.failed:
2501     _Fail("The master daemon is running, will not demote")
2502
2503   try:
2504     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2505       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2506   except EnvironmentError, err:
2507     if err.errno != errno.ENOENT:
2508       _Fail("Error while backing up cluster file: %s", err, exc=True)
2509
2510   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2511
2512
2513 def _FindDisks(nodes_ip, disks):
2514   """Sets the physical ID on disks and returns the block devices.
2515
2516   """
2517   # set the correct physical ID
2518   my_name = utils.HostInfo().name
2519   for cf in disks:
2520     cf.SetPhysicalID(my_name, nodes_ip)
2521
2522   bdevs = []
2523
2524   for cf in disks:
2525     rd = _RecursiveFindBD(cf)
2526     if rd is None:
2527       _Fail("Can't find device %s", cf)
2528     bdevs.append(rd)
2529   return bdevs
2530
2531
2532 def DrbdDisconnectNet(nodes_ip, disks):
2533   """Disconnects the network on a list of drbd devices.
2534
2535   """
2536   bdevs = _FindDisks(nodes_ip, disks)
2537
2538   # disconnect disks
2539   for rd in bdevs:
2540     try:
2541       rd.DisconnectNet()
2542     except errors.BlockDeviceError, err:
2543       _Fail("Can't change network configuration to standalone mode: %s",
2544             err, exc=True)
2545
2546
2547 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2548   """Attaches the network on a list of drbd devices.
2549
2550   """
2551   bdevs = _FindDisks(nodes_ip, disks)
2552
2553   if multimaster:
2554     for idx, rd in enumerate(bdevs):
2555       try:
2556         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2557       except EnvironmentError, err:
2558         _Fail("Can't create symlink: %s", err)
2559   # reconnect disks, switch to new master configuration and if
2560   # needed primary mode
2561   for rd in bdevs:
2562     try:
2563       rd.AttachNet(multimaster)
2564     except errors.BlockDeviceError, err:
2565       _Fail("Can't change network configuration: %s", err)
2566
2567   # wait until the disks are connected; we need to retry the re-attach
2568   # if the device becomes standalone, as this might happen if the one
2569   # node disconnects and reconnects in a different mode before the
2570   # other node reconnects; in this case, one or both of the nodes will
2571   # decide it has wrong configuration and switch to standalone
2572
2573   def _Attach():
2574     all_connected = True
2575
2576     for rd in bdevs:
2577       stats = rd.GetProcStatus()
2578
2579       all_connected = (all_connected and
2580                        (stats.is_connected or stats.is_in_resync))
2581
2582       if stats.is_standalone:
2583         # peer had different config info and this node became
2584         # standalone, even though this should not happen with the
2585         # new staged way of changing disk configs
2586         try:
2587           rd.AttachNet(multimaster)
2588         except errors.BlockDeviceError, err:
2589           _Fail("Can't change network configuration: %s", err)
2590
2591     if not all_connected:
2592       raise utils.RetryAgain()
2593
2594   try:
2595     # Start with a delay of 100 miliseconds and go up to 5 seconds
2596     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2597   except utils.RetryTimeout:
2598     _Fail("Timeout in disk reconnecting")
2599
2600   if multimaster:
2601     # change to primary mode
2602     for rd in bdevs:
2603       try:
2604         rd.Open()
2605       except errors.BlockDeviceError, err:
2606         _Fail("Can't change to primary mode: %s", err)
2607
2608
2609 def DrbdWaitSync(nodes_ip, disks):
2610   """Wait until DRBDs have synchronized.
2611
2612   """
2613   def _helper(rd):
2614     stats = rd.GetProcStatus()
2615     if not (stats.is_connected or stats.is_in_resync):
2616       raise utils.RetryAgain()
2617     return stats
2618
2619   bdevs = _FindDisks(nodes_ip, disks)
2620
2621   min_resync = 100
2622   alldone = True
2623   for rd in bdevs:
2624     try:
2625       # poll each second for 15 seconds
2626       stats = utils.Retry(_helper, 1, 15, args=[rd])
2627     except utils.RetryTimeout:
2628       stats = rd.GetProcStatus()
2629       # last check
2630       if not (stats.is_connected or stats.is_in_resync):
2631         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2632     alldone = alldone and (not stats.is_in_resync)
2633     if stats.sync_percent is not None:
2634       min_resync = min(min_resync, stats.sync_percent)
2635
2636   return (alldone, min_resync)
2637
2638
2639 def PowercycleNode(hypervisor_type):
2640   """Hard-powercycle the node.
2641
2642   Because we need to return first, and schedule the powercycle in the
2643   background, we won't be able to report failures nicely.
2644
2645   """
2646   hyper = hypervisor.GetHypervisor(hypervisor_type)
2647   try:
2648     pid = os.fork()
2649   except OSError:
2650     # if we can't fork, we'll pretend that we're in the child process
2651     pid = 0
2652   if pid > 0:
2653     return "Reboot scheduled in 5 seconds"
2654   time.sleep(5)
2655   hyper.PowercycleNode()
2656
2657
2658 class HooksRunner(object):
2659   """Hook runner.
2660
2661   This class is instantiated on the node side (ganeti-noded) and not
2662   on the master side.
2663
2664   """
2665   def __init__(self, hooks_base_dir=None):
2666     """Constructor for hooks runner.
2667
2668     @type hooks_base_dir: str or None
2669     @param hooks_base_dir: if not None, this overrides the
2670         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2671
2672     """
2673     if hooks_base_dir is None:
2674       hooks_base_dir = constants.HOOKS_BASE_DIR
2675     # yeah, _BASE_DIR is not valid for attributes, we use it like a
2676     # constant
2677     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2678
2679   def RunHooks(self, hpath, phase, env):
2680     """Run the scripts in the hooks directory.
2681
2682     @type hpath: str
2683     @param hpath: the path to the hooks directory which
2684         holds the scripts
2685     @type phase: str
2686     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2687         L{constants.HOOKS_PHASE_POST}
2688     @type env: dict
2689     @param env: dictionary with the environment for the hook
2690     @rtype: list
2691     @return: list of 3-element tuples:
2692       - script path
2693       - script result, either L{constants.HKR_SUCCESS} or
2694         L{constants.HKR_FAIL}
2695       - output of the script
2696
2697     @raise errors.ProgrammerError: for invalid input
2698         parameters
2699
2700     """
2701     if phase == constants.HOOKS_PHASE_PRE:
2702       suffix = "pre"
2703     elif phase == constants.HOOKS_PHASE_POST:
2704       suffix = "post"
2705     else:
2706       _Fail("Unknown hooks phase '%s'", phase)
2707
2708
2709     subdir = "%s-%s.d" % (hpath, suffix)
2710     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2711
2712     results = []
2713
2714     if not os.path.isdir(dir_name):
2715       # for non-existing/non-dirs, we simply exit instead of logging a
2716       # warning at every operation
2717       return results
2718
2719     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2720
2721     for (relname, relstatus, runresult)  in runparts_results:
2722       if relstatus == constants.RUNPARTS_SKIP:
2723         rrval = constants.HKR_SKIP
2724         output = ""
2725       elif relstatus == constants.RUNPARTS_ERR:
2726         rrval = constants.HKR_FAIL
2727         output = "Hook script execution error: %s" % runresult
2728       elif relstatus == constants.RUNPARTS_RUN:
2729         if runresult.failed:
2730           rrval = constants.HKR_FAIL
2731         else:
2732           rrval = constants.HKR_SUCCESS
2733         output = utils.SafeEncode(runresult.output.strip())
2734       results.append(("%s/%s" % (subdir, relname), rrval, output))
2735
2736     return results
2737
2738
2739 class IAllocatorRunner(object):
2740   """IAllocator runner.
2741
2742   This class is instantiated on the node side (ganeti-noded) and not on
2743   the master side.
2744
2745   """
2746   @staticmethod
2747   def Run(name, idata):
2748     """Run an iallocator script.
2749
2750     @type name: str
2751     @param name: the iallocator script name
2752     @type idata: str
2753     @param idata: the allocator input data
2754
2755     @rtype: tuple
2756     @return: two element tuple of:
2757        - status
2758        - either error message or stdout of allocator (for success)
2759
2760     """
2761     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2762                                   os.path.isfile)
2763     if alloc_script is None:
2764       _Fail("iallocator module '%s' not found in the search path", name)
2765
2766     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2767     try:
2768       os.write(fd, idata)
2769       os.close(fd)
2770       result = utils.RunCmd([alloc_script, fin_name])
2771       if result.failed:
2772         _Fail("iallocator module '%s' failed: %s, output '%s'",
2773               name, result.fail_reason, result.output)
2774     finally:
2775       os.unlink(fin_name)
2776
2777     return result.stdout
2778
2779
2780 class DevCacheManager(object):
2781   """Simple class for managing a cache of block device information.
2782
2783   """
2784   _DEV_PREFIX = "/dev/"
2785   _ROOT_DIR = constants.BDEV_CACHE_DIR
2786
2787   @classmethod
2788   def _ConvertPath(cls, dev_path):
2789     """Converts a /dev/name path to the cache file name.
2790
2791     This replaces slashes with underscores and strips the /dev
2792     prefix. It then returns the full path to the cache file.
2793
2794     @type dev_path: str
2795     @param dev_path: the C{/dev/} path name
2796     @rtype: str
2797     @return: the converted path name
2798
2799     """
2800     if dev_path.startswith(cls._DEV_PREFIX):
2801       dev_path = dev_path[len(cls._DEV_PREFIX):]
2802     dev_path = dev_path.replace("/", "_")
2803     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
2804     return fpath
2805
2806   @classmethod
2807   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2808     """Updates the cache information for a given device.
2809
2810     @type dev_path: str
2811     @param dev_path: the pathname of the device
2812     @type owner: str
2813     @param owner: the owner (instance name) of the device
2814     @type on_primary: bool
2815     @param on_primary: whether this is the primary
2816         node nor not
2817     @type iv_name: str
2818     @param iv_name: the instance-visible name of the
2819         device, as in objects.Disk.iv_name
2820
2821     @rtype: None
2822
2823     """
2824     if dev_path is None:
2825       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2826       return
2827     fpath = cls._ConvertPath(dev_path)
2828     if on_primary:
2829       state = "primary"
2830     else:
2831       state = "secondary"
2832     if iv_name is None:
2833       iv_name = "not_visible"
2834     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2835     try:
2836       utils.WriteFile(fpath, data=fdata)
2837     except EnvironmentError, err:
2838       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2839
2840   @classmethod
2841   def RemoveCache(cls, dev_path):
2842     """Remove data for a dev_path.
2843
2844     This is just a wrapper over L{utils.RemoveFile} with a converted
2845     path name and logging.
2846
2847     @type dev_path: str
2848     @param dev_path: the pathname of the device
2849
2850     @rtype: None
2851
2852     """
2853     if dev_path is None:
2854       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2855       return
2856     fpath = cls._ConvertPath(dev_path)
2857     try:
2858       utils.RemoveFile(fpath)
2859     except EnvironmentError, err:
2860       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)