Status change reason support for Reboot
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
67 from ganeti import ht
68
69
70 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
71 _ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
72   pathutils.DATA_DIR,
73   pathutils.JOB_QUEUE_ARCHIVE_DIR,
74   pathutils.QUEUE_DIR,
75   pathutils.CRYPTO_KEYS_DIR,
76   ])
77 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
78 _X509_KEY_FILE = "key"
79 _X509_CERT_FILE = "cert"
80 _IES_STATUS_FILE = "status"
81 _IES_PID_FILE = "pid"
82 _IES_CA_FILE = "ca"
83
84 #: Valid LVS output line regex
85 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
86
87 # Actions for the master setup script
88 _MASTER_START = "start"
89 _MASTER_STOP = "stop"
90
91 #: Maximum file permissions for restricted command directory and executables
92 _RCMD_MAX_MODE = (stat.S_IRWXU |
93                   stat.S_IRGRP | stat.S_IXGRP |
94                   stat.S_IROTH | stat.S_IXOTH)
95
96 #: Delay before returning an error for restricted commands
97 _RCMD_INVALID_DELAY = 10
98
99 #: How long to wait to acquire lock for restricted commands (shorter than
100 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
101 #: command requests arrive
102 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
103
104
105 class RPCFail(Exception):
106   """Class denoting RPC failure.
107
108   Its argument is the error message.
109
110   """
111
112
113 def GetInstReasonFilename(instance_name):
114   """Path of the file containing the reason of the instance status change.
115
116   @type instance_name: string
117   @param instance_name: The name of the instance
118   @rtype: string
119   @return: The path of the file
120
121   """
122   return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
123
124
125 class InstReason(object):
126   """Class representing the reason for a change of state of a VM.
127
128   It is used to allow an easy serialization of the reason, so that it can be
129   written on a file.
130
131   """
132   def __init__(self, source, text):
133     """Initialize the class with all the required values.
134
135     @type text: string
136     @param text: The textual description of the reason for changing state
137     @type source: string
138     @param source: The source of the state change (RAPI, CLI, ...)
139
140     """
141     self.source = source
142     self.text = text
143
144   def GetJson(self):
145     """Get the JSON representation of the InstReason.
146
147     @rtype: string
148     @return : The JSON representation of the object
149
150     """
151     return serializer.DumpJson(dict(source=self.source, text=self.text))
152
153   def Store(self, instance_name):
154     """Serialize on a file the reason for the last state change of an instance.
155
156     The exact location of the file depends on the name of the instance and on
157     the configuration of the Ganeti cluster defined at deploy time.
158
159     @type instance_name: string
160     @param instance_name: The name of the instance
161     @rtype: None
162
163     """
164     filename = GetInstReasonFilename(instance_name)
165     utils.WriteFile(filename, data=self.GetJson())
166
167
168 def _Fail(msg, *args, **kwargs):
169   """Log an error and the raise an RPCFail exception.
170
171   This exception is then handled specially in the ganeti daemon and
172   turned into a 'failed' return type. As such, this function is a
173   useful shortcut for logging the error and returning it to the master
174   daemon.
175
176   @type msg: string
177   @param msg: the text of the exception
178   @raise RPCFail
179
180   """
181   if args:
182     msg = msg % args
183   if "log" not in kwargs or kwargs["log"]: # if we should log this error
184     if "exc" in kwargs and kwargs["exc"]:
185       logging.exception(msg)
186     else:
187       logging.error(msg)
188   raise RPCFail(msg)
189
190
191 def _GetConfig():
192   """Simple wrapper to return a SimpleStore.
193
194   @rtype: L{ssconf.SimpleStore}
195   @return: a SimpleStore instance
196
197   """
198   return ssconf.SimpleStore()
199
200
201 def _GetSshRunner(cluster_name):
202   """Simple wrapper to return an SshRunner.
203
204   @type cluster_name: str
205   @param cluster_name: the cluster name, which is needed
206       by the SshRunner constructor
207   @rtype: L{ssh.SshRunner}
208   @return: an SshRunner instance
209
210   """
211   return ssh.SshRunner(cluster_name)
212
213
214 def _Decompress(data):
215   """Unpacks data compressed by the RPC client.
216
217   @type data: list or tuple
218   @param data: Data sent by RPC client
219   @rtype: str
220   @return: Decompressed data
221
222   """
223   assert isinstance(data, (list, tuple))
224   assert len(data) == 2
225   (encoding, content) = data
226   if encoding == constants.RPC_ENCODING_NONE:
227     return content
228   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
229     return zlib.decompress(base64.b64decode(content))
230   else:
231     raise AssertionError("Unknown data encoding")
232
233
234 def _CleanDirectory(path, exclude=None):
235   """Removes all regular files in a directory.
236
237   @type path: str
238   @param path: the directory to clean
239   @type exclude: list
240   @param exclude: list of files to be excluded, defaults
241       to the empty list
242
243   """
244   if path not in _ALLOWED_CLEAN_DIRS:
245     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
246           path)
247
248   if not os.path.isdir(path):
249     return
250   if exclude is None:
251     exclude = []
252   else:
253     # Normalize excluded paths
254     exclude = [os.path.normpath(i) for i in exclude]
255
256   for rel_name in utils.ListVisibleFiles(path):
257     full_name = utils.PathJoin(path, rel_name)
258     if full_name in exclude:
259       continue
260     if os.path.isfile(full_name) and not os.path.islink(full_name):
261       utils.RemoveFile(full_name)
262
263
264 def _BuildUploadFileList():
265   """Build the list of allowed upload files.
266
267   This is abstracted so that it's built only once at module import time.
268
269   """
270   allowed_files = set([
271     pathutils.CLUSTER_CONF_FILE,
272     pathutils.ETC_HOSTS,
273     pathutils.SSH_KNOWN_HOSTS_FILE,
274     pathutils.VNC_PASSWORD_FILE,
275     pathutils.RAPI_CERT_FILE,
276     pathutils.SPICE_CERT_FILE,
277     pathutils.SPICE_CACERT_FILE,
278     pathutils.RAPI_USERS_FILE,
279     pathutils.CONFD_HMAC_KEY,
280     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
281     ])
282
283   for hv_name in constants.HYPER_TYPES:
284     hv_class = hypervisor.GetHypervisorClass(hv_name)
285     allowed_files.update(hv_class.GetAncillaryFiles()[0])
286
287   assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
288     "Allowed file storage paths should never be uploaded via RPC"
289
290   return frozenset(allowed_files)
291
292
293 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
294
295
296 def JobQueuePurge():
297   """Removes job queue files and archived jobs.
298
299   @rtype: tuple
300   @return: True, None
301
302   """
303   _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
304   _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
305
306
307 def GetMasterInfo():
308   """Returns master information.
309
310   This is an utility function to compute master information, either
311   for consumption here or from the node daemon.
312
313   @rtype: tuple
314   @return: master_netdev, master_ip, master_name, primary_ip_family,
315     master_netmask
316   @raise RPCFail: in case of errors
317
318   """
319   try:
320     cfg = _GetConfig()
321     master_netdev = cfg.GetMasterNetdev()
322     master_ip = cfg.GetMasterIP()
323     master_netmask = cfg.GetMasterNetmask()
324     master_node = cfg.GetMasterNode()
325     primary_ip_family = cfg.GetPrimaryIPFamily()
326   except errors.ConfigurationError, err:
327     _Fail("Cluster configuration incomplete: %s", err, exc=True)
328   return (master_netdev, master_ip, master_node, primary_ip_family,
329           master_netmask)
330
331
332 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
333   """Decorator that runs hooks before and after the decorated function.
334
335   @type hook_opcode: string
336   @param hook_opcode: opcode of the hook
337   @type hooks_path: string
338   @param hooks_path: path of the hooks
339   @type env_builder_fn: function
340   @param env_builder_fn: function that returns a dictionary containing the
341     environment variables for the hooks. Will get all the parameters of the
342     decorated function.
343   @raise RPCFail: in case of pre-hook failure
344
345   """
346   def decorator(fn):
347     def wrapper(*args, **kwargs):
348       _, myself = ssconf.GetMasterAndMyself()
349       nodes = ([myself], [myself])  # these hooks run locally
350
351       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
352
353       cfg = _GetConfig()
354       hr = HooksRunner()
355       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
356                             None, env_fn, logging.warning, cfg.GetClusterName(),
357                             cfg.GetMasterNode())
358
359       hm.RunPhase(constants.HOOKS_PHASE_PRE)
360       result = fn(*args, **kwargs)
361       hm.RunPhase(constants.HOOKS_PHASE_POST)
362
363       return result
364     return wrapper
365   return decorator
366
367
368 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
369   """Builds environment variables for master IP hooks.
370
371   @type master_params: L{objects.MasterNetworkParameters}
372   @param master_params: network parameters of the master
373   @type use_external_mip_script: boolean
374   @param use_external_mip_script: whether to use an external master IP
375     address setup script (unused, but necessary per the implementation of the
376     _RunLocalHooks decorator)
377
378   """
379   # pylint: disable=W0613
380   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
381   env = {
382     "MASTER_NETDEV": master_params.netdev,
383     "MASTER_IP": master_params.ip,
384     "MASTER_NETMASK": str(master_params.netmask),
385     "CLUSTER_IP_VERSION": str(ver),
386   }
387
388   return env
389
390
391 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
392   """Execute the master IP address setup script.
393
394   @type master_params: L{objects.MasterNetworkParameters}
395   @param master_params: network parameters of the master
396   @type action: string
397   @param action: action to pass to the script. Must be one of
398     L{backend._MASTER_START} or L{backend._MASTER_STOP}
399   @type use_external_mip_script: boolean
400   @param use_external_mip_script: whether to use an external master IP
401     address setup script
402   @raise backend.RPCFail: if there are errors during the execution of the
403     script
404
405   """
406   env = _BuildMasterIpEnv(master_params)
407
408   if use_external_mip_script:
409     setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
410   else:
411     setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
412
413   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
414
415   if result.failed:
416     _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
417           (action, result.exit_code, result.output), log=True)
418
419
420 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
421                _BuildMasterIpEnv)
422 def ActivateMasterIp(master_params, use_external_mip_script):
423   """Activate the IP address of the master daemon.
424
425   @type master_params: L{objects.MasterNetworkParameters}
426   @param master_params: network parameters of the master
427   @type use_external_mip_script: boolean
428   @param use_external_mip_script: whether to use an external master IP
429     address setup script
430   @raise RPCFail: in case of errors during the IP startup
431
432   """
433   _RunMasterSetupScript(master_params, _MASTER_START,
434                         use_external_mip_script)
435
436
437 def StartMasterDaemons(no_voting):
438   """Activate local node as master node.
439
440   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
441
442   @type no_voting: boolean
443   @param no_voting: whether to start ganeti-masterd without a node vote
444       but still non-interactively
445   @rtype: None
446
447   """
448
449   if no_voting:
450     masterd_args = "--no-voting --yes-do-it"
451   else:
452     masterd_args = ""
453
454   env = {
455     "EXTRA_MASTERD_ARGS": masterd_args,
456     }
457
458   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
459   if result.failed:
460     msg = "Can't start Ganeti master: %s" % result.output
461     logging.error(msg)
462     _Fail(msg)
463
464
465 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
466                _BuildMasterIpEnv)
467 def DeactivateMasterIp(master_params, use_external_mip_script):
468   """Deactivate the master IP on this node.
469
470   @type master_params: L{objects.MasterNetworkParameters}
471   @param master_params: network parameters of the master
472   @type use_external_mip_script: boolean
473   @param use_external_mip_script: whether to use an external master IP
474     address setup script
475   @raise RPCFail: in case of errors during the IP turndown
476
477   """
478   _RunMasterSetupScript(master_params, _MASTER_STOP,
479                         use_external_mip_script)
480
481
482 def StopMasterDaemons():
483   """Stop the master daemons on this node.
484
485   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
486
487   @rtype: None
488
489   """
490   # TODO: log and report back to the caller the error failures; we
491   # need to decide in which case we fail the RPC for this
492
493   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
494   if result.failed:
495     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
496                   " and error %s",
497                   result.cmd, result.exit_code, result.output)
498
499
500 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
501   """Change the netmask of the master IP.
502
503   @param old_netmask: the old value of the netmask
504   @param netmask: the new value of the netmask
505   @param master_ip: the master IP
506   @param master_netdev: the master network device
507
508   """
509   if old_netmask == netmask:
510     return
511
512   if not netutils.IPAddress.Own(master_ip):
513     _Fail("The master IP address is not up, not attempting to change its"
514           " netmask")
515
516   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
517                          "%s/%s" % (master_ip, netmask),
518                          "dev", master_netdev, "label",
519                          "%s:0" % master_netdev])
520   if result.failed:
521     _Fail("Could not set the new netmask on the master IP address")
522
523   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
524                          "%s/%s" % (master_ip, old_netmask),
525                          "dev", master_netdev, "label",
526                          "%s:0" % master_netdev])
527   if result.failed:
528     _Fail("Could not bring down the master IP address with the old netmask")
529
530
531 def EtcHostsModify(mode, host, ip):
532   """Modify a host entry in /etc/hosts.
533
534   @param mode: The mode to operate. Either add or remove entry
535   @param host: The host to operate on
536   @param ip: The ip associated with the entry
537
538   """
539   if mode == constants.ETC_HOSTS_ADD:
540     if not ip:
541       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
542               " present")
543     utils.AddHostToEtcHosts(host, ip)
544   elif mode == constants.ETC_HOSTS_REMOVE:
545     if ip:
546       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
547               " parameter is present")
548     utils.RemoveHostFromEtcHosts(host)
549   else:
550     RPCFail("Mode not supported")
551
552
553 def LeaveCluster(modify_ssh_setup):
554   """Cleans up and remove the current node.
555
556   This function cleans up and prepares the current node to be removed
557   from the cluster.
558
559   If processing is successful, then it raises an
560   L{errors.QuitGanetiException} which is used as a special case to
561   shutdown the node daemon.
562
563   @param modify_ssh_setup: boolean
564
565   """
566   _CleanDirectory(pathutils.DATA_DIR)
567   _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
568   JobQueuePurge()
569
570   if modify_ssh_setup:
571     try:
572       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
573
574       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
575
576       utils.RemoveFile(priv_key)
577       utils.RemoveFile(pub_key)
578     except errors.OpExecError:
579       logging.exception("Error while processing ssh files")
580
581   try:
582     utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
583     utils.RemoveFile(pathutils.RAPI_CERT_FILE)
584     utils.RemoveFile(pathutils.SPICE_CERT_FILE)
585     utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
586     utils.RemoveFile(pathutils.NODED_CERT_FILE)
587   except: # pylint: disable=W0702
588     logging.exception("Error while removing cluster secrets")
589
590   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
591   if result.failed:
592     logging.error("Command %s failed with exitcode %s and error %s",
593                   result.cmd, result.exit_code, result.output)
594
595   # Raise a custom exception (handled in ganeti-noded)
596   raise errors.QuitGanetiException(True, "Shutdown scheduled")
597
598
599 def _GetVgInfo(name, excl_stor):
600   """Retrieves information about a LVM volume group.
601
602   """
603   # TODO: GetVGInfo supports returning information for multiple VGs at once
604   vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
605   if vginfo:
606     vg_free = int(round(vginfo[0][0], 0))
607     vg_size = int(round(vginfo[0][1], 0))
608   else:
609     vg_free = None
610     vg_size = None
611
612   return {
613     "name": name,
614     "vg_free": vg_free,
615     "vg_size": vg_size,
616     }
617
618
619 def _GetHvInfo(name):
620   """Retrieves node information from a hypervisor.
621
622   The information returned depends on the hypervisor. Common items:
623
624     - vg_size is the size of the configured volume group in MiB
625     - vg_free is the free size of the volume group in MiB
626     - memory_dom0 is the memory allocated for domain0 in MiB
627     - memory_free is the currently available (free) ram in MiB
628     - memory_total is the total number of ram in MiB
629     - hv_version: the hypervisor version, if available
630
631   """
632   return hypervisor.GetHypervisor(name).GetNodeInfo()
633
634
635 def _GetNamedNodeInfo(names, fn):
636   """Calls C{fn} for all names in C{names} and returns a dictionary.
637
638   @rtype: None or dict
639
640   """
641   if names is None:
642     return None
643   else:
644     return map(fn, names)
645
646
647 def GetNodeInfo(vg_names, hv_names, excl_stor):
648   """Gives back a hash with different information about the node.
649
650   @type vg_names: list of string
651   @param vg_names: Names of the volume groups to ask for disk space information
652   @type hv_names: list of string
653   @param hv_names: Names of the hypervisors to ask for node information
654   @type excl_stor: boolean
655   @param excl_stor: Whether exclusive_storage is active
656   @rtype: tuple; (string, None/dict, None/dict)
657   @return: Tuple containing boot ID, volume group information and hypervisor
658     information
659
660   """
661   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
662   vg_info = _GetNamedNodeInfo(vg_names, (lambda vg: _GetVgInfo(vg, excl_stor)))
663   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
664
665   return (bootid, vg_info, hv_info)
666
667
668 def _CheckExclusivePvs(pvi_list):
669   """Check that PVs are not shared among LVs
670
671   @type pvi_list: list of L{objects.LvmPvInfo} objects
672   @param pvi_list: information about the PVs
673
674   @rtype: list of tuples (string, list of strings)
675   @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
676
677   """
678   res = []
679   for pvi in pvi_list:
680     if len(pvi.lv_list) > 1:
681       res.append((pvi.name, pvi.lv_list))
682   return res
683
684
685 def VerifyNode(what, cluster_name):
686   """Verify the status of the local node.
687
688   Based on the input L{what} parameter, various checks are done on the
689   local node.
690
691   If the I{filelist} key is present, this list of
692   files is checksummed and the file/checksum pairs are returned.
693
694   If the I{nodelist} key is present, we check that we have
695   connectivity via ssh with the target nodes (and check the hostname
696   report).
697
698   If the I{node-net-test} key is present, we check that we have
699   connectivity to the given nodes via both primary IP and, if
700   applicable, secondary IPs.
701
702   @type what: C{dict}
703   @param what: a dictionary of things to check:
704       - filelist: list of files for which to compute checksums
705       - nodelist: list of nodes we should check ssh communication with
706       - node-net-test: list of nodes we should check node daemon port
707         connectivity with
708       - hypervisor: list with hypervisors to run the verify for
709   @rtype: dict
710   @return: a dictionary with the same keys as the input dict, and
711       values representing the result of the checks
712
713   """
714   result = {}
715   my_name = netutils.Hostname.GetSysName()
716   port = netutils.GetDaemonPort(constants.NODED)
717   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
718
719   if constants.NV_HYPERVISOR in what and vm_capable:
720     result[constants.NV_HYPERVISOR] = tmp = {}
721     for hv_name in what[constants.NV_HYPERVISOR]:
722       try:
723         val = hypervisor.GetHypervisor(hv_name).Verify()
724       except errors.HypervisorError, err:
725         val = "Error while checking hypervisor: %s" % str(err)
726       tmp[hv_name] = val
727
728   if constants.NV_HVPARAMS in what and vm_capable:
729     result[constants.NV_HVPARAMS] = tmp = []
730     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
731       try:
732         logging.info("Validating hv %s, %s", hv_name, hvparms)
733         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
734       except errors.HypervisorError, err:
735         tmp.append((source, hv_name, str(err)))
736
737   if constants.NV_FILELIST in what:
738     fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
739                                               what[constants.NV_FILELIST]))
740     result[constants.NV_FILELIST] = \
741       dict((vcluster.MakeVirtualPath(key), value)
742            for (key, value) in fingerprints.items())
743
744   if constants.NV_NODELIST in what:
745     (nodes, bynode) = what[constants.NV_NODELIST]
746
747     # Add nodes from other groups (different for each node)
748     try:
749       nodes.extend(bynode[my_name])
750     except KeyError:
751       pass
752
753     # Use a random order
754     random.shuffle(nodes)
755
756     # Try to contact all nodes
757     val = {}
758     for node in nodes:
759       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
760       if not success:
761         val[node] = message
762
763     result[constants.NV_NODELIST] = val
764
765   if constants.NV_NODENETTEST in what:
766     result[constants.NV_NODENETTEST] = tmp = {}
767     my_pip = my_sip = None
768     for name, pip, sip in what[constants.NV_NODENETTEST]:
769       if name == my_name:
770         my_pip = pip
771         my_sip = sip
772         break
773     if not my_pip:
774       tmp[my_name] = ("Can't find my own primary/secondary IP"
775                       " in the node list")
776     else:
777       for name, pip, sip in what[constants.NV_NODENETTEST]:
778         fail = []
779         if not netutils.TcpPing(pip, port, source=my_pip):
780           fail.append("primary")
781         if sip != pip:
782           if not netutils.TcpPing(sip, port, source=my_sip):
783             fail.append("secondary")
784         if fail:
785           tmp[name] = ("failure using the %s interface(s)" %
786                        " and ".join(fail))
787
788   if constants.NV_MASTERIP in what:
789     # FIXME: add checks on incoming data structures (here and in the
790     # rest of the function)
791     master_name, master_ip = what[constants.NV_MASTERIP]
792     if master_name == my_name:
793       source = constants.IP4_ADDRESS_LOCALHOST
794     else:
795       source = None
796     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
797                                                      source=source)
798
799   if constants.NV_USERSCRIPTS in what:
800     result[constants.NV_USERSCRIPTS] = \
801       [script for script in what[constants.NV_USERSCRIPTS]
802        if not utils.IsExecutable(script)]
803
804   if constants.NV_OOB_PATHS in what:
805     result[constants.NV_OOB_PATHS] = tmp = []
806     for path in what[constants.NV_OOB_PATHS]:
807       try:
808         st = os.stat(path)
809       except OSError, err:
810         tmp.append("error stating out of band helper: %s" % err)
811       else:
812         if stat.S_ISREG(st.st_mode):
813           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
814             tmp.append(None)
815           else:
816             tmp.append("out of band helper %s is not executable" % path)
817         else:
818           tmp.append("out of band helper %s is not a file" % path)
819
820   if constants.NV_LVLIST in what and vm_capable:
821     try:
822       val = GetVolumeList(utils.ListVolumeGroups().keys())
823     except RPCFail, err:
824       val = str(err)
825     result[constants.NV_LVLIST] = val
826
827   if constants.NV_INSTANCELIST in what and vm_capable:
828     # GetInstanceList can fail
829     try:
830       val = GetInstanceList(what[constants.NV_INSTANCELIST])
831     except RPCFail, err:
832       val = str(err)
833     result[constants.NV_INSTANCELIST] = val
834
835   if constants.NV_VGLIST in what and vm_capable:
836     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
837
838   if constants.NV_PVLIST in what and vm_capable:
839     check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
840     val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
841                                        filter_allocatable=False,
842                                        include_lvs=check_exclusive_pvs)
843     if check_exclusive_pvs:
844       result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
845       for pvi in val:
846         # Avoid sending useless data on the wire
847         pvi.lv_list = []
848     result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
849
850   if constants.NV_VERSION in what:
851     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
852                                     constants.RELEASE_VERSION)
853
854   if constants.NV_HVINFO in what and vm_capable:
855     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
856     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
857
858   if constants.NV_DRBDLIST in what and vm_capable:
859     try:
860       used_minors = bdev.DRBD8.GetUsedDevs().keys()
861     except errors.BlockDeviceError, err:
862       logging.warning("Can't get used minors list", exc_info=True)
863       used_minors = str(err)
864     result[constants.NV_DRBDLIST] = used_minors
865
866   if constants.NV_DRBDHELPER in what and vm_capable:
867     status = True
868     try:
869       payload = bdev.BaseDRBD.GetUsermodeHelper()
870     except errors.BlockDeviceError, err:
871       logging.error("Can't get DRBD usermode helper: %s", str(err))
872       status = False
873       payload = str(err)
874     result[constants.NV_DRBDHELPER] = (status, payload)
875
876   if constants.NV_NODESETUP in what:
877     result[constants.NV_NODESETUP] = tmpr = []
878     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
879       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
880                   " under /sys, missing required directories /sys/block"
881                   " and /sys/class/net")
882     if (not os.path.isdir("/proc/sys") or
883         not os.path.isfile("/proc/sysrq-trigger")):
884       tmpr.append("The procfs filesystem doesn't seem to be mounted"
885                   " under /proc, missing required directory /proc/sys and"
886                   " the file /proc/sysrq-trigger")
887
888   if constants.NV_TIME in what:
889     result[constants.NV_TIME] = utils.SplitTime(time.time())
890
891   if constants.NV_OSLIST in what and vm_capable:
892     result[constants.NV_OSLIST] = DiagnoseOS()
893
894   if constants.NV_BRIDGES in what and vm_capable:
895     result[constants.NV_BRIDGES] = [bridge
896                                     for bridge in what[constants.NV_BRIDGES]
897                                     if not utils.BridgeExists(bridge)]
898
899   if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
900     result[constants.NV_FILE_STORAGE_PATHS] = \
901       bdev.ComputeWrongFileStoragePaths()
902
903   return result
904
905
906 def GetBlockDevSizes(devices):
907   """Return the size of the given block devices
908
909   @type devices: list
910   @param devices: list of block device nodes to query
911   @rtype: dict
912   @return:
913     dictionary of all block devices under /dev (key). The value is their
914     size in MiB.
915
916     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
917
918   """
919   DEV_PREFIX = "/dev/"
920   blockdevs = {}
921
922   for devpath in devices:
923     if not utils.IsBelowDir(DEV_PREFIX, devpath):
924       continue
925
926     try:
927       st = os.stat(devpath)
928     except EnvironmentError, err:
929       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
930       continue
931
932     if stat.S_ISBLK(st.st_mode):
933       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
934       if result.failed:
935         # We don't want to fail, just do not list this device as available
936         logging.warning("Cannot get size for block device %s", devpath)
937         continue
938
939       size = int(result.stdout) / (1024 * 1024)
940       blockdevs[devpath] = size
941   return blockdevs
942
943
944 def GetVolumeList(vg_names):
945   """Compute list of logical volumes and their size.
946
947   @type vg_names: list
948   @param vg_names: the volume groups whose LVs we should list, or
949       empty for all volume groups
950   @rtype: dict
951   @return:
952       dictionary of all partions (key) with value being a tuple of
953       their size (in MiB), inactive and online status::
954
955         {'xenvg/test1': ('20.06', True, True)}
956
957       in case of errors, a string is returned with the error
958       details.
959
960   """
961   lvs = {}
962   sep = "|"
963   if not vg_names:
964     vg_names = []
965   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
966                          "--separator=%s" % sep,
967                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
968   if result.failed:
969     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
970
971   for line in result.stdout.splitlines():
972     line = line.strip()
973     match = _LVSLINE_REGEX.match(line)
974     if not match:
975       logging.error("Invalid line returned from lvs output: '%s'", line)
976       continue
977     vg_name, name, size, attr = match.groups()
978     inactive = attr[4] == "-"
979     online = attr[5] == "o"
980     virtual = attr[0] == "v"
981     if virtual:
982       # we don't want to report such volumes as existing, since they
983       # don't really hold data
984       continue
985     lvs[vg_name + "/" + name] = (size, inactive, online)
986
987   return lvs
988
989
990 def ListVolumeGroups():
991   """List the volume groups and their size.
992
993   @rtype: dict
994   @return: dictionary with keys volume name and values the
995       size of the volume
996
997   """
998   return utils.ListVolumeGroups()
999
1000
1001 def NodeVolumes():
1002   """List all volumes on this node.
1003
1004   @rtype: list
1005   @return:
1006     A list of dictionaries, each having four keys:
1007       - name: the logical volume name,
1008       - size: the size of the logical volume
1009       - dev: the physical device on which the LV lives
1010       - vg: the volume group to which it belongs
1011
1012     In case of errors, we return an empty list and log the
1013     error.
1014
1015     Note that since a logical volume can live on multiple physical
1016     volumes, the resulting list might include a logical volume
1017     multiple times.
1018
1019   """
1020   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1021                          "--separator=|",
1022                          "--options=lv_name,lv_size,devices,vg_name"])
1023   if result.failed:
1024     _Fail("Failed to list logical volumes, lvs output: %s",
1025           result.output)
1026
1027   def parse_dev(dev):
1028     return dev.split("(")[0]
1029
1030   def handle_dev(dev):
1031     return [parse_dev(x) for x in dev.split(",")]
1032
1033   def map_line(line):
1034     line = [v.strip() for v in line]
1035     return [{"name": line[0], "size": line[1],
1036              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1037
1038   all_devs = []
1039   for line in result.stdout.splitlines():
1040     if line.count("|") >= 3:
1041       all_devs.extend(map_line(line.split("|")))
1042     else:
1043       logging.warning("Strange line in the output from lvs: '%s'", line)
1044   return all_devs
1045
1046
1047 def BridgesExist(bridges_list):
1048   """Check if a list of bridges exist on the current node.
1049
1050   @rtype: boolean
1051   @return: C{True} if all of them exist, C{False} otherwise
1052
1053   """
1054   missing = []
1055   for bridge in bridges_list:
1056     if not utils.BridgeExists(bridge):
1057       missing.append(bridge)
1058
1059   if missing:
1060     _Fail("Missing bridges %s", utils.CommaJoin(missing))
1061
1062
1063 def GetInstanceList(hypervisor_list):
1064   """Provides a list of instances.
1065
1066   @type hypervisor_list: list
1067   @param hypervisor_list: the list of hypervisors to query information
1068
1069   @rtype: list
1070   @return: a list of all running instances on the current node
1071     - instance1.example.com
1072     - instance2.example.com
1073
1074   """
1075   results = []
1076   for hname in hypervisor_list:
1077     try:
1078       names = hypervisor.GetHypervisor(hname).ListInstances()
1079       results.extend(names)
1080     except errors.HypervisorError, err:
1081       _Fail("Error enumerating instances (hypervisor %s): %s",
1082             hname, err, exc=True)
1083
1084   return results
1085
1086
1087 def GetInstanceInfo(instance, hname):
1088   """Gives back the information about an instance as a dictionary.
1089
1090   @type instance: string
1091   @param instance: the instance name
1092   @type hname: string
1093   @param hname: the hypervisor type of the instance
1094
1095   @rtype: dict
1096   @return: dictionary with the following keys:
1097       - memory: memory size of instance (int)
1098       - state: xen state of instance (string)
1099       - time: cpu time of instance (float)
1100       - vcpus: the number of vcpus (int)
1101
1102   """
1103   output = {}
1104
1105   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1106   if iinfo is not None:
1107     output["memory"] = iinfo[2]
1108     output["vcpus"] = iinfo[3]
1109     output["state"] = iinfo[4]
1110     output["time"] = iinfo[5]
1111
1112   return output
1113
1114
1115 def GetInstanceMigratable(instance):
1116   """Gives whether an instance can be migrated.
1117
1118   @type instance: L{objects.Instance}
1119   @param instance: object representing the instance to be checked.
1120
1121   @rtype: tuple
1122   @return: tuple of (result, description) where:
1123       - result: whether the instance can be migrated or not
1124       - description: a description of the issue, if relevant
1125
1126   """
1127   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1128   iname = instance.name
1129   if iname not in hyper.ListInstances():
1130     _Fail("Instance %s is not running", iname)
1131
1132   for idx in range(len(instance.disks)):
1133     link_name = _GetBlockDevSymlinkPath(iname, idx)
1134     if not os.path.islink(link_name):
1135       logging.warning("Instance %s is missing symlink %s for disk %d",
1136                       iname, link_name, idx)
1137
1138
1139 def GetAllInstancesInfo(hypervisor_list):
1140   """Gather data about all instances.
1141
1142   This is the equivalent of L{GetInstanceInfo}, except that it
1143   computes data for all instances at once, thus being faster if one
1144   needs data about more than one instance.
1145
1146   @type hypervisor_list: list
1147   @param hypervisor_list: list of hypervisors to query for instance data
1148
1149   @rtype: dict
1150   @return: dictionary of instance: data, with data having the following keys:
1151       - memory: memory size of instance (int)
1152       - state: xen state of instance (string)
1153       - time: cpu time of instance (float)
1154       - vcpus: the number of vcpus
1155
1156   """
1157   output = {}
1158
1159   for hname in hypervisor_list:
1160     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1161     if iinfo:
1162       for name, _, memory, vcpus, state, times in iinfo:
1163         value = {
1164           "memory": memory,
1165           "vcpus": vcpus,
1166           "state": state,
1167           "time": times,
1168           }
1169         if name in output:
1170           # we only check static parameters, like memory and vcpus,
1171           # and not state and time which can change between the
1172           # invocations of the different hypervisors
1173           for key in "memory", "vcpus":
1174             if value[key] != output[name][key]:
1175               _Fail("Instance %s is running twice"
1176                     " with different parameters", name)
1177         output[name] = value
1178
1179   return output
1180
1181
1182 def _InstanceLogName(kind, os_name, instance, component):
1183   """Compute the OS log filename for a given instance and operation.
1184
1185   The instance name and os name are passed in as strings since not all
1186   operations have these as part of an instance object.
1187
1188   @type kind: string
1189   @param kind: the operation type (e.g. add, import, etc.)
1190   @type os_name: string
1191   @param os_name: the os name
1192   @type instance: string
1193   @param instance: the name of the instance being imported/added/etc.
1194   @type component: string or None
1195   @param component: the name of the component of the instance being
1196       transferred
1197
1198   """
1199   # TODO: Use tempfile.mkstemp to create unique filename
1200   if component:
1201     assert "/" not in component
1202     c_msg = "-%s" % component
1203   else:
1204     c_msg = ""
1205   base = ("%s-%s-%s%s-%s.log" %
1206           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1207   return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1208
1209
1210 def InstanceOsAdd(instance, reinstall, debug):
1211   """Add an OS to an instance.
1212
1213   @type instance: L{objects.Instance}
1214   @param instance: Instance whose OS is to be installed
1215   @type reinstall: boolean
1216   @param reinstall: whether this is an instance reinstall
1217   @type debug: integer
1218   @param debug: debug level, passed to the OS scripts
1219   @rtype: None
1220
1221   """
1222   inst_os = OSFromDisk(instance.os)
1223
1224   create_env = OSEnvironment(instance, inst_os, debug)
1225   if reinstall:
1226     create_env["INSTANCE_REINSTALL"] = "1"
1227
1228   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1229
1230   result = utils.RunCmd([inst_os.create_script], env=create_env,
1231                         cwd=inst_os.path, output=logfile, reset_env=True)
1232   if result.failed:
1233     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1234                   " output: %s", result.cmd, result.fail_reason, logfile,
1235                   result.output)
1236     lines = [utils.SafeEncode(val)
1237              for val in utils.TailFile(logfile, lines=20)]
1238     _Fail("OS create script failed (%s), last lines in the"
1239           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1240
1241
1242 def RunRenameInstance(instance, old_name, debug):
1243   """Run the OS rename script for an instance.
1244
1245   @type instance: L{objects.Instance}
1246   @param instance: Instance whose OS is to be installed
1247   @type old_name: string
1248   @param old_name: previous instance name
1249   @type debug: integer
1250   @param debug: debug level, passed to the OS scripts
1251   @rtype: boolean
1252   @return: the success of the operation
1253
1254   """
1255   inst_os = OSFromDisk(instance.os)
1256
1257   rename_env = OSEnvironment(instance, inst_os, debug)
1258   rename_env["OLD_INSTANCE_NAME"] = old_name
1259
1260   logfile = _InstanceLogName("rename", instance.os,
1261                              "%s-%s" % (old_name, instance.name), None)
1262
1263   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1264                         cwd=inst_os.path, output=logfile, reset_env=True)
1265
1266   if result.failed:
1267     logging.error("os create command '%s' returned error: %s output: %s",
1268                   result.cmd, result.fail_reason, result.output)
1269     lines = [utils.SafeEncode(val)
1270              for val in utils.TailFile(logfile, lines=20)]
1271     _Fail("OS rename script failed (%s), last lines in the"
1272           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1273
1274
1275 def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1276   """Returns symlink path for block device.
1277
1278   """
1279   if _dir is None:
1280     _dir = pathutils.DISK_LINKS_DIR
1281
1282   return utils.PathJoin(_dir,
1283                         ("%s%s%s" %
1284                          (instance_name, constants.DISK_SEPARATOR, idx)))
1285
1286
1287 def _SymlinkBlockDev(instance_name, device_path, idx):
1288   """Set up symlinks to a instance's block device.
1289
1290   This is an auxiliary function run when an instance is start (on the primary
1291   node) or when an instance is migrated (on the target node).
1292
1293
1294   @param instance_name: the name of the target instance
1295   @param device_path: path of the physical block device, on the node
1296   @param idx: the disk index
1297   @return: absolute path to the disk's symlink
1298
1299   """
1300   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1301   try:
1302     os.symlink(device_path, link_name)
1303   except OSError, err:
1304     if err.errno == errno.EEXIST:
1305       if (not os.path.islink(link_name) or
1306           os.readlink(link_name) != device_path):
1307         os.remove(link_name)
1308         os.symlink(device_path, link_name)
1309     else:
1310       raise
1311
1312   return link_name
1313
1314
1315 def _RemoveBlockDevLinks(instance_name, disks):
1316   """Remove the block device symlinks belonging to the given instance.
1317
1318   """
1319   for idx, _ in enumerate(disks):
1320     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1321     if os.path.islink(link_name):
1322       try:
1323         os.remove(link_name)
1324       except OSError:
1325         logging.exception("Can't remove symlink '%s'", link_name)
1326
1327
1328 def _GatherAndLinkBlockDevs(instance):
1329   """Set up an instance's block device(s).
1330
1331   This is run on the primary node at instance startup. The block
1332   devices must be already assembled.
1333
1334   @type instance: L{objects.Instance}
1335   @param instance: the instance whose disks we shoul assemble
1336   @rtype: list
1337   @return: list of (disk_object, device_path)
1338
1339   """
1340   block_devices = []
1341   for idx, disk in enumerate(instance.disks):
1342     device = _RecursiveFindBD(disk)
1343     if device is None:
1344       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1345                                     str(disk))
1346     device.Open()
1347     try:
1348       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1349     except OSError, e:
1350       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1351                                     e.strerror)
1352
1353     block_devices.append((disk, link_name))
1354
1355   return block_devices
1356
1357
1358 def StartInstance(instance, startup_paused):
1359   """Start an instance.
1360
1361   @type instance: L{objects.Instance}
1362   @param instance: the instance object
1363   @type startup_paused: bool
1364   @param instance: pause instance at startup?
1365   @rtype: None
1366
1367   """
1368   running_instances = GetInstanceList([instance.hypervisor])
1369
1370   if instance.name in running_instances:
1371     logging.info("Instance %s already running, not starting", instance.name)
1372     return
1373
1374   try:
1375     block_devices = _GatherAndLinkBlockDevs(instance)
1376     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1377     hyper.StartInstance(instance, block_devices, startup_paused)
1378   except errors.BlockDeviceError, err:
1379     _Fail("Block device error: %s", err, exc=True)
1380   except errors.HypervisorError, err:
1381     _RemoveBlockDevLinks(instance.name, instance.disks)
1382     _Fail("Hypervisor error: %s", err, exc=True)
1383
1384
1385 def InstanceShutdown(instance, timeout):
1386   """Shut an instance down.
1387
1388   @note: this functions uses polling with a hardcoded timeout.
1389
1390   @type instance: L{objects.Instance}
1391   @param instance: the instance object
1392   @type timeout: integer
1393   @param timeout: maximum timeout for soft shutdown
1394   @rtype: None
1395
1396   """
1397   hv_name = instance.hypervisor
1398   hyper = hypervisor.GetHypervisor(hv_name)
1399   iname = instance.name
1400
1401   if instance.name not in hyper.ListInstances():
1402     logging.info("Instance %s not running, doing nothing", iname)
1403     return
1404
1405   class _TryShutdown:
1406     def __init__(self):
1407       self.tried_once = False
1408
1409     def __call__(self):
1410       if iname not in hyper.ListInstances():
1411         return
1412
1413       try:
1414         hyper.StopInstance(instance, retry=self.tried_once)
1415       except errors.HypervisorError, err:
1416         if iname not in hyper.ListInstances():
1417           # if the instance is no longer existing, consider this a
1418           # success and go to cleanup
1419           return
1420
1421         _Fail("Failed to stop instance %s: %s", iname, err)
1422
1423       self.tried_once = True
1424
1425       raise utils.RetryAgain()
1426
1427   try:
1428     utils.Retry(_TryShutdown(), 5, timeout)
1429   except utils.RetryTimeout:
1430     # the shutdown did not succeed
1431     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1432
1433     try:
1434       hyper.StopInstance(instance, force=True)
1435     except errors.HypervisorError, err:
1436       if iname in hyper.ListInstances():
1437         # only raise an error if the instance still exists, otherwise
1438         # the error could simply be "instance ... unknown"!
1439         _Fail("Failed to force stop instance %s: %s", iname, err)
1440
1441     time.sleep(1)
1442
1443     if iname in hyper.ListInstances():
1444       _Fail("Could not shutdown instance %s even by destroy", iname)
1445
1446   try:
1447     hyper.CleanupInstance(instance.name)
1448   except errors.HypervisorError, err:
1449     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1450
1451   _RemoveBlockDevLinks(iname, instance.disks)
1452
1453
1454 def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1455   """Reboot an instance.
1456
1457   @type instance: L{objects.Instance}
1458   @param instance: the instance object to reboot
1459   @type reboot_type: str
1460   @param reboot_type: the type of reboot, one the following
1461     constants:
1462       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1463         instance OS, do not recreate the VM
1464       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1465         restart the VM (at the hypervisor level)
1466       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1467         not accepted here, since that mode is handled differently, in
1468         cmdlib, and translates into full stop and start of the
1469         instance (instead of a call_instance_reboot RPC)
1470   @type shutdown_timeout: integer
1471   @param shutdown_timeout: maximum timeout for soft shutdown
1472   @rtype: None
1473
1474   """
1475   running_instances = GetInstanceList([instance.hypervisor])
1476
1477   if instance.name not in running_instances:
1478     _Fail("Cannot reboot instance %s that is not running", instance.name)
1479
1480   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1481   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1482     try:
1483       hyper.RebootInstance(instance)
1484       reason.Store(instance.name)
1485     except errors.HypervisorError, err:
1486       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1487   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1488     try:
1489       InstanceShutdown(instance, shutdown_timeout)
1490       result = StartInstance(instance, False)
1491       reason.Store(instance.name)
1492       return result
1493     except errors.HypervisorError, err:
1494       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1495   else:
1496     _Fail("Invalid reboot_type received: %s", reboot_type)
1497
1498
1499 def InstanceBalloonMemory(instance, memory):
1500   """Resize an instance's memory.
1501
1502   @type instance: L{objects.Instance}
1503   @param instance: the instance object
1504   @type memory: int
1505   @param memory: new memory amount in MB
1506   @rtype: None
1507
1508   """
1509   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1510   running = hyper.ListInstances()
1511   if instance.name not in running:
1512     logging.info("Instance %s is not running, cannot balloon", instance.name)
1513     return
1514   try:
1515     hyper.BalloonInstanceMemory(instance, memory)
1516   except errors.HypervisorError, err:
1517     _Fail("Failed to balloon instance memory: %s", err, exc=True)
1518
1519
1520 def MigrationInfo(instance):
1521   """Gather information about an instance to be migrated.
1522
1523   @type instance: L{objects.Instance}
1524   @param instance: the instance definition
1525
1526   """
1527   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1528   try:
1529     info = hyper.MigrationInfo(instance)
1530   except errors.HypervisorError, err:
1531     _Fail("Failed to fetch migration information: %s", err, exc=True)
1532   return info
1533
1534
1535 def AcceptInstance(instance, info, target):
1536   """Prepare the node to accept an instance.
1537
1538   @type instance: L{objects.Instance}
1539   @param instance: the instance definition
1540   @type info: string/data (opaque)
1541   @param info: migration information, from the source node
1542   @type target: string
1543   @param target: target host (usually ip), on this node
1544
1545   """
1546   # TODO: why is this required only for DTS_EXT_MIRROR?
1547   if instance.disk_template in constants.DTS_EXT_MIRROR:
1548     # Create the symlinks, as the disks are not active
1549     # in any way
1550     try:
1551       _GatherAndLinkBlockDevs(instance)
1552     except errors.BlockDeviceError, err:
1553       _Fail("Block device error: %s", err, exc=True)
1554
1555   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1556   try:
1557     hyper.AcceptInstance(instance, info, target)
1558   except errors.HypervisorError, err:
1559     if instance.disk_template in constants.DTS_EXT_MIRROR:
1560       _RemoveBlockDevLinks(instance.name, instance.disks)
1561     _Fail("Failed to accept instance: %s", err, exc=True)
1562
1563
1564 def FinalizeMigrationDst(instance, info, success):
1565   """Finalize any preparation to accept an instance.
1566
1567   @type instance: L{objects.Instance}
1568   @param instance: the instance definition
1569   @type info: string/data (opaque)
1570   @param info: migration information, from the source node
1571   @type success: boolean
1572   @param success: whether the migration was a success or a failure
1573
1574   """
1575   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1576   try:
1577     hyper.FinalizeMigrationDst(instance, info, success)
1578   except errors.HypervisorError, err:
1579     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1580
1581
1582 def MigrateInstance(instance, target, live):
1583   """Migrates an instance to another node.
1584
1585   @type instance: L{objects.Instance}
1586   @param instance: the instance definition
1587   @type target: string
1588   @param target: the target node name
1589   @type live: boolean
1590   @param live: whether the migration should be done live or not (the
1591       interpretation of this parameter is left to the hypervisor)
1592   @raise RPCFail: if migration fails for some reason
1593
1594   """
1595   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1596
1597   try:
1598     hyper.MigrateInstance(instance, target, live)
1599   except errors.HypervisorError, err:
1600     _Fail("Failed to migrate instance: %s", err, exc=True)
1601
1602
1603 def FinalizeMigrationSource(instance, success, live):
1604   """Finalize the instance migration on the source node.
1605
1606   @type instance: L{objects.Instance}
1607   @param instance: the instance definition of the migrated instance
1608   @type success: bool
1609   @param success: whether the migration succeeded or not
1610   @type live: bool
1611   @param live: whether the user requested a live migration or not
1612   @raise RPCFail: If the execution fails for some reason
1613
1614   """
1615   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1616
1617   try:
1618     hyper.FinalizeMigrationSource(instance, success, live)
1619   except Exception, err:  # pylint: disable=W0703
1620     _Fail("Failed to finalize the migration on the source node: %s", err,
1621           exc=True)
1622
1623
1624 def GetMigrationStatus(instance):
1625   """Get the migration status
1626
1627   @type instance: L{objects.Instance}
1628   @param instance: the instance that is being migrated
1629   @rtype: L{objects.MigrationStatus}
1630   @return: the status of the current migration (one of
1631            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1632            progress info that can be retrieved from the hypervisor
1633   @raise RPCFail: If the migration status cannot be retrieved
1634
1635   """
1636   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1637   try:
1638     return hyper.GetMigrationStatus(instance)
1639   except Exception, err:  # pylint: disable=W0703
1640     _Fail("Failed to get migration status: %s", err, exc=True)
1641
1642
1643 def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1644   """Creates a block device for an instance.
1645
1646   @type disk: L{objects.Disk}
1647   @param disk: the object describing the disk we should create
1648   @type size: int
1649   @param size: the size of the physical underlying device, in MiB
1650   @type owner: str
1651   @param owner: the name of the instance for which disk is created,
1652       used for device cache data
1653   @type on_primary: boolean
1654   @param on_primary:  indicates if it is the primary node or not
1655   @type info: string
1656   @param info: string that will be sent to the physical device
1657       creation, used for example to set (LVM) tags on LVs
1658   @type excl_stor: boolean
1659   @param excl_stor: Whether exclusive_storage is active
1660
1661   @return: the new unique_id of the device (this can sometime be
1662       computed only after creation), or None. On secondary nodes,
1663       it's not required to return anything.
1664
1665   """
1666   # TODO: remove the obsolete "size" argument
1667   # pylint: disable=W0613
1668   clist = []
1669   if disk.children:
1670     for child in disk.children:
1671       try:
1672         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1673       except errors.BlockDeviceError, err:
1674         _Fail("Can't assemble device %s: %s", child, err)
1675       if on_primary or disk.AssembleOnSecondary():
1676         # we need the children open in case the device itself has to
1677         # be assembled
1678         try:
1679           # pylint: disable=E1103
1680           crdev.Open()
1681         except errors.BlockDeviceError, err:
1682           _Fail("Can't make child '%s' read-write: %s", child, err)
1683       clist.append(crdev)
1684
1685   try:
1686     device = bdev.Create(disk, clist, excl_stor)
1687   except errors.BlockDeviceError, err:
1688     _Fail("Can't create block device: %s", err)
1689
1690   if on_primary or disk.AssembleOnSecondary():
1691     try:
1692       device.Assemble()
1693     except errors.BlockDeviceError, err:
1694       _Fail("Can't assemble device after creation, unusual event: %s", err)
1695     if on_primary or disk.OpenOnSecondary():
1696       try:
1697         device.Open(force=True)
1698       except errors.BlockDeviceError, err:
1699         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1700     DevCacheManager.UpdateCache(device.dev_path, owner,
1701                                 on_primary, disk.iv_name)
1702
1703   device.SetInfo(info)
1704
1705   return device.unique_id
1706
1707
1708 def _WipeDevice(path, offset, size):
1709   """This function actually wipes the device.
1710
1711   @param path: The path to the device to wipe
1712   @param offset: The offset in MiB in the file
1713   @param size: The size in MiB to write
1714
1715   """
1716   # Internal sizes are always in Mebibytes; if the following "dd" command
1717   # should use a different block size the offset and size given to this
1718   # function must be adjusted accordingly before being passed to "dd".
1719   block_size = 1024 * 1024
1720
1721   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1722          "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1723          "count=%d" % size]
1724   result = utils.RunCmd(cmd)
1725
1726   if result.failed:
1727     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1728           result.fail_reason, result.output)
1729
1730
1731 def BlockdevWipe(disk, offset, size):
1732   """Wipes a block device.
1733
1734   @type disk: L{objects.Disk}
1735   @param disk: the disk object we want to wipe
1736   @type offset: int
1737   @param offset: The offset in MiB in the file
1738   @type size: int
1739   @param size: The size in MiB to write
1740
1741   """
1742   try:
1743     rdev = _RecursiveFindBD(disk)
1744   except errors.BlockDeviceError:
1745     rdev = None
1746
1747   if not rdev:
1748     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1749
1750   # Do cross verify some of the parameters
1751   if offset < 0:
1752     _Fail("Negative offset")
1753   if size < 0:
1754     _Fail("Negative size")
1755   if offset > rdev.size:
1756     _Fail("Offset is bigger than device size")
1757   if (offset + size) > rdev.size:
1758     _Fail("The provided offset and size to wipe is bigger than device size")
1759
1760   _WipeDevice(rdev.dev_path, offset, size)
1761
1762
1763 def BlockdevPauseResumeSync(disks, pause):
1764   """Pause or resume the sync of the block device.
1765
1766   @type disks: list of L{objects.Disk}
1767   @param disks: the disks object we want to pause/resume
1768   @type pause: bool
1769   @param pause: Wheater to pause or resume
1770
1771   """
1772   success = []
1773   for disk in disks:
1774     try:
1775       rdev = _RecursiveFindBD(disk)
1776     except errors.BlockDeviceError:
1777       rdev = None
1778
1779     if not rdev:
1780       success.append((False, ("Cannot change sync for device %s:"
1781                               " device not found" % disk.iv_name)))
1782       continue
1783
1784     result = rdev.PauseResumeSync(pause)
1785
1786     if result:
1787       success.append((result, None))
1788     else:
1789       if pause:
1790         msg = "Pause"
1791       else:
1792         msg = "Resume"
1793       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1794
1795   return success
1796
1797
1798 def BlockdevRemove(disk):
1799   """Remove a block device.
1800
1801   @note: This is intended to be called recursively.
1802
1803   @type disk: L{objects.Disk}
1804   @param disk: the disk object we should remove
1805   @rtype: boolean
1806   @return: the success of the operation
1807
1808   """
1809   msgs = []
1810   try:
1811     rdev = _RecursiveFindBD(disk)
1812   except errors.BlockDeviceError, err:
1813     # probably can't attach
1814     logging.info("Can't attach to device %s in remove", disk)
1815     rdev = None
1816   if rdev is not None:
1817     r_path = rdev.dev_path
1818     try:
1819       rdev.Remove()
1820     except errors.BlockDeviceError, err:
1821       msgs.append(str(err))
1822     if not msgs:
1823       DevCacheManager.RemoveCache(r_path)
1824
1825   if disk.children:
1826     for child in disk.children:
1827       try:
1828         BlockdevRemove(child)
1829       except RPCFail, err:
1830         msgs.append(str(err))
1831
1832   if msgs:
1833     _Fail("; ".join(msgs))
1834
1835
1836 def _RecursiveAssembleBD(disk, owner, as_primary):
1837   """Activate a block device for an instance.
1838
1839   This is run on the primary and secondary nodes for an instance.
1840
1841   @note: this function is called recursively.
1842
1843   @type disk: L{objects.Disk}
1844   @param disk: the disk we try to assemble
1845   @type owner: str
1846   @param owner: the name of the instance which owns the disk
1847   @type as_primary: boolean
1848   @param as_primary: if we should make the block device
1849       read/write
1850
1851   @return: the assembled device or None (in case no device
1852       was assembled)
1853   @raise errors.BlockDeviceError: in case there is an error
1854       during the activation of the children or the device
1855       itself
1856
1857   """
1858   children = []
1859   if disk.children:
1860     mcn = disk.ChildrenNeeded()
1861     if mcn == -1:
1862       mcn = 0 # max number of Nones allowed
1863     else:
1864       mcn = len(disk.children) - mcn # max number of Nones
1865     for chld_disk in disk.children:
1866       try:
1867         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1868       except errors.BlockDeviceError, err:
1869         if children.count(None) >= mcn:
1870           raise
1871         cdev = None
1872         logging.error("Error in child activation (but continuing): %s",
1873                       str(err))
1874       children.append(cdev)
1875
1876   if as_primary or disk.AssembleOnSecondary():
1877     r_dev = bdev.Assemble(disk, children)
1878     result = r_dev
1879     if as_primary or disk.OpenOnSecondary():
1880       r_dev.Open()
1881     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1882                                 as_primary, disk.iv_name)
1883
1884   else:
1885     result = True
1886   return result
1887
1888
1889 def BlockdevAssemble(disk, owner, as_primary, idx):
1890   """Activate a block device for an instance.
1891
1892   This is a wrapper over _RecursiveAssembleBD.
1893
1894   @rtype: str or boolean
1895   @return: a C{/dev/...} path for primary nodes, and
1896       C{True} for secondary nodes
1897
1898   """
1899   try:
1900     result = _RecursiveAssembleBD(disk, owner, as_primary)
1901     if isinstance(result, bdev.BlockDev):
1902       # pylint: disable=E1103
1903       result = result.dev_path
1904       if as_primary:
1905         _SymlinkBlockDev(owner, result, idx)
1906   except errors.BlockDeviceError, err:
1907     _Fail("Error while assembling disk: %s", err, exc=True)
1908   except OSError, err:
1909     _Fail("Error while symlinking disk: %s", err, exc=True)
1910
1911   return result
1912
1913
1914 def BlockdevShutdown(disk):
1915   """Shut down a block device.
1916
1917   First, if the device is assembled (Attach() is successful), then
1918   the device is shutdown. Then the children of the device are
1919   shutdown.
1920
1921   This function is called recursively. Note that we don't cache the
1922   children or such, as oppossed to assemble, shutdown of different
1923   devices doesn't require that the upper device was active.
1924
1925   @type disk: L{objects.Disk}
1926   @param disk: the description of the disk we should
1927       shutdown
1928   @rtype: None
1929
1930   """
1931   msgs = []
1932   r_dev = _RecursiveFindBD(disk)
1933   if r_dev is not None:
1934     r_path = r_dev.dev_path
1935     try:
1936       r_dev.Shutdown()
1937       DevCacheManager.RemoveCache(r_path)
1938     except errors.BlockDeviceError, err:
1939       msgs.append(str(err))
1940
1941   if disk.children:
1942     for child in disk.children:
1943       try:
1944         BlockdevShutdown(child)
1945       except RPCFail, err:
1946         msgs.append(str(err))
1947
1948   if msgs:
1949     _Fail("; ".join(msgs))
1950
1951
1952 def BlockdevAddchildren(parent_cdev, new_cdevs):
1953   """Extend a mirrored block device.
1954
1955   @type parent_cdev: L{objects.Disk}
1956   @param parent_cdev: the disk to which we should add children
1957   @type new_cdevs: list of L{objects.Disk}
1958   @param new_cdevs: the list of children which we should add
1959   @rtype: None
1960
1961   """
1962   parent_bdev = _RecursiveFindBD(parent_cdev)
1963   if parent_bdev is None:
1964     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1965   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1966   if new_bdevs.count(None) > 0:
1967     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1968   parent_bdev.AddChildren(new_bdevs)
1969
1970
1971 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1972   """Shrink a mirrored block device.
1973
1974   @type parent_cdev: L{objects.Disk}
1975   @param parent_cdev: the disk from which we should remove children
1976   @type new_cdevs: list of L{objects.Disk}
1977   @param new_cdevs: the list of children which we should remove
1978   @rtype: None
1979
1980   """
1981   parent_bdev = _RecursiveFindBD(parent_cdev)
1982   if parent_bdev is None:
1983     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1984   devs = []
1985   for disk in new_cdevs:
1986     rpath = disk.StaticDevPath()
1987     if rpath is None:
1988       bd = _RecursiveFindBD(disk)
1989       if bd is None:
1990         _Fail("Can't find device %s while removing children", disk)
1991       else:
1992         devs.append(bd.dev_path)
1993     else:
1994       if not utils.IsNormAbsPath(rpath):
1995         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1996       devs.append(rpath)
1997   parent_bdev.RemoveChildren(devs)
1998
1999
2000 def BlockdevGetmirrorstatus(disks):
2001   """Get the mirroring status of a list of devices.
2002
2003   @type disks: list of L{objects.Disk}
2004   @param disks: the list of disks which we should query
2005   @rtype: disk
2006   @return: List of L{objects.BlockDevStatus}, one for each disk
2007   @raise errors.BlockDeviceError: if any of the disks cannot be
2008       found
2009
2010   """
2011   stats = []
2012   for dsk in disks:
2013     rbd = _RecursiveFindBD(dsk)
2014     if rbd is None:
2015       _Fail("Can't find device %s", dsk)
2016
2017     stats.append(rbd.CombinedSyncStatus())
2018
2019   return stats
2020
2021
2022 def BlockdevGetmirrorstatusMulti(disks):
2023   """Get the mirroring status of a list of devices.
2024
2025   @type disks: list of L{objects.Disk}
2026   @param disks: the list of disks which we should query
2027   @rtype: disk
2028   @return: List of tuples, (bool, status), one for each disk; bool denotes
2029     success/failure, status is L{objects.BlockDevStatus} on success, string
2030     otherwise
2031
2032   """
2033   result = []
2034   for disk in disks:
2035     try:
2036       rbd = _RecursiveFindBD(disk)
2037       if rbd is None:
2038         result.append((False, "Can't find device %s" % disk))
2039         continue
2040
2041       status = rbd.CombinedSyncStatus()
2042     except errors.BlockDeviceError, err:
2043       logging.exception("Error while getting disk status")
2044       result.append((False, str(err)))
2045     else:
2046       result.append((True, status))
2047
2048   assert len(disks) == len(result)
2049
2050   return result
2051
2052
2053 def _RecursiveFindBD(disk):
2054   """Check if a device is activated.
2055
2056   If so, return information about the real device.
2057
2058   @type disk: L{objects.Disk}
2059   @param disk: the disk object we need to find
2060
2061   @return: None if the device can't be found,
2062       otherwise the device instance
2063
2064   """
2065   children = []
2066   if disk.children:
2067     for chdisk in disk.children:
2068       children.append(_RecursiveFindBD(chdisk))
2069
2070   return bdev.FindDevice(disk, children)
2071
2072
2073 def _OpenRealBD(disk):
2074   """Opens the underlying block device of a disk.
2075
2076   @type disk: L{objects.Disk}
2077   @param disk: the disk object we want to open
2078
2079   """
2080   real_disk = _RecursiveFindBD(disk)
2081   if real_disk is None:
2082     _Fail("Block device '%s' is not set up", disk)
2083
2084   real_disk.Open()
2085
2086   return real_disk
2087
2088
2089 def BlockdevFind(disk):
2090   """Check if a device is activated.
2091
2092   If it is, return information about the real device.
2093
2094   @type disk: L{objects.Disk}
2095   @param disk: the disk to find
2096   @rtype: None or objects.BlockDevStatus
2097   @return: None if the disk cannot be found, otherwise a the current
2098            information
2099
2100   """
2101   try:
2102     rbd = _RecursiveFindBD(disk)
2103   except errors.BlockDeviceError, err:
2104     _Fail("Failed to find device: %s", err, exc=True)
2105
2106   if rbd is None:
2107     return None
2108
2109   return rbd.GetSyncStatus()
2110
2111
2112 def BlockdevGetsize(disks):
2113   """Computes the size of the given disks.
2114
2115   If a disk is not found, returns None instead.
2116
2117   @type disks: list of L{objects.Disk}
2118   @param disks: the list of disk to compute the size for
2119   @rtype: list
2120   @return: list with elements None if the disk cannot be found,
2121       otherwise the size
2122
2123   """
2124   result = []
2125   for cf in disks:
2126     try:
2127       rbd = _RecursiveFindBD(cf)
2128     except errors.BlockDeviceError:
2129       result.append(None)
2130       continue
2131     if rbd is None:
2132       result.append(None)
2133     else:
2134       result.append(rbd.GetActualSize())
2135   return result
2136
2137
2138 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2139   """Export a block device to a remote node.
2140
2141   @type disk: L{objects.Disk}
2142   @param disk: the description of the disk to export
2143   @type dest_node: str
2144   @param dest_node: the destination node to export to
2145   @type dest_path: str
2146   @param dest_path: the destination path on the target node
2147   @type cluster_name: str
2148   @param cluster_name: the cluster name, needed for SSH hostalias
2149   @rtype: None
2150
2151   """
2152   real_disk = _OpenRealBD(disk)
2153
2154   # the block size on the read dd is 1MiB to match our units
2155   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2156                                "dd if=%s bs=1048576 count=%s",
2157                                real_disk.dev_path, str(disk.size))
2158
2159   # we set here a smaller block size as, due to ssh buffering, more
2160   # than 64-128k will mostly ignored; we use nocreat to fail if the
2161   # device is not already there or we pass a wrong path; we use
2162   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2163   # to not buffer too much memory; this means that at best, we flush
2164   # every 64k, which will not be very fast
2165   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2166                                 " oflag=dsync", dest_path)
2167
2168   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2169                                                    constants.SSH_LOGIN_USER,
2170                                                    destcmd)
2171
2172   # all commands have been checked, so we're safe to combine them
2173   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2174
2175   result = utils.RunCmd(["bash", "-c", command])
2176
2177   if result.failed:
2178     _Fail("Disk copy command '%s' returned error: %s"
2179           " output: %s", command, result.fail_reason, result.output)
2180
2181
2182 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2183   """Write a file to the filesystem.
2184
2185   This allows the master to overwrite(!) a file. It will only perform
2186   the operation if the file belongs to a list of configuration files.
2187
2188   @type file_name: str
2189   @param file_name: the target file name
2190   @type data: str
2191   @param data: the new contents of the file
2192   @type mode: int
2193   @param mode: the mode to give the file (can be None)
2194   @type uid: string
2195   @param uid: the owner of the file
2196   @type gid: string
2197   @param gid: the group of the file
2198   @type atime: float
2199   @param atime: the atime to set on the file (can be None)
2200   @type mtime: float
2201   @param mtime: the mtime to set on the file (can be None)
2202   @rtype: None
2203
2204   """
2205   file_name = vcluster.LocalizeVirtualPath(file_name)
2206
2207   if not os.path.isabs(file_name):
2208     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2209
2210   if file_name not in _ALLOWED_UPLOAD_FILES:
2211     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2212           file_name)
2213
2214   raw_data = _Decompress(data)
2215
2216   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2217     _Fail("Invalid username/groupname type")
2218
2219   getents = runtime.GetEnts()
2220   uid = getents.LookupUser(uid)
2221   gid = getents.LookupGroup(gid)
2222
2223   utils.SafeWriteFile(file_name, None,
2224                       data=raw_data, mode=mode, uid=uid, gid=gid,
2225                       atime=atime, mtime=mtime)
2226
2227
2228 def RunOob(oob_program, command, node, timeout):
2229   """Executes oob_program with given command on given node.
2230
2231   @param oob_program: The path to the executable oob_program
2232   @param command: The command to invoke on oob_program
2233   @param node: The node given as an argument to the program
2234   @param timeout: Timeout after which we kill the oob program
2235
2236   @return: stdout
2237   @raise RPCFail: If execution fails for some reason
2238
2239   """
2240   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2241
2242   if result.failed:
2243     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2244           result.fail_reason, result.output)
2245
2246   return result.stdout
2247
2248
2249 def _OSOndiskAPIVersion(os_dir):
2250   """Compute and return the API version of a given OS.
2251
2252   This function will try to read the API version of the OS residing in
2253   the 'os_dir' directory.
2254
2255   @type os_dir: str
2256   @param os_dir: the directory in which we should look for the OS
2257   @rtype: tuple
2258   @return: tuple (status, data) with status denoting the validity and
2259       data holding either the vaid versions or an error message
2260
2261   """
2262   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2263
2264   try:
2265     st = os.stat(api_file)
2266   except EnvironmentError, err:
2267     return False, ("Required file '%s' not found under path %s: %s" %
2268                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2269
2270   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2271     return False, ("File '%s' in %s is not a regular file" %
2272                    (constants.OS_API_FILE, os_dir))
2273
2274   try:
2275     api_versions = utils.ReadFile(api_file).splitlines()
2276   except EnvironmentError, err:
2277     return False, ("Error while reading the API version file at %s: %s" %
2278                    (api_file, utils.ErrnoOrStr(err)))
2279
2280   try:
2281     api_versions = [int(version.strip()) for version in api_versions]
2282   except (TypeError, ValueError), err:
2283     return False, ("API version(s) can't be converted to integer: %s" %
2284                    str(err))
2285
2286   return True, api_versions
2287
2288
2289 def DiagnoseOS(top_dirs=None):
2290   """Compute the validity for all OSes.
2291
2292   @type top_dirs: list
2293   @param top_dirs: the list of directories in which to
2294       search (if not given defaults to
2295       L{pathutils.OS_SEARCH_PATH})
2296   @rtype: list of L{objects.OS}
2297   @return: a list of tuples (name, path, status, diagnose, variants,
2298       parameters, api_version) for all (potential) OSes under all
2299       search paths, where:
2300           - name is the (potential) OS name
2301           - path is the full path to the OS
2302           - status True/False is the validity of the OS
2303           - diagnose is the error message for an invalid OS, otherwise empty
2304           - variants is a list of supported OS variants, if any
2305           - parameters is a list of (name, help) parameters, if any
2306           - api_version is a list of support OS API versions
2307
2308   """
2309   if top_dirs is None:
2310     top_dirs = pathutils.OS_SEARCH_PATH
2311
2312   result = []
2313   for dir_name in top_dirs:
2314     if os.path.isdir(dir_name):
2315       try:
2316         f_names = utils.ListVisibleFiles(dir_name)
2317       except EnvironmentError, err:
2318         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2319         break
2320       for name in f_names:
2321         os_path = utils.PathJoin(dir_name, name)
2322         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2323         if status:
2324           diagnose = ""
2325           variants = os_inst.supported_variants
2326           parameters = os_inst.supported_parameters
2327           api_versions = os_inst.api_versions
2328         else:
2329           diagnose = os_inst
2330           variants = parameters = api_versions = []
2331         result.append((name, os_path, status, diagnose, variants,
2332                        parameters, api_versions))
2333
2334   return result
2335
2336
2337 def _TryOSFromDisk(name, base_dir=None):
2338   """Create an OS instance from disk.
2339
2340   This function will return an OS instance if the given name is a
2341   valid OS name.
2342
2343   @type base_dir: string
2344   @keyword base_dir: Base directory containing OS installations.
2345                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2346   @rtype: tuple
2347   @return: success and either the OS instance if we find a valid one,
2348       or error message
2349
2350   """
2351   if base_dir is None:
2352     os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2353   else:
2354     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2355
2356   if os_dir is None:
2357     return False, "Directory for OS %s not found in search path" % name
2358
2359   status, api_versions = _OSOndiskAPIVersion(os_dir)
2360   if not status:
2361     # push the error up
2362     return status, api_versions
2363
2364   if not constants.OS_API_VERSIONS.intersection(api_versions):
2365     return False, ("API version mismatch for path '%s': found %s, want %s." %
2366                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2367
2368   # OS Files dictionary, we will populate it with the absolute path
2369   # names; if the value is True, then it is a required file, otherwise
2370   # an optional one
2371   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2372
2373   if max(api_versions) >= constants.OS_API_V15:
2374     os_files[constants.OS_VARIANTS_FILE] = False
2375
2376   if max(api_versions) >= constants.OS_API_V20:
2377     os_files[constants.OS_PARAMETERS_FILE] = True
2378   else:
2379     del os_files[constants.OS_SCRIPT_VERIFY]
2380
2381   for (filename, required) in os_files.items():
2382     os_files[filename] = utils.PathJoin(os_dir, filename)
2383
2384     try:
2385       st = os.stat(os_files[filename])
2386     except EnvironmentError, err:
2387       if err.errno == errno.ENOENT and not required:
2388         del os_files[filename]
2389         continue
2390       return False, ("File '%s' under path '%s' is missing (%s)" %
2391                      (filename, os_dir, utils.ErrnoOrStr(err)))
2392
2393     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2394       return False, ("File '%s' under path '%s' is not a regular file" %
2395                      (filename, os_dir))
2396
2397     if filename in constants.OS_SCRIPTS:
2398       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2399         return False, ("File '%s' under path '%s' is not executable" %
2400                        (filename, os_dir))
2401
2402   variants = []
2403   if constants.OS_VARIANTS_FILE in os_files:
2404     variants_file = os_files[constants.OS_VARIANTS_FILE]
2405     try:
2406       variants = \
2407         utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2408     except EnvironmentError, err:
2409       # we accept missing files, but not other errors
2410       if err.errno != errno.ENOENT:
2411         return False, ("Error while reading the OS variants file at %s: %s" %
2412                        (variants_file, utils.ErrnoOrStr(err)))
2413
2414   parameters = []
2415   if constants.OS_PARAMETERS_FILE in os_files:
2416     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2417     try:
2418       parameters = utils.ReadFile(parameters_file).splitlines()
2419     except EnvironmentError, err:
2420       return False, ("Error while reading the OS parameters file at %s: %s" %
2421                      (parameters_file, utils.ErrnoOrStr(err)))
2422     parameters = [v.split(None, 1) for v in parameters]
2423
2424   os_obj = objects.OS(name=name, path=os_dir,
2425                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2426                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2427                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2428                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2429                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2430                                                  None),
2431                       supported_variants=variants,
2432                       supported_parameters=parameters,
2433                       api_versions=api_versions)
2434   return True, os_obj
2435
2436
2437 def OSFromDisk(name, base_dir=None):
2438   """Create an OS instance from disk.
2439
2440   This function will return an OS instance if the given name is a
2441   valid OS name. Otherwise, it will raise an appropriate
2442   L{RPCFail} exception, detailing why this is not a valid OS.
2443
2444   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2445   an exception but returns true/false status data.
2446
2447   @type base_dir: string
2448   @keyword base_dir: Base directory containing OS installations.
2449                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2450   @rtype: L{objects.OS}
2451   @return: the OS instance if we find a valid one
2452   @raise RPCFail: if we don't find a valid OS
2453
2454   """
2455   name_only = objects.OS.GetName(name)
2456   status, payload = _TryOSFromDisk(name_only, base_dir)
2457
2458   if not status:
2459     _Fail(payload)
2460
2461   return payload
2462
2463
2464 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2465   """Calculate the basic environment for an os script.
2466
2467   @type os_name: str
2468   @param os_name: full operating system name (including variant)
2469   @type inst_os: L{objects.OS}
2470   @param inst_os: operating system for which the environment is being built
2471   @type os_params: dict
2472   @param os_params: the OS parameters
2473   @type debug: integer
2474   @param debug: debug level (0 or 1, for OS Api 10)
2475   @rtype: dict
2476   @return: dict of environment variables
2477   @raise errors.BlockDeviceError: if the block device
2478       cannot be found
2479
2480   """
2481   result = {}
2482   api_version = \
2483     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2484   result["OS_API_VERSION"] = "%d" % api_version
2485   result["OS_NAME"] = inst_os.name
2486   result["DEBUG_LEVEL"] = "%d" % debug
2487
2488   # OS variants
2489   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2490     variant = objects.OS.GetVariant(os_name)
2491     if not variant:
2492       variant = inst_os.supported_variants[0]
2493   else:
2494     variant = ""
2495   result["OS_VARIANT"] = variant
2496
2497   # OS params
2498   for pname, pvalue in os_params.items():
2499     result["OSP_%s" % pname.upper()] = pvalue
2500
2501   # Set a default path otherwise programs called by OS scripts (or
2502   # even hooks called from OS scripts) might break, and we don't want
2503   # to have each script require setting a PATH variable
2504   result["PATH"] = constants.HOOKS_PATH
2505
2506   return result
2507
2508
2509 def OSEnvironment(instance, inst_os, debug=0):
2510   """Calculate the environment for an os script.
2511
2512   @type instance: L{objects.Instance}
2513   @param instance: target instance for the os script run
2514   @type inst_os: L{objects.OS}
2515   @param inst_os: operating system for which the environment is being built
2516   @type debug: integer
2517   @param debug: debug level (0 or 1, for OS Api 10)
2518   @rtype: dict
2519   @return: dict of environment variables
2520   @raise errors.BlockDeviceError: if the block device
2521       cannot be found
2522
2523   """
2524   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2525
2526   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2527     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2528
2529   result["HYPERVISOR"] = instance.hypervisor
2530   result["DISK_COUNT"] = "%d" % len(instance.disks)
2531   result["NIC_COUNT"] = "%d" % len(instance.nics)
2532   result["INSTANCE_SECONDARY_NODES"] = \
2533       ("%s" % " ".join(instance.secondary_nodes))
2534
2535   # Disks
2536   for idx, disk in enumerate(instance.disks):
2537     real_disk = _OpenRealBD(disk)
2538     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2539     result["DISK_%d_ACCESS" % idx] = disk.mode
2540     if constants.HV_DISK_TYPE in instance.hvparams:
2541       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2542         instance.hvparams[constants.HV_DISK_TYPE]
2543     if disk.dev_type in constants.LDS_BLOCK:
2544       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2545     elif disk.dev_type == constants.LD_FILE:
2546       result["DISK_%d_BACKEND_TYPE" % idx] = \
2547         "file:%s" % disk.physical_id[0]
2548
2549   # NICs
2550   for idx, nic in enumerate(instance.nics):
2551     result["NIC_%d_MAC" % idx] = nic.mac
2552     if nic.ip:
2553       result["NIC_%d_IP" % idx] = nic.ip
2554     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2555     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2556       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2557     if nic.nicparams[constants.NIC_LINK]:
2558       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2559     if nic.netinfo:
2560       nobj = objects.Network.FromDict(nic.netinfo)
2561       result.update(nobj.HooksDict("NIC_%d_" % idx))
2562     elif nic.network:
2563       # FIXME: broken network reference: the instance NIC specifies a network,
2564       # but the relevant network entry was not in the config. This should be
2565       # made impossible.
2566       result["INSTANCE_NIC%d_NETWORK" % idx] = nic.network
2567     if constants.HV_NIC_TYPE in instance.hvparams:
2568       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2569         instance.hvparams[constants.HV_NIC_TYPE]
2570
2571   # HV/BE params
2572   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2573     for key, value in source.items():
2574       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2575
2576   return result
2577
2578
2579 def DiagnoseExtStorage(top_dirs=None):
2580   """Compute the validity for all ExtStorage Providers.
2581
2582   @type top_dirs: list
2583   @param top_dirs: the list of directories in which to
2584       search (if not given defaults to
2585       L{pathutils.ES_SEARCH_PATH})
2586   @rtype: list of L{objects.ExtStorage}
2587   @return: a list of tuples (name, path, status, diagnose, parameters)
2588       for all (potential) ExtStorage Providers under all
2589       search paths, where:
2590           - name is the (potential) ExtStorage Provider
2591           - path is the full path to the ExtStorage Provider
2592           - status True/False is the validity of the ExtStorage Provider
2593           - diagnose is the error message for an invalid ExtStorage Provider,
2594             otherwise empty
2595           - parameters is a list of (name, help) parameters, if any
2596
2597   """
2598   if top_dirs is None:
2599     top_dirs = pathutils.ES_SEARCH_PATH
2600
2601   result = []
2602   for dir_name in top_dirs:
2603     if os.path.isdir(dir_name):
2604       try:
2605         f_names = utils.ListVisibleFiles(dir_name)
2606       except EnvironmentError, err:
2607         logging.exception("Can't list the ExtStorage directory %s: %s",
2608                           dir_name, err)
2609         break
2610       for name in f_names:
2611         es_path = utils.PathJoin(dir_name, name)
2612         status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2613         if status:
2614           diagnose = ""
2615           parameters = es_inst.supported_parameters
2616         else:
2617           diagnose = es_inst
2618           parameters = []
2619         result.append((name, es_path, status, diagnose, parameters))
2620
2621   return result
2622
2623
2624 def BlockdevGrow(disk, amount, dryrun, backingstore):
2625   """Grow a stack of block devices.
2626
2627   This function is called recursively, with the childrens being the
2628   first ones to resize.
2629
2630   @type disk: L{objects.Disk}
2631   @param disk: the disk to be grown
2632   @type amount: integer
2633   @param amount: the amount (in mebibytes) to grow with
2634   @type dryrun: boolean
2635   @param dryrun: whether to execute the operation in simulation mode
2636       only, without actually increasing the size
2637   @param backingstore: whether to execute the operation on backing storage
2638       only, or on "logical" storage only; e.g. DRBD is logical storage,
2639       whereas LVM, file, RBD are backing storage
2640   @rtype: (status, result)
2641   @return: a tuple with the status of the operation (True/False), and
2642       the errors message if status is False
2643
2644   """
2645   r_dev = _RecursiveFindBD(disk)
2646   if r_dev is None:
2647     _Fail("Cannot find block device %s", disk)
2648
2649   try:
2650     r_dev.Grow(amount, dryrun, backingstore)
2651   except errors.BlockDeviceError, err:
2652     _Fail("Failed to grow block device: %s", err, exc=True)
2653
2654
2655 def BlockdevSnapshot(disk):
2656   """Create a snapshot copy of a block device.
2657
2658   This function is called recursively, and the snapshot is actually created
2659   just for the leaf lvm backend device.
2660
2661   @type disk: L{objects.Disk}
2662   @param disk: the disk to be snapshotted
2663   @rtype: string
2664   @return: snapshot disk ID as (vg, lv)
2665
2666   """
2667   if disk.dev_type == constants.LD_DRBD8:
2668     if not disk.children:
2669       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2670             disk.unique_id)
2671     return BlockdevSnapshot(disk.children[0])
2672   elif disk.dev_type == constants.LD_LV:
2673     r_dev = _RecursiveFindBD(disk)
2674     if r_dev is not None:
2675       # FIXME: choose a saner value for the snapshot size
2676       # let's stay on the safe side and ask for the full size, for now
2677       return r_dev.Snapshot(disk.size)
2678     else:
2679       _Fail("Cannot find block device %s", disk)
2680   else:
2681     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2682           disk.unique_id, disk.dev_type)
2683
2684
2685 def BlockdevSetInfo(disk, info):
2686   """Sets 'metadata' information on block devices.
2687
2688   This function sets 'info' metadata on block devices. Initial
2689   information is set at device creation; this function should be used
2690   for example after renames.
2691
2692   @type disk: L{objects.Disk}
2693   @param disk: the disk to be grown
2694   @type info: string
2695   @param info: new 'info' metadata
2696   @rtype: (status, result)
2697   @return: a tuple with the status of the operation (True/False), and
2698       the errors message if status is False
2699
2700   """
2701   r_dev = _RecursiveFindBD(disk)
2702   if r_dev is None:
2703     _Fail("Cannot find block device %s", disk)
2704
2705   try:
2706     r_dev.SetInfo(info)
2707   except errors.BlockDeviceError, err:
2708     _Fail("Failed to set information on block device: %s", err, exc=True)
2709
2710
2711 def FinalizeExport(instance, snap_disks):
2712   """Write out the export configuration information.
2713
2714   @type instance: L{objects.Instance}
2715   @param instance: the instance which we export, used for
2716       saving configuration
2717   @type snap_disks: list of L{objects.Disk}
2718   @param snap_disks: list of snapshot block devices, which
2719       will be used to get the actual name of the dump file
2720
2721   @rtype: None
2722
2723   """
2724   destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2725   finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2726
2727   config = objects.SerializableConfigParser()
2728
2729   config.add_section(constants.INISECT_EXP)
2730   config.set(constants.INISECT_EXP, "version", "0")
2731   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2732   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2733   config.set(constants.INISECT_EXP, "os", instance.os)
2734   config.set(constants.INISECT_EXP, "compression", "none")
2735
2736   config.add_section(constants.INISECT_INS)
2737   config.set(constants.INISECT_INS, "name", instance.name)
2738   config.set(constants.INISECT_INS, "maxmem", "%d" %
2739              instance.beparams[constants.BE_MAXMEM])
2740   config.set(constants.INISECT_INS, "minmem", "%d" %
2741              instance.beparams[constants.BE_MINMEM])
2742   # "memory" is deprecated, but useful for exporting to old ganeti versions
2743   config.set(constants.INISECT_INS, "memory", "%d" %
2744              instance.beparams[constants.BE_MAXMEM])
2745   config.set(constants.INISECT_INS, "vcpus", "%d" %
2746              instance.beparams[constants.BE_VCPUS])
2747   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2748   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2749   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2750
2751   nic_total = 0
2752   for nic_count, nic in enumerate(instance.nics):
2753     nic_total += 1
2754     config.set(constants.INISECT_INS, "nic%d_mac" %
2755                nic_count, "%s" % nic.mac)
2756     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2757     config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2758                "%s" % nic.network)
2759     for param in constants.NICS_PARAMETER_TYPES:
2760       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2761                  "%s" % nic.nicparams.get(param, None))
2762   # TODO: redundant: on load can read nics until it doesn't exist
2763   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2764
2765   disk_total = 0
2766   for disk_count, disk in enumerate(snap_disks):
2767     if disk:
2768       disk_total += 1
2769       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2770                  ("%s" % disk.iv_name))
2771       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2772                  ("%s" % disk.physical_id[1]))
2773       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2774                  ("%d" % disk.size))
2775
2776   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2777
2778   # New-style hypervisor/backend parameters
2779
2780   config.add_section(constants.INISECT_HYP)
2781   for name, value in instance.hvparams.items():
2782     if name not in constants.HVC_GLOBALS:
2783       config.set(constants.INISECT_HYP, name, str(value))
2784
2785   config.add_section(constants.INISECT_BEP)
2786   for name, value in instance.beparams.items():
2787     config.set(constants.INISECT_BEP, name, str(value))
2788
2789   config.add_section(constants.INISECT_OSP)
2790   for name, value in instance.osparams.items():
2791     config.set(constants.INISECT_OSP, name, str(value))
2792
2793   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2794                   data=config.Dumps())
2795   shutil.rmtree(finaldestdir, ignore_errors=True)
2796   shutil.move(destdir, finaldestdir)
2797
2798
2799 def ExportInfo(dest):
2800   """Get export configuration information.
2801
2802   @type dest: str
2803   @param dest: directory containing the export
2804
2805   @rtype: L{objects.SerializableConfigParser}
2806   @return: a serializable config file containing the
2807       export info
2808
2809   """
2810   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2811
2812   config = objects.SerializableConfigParser()
2813   config.read(cff)
2814
2815   if (not config.has_section(constants.INISECT_EXP) or
2816       not config.has_section(constants.INISECT_INS)):
2817     _Fail("Export info file doesn't have the required fields")
2818
2819   return config.Dumps()
2820
2821
2822 def ListExports():
2823   """Return a list of exports currently available on this machine.
2824
2825   @rtype: list
2826   @return: list of the exports
2827
2828   """
2829   if os.path.isdir(pathutils.EXPORT_DIR):
2830     return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2831   else:
2832     _Fail("No exports directory")
2833
2834
2835 def RemoveExport(export):
2836   """Remove an existing export from the node.
2837
2838   @type export: str
2839   @param export: the name of the export to remove
2840   @rtype: None
2841
2842   """
2843   target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2844
2845   try:
2846     shutil.rmtree(target)
2847   except EnvironmentError, err:
2848     _Fail("Error while removing the export: %s", err, exc=True)
2849
2850
2851 def BlockdevRename(devlist):
2852   """Rename a list of block devices.
2853
2854   @type devlist: list of tuples
2855   @param devlist: list of tuples of the form  (disk,
2856       new_logical_id, new_physical_id); disk is an
2857       L{objects.Disk} object describing the current disk,
2858       and new logical_id/physical_id is the name we
2859       rename it to
2860   @rtype: boolean
2861   @return: True if all renames succeeded, False otherwise
2862
2863   """
2864   msgs = []
2865   result = True
2866   for disk, unique_id in devlist:
2867     dev = _RecursiveFindBD(disk)
2868     if dev is None:
2869       msgs.append("Can't find device %s in rename" % str(disk))
2870       result = False
2871       continue
2872     try:
2873       old_rpath = dev.dev_path
2874       dev.Rename(unique_id)
2875       new_rpath = dev.dev_path
2876       if old_rpath != new_rpath:
2877         DevCacheManager.RemoveCache(old_rpath)
2878         # FIXME: we should add the new cache information here, like:
2879         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2880         # but we don't have the owner here - maybe parse from existing
2881         # cache? for now, we only lose lvm data when we rename, which
2882         # is less critical than DRBD or MD
2883     except errors.BlockDeviceError, err:
2884       msgs.append("Can't rename device '%s' to '%s': %s" %
2885                   (dev, unique_id, err))
2886       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2887       result = False
2888   if not result:
2889     _Fail("; ".join(msgs))
2890
2891
2892 def _TransformFileStorageDir(fs_dir):
2893   """Checks whether given file_storage_dir is valid.
2894
2895   Checks wheter the given fs_dir is within the cluster-wide default
2896   file_storage_dir or the shared_file_storage_dir, which are stored in
2897   SimpleStore. Only paths under those directories are allowed.
2898
2899   @type fs_dir: str
2900   @param fs_dir: the path to check
2901
2902   @return: the normalized path if valid, None otherwise
2903
2904   """
2905   if not (constants.ENABLE_FILE_STORAGE or
2906           constants.ENABLE_SHARED_FILE_STORAGE):
2907     _Fail("File storage disabled at configure time")
2908
2909   bdev.CheckFileStoragePath(fs_dir)
2910
2911   return os.path.normpath(fs_dir)
2912
2913
2914 def CreateFileStorageDir(file_storage_dir):
2915   """Create file storage directory.
2916
2917   @type file_storage_dir: str
2918   @param file_storage_dir: directory to create
2919
2920   @rtype: tuple
2921   @return: tuple with first element a boolean indicating wheter dir
2922       creation was successful or not
2923
2924   """
2925   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2926   if os.path.exists(file_storage_dir):
2927     if not os.path.isdir(file_storage_dir):
2928       _Fail("Specified storage dir '%s' is not a directory",
2929             file_storage_dir)
2930   else:
2931     try:
2932       os.makedirs(file_storage_dir, 0750)
2933     except OSError, err:
2934       _Fail("Cannot create file storage directory '%s': %s",
2935             file_storage_dir, err, exc=True)
2936
2937
2938 def RemoveFileStorageDir(file_storage_dir):
2939   """Remove file storage directory.
2940
2941   Remove it only if it's empty. If not log an error and return.
2942
2943   @type file_storage_dir: str
2944   @param file_storage_dir: the directory we should cleanup
2945   @rtype: tuple (success,)
2946   @return: tuple of one element, C{success}, denoting
2947       whether the operation was successful
2948
2949   """
2950   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2951   if os.path.exists(file_storage_dir):
2952     if not os.path.isdir(file_storage_dir):
2953       _Fail("Specified Storage directory '%s' is not a directory",
2954             file_storage_dir)
2955     # deletes dir only if empty, otherwise we want to fail the rpc call
2956     try:
2957       os.rmdir(file_storage_dir)
2958     except OSError, err:
2959       _Fail("Cannot remove file storage directory '%s': %s",
2960             file_storage_dir, err)
2961
2962
2963 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2964   """Rename the file storage directory.
2965
2966   @type old_file_storage_dir: str
2967   @param old_file_storage_dir: the current path
2968   @type new_file_storage_dir: str
2969   @param new_file_storage_dir: the name we should rename to
2970   @rtype: tuple (success,)
2971   @return: tuple of one element, C{success}, denoting
2972       whether the operation was successful
2973
2974   """
2975   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2976   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2977   if not os.path.exists(new_file_storage_dir):
2978     if os.path.isdir(old_file_storage_dir):
2979       try:
2980         os.rename(old_file_storage_dir, new_file_storage_dir)
2981       except OSError, err:
2982         _Fail("Cannot rename '%s' to '%s': %s",
2983               old_file_storage_dir, new_file_storage_dir, err)
2984     else:
2985       _Fail("Specified storage dir '%s' is not a directory",
2986             old_file_storage_dir)
2987   else:
2988     if os.path.exists(old_file_storage_dir):
2989       _Fail("Cannot rename '%s' to '%s': both locations exist",
2990             old_file_storage_dir, new_file_storage_dir)
2991
2992
2993 def _EnsureJobQueueFile(file_name):
2994   """Checks whether the given filename is in the queue directory.
2995
2996   @type file_name: str
2997   @param file_name: the file name we should check
2998   @rtype: None
2999   @raises RPCFail: if the file is not valid
3000
3001   """
3002   if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3003     _Fail("Passed job queue file '%s' does not belong to"
3004           " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3005
3006
3007 def JobQueueUpdate(file_name, content):
3008   """Updates a file in the queue directory.
3009
3010   This is just a wrapper over L{utils.io.WriteFile}, with proper
3011   checking.
3012
3013   @type file_name: str
3014   @param file_name: the job file name
3015   @type content: str
3016   @param content: the new job contents
3017   @rtype: boolean
3018   @return: the success of the operation
3019
3020   """
3021   file_name = vcluster.LocalizeVirtualPath(file_name)
3022
3023   _EnsureJobQueueFile(file_name)
3024   getents = runtime.GetEnts()
3025
3026   # Write and replace the file atomically
3027   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3028                   gid=getents.masterd_gid)
3029
3030
3031 def JobQueueRename(old, new):
3032   """Renames a job queue file.
3033
3034   This is just a wrapper over os.rename with proper checking.
3035
3036   @type old: str
3037   @param old: the old (actual) file name
3038   @type new: str
3039   @param new: the desired file name
3040   @rtype: tuple
3041   @return: the success of the operation and payload
3042
3043   """
3044   old = vcluster.LocalizeVirtualPath(old)
3045   new = vcluster.LocalizeVirtualPath(new)
3046
3047   _EnsureJobQueueFile(old)
3048   _EnsureJobQueueFile(new)
3049
3050   getents = runtime.GetEnts()
3051
3052   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
3053                    dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
3054
3055
3056 def BlockdevClose(instance_name, disks):
3057   """Closes the given block devices.
3058
3059   This means they will be switched to secondary mode (in case of
3060   DRBD).
3061
3062   @param instance_name: if the argument is not empty, the symlinks
3063       of this instance will be removed
3064   @type disks: list of L{objects.Disk}
3065   @param disks: the list of disks to be closed
3066   @rtype: tuple (success, message)
3067   @return: a tuple of success and message, where success
3068       indicates the succes of the operation, and message
3069       which will contain the error details in case we
3070       failed
3071
3072   """
3073   bdevs = []
3074   for cf in disks:
3075     rd = _RecursiveFindBD(cf)
3076     if rd is None:
3077       _Fail("Can't find device %s", cf)
3078     bdevs.append(rd)
3079
3080   msg = []
3081   for rd in bdevs:
3082     try:
3083       rd.Close()
3084     except errors.BlockDeviceError, err:
3085       msg.append(str(err))
3086   if msg:
3087     _Fail("Can't make devices secondary: %s", ",".join(msg))
3088   else:
3089     if instance_name:
3090       _RemoveBlockDevLinks(instance_name, disks)
3091
3092
3093 def ValidateHVParams(hvname, hvparams):
3094   """Validates the given hypervisor parameters.
3095
3096   @type hvname: string
3097   @param hvname: the hypervisor name
3098   @type hvparams: dict
3099   @param hvparams: the hypervisor parameters to be validated
3100   @rtype: None
3101
3102   """
3103   try:
3104     hv_type = hypervisor.GetHypervisor(hvname)
3105     hv_type.ValidateParameters(hvparams)
3106   except errors.HypervisorError, err:
3107     _Fail(str(err), log=False)
3108
3109
3110 def _CheckOSPList(os_obj, parameters):
3111   """Check whether a list of parameters is supported by the OS.
3112
3113   @type os_obj: L{objects.OS}
3114   @param os_obj: OS object to check
3115   @type parameters: list
3116   @param parameters: the list of parameters to check
3117
3118   """
3119   supported = [v[0] for v in os_obj.supported_parameters]
3120   delta = frozenset(parameters).difference(supported)
3121   if delta:
3122     _Fail("The following parameters are not supported"
3123           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3124
3125
3126 def ValidateOS(required, osname, checks, osparams):
3127   """Validate the given OS' parameters.
3128
3129   @type required: boolean
3130   @param required: whether absence of the OS should translate into
3131       failure or not
3132   @type osname: string
3133   @param osname: the OS to be validated
3134   @type checks: list
3135   @param checks: list of the checks to run (currently only 'parameters')
3136   @type osparams: dict
3137   @param osparams: dictionary with OS parameters
3138   @rtype: boolean
3139   @return: True if the validation passed, or False if the OS was not
3140       found and L{required} was false
3141
3142   """
3143   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3144     _Fail("Unknown checks required for OS %s: %s", osname,
3145           set(checks).difference(constants.OS_VALIDATE_CALLS))
3146
3147   name_only = objects.OS.GetName(osname)
3148   status, tbv = _TryOSFromDisk(name_only, None)
3149
3150   if not status:
3151     if required:
3152       _Fail(tbv)
3153     else:
3154       return False
3155
3156   if max(tbv.api_versions) < constants.OS_API_V20:
3157     return True
3158
3159   if constants.OS_VALIDATE_PARAMETERS in checks:
3160     _CheckOSPList(tbv, osparams.keys())
3161
3162   validate_env = OSCoreEnv(osname, tbv, osparams)
3163   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3164                         cwd=tbv.path, reset_env=True)
3165   if result.failed:
3166     logging.error("os validate command '%s' returned error: %s output: %s",
3167                   result.cmd, result.fail_reason, result.output)
3168     _Fail("OS validation script failed (%s), output: %s",
3169           result.fail_reason, result.output, log=False)
3170
3171   return True
3172
3173
3174 def DemoteFromMC():
3175   """Demotes the current node from master candidate role.
3176
3177   """
3178   # try to ensure we're not the master by mistake
3179   master, myself = ssconf.GetMasterAndMyself()
3180   if master == myself:
3181     _Fail("ssconf status shows I'm the master node, will not demote")
3182
3183   result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3184   if not result.failed:
3185     _Fail("The master daemon is running, will not demote")
3186
3187   try:
3188     if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3189       utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3190   except EnvironmentError, err:
3191     if err.errno != errno.ENOENT:
3192       _Fail("Error while backing up cluster file: %s", err, exc=True)
3193
3194   utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3195
3196
3197 def _GetX509Filenames(cryptodir, name):
3198   """Returns the full paths for the private key and certificate.
3199
3200   """
3201   return (utils.PathJoin(cryptodir, name),
3202           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3203           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3204
3205
3206 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3207   """Creates a new X509 certificate for SSL/TLS.
3208
3209   @type validity: int
3210   @param validity: Validity in seconds
3211   @rtype: tuple; (string, string)
3212   @return: Certificate name and public part
3213
3214   """
3215   (key_pem, cert_pem) = \
3216     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3217                                      min(validity, _MAX_SSL_CERT_VALIDITY))
3218
3219   cert_dir = tempfile.mkdtemp(dir=cryptodir,
3220                               prefix="x509-%s-" % utils.TimestampForFilename())
3221   try:
3222     name = os.path.basename(cert_dir)
3223     assert len(name) > 5
3224
3225     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3226
3227     utils.WriteFile(key_file, mode=0400, data=key_pem)
3228     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3229
3230     # Never return private key as it shouldn't leave the node
3231     return (name, cert_pem)
3232   except Exception:
3233     shutil.rmtree(cert_dir, ignore_errors=True)
3234     raise
3235
3236
3237 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3238   """Removes a X509 certificate.
3239
3240   @type name: string
3241   @param name: Certificate name
3242
3243   """
3244   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3245
3246   utils.RemoveFile(key_file)
3247   utils.RemoveFile(cert_file)
3248
3249   try:
3250     os.rmdir(cert_dir)
3251   except EnvironmentError, err:
3252     _Fail("Cannot remove certificate directory '%s': %s",
3253           cert_dir, err)
3254
3255
3256 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3257   """Returns the command for the requested input/output.
3258
3259   @type instance: L{objects.Instance}
3260   @param instance: The instance object
3261   @param mode: Import/export mode
3262   @param ieio: Input/output type
3263   @param ieargs: Input/output arguments
3264
3265   """
3266   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3267
3268   env = None
3269   prefix = None
3270   suffix = None
3271   exp_size = None
3272
3273   if ieio == constants.IEIO_FILE:
3274     (filename, ) = ieargs
3275
3276     if not utils.IsNormAbsPath(filename):
3277       _Fail("Path '%s' is not normalized or absolute", filename)
3278
3279     real_filename = os.path.realpath(filename)
3280     directory = os.path.dirname(real_filename)
3281
3282     if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3283       _Fail("File '%s' is not under exports directory '%s': %s",
3284             filename, pathutils.EXPORT_DIR, real_filename)
3285
3286     # Create directory
3287     utils.Makedirs(directory, mode=0750)
3288
3289     quoted_filename = utils.ShellQuote(filename)
3290
3291     if mode == constants.IEM_IMPORT:
3292       suffix = "> %s" % quoted_filename
3293     elif mode == constants.IEM_EXPORT:
3294       suffix = "< %s" % quoted_filename
3295
3296       # Retrieve file size
3297       try:
3298         st = os.stat(filename)
3299       except EnvironmentError, err:
3300         logging.error("Can't stat(2) %s: %s", filename, err)
3301       else:
3302         exp_size = utils.BytesToMebibyte(st.st_size)
3303
3304   elif ieio == constants.IEIO_RAW_DISK:
3305     (disk, ) = ieargs
3306
3307     real_disk = _OpenRealBD(disk)
3308
3309     if mode == constants.IEM_IMPORT:
3310       # we set here a smaller block size as, due to transport buffering, more
3311       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3312       # is not already there or we pass a wrong path; we use notrunc to no
3313       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3314       # much memory; this means that at best, we flush every 64k, which will
3315       # not be very fast
3316       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3317                                     " bs=%s oflag=dsync"),
3318                                     real_disk.dev_path,
3319                                     str(64 * 1024))
3320
3321     elif mode == constants.IEM_EXPORT:
3322       # the block size on the read dd is 1MiB to match our units
3323       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3324                                    real_disk.dev_path,
3325                                    str(1024 * 1024), # 1 MB
3326                                    str(disk.size))
3327       exp_size = disk.size
3328
3329   elif ieio == constants.IEIO_SCRIPT:
3330     (disk, disk_index, ) = ieargs
3331
3332     assert isinstance(disk_index, (int, long))
3333
3334     real_disk = _OpenRealBD(disk)
3335
3336     inst_os = OSFromDisk(instance.os)
3337     env = OSEnvironment(instance, inst_os)
3338
3339     if mode == constants.IEM_IMPORT:
3340       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3341       env["IMPORT_INDEX"] = str(disk_index)
3342       script = inst_os.import_script
3343
3344     elif mode == constants.IEM_EXPORT:
3345       env["EXPORT_DEVICE"] = real_disk.dev_path
3346       env["EXPORT_INDEX"] = str(disk_index)
3347       script = inst_os.export_script
3348
3349     # TODO: Pass special environment only to script
3350     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3351
3352     if mode == constants.IEM_IMPORT:
3353       suffix = "| %s" % script_cmd
3354
3355     elif mode == constants.IEM_EXPORT:
3356       prefix = "%s |" % script_cmd
3357
3358     # Let script predict size
3359     exp_size = constants.IE_CUSTOM_SIZE
3360
3361   else:
3362     _Fail("Invalid %s I/O mode %r", mode, ieio)
3363
3364   return (env, prefix, suffix, exp_size)
3365
3366
3367 def _CreateImportExportStatusDir(prefix):
3368   """Creates status directory for import/export.
3369
3370   """
3371   return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3372                           prefix=("%s-%s-" %
3373                                   (prefix, utils.TimestampForFilename())))
3374
3375
3376 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3377                             ieio, ieioargs):
3378   """Starts an import or export daemon.
3379
3380   @param mode: Import/output mode
3381   @type opts: L{objects.ImportExportOptions}
3382   @param opts: Daemon options
3383   @type host: string
3384   @param host: Remote host for export (None for import)
3385   @type port: int
3386   @param port: Remote port for export (None for import)
3387   @type instance: L{objects.Instance}
3388   @param instance: Instance object
3389   @type component: string
3390   @param component: which part of the instance is transferred now,
3391       e.g. 'disk/0'
3392   @param ieio: Input/output type
3393   @param ieioargs: Input/output arguments
3394
3395   """
3396   if mode == constants.IEM_IMPORT:
3397     prefix = "import"
3398
3399     if not (host is None and port is None):
3400       _Fail("Can not specify host or port on import")
3401
3402   elif mode == constants.IEM_EXPORT:
3403     prefix = "export"
3404
3405     if host is None or port is None:
3406       _Fail("Host and port must be specified for an export")
3407
3408   else:
3409     _Fail("Invalid mode %r", mode)
3410
3411   if (opts.key_name is None) ^ (opts.ca_pem is None):
3412     _Fail("Cluster certificate can only be used for both key and CA")
3413
3414   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3415     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3416
3417   if opts.key_name is None:
3418     # Use server.pem
3419     key_path = pathutils.NODED_CERT_FILE
3420     cert_path = pathutils.NODED_CERT_FILE
3421     assert opts.ca_pem is None
3422   else:
3423     (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3424                                                  opts.key_name)
3425     assert opts.ca_pem is not None
3426
3427   for i in [key_path, cert_path]:
3428     if not os.path.exists(i):
3429       _Fail("File '%s' does not exist" % i)
3430
3431   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3432   try:
3433     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3434     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3435     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3436
3437     if opts.ca_pem is None:
3438       # Use server.pem
3439       ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3440     else:
3441       ca = opts.ca_pem
3442
3443     # Write CA file
3444     utils.WriteFile(ca_file, data=ca, mode=0400)
3445
3446     cmd = [
3447       pathutils.IMPORT_EXPORT_DAEMON,
3448       status_file, mode,
3449       "--key=%s" % key_path,
3450       "--cert=%s" % cert_path,
3451       "--ca=%s" % ca_file,
3452       ]
3453
3454     if host:
3455       cmd.append("--host=%s" % host)
3456
3457     if port:
3458       cmd.append("--port=%s" % port)
3459
3460     if opts.ipv6:
3461       cmd.append("--ipv6")
3462     else:
3463       cmd.append("--ipv4")
3464
3465     if opts.compress:
3466       cmd.append("--compress=%s" % opts.compress)
3467
3468     if opts.magic:
3469       cmd.append("--magic=%s" % opts.magic)
3470
3471     if exp_size is not None:
3472       cmd.append("--expected-size=%s" % exp_size)
3473
3474     if cmd_prefix:
3475       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3476
3477     if cmd_suffix:
3478       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3479
3480     if mode == constants.IEM_EXPORT:
3481       # Retry connection a few times when connecting to remote peer
3482       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3483       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3484     elif opts.connect_timeout is not None:
3485       assert mode == constants.IEM_IMPORT
3486       # Overall timeout for establishing connection while listening
3487       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3488
3489     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3490
3491     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3492     # support for receiving a file descriptor for output
3493     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3494                       output=logfile)
3495
3496     # The import/export name is simply the status directory name
3497     return os.path.basename(status_dir)
3498
3499   except Exception:
3500     shutil.rmtree(status_dir, ignore_errors=True)
3501     raise
3502
3503
3504 def GetImportExportStatus(names):
3505   """Returns import/export daemon status.
3506
3507   @type names: sequence
3508   @param names: List of names
3509   @rtype: List of dicts
3510   @return: Returns a list of the state of each named import/export or None if a
3511            status couldn't be read
3512
3513   """
3514   result = []
3515
3516   for name in names:
3517     status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3518                                  _IES_STATUS_FILE)
3519
3520     try:
3521       data = utils.ReadFile(status_file)
3522     except EnvironmentError, err:
3523       if err.errno != errno.ENOENT:
3524         raise
3525       data = None
3526
3527     if not data:
3528       result.append(None)
3529       continue
3530
3531     result.append(serializer.LoadJson(data))
3532
3533   return result
3534
3535
3536 def AbortImportExport(name):
3537   """Sends SIGTERM to a running import/export daemon.
3538
3539   """
3540   logging.info("Abort import/export %s", name)
3541
3542   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3543   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3544
3545   if pid:
3546     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3547                  name, pid)
3548     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3549
3550
3551 def CleanupImportExport(name):
3552   """Cleanup after an import or export.
3553
3554   If the import/export daemon is still running it's killed. Afterwards the
3555   whole status directory is removed.
3556
3557   """
3558   logging.info("Finalizing import/export %s", name)
3559
3560   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3561
3562   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3563
3564   if pid:
3565     logging.info("Import/export %s is still running with PID %s",
3566                  name, pid)
3567     utils.KillProcess(pid, waitpid=False)
3568
3569   shutil.rmtree(status_dir, ignore_errors=True)
3570
3571
3572 def _FindDisks(nodes_ip, disks):
3573   """Sets the physical ID on disks and returns the block devices.
3574
3575   """
3576   # set the correct physical ID
3577   my_name = netutils.Hostname.GetSysName()
3578   for cf in disks:
3579     cf.SetPhysicalID(my_name, nodes_ip)
3580
3581   bdevs = []
3582
3583   for cf in disks:
3584     rd = _RecursiveFindBD(cf)
3585     if rd is None:
3586       _Fail("Can't find device %s", cf)
3587     bdevs.append(rd)
3588   return bdevs
3589
3590
3591 def DrbdDisconnectNet(nodes_ip, disks):
3592   """Disconnects the network on a list of drbd devices.
3593
3594   """
3595   bdevs = _FindDisks(nodes_ip, disks)
3596
3597   # disconnect disks
3598   for rd in bdevs:
3599     try:
3600       rd.DisconnectNet()
3601     except errors.BlockDeviceError, err:
3602       _Fail("Can't change network configuration to standalone mode: %s",
3603             err, exc=True)
3604
3605
3606 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3607   """Attaches the network on a list of drbd devices.
3608
3609   """
3610   bdevs = _FindDisks(nodes_ip, disks)
3611
3612   if multimaster:
3613     for idx, rd in enumerate(bdevs):
3614       try:
3615         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3616       except EnvironmentError, err:
3617         _Fail("Can't create symlink: %s", err)
3618   # reconnect disks, switch to new master configuration and if
3619   # needed primary mode
3620   for rd in bdevs:
3621     try:
3622       rd.AttachNet(multimaster)
3623     except errors.BlockDeviceError, err:
3624       _Fail("Can't change network configuration: %s", err)
3625
3626   # wait until the disks are connected; we need to retry the re-attach
3627   # if the device becomes standalone, as this might happen if the one
3628   # node disconnects and reconnects in a different mode before the
3629   # other node reconnects; in this case, one or both of the nodes will
3630   # decide it has wrong configuration and switch to standalone
3631
3632   def _Attach():
3633     all_connected = True
3634
3635     for rd in bdevs:
3636       stats = rd.GetProcStatus()
3637
3638       all_connected = (all_connected and
3639                        (stats.is_connected or stats.is_in_resync))
3640
3641       if stats.is_standalone:
3642         # peer had different config info and this node became
3643         # standalone, even though this should not happen with the
3644         # new staged way of changing disk configs
3645         try:
3646           rd.AttachNet(multimaster)
3647         except errors.BlockDeviceError, err:
3648           _Fail("Can't change network configuration: %s", err)
3649
3650     if not all_connected:
3651       raise utils.RetryAgain()
3652
3653   try:
3654     # Start with a delay of 100 miliseconds and go up to 5 seconds
3655     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3656   except utils.RetryTimeout:
3657     _Fail("Timeout in disk reconnecting")
3658
3659   if multimaster:
3660     # change to primary mode
3661     for rd in bdevs:
3662       try:
3663         rd.Open()
3664       except errors.BlockDeviceError, err:
3665         _Fail("Can't change to primary mode: %s", err)
3666
3667
3668 def DrbdWaitSync(nodes_ip, disks):
3669   """Wait until DRBDs have synchronized.
3670
3671   """
3672   def _helper(rd):
3673     stats = rd.GetProcStatus()
3674     if not (stats.is_connected or stats.is_in_resync):
3675       raise utils.RetryAgain()
3676     return stats
3677
3678   bdevs = _FindDisks(nodes_ip, disks)
3679
3680   min_resync = 100
3681   alldone = True
3682   for rd in bdevs:
3683     try:
3684       # poll each second for 15 seconds
3685       stats = utils.Retry(_helper, 1, 15, args=[rd])
3686     except utils.RetryTimeout:
3687       stats = rd.GetProcStatus()
3688       # last check
3689       if not (stats.is_connected or stats.is_in_resync):
3690         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3691     alldone = alldone and (not stats.is_in_resync)
3692     if stats.sync_percent is not None:
3693       min_resync = min(min_resync, stats.sync_percent)
3694
3695   return (alldone, min_resync)
3696
3697
3698 def GetDrbdUsermodeHelper():
3699   """Returns DRBD usermode helper currently configured.
3700
3701   """
3702   try:
3703     return bdev.BaseDRBD.GetUsermodeHelper()
3704   except errors.BlockDeviceError, err:
3705     _Fail(str(err))
3706
3707
3708 def PowercycleNode(hypervisor_type):
3709   """Hard-powercycle the node.
3710
3711   Because we need to return first, and schedule the powercycle in the
3712   background, we won't be able to report failures nicely.
3713
3714   """
3715   hyper = hypervisor.GetHypervisor(hypervisor_type)
3716   try:
3717     pid = os.fork()
3718   except OSError:
3719     # if we can't fork, we'll pretend that we're in the child process
3720     pid = 0
3721   if pid > 0:
3722     return "Reboot scheduled in 5 seconds"
3723   # ensure the child is running on ram
3724   try:
3725     utils.Mlockall()
3726   except Exception: # pylint: disable=W0703
3727     pass
3728   time.sleep(5)
3729   hyper.PowercycleNode()
3730
3731
3732 def _VerifyRestrictedCmdName(cmd):
3733   """Verifies a restricted command name.
3734
3735   @type cmd: string
3736   @param cmd: Command name
3737   @rtype: tuple; (boolean, string or None)
3738   @return: The tuple's first element is the status; if C{False}, the second
3739     element is an error message string, otherwise it's C{None}
3740
3741   """
3742   if not cmd.strip():
3743     return (False, "Missing command name")
3744
3745   if os.path.basename(cmd) != cmd:
3746     return (False, "Invalid command name")
3747
3748   if not constants.EXT_PLUGIN_MASK.match(cmd):
3749     return (False, "Command name contains forbidden characters")
3750
3751   return (True, None)
3752
3753
3754 def _CommonRestrictedCmdCheck(path, owner):
3755   """Common checks for restricted command file system directories and files.
3756
3757   @type path: string
3758   @param path: Path to check
3759   @param owner: C{None} or tuple containing UID and GID
3760   @rtype: tuple; (boolean, string or C{os.stat} result)
3761   @return: The tuple's first element is the status; if C{False}, the second
3762     element is an error message string, otherwise it's the result of C{os.stat}
3763
3764   """
3765   if owner is None:
3766     # Default to root as owner
3767     owner = (0, 0)
3768
3769   try:
3770     st = os.stat(path)
3771   except EnvironmentError, err:
3772     return (False, "Can't stat(2) '%s': %s" % (path, err))
3773
3774   if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3775     return (False, "Permissions on '%s' are too permissive" % path)
3776
3777   if (st.st_uid, st.st_gid) != owner:
3778     (owner_uid, owner_gid) = owner
3779     return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3780
3781   return (True, st)
3782
3783
3784 def _VerifyRestrictedCmdDirectory(path, _owner=None):
3785   """Verifies restricted command directory.
3786
3787   @type path: string
3788   @param path: Path to check
3789   @rtype: tuple; (boolean, string or None)
3790   @return: The tuple's first element is the status; if C{False}, the second
3791     element is an error message string, otherwise it's C{None}
3792
3793   """
3794   (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3795
3796   if not status:
3797     return (False, value)
3798
3799   if not stat.S_ISDIR(value.st_mode):
3800     return (False, "Path '%s' is not a directory" % path)
3801
3802   return (True, None)
3803
3804
3805 def _VerifyRestrictedCmd(path, cmd, _owner=None):
3806   """Verifies a whole restricted command and returns its executable filename.
3807
3808   @type path: string
3809   @param path: Directory containing restricted commands
3810   @type cmd: string
3811   @param cmd: Command name
3812   @rtype: tuple; (boolean, string)
3813   @return: The tuple's first element is the status; if C{False}, the second
3814     element is an error message string, otherwise the second element is the
3815     absolute path to the executable
3816
3817   """
3818   executable = utils.PathJoin(path, cmd)
3819
3820   (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
3821
3822   if not status:
3823     return (False, msg)
3824
3825   if not utils.IsExecutable(executable):
3826     return (False, "access(2) thinks '%s' can't be executed" % executable)
3827
3828   return (True, executable)
3829
3830
3831 def _PrepareRestrictedCmd(path, cmd,
3832                           _verify_dir=_VerifyRestrictedCmdDirectory,
3833                           _verify_name=_VerifyRestrictedCmdName,
3834                           _verify_cmd=_VerifyRestrictedCmd):
3835   """Performs a number of tests on a restricted command.
3836
3837   @type path: string
3838   @param path: Directory containing restricted commands
3839   @type cmd: string
3840   @param cmd: Command name
3841   @return: Same as L{_VerifyRestrictedCmd}
3842
3843   """
3844   # Verify the directory first
3845   (status, msg) = _verify_dir(path)
3846   if status:
3847     # Check command if everything was alright
3848     (status, msg) = _verify_name(cmd)
3849
3850   if not status:
3851     return (False, msg)
3852
3853   # Check actual executable
3854   return _verify_cmd(path, cmd)
3855
3856
3857 def RunRestrictedCmd(cmd,
3858                      _lock_timeout=_RCMD_LOCK_TIMEOUT,
3859                      _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
3860                      _path=pathutils.RESTRICTED_COMMANDS_DIR,
3861                      _sleep_fn=time.sleep,
3862                      _prepare_fn=_PrepareRestrictedCmd,
3863                      _runcmd_fn=utils.RunCmd,
3864                      _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
3865   """Executes a restricted command after performing strict tests.
3866
3867   @type cmd: string
3868   @param cmd: Command name
3869   @rtype: string
3870   @return: Command output
3871   @raise RPCFail: In case of an error
3872
3873   """
3874   logging.info("Preparing to run restricted command '%s'", cmd)
3875
3876   if not _enabled:
3877     _Fail("Restricted commands disabled at configure time")
3878
3879   lock = None
3880   try:
3881     cmdresult = None
3882     try:
3883       lock = utils.FileLock.Open(_lock_file)
3884       lock.Exclusive(blocking=True, timeout=_lock_timeout)
3885
3886       (status, value) = _prepare_fn(_path, cmd)
3887
3888       if status:
3889         cmdresult = _runcmd_fn([value], env={}, reset_env=True,
3890                                postfork_fn=lambda _: lock.Unlock())
3891       else:
3892         logging.error(value)
3893     except Exception: # pylint: disable=W0703
3894       # Keep original error in log
3895       logging.exception("Caught exception")
3896
3897     if cmdresult is None:
3898       logging.info("Sleeping for %0.1f seconds before returning",
3899                    _RCMD_INVALID_DELAY)
3900       _sleep_fn(_RCMD_INVALID_DELAY)
3901
3902       # Do not include original error message in returned error
3903       _Fail("Executing command '%s' failed" % cmd)
3904     elif cmdresult.failed or cmdresult.fail_reason:
3905       _Fail("Restricted command '%s' failed: %s; output: %s",
3906             cmd, cmdresult.fail_reason, cmdresult.output)
3907     else:
3908       return cmdresult.output
3909   finally:
3910     if lock is not None:
3911       # Release lock at last
3912       lock.Close()
3913       lock = None
3914
3915
3916 def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
3917   """Creates or removes the watcher pause file.
3918
3919   @type until: None or number
3920   @param until: Unix timestamp saying until when the watcher shouldn't run
3921
3922   """
3923   if until is None:
3924     logging.info("Received request to no longer pause watcher")
3925     utils.RemoveFile(_filename)
3926   else:
3927     logging.info("Received request to pause watcher until %s", until)
3928
3929     if not ht.TNumber(until):
3930       _Fail("Duration must be numeric")
3931
3932     utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
3933
3934
3935 class HooksRunner(object):
3936   """Hook runner.
3937
3938   This class is instantiated on the node side (ganeti-noded) and not
3939   on the master side.
3940
3941   """
3942   def __init__(self, hooks_base_dir=None):
3943     """Constructor for hooks runner.
3944
3945     @type hooks_base_dir: str or None
3946     @param hooks_base_dir: if not None, this overrides the
3947         L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3948
3949     """
3950     if hooks_base_dir is None:
3951       hooks_base_dir = pathutils.HOOKS_BASE_DIR
3952     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3953     # constant
3954     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3955
3956   def RunLocalHooks(self, node_list, hpath, phase, env):
3957     """Check that the hooks will be run only locally and then run them.
3958
3959     """
3960     assert len(node_list) == 1
3961     node = node_list[0]
3962     _, myself = ssconf.GetMasterAndMyself()
3963     assert node == myself
3964
3965     results = self.RunHooks(hpath, phase, env)
3966
3967     # Return values in the form expected by HooksMaster
3968     return {node: (None, False, results)}
3969
3970   def RunHooks(self, hpath, phase, env):
3971     """Run the scripts in the hooks directory.
3972
3973     @type hpath: str
3974     @param hpath: the path to the hooks directory which
3975         holds the scripts
3976     @type phase: str
3977     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3978         L{constants.HOOKS_PHASE_POST}
3979     @type env: dict
3980     @param env: dictionary with the environment for the hook
3981     @rtype: list
3982     @return: list of 3-element tuples:
3983       - script path
3984       - script result, either L{constants.HKR_SUCCESS} or
3985         L{constants.HKR_FAIL}
3986       - output of the script
3987
3988     @raise errors.ProgrammerError: for invalid input
3989         parameters
3990
3991     """
3992     if phase == constants.HOOKS_PHASE_PRE:
3993       suffix = "pre"
3994     elif phase == constants.HOOKS_PHASE_POST:
3995       suffix = "post"
3996     else:
3997       _Fail("Unknown hooks phase '%s'", phase)
3998
3999     subdir = "%s-%s.d" % (hpath, suffix)
4000     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4001
4002     results = []
4003
4004     if not os.path.isdir(dir_name):
4005       # for non-existing/non-dirs, we simply exit instead of logging a
4006       # warning at every operation
4007       return results
4008
4009     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4010
4011     for (relname, relstatus, runresult) in runparts_results:
4012       if relstatus == constants.RUNPARTS_SKIP:
4013         rrval = constants.HKR_SKIP
4014         output = ""
4015       elif relstatus == constants.RUNPARTS_ERR:
4016         rrval = constants.HKR_FAIL
4017         output = "Hook script execution error: %s" % runresult
4018       elif relstatus == constants.RUNPARTS_RUN:
4019         if runresult.failed:
4020           rrval = constants.HKR_FAIL
4021         else:
4022           rrval = constants.HKR_SUCCESS
4023         output = utils.SafeEncode(runresult.output.strip())
4024       results.append(("%s/%s" % (subdir, relname), rrval, output))
4025
4026     return results
4027
4028
4029 class IAllocatorRunner(object):
4030   """IAllocator runner.
4031
4032   This class is instantiated on the node side (ganeti-noded) and not on
4033   the master side.
4034
4035   """
4036   @staticmethod
4037   def Run(name, idata):
4038     """Run an iallocator script.
4039
4040     @type name: str
4041     @param name: the iallocator script name
4042     @type idata: str
4043     @param idata: the allocator input data
4044
4045     @rtype: tuple
4046     @return: two element tuple of:
4047        - status
4048        - either error message or stdout of allocator (for success)
4049
4050     """
4051     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4052                                   os.path.isfile)
4053     if alloc_script is None:
4054       _Fail("iallocator module '%s' not found in the search path", name)
4055
4056     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4057     try:
4058       os.write(fd, idata)
4059       os.close(fd)
4060       result = utils.RunCmd([alloc_script, fin_name])
4061       if result.failed:
4062         _Fail("iallocator module '%s' failed: %s, output '%s'",
4063               name, result.fail_reason, result.output)
4064     finally:
4065       os.unlink(fin_name)
4066
4067     return result.stdout
4068
4069
4070 class DevCacheManager(object):
4071   """Simple class for managing a cache of block device information.
4072
4073   """
4074   _DEV_PREFIX = "/dev/"
4075   _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4076
4077   @classmethod
4078   def _ConvertPath(cls, dev_path):
4079     """Converts a /dev/name path to the cache file name.
4080
4081     This replaces slashes with underscores and strips the /dev
4082     prefix. It then returns the full path to the cache file.
4083
4084     @type dev_path: str
4085     @param dev_path: the C{/dev/} path name
4086     @rtype: str
4087     @return: the converted path name
4088
4089     """
4090     if dev_path.startswith(cls._DEV_PREFIX):
4091       dev_path = dev_path[len(cls._DEV_PREFIX):]
4092     dev_path = dev_path.replace("/", "_")
4093     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4094     return fpath
4095
4096   @classmethod
4097   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4098     """Updates the cache information for a given device.
4099
4100     @type dev_path: str
4101     @param dev_path: the pathname of the device
4102     @type owner: str
4103     @param owner: the owner (instance name) of the device
4104     @type on_primary: bool
4105     @param on_primary: whether this is the primary
4106         node nor not
4107     @type iv_name: str
4108     @param iv_name: the instance-visible name of the
4109         device, as in objects.Disk.iv_name
4110
4111     @rtype: None
4112
4113     """
4114     if dev_path is None:
4115       logging.error("DevCacheManager.UpdateCache got a None dev_path")
4116       return
4117     fpath = cls._ConvertPath(dev_path)
4118     if on_primary:
4119       state = "primary"
4120     else:
4121       state = "secondary"
4122     if iv_name is None:
4123       iv_name = "not_visible"
4124     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4125     try:
4126       utils.WriteFile(fpath, data=fdata)
4127     except EnvironmentError, err:
4128       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4129
4130   @classmethod
4131   def RemoveCache(cls, dev_path):
4132     """Remove data for a dev_path.
4133
4134     This is just a wrapper over L{utils.io.RemoveFile} with a converted
4135     path name and logging.
4136
4137     @type dev_path: str
4138     @param dev_path: the pathname of the device
4139
4140     @rtype: None
4141
4142     """
4143     if dev_path is None:
4144       logging.error("DevCacheManager.RemoveCache got a None dev_path")
4145       return
4146     fpath = cls._ConvertPath(dev_path)
4147     try:
4148       utils.RemoveFile(fpath)
4149     except EnvironmentError, err:
4150       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)