Include hvparams in ssconf files
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti.storage import bdev
58 from ganeti.storage import drbd
59 from ganeti import objects
60 from ganeti import ssconf
61 from ganeti import serializer
62 from ganeti import netutils
63 from ganeti import runtime
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
67 from ganeti import ht
68 from ganeti.storage.base import BlockDev
69 from ganeti.storage.drbd import DRBD8
70 from ganeti import hooksmaster
71
72
73 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
74 _ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
75   pathutils.DATA_DIR,
76   pathutils.JOB_QUEUE_ARCHIVE_DIR,
77   pathutils.QUEUE_DIR,
78   pathutils.CRYPTO_KEYS_DIR,
79   ])
80 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
81 _X509_KEY_FILE = "key"
82 _X509_CERT_FILE = "cert"
83 _IES_STATUS_FILE = "status"
84 _IES_PID_FILE = "pid"
85 _IES_CA_FILE = "ca"
86
87 #: Valid LVS output line regex
88 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
89
90 # Actions for the master setup script
91 _MASTER_START = "start"
92 _MASTER_STOP = "stop"
93
94 #: Maximum file permissions for restricted command directory and executables
95 _RCMD_MAX_MODE = (stat.S_IRWXU |
96                   stat.S_IRGRP | stat.S_IXGRP |
97                   stat.S_IROTH | stat.S_IXOTH)
98
99 #: Delay before returning an error for restricted commands
100 _RCMD_INVALID_DELAY = 10
101
102 #: How long to wait to acquire lock for restricted commands (shorter than
103 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
104 #: command requests arrive
105 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
106
107
108 class RPCFail(Exception):
109   """Class denoting RPC failure.
110
111   Its argument is the error message.
112
113   """
114
115
116 def _GetInstReasonFilename(instance_name):
117   """Path of the file containing the reason of the instance status change.
118
119   @type instance_name: string
120   @param instance_name: The name of the instance
121   @rtype: string
122   @return: The path of the file
123
124   """
125   return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
126
127
128 def _StoreInstReasonTrail(instance_name, trail):
129   """Serialize a reason trail related to an instance change of state to file.
130
131   The exact location of the file depends on the name of the instance and on
132   the configuration of the Ganeti cluster defined at deploy time.
133
134   @type instance_name: string
135   @param instance_name: The name of the instance
136   @rtype: None
137
138   """
139   json = serializer.DumpJson(trail)
140   filename = _GetInstReasonFilename(instance_name)
141   utils.WriteFile(filename, data=json)
142
143
144 def _Fail(msg, *args, **kwargs):
145   """Log an error and the raise an RPCFail exception.
146
147   This exception is then handled specially in the ganeti daemon and
148   turned into a 'failed' return type. As such, this function is a
149   useful shortcut for logging the error and returning it to the master
150   daemon.
151
152   @type msg: string
153   @param msg: the text of the exception
154   @raise RPCFail
155
156   """
157   if args:
158     msg = msg % args
159   if "log" not in kwargs or kwargs["log"]: # if we should log this error
160     if "exc" in kwargs and kwargs["exc"]:
161       logging.exception(msg)
162     else:
163       logging.error(msg)
164   raise RPCFail(msg)
165
166
167 def _GetConfig():
168   """Simple wrapper to return a SimpleStore.
169
170   @rtype: L{ssconf.SimpleStore}
171   @return: a SimpleStore instance
172
173   """
174   return ssconf.SimpleStore()
175
176
177 def _GetSshRunner(cluster_name):
178   """Simple wrapper to return an SshRunner.
179
180   @type cluster_name: str
181   @param cluster_name: the cluster name, which is needed
182       by the SshRunner constructor
183   @rtype: L{ssh.SshRunner}
184   @return: an SshRunner instance
185
186   """
187   return ssh.SshRunner(cluster_name)
188
189
190 def _Decompress(data):
191   """Unpacks data compressed by the RPC client.
192
193   @type data: list or tuple
194   @param data: Data sent by RPC client
195   @rtype: str
196   @return: Decompressed data
197
198   """
199   assert isinstance(data, (list, tuple))
200   assert len(data) == 2
201   (encoding, content) = data
202   if encoding == constants.RPC_ENCODING_NONE:
203     return content
204   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
205     return zlib.decompress(base64.b64decode(content))
206   else:
207     raise AssertionError("Unknown data encoding")
208
209
210 def _CleanDirectory(path, exclude=None):
211   """Removes all regular files in a directory.
212
213   @type path: str
214   @param path: the directory to clean
215   @type exclude: list
216   @param exclude: list of files to be excluded, defaults
217       to the empty list
218
219   """
220   if path not in _ALLOWED_CLEAN_DIRS:
221     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
222           path)
223
224   if not os.path.isdir(path):
225     return
226   if exclude is None:
227     exclude = []
228   else:
229     # Normalize excluded paths
230     exclude = [os.path.normpath(i) for i in exclude]
231
232   for rel_name in utils.ListVisibleFiles(path):
233     full_name = utils.PathJoin(path, rel_name)
234     if full_name in exclude:
235       continue
236     if os.path.isfile(full_name) and not os.path.islink(full_name):
237       utils.RemoveFile(full_name)
238
239
240 def _BuildUploadFileList():
241   """Build the list of allowed upload files.
242
243   This is abstracted so that it's built only once at module import time.
244
245   """
246   allowed_files = set([
247     pathutils.CLUSTER_CONF_FILE,
248     pathutils.ETC_HOSTS,
249     pathutils.SSH_KNOWN_HOSTS_FILE,
250     pathutils.VNC_PASSWORD_FILE,
251     pathutils.RAPI_CERT_FILE,
252     pathutils.SPICE_CERT_FILE,
253     pathutils.SPICE_CACERT_FILE,
254     pathutils.RAPI_USERS_FILE,
255     pathutils.CONFD_HMAC_KEY,
256     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
257     ])
258
259   for hv_name in constants.HYPER_TYPES:
260     hv_class = hypervisor.GetHypervisorClass(hv_name)
261     allowed_files.update(hv_class.GetAncillaryFiles()[0])
262
263   assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
264     "Allowed file storage paths should never be uploaded via RPC"
265
266   return frozenset(allowed_files)
267
268
269 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
270
271
272 def JobQueuePurge():
273   """Removes job queue files and archived jobs.
274
275   @rtype: tuple
276   @return: True, None
277
278   """
279   _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
280   _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
281
282
283 def GetMasterInfo():
284   """Returns master information.
285
286   This is an utility function to compute master information, either
287   for consumption here or from the node daemon.
288
289   @rtype: tuple
290   @return: master_netdev, master_ip, master_name, primary_ip_family,
291     master_netmask
292   @raise RPCFail: in case of errors
293
294   """
295   try:
296     cfg = _GetConfig()
297     master_netdev = cfg.GetMasterNetdev()
298     master_ip = cfg.GetMasterIP()
299     master_netmask = cfg.GetMasterNetmask()
300     master_node = cfg.GetMasterNode()
301     primary_ip_family = cfg.GetPrimaryIPFamily()
302   except errors.ConfigurationError, err:
303     _Fail("Cluster configuration incomplete: %s", err, exc=True)
304   return (master_netdev, master_ip, master_node, primary_ip_family,
305           master_netmask)
306
307
308 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
309   """Decorator that runs hooks before and after the decorated function.
310
311   @type hook_opcode: string
312   @param hook_opcode: opcode of the hook
313   @type hooks_path: string
314   @param hooks_path: path of the hooks
315   @type env_builder_fn: function
316   @param env_builder_fn: function that returns a dictionary containing the
317     environment variables for the hooks. Will get all the parameters of the
318     decorated function.
319   @raise RPCFail: in case of pre-hook failure
320
321   """
322   def decorator(fn):
323     def wrapper(*args, **kwargs):
324       _, myself = ssconf.GetMasterAndMyself()
325       nodes = ([myself], [myself])  # these hooks run locally
326
327       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
328
329       cfg = _GetConfig()
330       hr = HooksRunner()
331       hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
332                                    hr.RunLocalHooks, None, env_fn,
333                                    logging.warning, cfg.GetClusterName(),
334                                    cfg.GetMasterNode())
335       hm.RunPhase(constants.HOOKS_PHASE_PRE)
336       result = fn(*args, **kwargs)
337       hm.RunPhase(constants.HOOKS_PHASE_POST)
338
339       return result
340     return wrapper
341   return decorator
342
343
344 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
345   """Builds environment variables for master IP hooks.
346
347   @type master_params: L{objects.MasterNetworkParameters}
348   @param master_params: network parameters of the master
349   @type use_external_mip_script: boolean
350   @param use_external_mip_script: whether to use an external master IP
351     address setup script (unused, but necessary per the implementation of the
352     _RunLocalHooks decorator)
353
354   """
355   # pylint: disable=W0613
356   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
357   env = {
358     "MASTER_NETDEV": master_params.netdev,
359     "MASTER_IP": master_params.ip,
360     "MASTER_NETMASK": str(master_params.netmask),
361     "CLUSTER_IP_VERSION": str(ver),
362   }
363
364   return env
365
366
367 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
368   """Execute the master IP address setup script.
369
370   @type master_params: L{objects.MasterNetworkParameters}
371   @param master_params: network parameters of the master
372   @type action: string
373   @param action: action to pass to the script. Must be one of
374     L{backend._MASTER_START} or L{backend._MASTER_STOP}
375   @type use_external_mip_script: boolean
376   @param use_external_mip_script: whether to use an external master IP
377     address setup script
378   @raise backend.RPCFail: if there are errors during the execution of the
379     script
380
381   """
382   env = _BuildMasterIpEnv(master_params)
383
384   if use_external_mip_script:
385     setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
386   else:
387     setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
388
389   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
390
391   if result.failed:
392     _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
393           (action, result.exit_code, result.output), log=True)
394
395
396 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
397                _BuildMasterIpEnv)
398 def ActivateMasterIp(master_params, use_external_mip_script):
399   """Activate the IP address of the master daemon.
400
401   @type master_params: L{objects.MasterNetworkParameters}
402   @param master_params: network parameters of the master
403   @type use_external_mip_script: boolean
404   @param use_external_mip_script: whether to use an external master IP
405     address setup script
406   @raise RPCFail: in case of errors during the IP startup
407
408   """
409   _RunMasterSetupScript(master_params, _MASTER_START,
410                         use_external_mip_script)
411
412
413 def StartMasterDaemons(no_voting):
414   """Activate local node as master node.
415
416   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
417
418   @type no_voting: boolean
419   @param no_voting: whether to start ganeti-masterd without a node vote
420       but still non-interactively
421   @rtype: None
422
423   """
424
425   if no_voting:
426     masterd_args = "--no-voting --yes-do-it"
427   else:
428     masterd_args = ""
429
430   env = {
431     "EXTRA_MASTERD_ARGS": masterd_args,
432     }
433
434   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
435   if result.failed:
436     msg = "Can't start Ganeti master: %s" % result.output
437     logging.error(msg)
438     _Fail(msg)
439
440
441 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
442                _BuildMasterIpEnv)
443 def DeactivateMasterIp(master_params, use_external_mip_script):
444   """Deactivate the master IP on this node.
445
446   @type master_params: L{objects.MasterNetworkParameters}
447   @param master_params: network parameters of the master
448   @type use_external_mip_script: boolean
449   @param use_external_mip_script: whether to use an external master IP
450     address setup script
451   @raise RPCFail: in case of errors during the IP turndown
452
453   """
454   _RunMasterSetupScript(master_params, _MASTER_STOP,
455                         use_external_mip_script)
456
457
458 def StopMasterDaemons():
459   """Stop the master daemons on this node.
460
461   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
462
463   @rtype: None
464
465   """
466   # TODO: log and report back to the caller the error failures; we
467   # need to decide in which case we fail the RPC for this
468
469   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
470   if result.failed:
471     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
472                   " and error %s",
473                   result.cmd, result.exit_code, result.output)
474
475
476 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
477   """Change the netmask of the master IP.
478
479   @param old_netmask: the old value of the netmask
480   @param netmask: the new value of the netmask
481   @param master_ip: the master IP
482   @param master_netdev: the master network device
483
484   """
485   if old_netmask == netmask:
486     return
487
488   if not netutils.IPAddress.Own(master_ip):
489     _Fail("The master IP address is not up, not attempting to change its"
490           " netmask")
491
492   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
493                          "%s/%s" % (master_ip, netmask),
494                          "dev", master_netdev, "label",
495                          "%s:0" % master_netdev])
496   if result.failed:
497     _Fail("Could not set the new netmask on the master IP address")
498
499   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
500                          "%s/%s" % (master_ip, old_netmask),
501                          "dev", master_netdev, "label",
502                          "%s:0" % master_netdev])
503   if result.failed:
504     _Fail("Could not bring down the master IP address with the old netmask")
505
506
507 def EtcHostsModify(mode, host, ip):
508   """Modify a host entry in /etc/hosts.
509
510   @param mode: The mode to operate. Either add or remove entry
511   @param host: The host to operate on
512   @param ip: The ip associated with the entry
513
514   """
515   if mode == constants.ETC_HOSTS_ADD:
516     if not ip:
517       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
518               " present")
519     utils.AddHostToEtcHosts(host, ip)
520   elif mode == constants.ETC_HOSTS_REMOVE:
521     if ip:
522       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
523               " parameter is present")
524     utils.RemoveHostFromEtcHosts(host)
525   else:
526     RPCFail("Mode not supported")
527
528
529 def LeaveCluster(modify_ssh_setup):
530   """Cleans up and remove the current node.
531
532   This function cleans up and prepares the current node to be removed
533   from the cluster.
534
535   If processing is successful, then it raises an
536   L{errors.QuitGanetiException} which is used as a special case to
537   shutdown the node daemon.
538
539   @param modify_ssh_setup: boolean
540
541   """
542   _CleanDirectory(pathutils.DATA_DIR)
543   _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
544   JobQueuePurge()
545
546   if modify_ssh_setup:
547     try:
548       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
549
550       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
551
552       utils.RemoveFile(priv_key)
553       utils.RemoveFile(pub_key)
554     except errors.OpExecError:
555       logging.exception("Error while processing ssh files")
556
557   try:
558     utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
559     utils.RemoveFile(pathutils.RAPI_CERT_FILE)
560     utils.RemoveFile(pathutils.SPICE_CERT_FILE)
561     utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
562     utils.RemoveFile(pathutils.NODED_CERT_FILE)
563   except: # pylint: disable=W0702
564     logging.exception("Error while removing cluster secrets")
565
566   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
567   if result.failed:
568     logging.error("Command %s failed with exitcode %s and error %s",
569                   result.cmd, result.exit_code, result.output)
570
571   # Raise a custom exception (handled in ganeti-noded)
572   raise errors.QuitGanetiException(True, "Shutdown scheduled")
573
574
575 def _GetVgInfo(name, excl_stor):
576   """Retrieves information about a LVM volume group.
577
578   """
579   # TODO: GetVGInfo supports returning information for multiple VGs at once
580   vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
581   if vginfo:
582     vg_free = int(round(vginfo[0][0], 0))
583     vg_size = int(round(vginfo[0][1], 0))
584   else:
585     vg_free = None
586     vg_size = None
587
588   return {
589     "name": name,
590     "vg_free": vg_free,
591     "vg_size": vg_size,
592     }
593
594
595 def _GetVgSpindlesInfo(name, excl_stor):
596   """Retrieves information about spindles in an LVM volume group.
597
598   @type name: string
599   @param name: VG name
600   @type excl_stor: bool
601   @param excl_stor: exclusive storage
602   @rtype: dict
603   @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
604       free spindles, total spindles respectively
605
606   """
607   if excl_stor:
608     (vg_free, vg_size) = bdev.LogicalVolume.GetVgSpindlesInfo(name)
609   else:
610     vg_free = 0
611     vg_size = 0
612   return {
613     "name": name,
614     "vg_free": vg_free,
615     "vg_size": vg_size,
616     }
617
618
619 def _GetHvInfo(name):
620   """Retrieves node information from a hypervisor.
621
622   The information returned depends on the hypervisor. Common items:
623
624     - vg_size is the size of the configured volume group in MiB
625     - vg_free is the free size of the volume group in MiB
626     - memory_dom0 is the memory allocated for domain0 in MiB
627     - memory_free is the currently available (free) ram in MiB
628     - memory_total is the total number of ram in MiB
629     - hv_version: the hypervisor version, if available
630
631   """
632   return hypervisor.GetHypervisor(name).GetNodeInfo()
633
634
635 def _GetNamedNodeInfo(names, fn):
636   """Calls C{fn} for all names in C{names} and returns a dictionary.
637
638   @rtype: None or dict
639
640   """
641   if names is None:
642     return None
643   else:
644     return map(fn, names)
645
646
647 def GetNodeInfo(storage_units, hv_names, excl_stor):
648   """Gives back a hash with different information about the node.
649
650   @type storage_units: list of pairs (string, string)
651   @param storage_units: List of pairs (storage unit, identifier) to ask for disk
652                         space information. In case of lvm-vg, the identifier is
653                         the VG name.
654   @type hv_names: list of string
655   @param hv_names: Names of the hypervisors to ask for node information
656   @type excl_stor: boolean
657   @param excl_stor: Whether exclusive_storage is active
658   @rtype: tuple; (string, None/dict, None/dict)
659   @return: Tuple containing boot ID, volume group information and hypervisor
660     information
661
662   """
663   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
664   storage_info = _GetNamedNodeInfo(
665     storage_units,
666     (lambda storage_unit: _ApplyStorageInfoFunction(storage_unit[0],
667                                                     storage_unit[1],
668                                                     excl_stor)))
669   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
670
671   return (bootid, storage_info, hv_info)
672
673
674 # FIXME: implement storage reporting for all missing storage types.
675 _STORAGE_TYPE_INFO_FN = {
676   constants.ST_BLOCK: None,
677   constants.ST_DISKLESS: None,
678   constants.ST_EXT: None,
679   constants.ST_FILE: None,
680   constants.ST_LVM_PV: _GetVgSpindlesInfo,
681   constants.ST_LVM_VG: _GetVgInfo,
682   constants.ST_RADOS: None,
683 }
684
685
686 def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
687   """Looks up and applies the correct function to calculate free and total
688   storage for the given storage type.
689
690   @type storage_type: string
691   @param storage_type: the storage type for which the storage shall be reported.
692   @type storage_key: string
693   @param storage_key: identifier of a storage unit, e.g. the volume group name
694     of an LVM storage unit
695   @type args: any
696   @param args: various parameters that can be used for storage reporting. These
697     parameters and their semantics vary from storage type to storage type and
698     are just propagated in this function.
699   @return: the results of the application of the storage space function (see
700     _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
701     storage type
702   @raises NotImplementedError: for storage types who don't support space
703     reporting yet
704   """
705   fn = _STORAGE_TYPE_INFO_FN[storage_type]
706   if fn is not None:
707     return fn(storage_key, *args)
708   else:
709     raise NotImplementedError
710
711
712 def _CheckExclusivePvs(pvi_list):
713   """Check that PVs are not shared among LVs
714
715   @type pvi_list: list of L{objects.LvmPvInfo} objects
716   @param pvi_list: information about the PVs
717
718   @rtype: list of tuples (string, list of strings)
719   @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
720
721   """
722   res = []
723   for pvi in pvi_list:
724     if len(pvi.lv_list) > 1:
725       res.append((pvi.name, pvi.lv_list))
726   return res
727
728
729 def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
730                        get_hv_fn=hypervisor.GetHypervisor):
731   """Verifies the hypervisor. Appends the results to the 'results' list.
732
733   @type what: C{dict}
734   @param what: a dictionary of things to check
735   @type vm_capable: boolean
736   @param vm_capable: whether or not this node is vm capable
737   @type result: dict
738   @param result: dictionary of verification results; results of the
739     verifications in this function will be added here
740   @type all_hvparams: dict of dict of string
741   @param all_hvparams: dictionary mapping hypervisor names to hvparams
742   @type get_hv_fn: function
743   @param get_hv_fn: function to retrieve the hypervisor, to improve testability
744
745   """
746   if not vm_capable:
747     return
748
749   if constants.NV_HYPERVISOR in what:
750     result[constants.NV_HYPERVISOR] = {}
751     for hv_name in what[constants.NV_HYPERVISOR]:
752       hvparams = all_hvparams[hv_name]
753       try:
754         val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
755       except errors.HypervisorError, err:
756         val = "Error while checking hypervisor: %s" % str(err)
757       result[constants.NV_HYPERVISOR][hv_name] = val
758
759
760 def _VerifyHvparams(what, vm_capable, result,
761                     get_hv_fn=hypervisor.GetHypervisor):
762   """Verifies the hvparams. Appends the results to the 'results' list.
763
764   @type what: C{dict}
765   @param what: a dictionary of things to check
766   @type vm_capable: boolean
767   @param vm_capable: whether or not this node is vm capable
768   @type result: dict
769   @param result: dictionary of verification results; results of the
770     verifications in this function will be added here
771   @type get_hv_fn: function
772   @param get_hv_fn: function to retrieve the hypervisor, to improve testability
773
774   """
775   if not vm_capable:
776     return
777
778   if constants.NV_HVPARAMS in what:
779     result[constants.NV_HVPARAMS] = []
780     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
781       try:
782         logging.info("Validating hv %s, %s", hv_name, hvparms)
783         get_hv_fn(hv_name).ValidateParameters(hvparms)
784       except errors.HypervisorError, err:
785         result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
786
787
788 def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
789   """Verifies the instance list.
790
791   @type what: C{dict}
792   @param what: a dictionary of things to check
793   @type vm_capable: boolean
794   @param vm_capable: whether or not this node is vm capable
795   @type result: dict
796   @param result: dictionary of verification results; results of the
797     verifications in this function will be added here
798   @type all_hvparams: dict of dict of string
799   @param all_hvparams: dictionary mapping hypervisor names to hvparams
800
801   """
802   if constants.NV_INSTANCELIST in what and vm_capable:
803     # GetInstanceList can fail
804     try:
805       val = GetInstanceList(what[constants.NV_INSTANCELIST],
806                             all_hvparams=all_hvparams)
807     except RPCFail, err:
808       val = str(err)
809     result[constants.NV_INSTANCELIST] = val
810
811
812 def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
813   """Verifies the node info.
814
815   @type what: C{dict}
816   @param what: a dictionary of things to check
817   @type vm_capable: boolean
818   @param vm_capable: whether or not this node is vm capable
819   @type result: dict
820   @param result: dictionary of verification results; results of the
821     verifications in this function will be added here
822   @type all_hvparams: dict of dict of string
823   @param all_hvparams: dictionary mapping hypervisor names to hvparams
824
825   """
826   if constants.NV_HVINFO in what and vm_capable:
827     hvname = what[constants.NV_HVINFO]
828     hyper = hypervisor.GetHypervisor(hvname)
829     hvparams = all_hvparams[hvname]
830     result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
831
832
833 def VerifyNode(what, cluster_name, all_hvparams):
834   """Verify the status of the local node.
835
836   Based on the input L{what} parameter, various checks are done on the
837   local node.
838
839   If the I{filelist} key is present, this list of
840   files is checksummed and the file/checksum pairs are returned.
841
842   If the I{nodelist} key is present, we check that we have
843   connectivity via ssh with the target nodes (and check the hostname
844   report).
845
846   If the I{node-net-test} key is present, we check that we have
847   connectivity to the given nodes via both primary IP and, if
848   applicable, secondary IPs.
849
850   @type what: C{dict}
851   @param what: a dictionary of things to check:
852       - filelist: list of files for which to compute checksums
853       - nodelist: list of nodes we should check ssh communication with
854       - node-net-test: list of nodes we should check node daemon port
855         connectivity with
856       - hypervisor: list with hypervisors to run the verify for
857   @type cluster_name: string
858   @param cluster_name: the cluster's name
859   @type all_hvparams: dict of dict of strings
860   @param all_hvparams: a dictionary mapping hypervisor names to hvparams
861   @rtype: dict
862   @return: a dictionary with the same keys as the input dict, and
863       values representing the result of the checks
864
865   """
866   result = {}
867   my_name = netutils.Hostname.GetSysName()
868   port = netutils.GetDaemonPort(constants.NODED)
869   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
870
871   _VerifyHypervisors(what, vm_capable, result, all_hvparams)
872   _VerifyHvparams(what, vm_capable, result)
873
874   if constants.NV_FILELIST in what:
875     fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
876                                               what[constants.NV_FILELIST]))
877     result[constants.NV_FILELIST] = \
878       dict((vcluster.MakeVirtualPath(key), value)
879            for (key, value) in fingerprints.items())
880
881   if constants.NV_NODELIST in what:
882     (nodes, bynode) = what[constants.NV_NODELIST]
883
884     # Add nodes from other groups (different for each node)
885     try:
886       nodes.extend(bynode[my_name])
887     except KeyError:
888       pass
889
890     # Use a random order
891     random.shuffle(nodes)
892
893     # Try to contact all nodes
894     val = {}
895     for node in nodes:
896       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
897       if not success:
898         val[node] = message
899
900     result[constants.NV_NODELIST] = val
901
902   if constants.NV_NODENETTEST in what:
903     result[constants.NV_NODENETTEST] = tmp = {}
904     my_pip = my_sip = None
905     for name, pip, sip in what[constants.NV_NODENETTEST]:
906       if name == my_name:
907         my_pip = pip
908         my_sip = sip
909         break
910     if not my_pip:
911       tmp[my_name] = ("Can't find my own primary/secondary IP"
912                       " in the node list")
913     else:
914       for name, pip, sip in what[constants.NV_NODENETTEST]:
915         fail = []
916         if not netutils.TcpPing(pip, port, source=my_pip):
917           fail.append("primary")
918         if sip != pip:
919           if not netutils.TcpPing(sip, port, source=my_sip):
920             fail.append("secondary")
921         if fail:
922           tmp[name] = ("failure using the %s interface(s)" %
923                        " and ".join(fail))
924
925   if constants.NV_MASTERIP in what:
926     # FIXME: add checks on incoming data structures (here and in the
927     # rest of the function)
928     master_name, master_ip = what[constants.NV_MASTERIP]
929     if master_name == my_name:
930       source = constants.IP4_ADDRESS_LOCALHOST
931     else:
932       source = None
933     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
934                                                      source=source)
935
936   if constants.NV_USERSCRIPTS in what:
937     result[constants.NV_USERSCRIPTS] = \
938       [script for script in what[constants.NV_USERSCRIPTS]
939        if not utils.IsExecutable(script)]
940
941   if constants.NV_OOB_PATHS in what:
942     result[constants.NV_OOB_PATHS] = tmp = []
943     for path in what[constants.NV_OOB_PATHS]:
944       try:
945         st = os.stat(path)
946       except OSError, err:
947         tmp.append("error stating out of band helper: %s" % err)
948       else:
949         if stat.S_ISREG(st.st_mode):
950           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
951             tmp.append(None)
952           else:
953             tmp.append("out of band helper %s is not executable" % path)
954         else:
955           tmp.append("out of band helper %s is not a file" % path)
956
957   if constants.NV_LVLIST in what and vm_capable:
958     try:
959       val = GetVolumeList(utils.ListVolumeGroups().keys())
960     except RPCFail, err:
961       val = str(err)
962     result[constants.NV_LVLIST] = val
963
964   _VerifyInstanceList(what, vm_capable, result, all_hvparams)
965
966   if constants.NV_VGLIST in what and vm_capable:
967     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
968
969   if constants.NV_PVLIST in what and vm_capable:
970     check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
971     val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
972                                        filter_allocatable=False,
973                                        include_lvs=check_exclusive_pvs)
974     if check_exclusive_pvs:
975       result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
976       for pvi in val:
977         # Avoid sending useless data on the wire
978         pvi.lv_list = []
979     result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
980
981   if constants.NV_VERSION in what:
982     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
983                                     constants.RELEASE_VERSION)
984
985   _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
986
987   if constants.NV_DRBDVERSION in what and vm_capable:
988     try:
989       drbd_version = DRBD8.GetProcInfo().GetVersionString()
990     except errors.BlockDeviceError, err:
991       logging.warning("Can't get DRBD version", exc_info=True)
992       drbd_version = str(err)
993     result[constants.NV_DRBDVERSION] = drbd_version
994
995   if constants.NV_DRBDLIST in what and vm_capable:
996     try:
997       used_minors = drbd.DRBD8.GetUsedDevs()
998     except errors.BlockDeviceError, err:
999       logging.warning("Can't get used minors list", exc_info=True)
1000       used_minors = str(err)
1001     result[constants.NV_DRBDLIST] = used_minors
1002
1003   if constants.NV_DRBDHELPER in what and vm_capable:
1004     status = True
1005     try:
1006       payload = drbd.DRBD8.GetUsermodeHelper()
1007     except errors.BlockDeviceError, err:
1008       logging.error("Can't get DRBD usermode helper: %s", str(err))
1009       status = False
1010       payload = str(err)
1011     result[constants.NV_DRBDHELPER] = (status, payload)
1012
1013   if constants.NV_NODESETUP in what:
1014     result[constants.NV_NODESETUP] = tmpr = []
1015     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1016       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1017                   " under /sys, missing required directories /sys/block"
1018                   " and /sys/class/net")
1019     if (not os.path.isdir("/proc/sys") or
1020         not os.path.isfile("/proc/sysrq-trigger")):
1021       tmpr.append("The procfs filesystem doesn't seem to be mounted"
1022                   " under /proc, missing required directory /proc/sys and"
1023                   " the file /proc/sysrq-trigger")
1024
1025   if constants.NV_TIME in what:
1026     result[constants.NV_TIME] = utils.SplitTime(time.time())
1027
1028   if constants.NV_OSLIST in what and vm_capable:
1029     result[constants.NV_OSLIST] = DiagnoseOS()
1030
1031   if constants.NV_BRIDGES in what and vm_capable:
1032     result[constants.NV_BRIDGES] = [bridge
1033                                     for bridge in what[constants.NV_BRIDGES]
1034                                     if not utils.BridgeExists(bridge)]
1035
1036   if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
1037     result[constants.NV_FILE_STORAGE_PATHS] = \
1038       bdev.ComputeWrongFileStoragePaths()
1039
1040   return result
1041
1042
1043 def GetBlockDevSizes(devices):
1044   """Return the size of the given block devices
1045
1046   @type devices: list
1047   @param devices: list of block device nodes to query
1048   @rtype: dict
1049   @return:
1050     dictionary of all block devices under /dev (key). The value is their
1051     size in MiB.
1052
1053     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1054
1055   """
1056   DEV_PREFIX = "/dev/"
1057   blockdevs = {}
1058
1059   for devpath in devices:
1060     if not utils.IsBelowDir(DEV_PREFIX, devpath):
1061       continue
1062
1063     try:
1064       st = os.stat(devpath)
1065     except EnvironmentError, err:
1066       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1067       continue
1068
1069     if stat.S_ISBLK(st.st_mode):
1070       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1071       if result.failed:
1072         # We don't want to fail, just do not list this device as available
1073         logging.warning("Cannot get size for block device %s", devpath)
1074         continue
1075
1076       size = int(result.stdout) / (1024 * 1024)
1077       blockdevs[devpath] = size
1078   return blockdevs
1079
1080
1081 def GetVolumeList(vg_names):
1082   """Compute list of logical volumes and their size.
1083
1084   @type vg_names: list
1085   @param vg_names: the volume groups whose LVs we should list, or
1086       empty for all volume groups
1087   @rtype: dict
1088   @return:
1089       dictionary of all partions (key) with value being a tuple of
1090       their size (in MiB), inactive and online status::
1091
1092         {'xenvg/test1': ('20.06', True, True)}
1093
1094       in case of errors, a string is returned with the error
1095       details.
1096
1097   """
1098   lvs = {}
1099   sep = "|"
1100   if not vg_names:
1101     vg_names = []
1102   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1103                          "--separator=%s" % sep,
1104                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1105   if result.failed:
1106     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1107
1108   for line in result.stdout.splitlines():
1109     line = line.strip()
1110     match = _LVSLINE_REGEX.match(line)
1111     if not match:
1112       logging.error("Invalid line returned from lvs output: '%s'", line)
1113       continue
1114     vg_name, name, size, attr = match.groups()
1115     inactive = attr[4] == "-"
1116     online = attr[5] == "o"
1117     virtual = attr[0] == "v"
1118     if virtual:
1119       # we don't want to report such volumes as existing, since they
1120       # don't really hold data
1121       continue
1122     lvs[vg_name + "/" + name] = (size, inactive, online)
1123
1124   return lvs
1125
1126
1127 def ListVolumeGroups():
1128   """List the volume groups and their size.
1129
1130   @rtype: dict
1131   @return: dictionary with keys volume name and values the
1132       size of the volume
1133
1134   """
1135   return utils.ListVolumeGroups()
1136
1137
1138 def NodeVolumes():
1139   """List all volumes on this node.
1140
1141   @rtype: list
1142   @return:
1143     A list of dictionaries, each having four keys:
1144       - name: the logical volume name,
1145       - size: the size of the logical volume
1146       - dev: the physical device on which the LV lives
1147       - vg: the volume group to which it belongs
1148
1149     In case of errors, we return an empty list and log the
1150     error.
1151
1152     Note that since a logical volume can live on multiple physical
1153     volumes, the resulting list might include a logical volume
1154     multiple times.
1155
1156   """
1157   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1158                          "--separator=|",
1159                          "--options=lv_name,lv_size,devices,vg_name"])
1160   if result.failed:
1161     _Fail("Failed to list logical volumes, lvs output: %s",
1162           result.output)
1163
1164   def parse_dev(dev):
1165     return dev.split("(")[0]
1166
1167   def handle_dev(dev):
1168     return [parse_dev(x) for x in dev.split(",")]
1169
1170   def map_line(line):
1171     line = [v.strip() for v in line]
1172     return [{"name": line[0], "size": line[1],
1173              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1174
1175   all_devs = []
1176   for line in result.stdout.splitlines():
1177     if line.count("|") >= 3:
1178       all_devs.extend(map_line(line.split("|")))
1179     else:
1180       logging.warning("Strange line in the output from lvs: '%s'", line)
1181   return all_devs
1182
1183
1184 def BridgesExist(bridges_list):
1185   """Check if a list of bridges exist on the current node.
1186
1187   @rtype: boolean
1188   @return: C{True} if all of them exist, C{False} otherwise
1189
1190   """
1191   missing = []
1192   for bridge in bridges_list:
1193     if not utils.BridgeExists(bridge):
1194       missing.append(bridge)
1195
1196   if missing:
1197     _Fail("Missing bridges %s", utils.CommaJoin(missing))
1198
1199
1200 def GetInstanceListForHypervisor(hname, hvparams=None,
1201                                  get_hv_fn=hypervisor.GetHypervisor):
1202   """Provides a list of instances of the given hypervisor.
1203
1204   @type hname: string
1205   @param hname: name of the hypervisor
1206   @type hvparams: dict of strings
1207   @param hvparams: hypervisor parameters for the given hypervisor
1208   @type get_hv_fn: function
1209   @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1210     name; optional parameter to increase testability
1211
1212   @rtype: list
1213   @return: a list of all running instances on the current node
1214     - instance1.example.com
1215     - instance2.example.com
1216
1217   """
1218   results = []
1219   try:
1220     hv = get_hv_fn(hname)
1221     names = hv.ListInstances(hvparams=hvparams)
1222     results.extend(names)
1223   except errors.HypervisorError, err:
1224     _Fail("Error enumerating instances (hypervisor %s): %s",
1225           hname, err, exc=True)
1226   return results
1227
1228
1229 def GetInstanceList(hypervisor_list, all_hvparams=None,
1230                     get_hv_fn=hypervisor.GetHypervisor):
1231   """Provides a list of instances.
1232
1233   @type hypervisor_list: list
1234   @param hypervisor_list: the list of hypervisors to query information
1235   @type all_hvparams: dict of dict of strings
1236   @param all_hvparams: a dictionary mapping hypervisor types to respective
1237     cluster-wide hypervisor parameters
1238   @type get_hv_fn: function
1239   @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1240     name; optional parameter to increase testability
1241
1242   @rtype: list
1243   @return: a list of all running instances on the current node
1244     - instance1.example.com
1245     - instance2.example.com
1246
1247   """
1248   results = []
1249   for hname in hypervisor_list:
1250     hvparams = all_hvparams[hname]
1251     results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1252                                                 get_hv_fn=get_hv_fn))
1253   return results
1254
1255
1256 def GetInstanceInfo(instance, hname):
1257   """Gives back the information about an instance as a dictionary.
1258
1259   @type instance: string
1260   @param instance: the instance name
1261   @type hname: string
1262   @param hname: the hypervisor type of the instance
1263
1264   @rtype: dict
1265   @return: dictionary with the following keys:
1266       - memory: memory size of instance (int)
1267       - state: xen state of instance (string)
1268       - time: cpu time of instance (float)
1269       - vcpus: the number of vcpus (int)
1270
1271   """
1272   output = {}
1273
1274   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1275   if iinfo is not None:
1276     output["memory"] = iinfo[2]
1277     output["vcpus"] = iinfo[3]
1278     output["state"] = iinfo[4]
1279     output["time"] = iinfo[5]
1280
1281   return output
1282
1283
1284 def GetInstanceMigratable(instance):
1285   """Computes whether an instance can be migrated.
1286
1287   @type instance: L{objects.Instance}
1288   @param instance: object representing the instance to be checked.
1289
1290   @rtype: tuple
1291   @return: tuple of (result, description) where:
1292       - result: whether the instance can be migrated or not
1293       - description: a description of the issue, if relevant
1294
1295   """
1296   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1297   iname = instance.name
1298   if iname not in hyper.ListInstances(instance.hvparams):
1299     _Fail("Instance %s is not running", iname)
1300
1301   for idx in range(len(instance.disks)):
1302     link_name = _GetBlockDevSymlinkPath(iname, idx)
1303     if not os.path.islink(link_name):
1304       logging.warning("Instance %s is missing symlink %s for disk %d",
1305                       iname, link_name, idx)
1306
1307
1308 def GetAllInstancesInfo(hypervisor_list):
1309   """Gather data about all instances.
1310
1311   This is the equivalent of L{GetInstanceInfo}, except that it
1312   computes data for all instances at once, thus being faster if one
1313   needs data about more than one instance.
1314
1315   @type hypervisor_list: list
1316   @param hypervisor_list: list of hypervisors to query for instance data
1317
1318   @rtype: dict
1319   @return: dictionary of instance: data, with data having the following keys:
1320       - memory: memory size of instance (int)
1321       - state: xen state of instance (string)
1322       - time: cpu time of instance (float)
1323       - vcpus: the number of vcpus
1324
1325   """
1326   output = {}
1327
1328   for hname in hypervisor_list:
1329     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1330     if iinfo:
1331       for name, _, memory, vcpus, state, times in iinfo:
1332         value = {
1333           "memory": memory,
1334           "vcpus": vcpus,
1335           "state": state,
1336           "time": times,
1337           }
1338         if name in output:
1339           # we only check static parameters, like memory and vcpus,
1340           # and not state and time which can change between the
1341           # invocations of the different hypervisors
1342           for key in "memory", "vcpus":
1343             if value[key] != output[name][key]:
1344               _Fail("Instance %s is running twice"
1345                     " with different parameters", name)
1346         output[name] = value
1347
1348   return output
1349
1350
1351 def _InstanceLogName(kind, os_name, instance, component):
1352   """Compute the OS log filename for a given instance and operation.
1353
1354   The instance name and os name are passed in as strings since not all
1355   operations have these as part of an instance object.
1356
1357   @type kind: string
1358   @param kind: the operation type (e.g. add, import, etc.)
1359   @type os_name: string
1360   @param os_name: the os name
1361   @type instance: string
1362   @param instance: the name of the instance being imported/added/etc.
1363   @type component: string or None
1364   @param component: the name of the component of the instance being
1365       transferred
1366
1367   """
1368   # TODO: Use tempfile.mkstemp to create unique filename
1369   if component:
1370     assert "/" not in component
1371     c_msg = "-%s" % component
1372   else:
1373     c_msg = ""
1374   base = ("%s-%s-%s%s-%s.log" %
1375           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1376   return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1377
1378
1379 def InstanceOsAdd(instance, reinstall, debug):
1380   """Add an OS to an instance.
1381
1382   @type instance: L{objects.Instance}
1383   @param instance: Instance whose OS is to be installed
1384   @type reinstall: boolean
1385   @param reinstall: whether this is an instance reinstall
1386   @type debug: integer
1387   @param debug: debug level, passed to the OS scripts
1388   @rtype: None
1389
1390   """
1391   inst_os = OSFromDisk(instance.os)
1392
1393   create_env = OSEnvironment(instance, inst_os, debug)
1394   if reinstall:
1395     create_env["INSTANCE_REINSTALL"] = "1"
1396
1397   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1398
1399   result = utils.RunCmd([inst_os.create_script], env=create_env,
1400                         cwd=inst_os.path, output=logfile, reset_env=True)
1401   if result.failed:
1402     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1403                   " output: %s", result.cmd, result.fail_reason, logfile,
1404                   result.output)
1405     lines = [utils.SafeEncode(val)
1406              for val in utils.TailFile(logfile, lines=20)]
1407     _Fail("OS create script failed (%s), last lines in the"
1408           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1409
1410
1411 def RunRenameInstance(instance, old_name, debug):
1412   """Run the OS rename script for an instance.
1413
1414   @type instance: L{objects.Instance}
1415   @param instance: Instance whose OS is to be installed
1416   @type old_name: string
1417   @param old_name: previous instance name
1418   @type debug: integer
1419   @param debug: debug level, passed to the OS scripts
1420   @rtype: boolean
1421   @return: the success of the operation
1422
1423   """
1424   inst_os = OSFromDisk(instance.os)
1425
1426   rename_env = OSEnvironment(instance, inst_os, debug)
1427   rename_env["OLD_INSTANCE_NAME"] = old_name
1428
1429   logfile = _InstanceLogName("rename", instance.os,
1430                              "%s-%s" % (old_name, instance.name), None)
1431
1432   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1433                         cwd=inst_os.path, output=logfile, reset_env=True)
1434
1435   if result.failed:
1436     logging.error("os create command '%s' returned error: %s output: %s",
1437                   result.cmd, result.fail_reason, result.output)
1438     lines = [utils.SafeEncode(val)
1439              for val in utils.TailFile(logfile, lines=20)]
1440     _Fail("OS rename script failed (%s), last lines in the"
1441           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1442
1443
1444 def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1445   """Returns symlink path for block device.
1446
1447   """
1448   if _dir is None:
1449     _dir = pathutils.DISK_LINKS_DIR
1450
1451   return utils.PathJoin(_dir,
1452                         ("%s%s%s" %
1453                          (instance_name, constants.DISK_SEPARATOR, idx)))
1454
1455
1456 def _SymlinkBlockDev(instance_name, device_path, idx):
1457   """Set up symlinks to a instance's block device.
1458
1459   This is an auxiliary function run when an instance is start (on the primary
1460   node) or when an instance is migrated (on the target node).
1461
1462
1463   @param instance_name: the name of the target instance
1464   @param device_path: path of the physical block device, on the node
1465   @param idx: the disk index
1466   @return: absolute path to the disk's symlink
1467
1468   """
1469   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1470   try:
1471     os.symlink(device_path, link_name)
1472   except OSError, err:
1473     if err.errno == errno.EEXIST:
1474       if (not os.path.islink(link_name) or
1475           os.readlink(link_name) != device_path):
1476         os.remove(link_name)
1477         os.symlink(device_path, link_name)
1478     else:
1479       raise
1480
1481   return link_name
1482
1483
1484 def _RemoveBlockDevLinks(instance_name, disks):
1485   """Remove the block device symlinks belonging to the given instance.
1486
1487   """
1488   for idx, _ in enumerate(disks):
1489     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1490     if os.path.islink(link_name):
1491       try:
1492         os.remove(link_name)
1493       except OSError:
1494         logging.exception("Can't remove symlink '%s'", link_name)
1495
1496
1497 def _GatherAndLinkBlockDevs(instance):
1498   """Set up an instance's block device(s).
1499
1500   This is run on the primary node at instance startup. The block
1501   devices must be already assembled.
1502
1503   @type instance: L{objects.Instance}
1504   @param instance: the instance whose disks we shoul assemble
1505   @rtype: list
1506   @return: list of (disk_object, device_path)
1507
1508   """
1509   block_devices = []
1510   for idx, disk in enumerate(instance.disks):
1511     device = _RecursiveFindBD(disk)
1512     if device is None:
1513       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1514                                     str(disk))
1515     device.Open()
1516     try:
1517       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1518     except OSError, e:
1519       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1520                                     e.strerror)
1521
1522     block_devices.append((disk, link_name))
1523
1524   return block_devices
1525
1526
1527 def StartInstance(instance, startup_paused, reason, store_reason=True):
1528   """Start an instance.
1529
1530   @type instance: L{objects.Instance}
1531   @param instance: the instance object
1532   @type startup_paused: bool
1533   @param instance: pause instance at startup?
1534   @type reason: list of reasons
1535   @param reason: the reason trail for this startup
1536   @type store_reason: boolean
1537   @param store_reason: whether to store the shutdown reason trail on file
1538   @rtype: None
1539
1540   """
1541   running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1542                                                    instance.hvparams)
1543
1544   if instance.name in running_instances:
1545     logging.info("Instance %s already running, not starting", instance.name)
1546     return
1547
1548   try:
1549     block_devices = _GatherAndLinkBlockDevs(instance)
1550     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1551     hyper.StartInstance(instance, block_devices, startup_paused)
1552     if store_reason:
1553       _StoreInstReasonTrail(instance.name, reason)
1554   except errors.BlockDeviceError, err:
1555     _Fail("Block device error: %s", err, exc=True)
1556   except errors.HypervisorError, err:
1557     _RemoveBlockDevLinks(instance.name, instance.disks)
1558     _Fail("Hypervisor error: %s", err, exc=True)
1559
1560
1561 def InstanceShutdown(instance, timeout, reason, store_reason=True):
1562   """Shut an instance down.
1563
1564   @note: this functions uses polling with a hardcoded timeout.
1565
1566   @type instance: L{objects.Instance}
1567   @param instance: the instance object
1568   @type timeout: integer
1569   @param timeout: maximum timeout for soft shutdown
1570   @type reason: list of reasons
1571   @param reason: the reason trail for this shutdown
1572   @type store_reason: boolean
1573   @param store_reason: whether to store the shutdown reason trail on file
1574   @rtype: None
1575
1576   """
1577   hv_name = instance.hypervisor
1578   hyper = hypervisor.GetHypervisor(hv_name)
1579   iname = instance.name
1580
1581   if instance.name not in hyper.ListInstances(instance.hvparams):
1582     logging.info("Instance %s not running, doing nothing", iname)
1583     return
1584
1585   class _TryShutdown:
1586     def __init__(self):
1587       self.tried_once = False
1588
1589     def __call__(self):
1590       if iname not in hyper.ListInstances(instance.hvparams):
1591         return
1592
1593       try:
1594         hyper.StopInstance(instance, retry=self.tried_once)
1595         if store_reason:
1596           _StoreInstReasonTrail(instance.name, reason)
1597       except errors.HypervisorError, err:
1598         if iname not in hyper.ListInstances(instance.hvparams):
1599           # if the instance is no longer existing, consider this a
1600           # success and go to cleanup
1601           return
1602
1603         _Fail("Failed to stop instance %s: %s", iname, err)
1604
1605       self.tried_once = True
1606
1607       raise utils.RetryAgain()
1608
1609   try:
1610     utils.Retry(_TryShutdown(), 5, timeout)
1611   except utils.RetryTimeout:
1612     # the shutdown did not succeed
1613     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1614
1615     try:
1616       hyper.StopInstance(instance, force=True)
1617     except errors.HypervisorError, err:
1618       if iname in hyper.ListInstances(instance.hvparams):
1619         # only raise an error if the instance still exists, otherwise
1620         # the error could simply be "instance ... unknown"!
1621         _Fail("Failed to force stop instance %s: %s", iname, err)
1622
1623     time.sleep(1)
1624
1625     if iname in hyper.ListInstances(instance.hvparams):
1626       _Fail("Could not shutdown instance %s even by destroy", iname)
1627
1628   try:
1629     hyper.CleanupInstance(instance.name)
1630   except errors.HypervisorError, err:
1631     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1632
1633   _RemoveBlockDevLinks(iname, instance.disks)
1634
1635
1636 def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1637   """Reboot an instance.
1638
1639   @type instance: L{objects.Instance}
1640   @param instance: the instance object to reboot
1641   @type reboot_type: str
1642   @param reboot_type: the type of reboot, one the following
1643     constants:
1644       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1645         instance OS, do not recreate the VM
1646       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1647         restart the VM (at the hypervisor level)
1648       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1649         not accepted here, since that mode is handled differently, in
1650         cmdlib, and translates into full stop and start of the
1651         instance (instead of a call_instance_reboot RPC)
1652   @type shutdown_timeout: integer
1653   @param shutdown_timeout: maximum timeout for soft shutdown
1654   @type reason: list of reasons
1655   @param reason: the reason trail for this reboot
1656   @rtype: None
1657
1658   """
1659   running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1660                                                    instance.hvparams)
1661
1662   if instance.name not in running_instances:
1663     _Fail("Cannot reboot instance %s that is not running", instance.name)
1664
1665   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1666   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1667     try:
1668       hyper.RebootInstance(instance)
1669     except errors.HypervisorError, err:
1670       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1671   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1672     try:
1673       InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1674       result = StartInstance(instance, False, reason, store_reason=False)
1675       _StoreInstReasonTrail(instance.name, reason)
1676       return result
1677     except errors.HypervisorError, err:
1678       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1679   else:
1680     _Fail("Invalid reboot_type received: %s", reboot_type)
1681
1682
1683 def InstanceBalloonMemory(instance, memory):
1684   """Resize an instance's memory.
1685
1686   @type instance: L{objects.Instance}
1687   @param instance: the instance object
1688   @type memory: int
1689   @param memory: new memory amount in MB
1690   @rtype: None
1691
1692   """
1693   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1694   running = hyper.ListInstances(instance.hvparams)
1695   if instance.name not in running:
1696     logging.info("Instance %s is not running, cannot balloon", instance.name)
1697     return
1698   try:
1699     hyper.BalloonInstanceMemory(instance, memory)
1700   except errors.HypervisorError, err:
1701     _Fail("Failed to balloon instance memory: %s", err, exc=True)
1702
1703
1704 def MigrationInfo(instance):
1705   """Gather information about an instance to be migrated.
1706
1707   @type instance: L{objects.Instance}
1708   @param instance: the instance definition
1709
1710   """
1711   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1712   try:
1713     info = hyper.MigrationInfo(instance)
1714   except errors.HypervisorError, err:
1715     _Fail("Failed to fetch migration information: %s", err, exc=True)
1716   return info
1717
1718
1719 def AcceptInstance(instance, info, target):
1720   """Prepare the node to accept an instance.
1721
1722   @type instance: L{objects.Instance}
1723   @param instance: the instance definition
1724   @type info: string/data (opaque)
1725   @param info: migration information, from the source node
1726   @type target: string
1727   @param target: target host (usually ip), on this node
1728
1729   """
1730   # TODO: why is this required only for DTS_EXT_MIRROR?
1731   if instance.disk_template in constants.DTS_EXT_MIRROR:
1732     # Create the symlinks, as the disks are not active
1733     # in any way
1734     try:
1735       _GatherAndLinkBlockDevs(instance)
1736     except errors.BlockDeviceError, err:
1737       _Fail("Block device error: %s", err, exc=True)
1738
1739   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1740   try:
1741     hyper.AcceptInstance(instance, info, target)
1742   except errors.HypervisorError, err:
1743     if instance.disk_template in constants.DTS_EXT_MIRROR:
1744       _RemoveBlockDevLinks(instance.name, instance.disks)
1745     _Fail("Failed to accept instance: %s", err, exc=True)
1746
1747
1748 def FinalizeMigrationDst(instance, info, success):
1749   """Finalize any preparation to accept an instance.
1750
1751   @type instance: L{objects.Instance}
1752   @param instance: the instance definition
1753   @type info: string/data (opaque)
1754   @param info: migration information, from the source node
1755   @type success: boolean
1756   @param success: whether the migration was a success or a failure
1757
1758   """
1759   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1760   try:
1761     hyper.FinalizeMigrationDst(instance, info, success)
1762   except errors.HypervisorError, err:
1763     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1764
1765
1766 def MigrateInstance(instance, target, live):
1767   """Migrates an instance to another node.
1768
1769   @type instance: L{objects.Instance}
1770   @param instance: the instance definition
1771   @type target: string
1772   @param target: the target node name
1773   @type live: boolean
1774   @param live: whether the migration should be done live or not (the
1775       interpretation of this parameter is left to the hypervisor)
1776   @raise RPCFail: if migration fails for some reason
1777
1778   """
1779   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1780
1781   try:
1782     hyper.MigrateInstance(instance, target, live)
1783   except errors.HypervisorError, err:
1784     _Fail("Failed to migrate instance: %s", err, exc=True)
1785
1786
1787 def FinalizeMigrationSource(instance, success, live):
1788   """Finalize the instance migration on the source node.
1789
1790   @type instance: L{objects.Instance}
1791   @param instance: the instance definition of the migrated instance
1792   @type success: bool
1793   @param success: whether the migration succeeded or not
1794   @type live: bool
1795   @param live: whether the user requested a live migration or not
1796   @raise RPCFail: If the execution fails for some reason
1797
1798   """
1799   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1800
1801   try:
1802     hyper.FinalizeMigrationSource(instance, success, live)
1803   except Exception, err:  # pylint: disable=W0703
1804     _Fail("Failed to finalize the migration on the source node: %s", err,
1805           exc=True)
1806
1807
1808 def GetMigrationStatus(instance):
1809   """Get the migration status
1810
1811   @type instance: L{objects.Instance}
1812   @param instance: the instance that is being migrated
1813   @rtype: L{objects.MigrationStatus}
1814   @return: the status of the current migration (one of
1815            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1816            progress info that can be retrieved from the hypervisor
1817   @raise RPCFail: If the migration status cannot be retrieved
1818
1819   """
1820   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1821   try:
1822     return hyper.GetMigrationStatus(instance)
1823   except Exception, err:  # pylint: disable=W0703
1824     _Fail("Failed to get migration status: %s", err, exc=True)
1825
1826
1827 def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1828   """Creates a block device for an instance.
1829
1830   @type disk: L{objects.Disk}
1831   @param disk: the object describing the disk we should create
1832   @type size: int
1833   @param size: the size of the physical underlying device, in MiB
1834   @type owner: str
1835   @param owner: the name of the instance for which disk is created,
1836       used for device cache data
1837   @type on_primary: boolean
1838   @param on_primary:  indicates if it is the primary node or not
1839   @type info: string
1840   @param info: string that will be sent to the physical device
1841       creation, used for example to set (LVM) tags on LVs
1842   @type excl_stor: boolean
1843   @param excl_stor: Whether exclusive_storage is active
1844
1845   @return: the new unique_id of the device (this can sometime be
1846       computed only after creation), or None. On secondary nodes,
1847       it's not required to return anything.
1848
1849   """
1850   # TODO: remove the obsolete "size" argument
1851   # pylint: disable=W0613
1852   clist = []
1853   if disk.children:
1854     for child in disk.children:
1855       try:
1856         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1857       except errors.BlockDeviceError, err:
1858         _Fail("Can't assemble device %s: %s", child, err)
1859       if on_primary or disk.AssembleOnSecondary():
1860         # we need the children open in case the device itself has to
1861         # be assembled
1862         try:
1863           # pylint: disable=E1103
1864           crdev.Open()
1865         except errors.BlockDeviceError, err:
1866           _Fail("Can't make child '%s' read-write: %s", child, err)
1867       clist.append(crdev)
1868
1869   try:
1870     device = bdev.Create(disk, clist, excl_stor)
1871   except errors.BlockDeviceError, err:
1872     _Fail("Can't create block device: %s", err)
1873
1874   if on_primary or disk.AssembleOnSecondary():
1875     try:
1876       device.Assemble()
1877     except errors.BlockDeviceError, err:
1878       _Fail("Can't assemble device after creation, unusual event: %s", err)
1879     if on_primary or disk.OpenOnSecondary():
1880       try:
1881         device.Open(force=True)
1882       except errors.BlockDeviceError, err:
1883         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1884     DevCacheManager.UpdateCache(device.dev_path, owner,
1885                                 on_primary, disk.iv_name)
1886
1887   device.SetInfo(info)
1888
1889   return device.unique_id
1890
1891
1892 def _WipeDevice(path, offset, size):
1893   """This function actually wipes the device.
1894
1895   @param path: The path to the device to wipe
1896   @param offset: The offset in MiB in the file
1897   @param size: The size in MiB to write
1898
1899   """
1900   # Internal sizes are always in Mebibytes; if the following "dd" command
1901   # should use a different block size the offset and size given to this
1902   # function must be adjusted accordingly before being passed to "dd".
1903   block_size = 1024 * 1024
1904
1905   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1906          "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1907          "count=%d" % size]
1908   result = utils.RunCmd(cmd)
1909
1910   if result.failed:
1911     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1912           result.fail_reason, result.output)
1913
1914
1915 def BlockdevWipe(disk, offset, size):
1916   """Wipes a block device.
1917
1918   @type disk: L{objects.Disk}
1919   @param disk: the disk object we want to wipe
1920   @type offset: int
1921   @param offset: The offset in MiB in the file
1922   @type size: int
1923   @param size: The size in MiB to write
1924
1925   """
1926   try:
1927     rdev = _RecursiveFindBD(disk)
1928   except errors.BlockDeviceError:
1929     rdev = None
1930
1931   if not rdev:
1932     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1933
1934   # Do cross verify some of the parameters
1935   if offset < 0:
1936     _Fail("Negative offset")
1937   if size < 0:
1938     _Fail("Negative size")
1939   if offset > rdev.size:
1940     _Fail("Offset is bigger than device size")
1941   if (offset + size) > rdev.size:
1942     _Fail("The provided offset and size to wipe is bigger than device size")
1943
1944   _WipeDevice(rdev.dev_path, offset, size)
1945
1946
1947 def BlockdevPauseResumeSync(disks, pause):
1948   """Pause or resume the sync of the block device.
1949
1950   @type disks: list of L{objects.Disk}
1951   @param disks: the disks object we want to pause/resume
1952   @type pause: bool
1953   @param pause: Wheater to pause or resume
1954
1955   """
1956   success = []
1957   for disk in disks:
1958     try:
1959       rdev = _RecursiveFindBD(disk)
1960     except errors.BlockDeviceError:
1961       rdev = None
1962
1963     if not rdev:
1964       success.append((False, ("Cannot change sync for device %s:"
1965                               " device not found" % disk.iv_name)))
1966       continue
1967
1968     result = rdev.PauseResumeSync(pause)
1969
1970     if result:
1971       success.append((result, None))
1972     else:
1973       if pause:
1974         msg = "Pause"
1975       else:
1976         msg = "Resume"
1977       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1978
1979   return success
1980
1981
1982 def BlockdevRemove(disk):
1983   """Remove a block device.
1984
1985   @note: This is intended to be called recursively.
1986
1987   @type disk: L{objects.Disk}
1988   @param disk: the disk object we should remove
1989   @rtype: boolean
1990   @return: the success of the operation
1991
1992   """
1993   msgs = []
1994   try:
1995     rdev = _RecursiveFindBD(disk)
1996   except errors.BlockDeviceError, err:
1997     # probably can't attach
1998     logging.info("Can't attach to device %s in remove", disk)
1999     rdev = None
2000   if rdev is not None:
2001     r_path = rdev.dev_path
2002     try:
2003       rdev.Remove()
2004     except errors.BlockDeviceError, err:
2005       msgs.append(str(err))
2006     if not msgs:
2007       DevCacheManager.RemoveCache(r_path)
2008
2009   if disk.children:
2010     for child in disk.children:
2011       try:
2012         BlockdevRemove(child)
2013       except RPCFail, err:
2014         msgs.append(str(err))
2015
2016   if msgs:
2017     _Fail("; ".join(msgs))
2018
2019
2020 def _RecursiveAssembleBD(disk, owner, as_primary):
2021   """Activate a block device for an instance.
2022
2023   This is run on the primary and secondary nodes for an instance.
2024
2025   @note: this function is called recursively.
2026
2027   @type disk: L{objects.Disk}
2028   @param disk: the disk we try to assemble
2029   @type owner: str
2030   @param owner: the name of the instance which owns the disk
2031   @type as_primary: boolean
2032   @param as_primary: if we should make the block device
2033       read/write
2034
2035   @return: the assembled device or None (in case no device
2036       was assembled)
2037   @raise errors.BlockDeviceError: in case there is an error
2038       during the activation of the children or the device
2039       itself
2040
2041   """
2042   children = []
2043   if disk.children:
2044     mcn = disk.ChildrenNeeded()
2045     if mcn == -1:
2046       mcn = 0 # max number of Nones allowed
2047     else:
2048       mcn = len(disk.children) - mcn # max number of Nones
2049     for chld_disk in disk.children:
2050       try:
2051         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2052       except errors.BlockDeviceError, err:
2053         if children.count(None) >= mcn:
2054           raise
2055         cdev = None
2056         logging.error("Error in child activation (but continuing): %s",
2057                       str(err))
2058       children.append(cdev)
2059
2060   if as_primary or disk.AssembleOnSecondary():
2061     r_dev = bdev.Assemble(disk, children)
2062     result = r_dev
2063     if as_primary or disk.OpenOnSecondary():
2064       r_dev.Open()
2065     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2066                                 as_primary, disk.iv_name)
2067
2068   else:
2069     result = True
2070   return result
2071
2072
2073 def BlockdevAssemble(disk, owner, as_primary, idx):
2074   """Activate a block device for an instance.
2075
2076   This is a wrapper over _RecursiveAssembleBD.
2077
2078   @rtype: str or boolean
2079   @return: a C{/dev/...} path for primary nodes, and
2080       C{True} for secondary nodes
2081
2082   """
2083   try:
2084     result = _RecursiveAssembleBD(disk, owner, as_primary)
2085     if isinstance(result, BlockDev):
2086       # pylint: disable=E1103
2087       result = result.dev_path
2088       if as_primary:
2089         _SymlinkBlockDev(owner, result, idx)
2090   except errors.BlockDeviceError, err:
2091     _Fail("Error while assembling disk: %s", err, exc=True)
2092   except OSError, err:
2093     _Fail("Error while symlinking disk: %s", err, exc=True)
2094
2095   return result
2096
2097
2098 def BlockdevShutdown(disk):
2099   """Shut down a block device.
2100
2101   First, if the device is assembled (Attach() is successful), then
2102   the device is shutdown. Then the children of the device are
2103   shutdown.
2104
2105   This function is called recursively. Note that we don't cache the
2106   children or such, as oppossed to assemble, shutdown of different
2107   devices doesn't require that the upper device was active.
2108
2109   @type disk: L{objects.Disk}
2110   @param disk: the description of the disk we should
2111       shutdown
2112   @rtype: None
2113
2114   """
2115   msgs = []
2116   r_dev = _RecursiveFindBD(disk)
2117   if r_dev is not None:
2118     r_path = r_dev.dev_path
2119     try:
2120       r_dev.Shutdown()
2121       DevCacheManager.RemoveCache(r_path)
2122     except errors.BlockDeviceError, err:
2123       msgs.append(str(err))
2124
2125   if disk.children:
2126     for child in disk.children:
2127       try:
2128         BlockdevShutdown(child)
2129       except RPCFail, err:
2130         msgs.append(str(err))
2131
2132   if msgs:
2133     _Fail("; ".join(msgs))
2134
2135
2136 def BlockdevAddchildren(parent_cdev, new_cdevs):
2137   """Extend a mirrored block device.
2138
2139   @type parent_cdev: L{objects.Disk}
2140   @param parent_cdev: the disk to which we should add children
2141   @type new_cdevs: list of L{objects.Disk}
2142   @param new_cdevs: the list of children which we should add
2143   @rtype: None
2144
2145   """
2146   parent_bdev = _RecursiveFindBD(parent_cdev)
2147   if parent_bdev is None:
2148     _Fail("Can't find parent device '%s' in add children", parent_cdev)
2149   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2150   if new_bdevs.count(None) > 0:
2151     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2152   parent_bdev.AddChildren(new_bdevs)
2153
2154
2155 def BlockdevRemovechildren(parent_cdev, new_cdevs):
2156   """Shrink a mirrored block device.
2157
2158   @type parent_cdev: L{objects.Disk}
2159   @param parent_cdev: the disk from which we should remove children
2160   @type new_cdevs: list of L{objects.Disk}
2161   @param new_cdevs: the list of children which we should remove
2162   @rtype: None
2163
2164   """
2165   parent_bdev = _RecursiveFindBD(parent_cdev)
2166   if parent_bdev is None:
2167     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2168   devs = []
2169   for disk in new_cdevs:
2170     rpath = disk.StaticDevPath()
2171     if rpath is None:
2172       bd = _RecursiveFindBD(disk)
2173       if bd is None:
2174         _Fail("Can't find device %s while removing children", disk)
2175       else:
2176         devs.append(bd.dev_path)
2177     else:
2178       if not utils.IsNormAbsPath(rpath):
2179         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2180       devs.append(rpath)
2181   parent_bdev.RemoveChildren(devs)
2182
2183
2184 def BlockdevGetmirrorstatus(disks):
2185   """Get the mirroring status of a list of devices.
2186
2187   @type disks: list of L{objects.Disk}
2188   @param disks: the list of disks which we should query
2189   @rtype: disk
2190   @return: List of L{objects.BlockDevStatus}, one for each disk
2191   @raise errors.BlockDeviceError: if any of the disks cannot be
2192       found
2193
2194   """
2195   stats = []
2196   for dsk in disks:
2197     rbd = _RecursiveFindBD(dsk)
2198     if rbd is None:
2199       _Fail("Can't find device %s", dsk)
2200
2201     stats.append(rbd.CombinedSyncStatus())
2202
2203   return stats
2204
2205
2206 def BlockdevGetmirrorstatusMulti(disks):
2207   """Get the mirroring status of a list of devices.
2208
2209   @type disks: list of L{objects.Disk}
2210   @param disks: the list of disks which we should query
2211   @rtype: disk
2212   @return: List of tuples, (bool, status), one for each disk; bool denotes
2213     success/failure, status is L{objects.BlockDevStatus} on success, string
2214     otherwise
2215
2216   """
2217   result = []
2218   for disk in disks:
2219     try:
2220       rbd = _RecursiveFindBD(disk)
2221       if rbd is None:
2222         result.append((False, "Can't find device %s" % disk))
2223         continue
2224
2225       status = rbd.CombinedSyncStatus()
2226     except errors.BlockDeviceError, err:
2227       logging.exception("Error while getting disk status")
2228       result.append((False, str(err)))
2229     else:
2230       result.append((True, status))
2231
2232   assert len(disks) == len(result)
2233
2234   return result
2235
2236
2237 def _RecursiveFindBD(disk):
2238   """Check if a device is activated.
2239
2240   If so, return information about the real device.
2241
2242   @type disk: L{objects.Disk}
2243   @param disk: the disk object we need to find
2244
2245   @return: None if the device can't be found,
2246       otherwise the device instance
2247
2248   """
2249   children = []
2250   if disk.children:
2251     for chdisk in disk.children:
2252       children.append(_RecursiveFindBD(chdisk))
2253
2254   return bdev.FindDevice(disk, children)
2255
2256
2257 def _OpenRealBD(disk):
2258   """Opens the underlying block device of a disk.
2259
2260   @type disk: L{objects.Disk}
2261   @param disk: the disk object we want to open
2262
2263   """
2264   real_disk = _RecursiveFindBD(disk)
2265   if real_disk is None:
2266     _Fail("Block device '%s' is not set up", disk)
2267
2268   real_disk.Open()
2269
2270   return real_disk
2271
2272
2273 def BlockdevFind(disk):
2274   """Check if a device is activated.
2275
2276   If it is, return information about the real device.
2277
2278   @type disk: L{objects.Disk}
2279   @param disk: the disk to find
2280   @rtype: None or objects.BlockDevStatus
2281   @return: None if the disk cannot be found, otherwise a the current
2282            information
2283
2284   """
2285   try:
2286     rbd = _RecursiveFindBD(disk)
2287   except errors.BlockDeviceError, err:
2288     _Fail("Failed to find device: %s", err, exc=True)
2289
2290   if rbd is None:
2291     return None
2292
2293   return rbd.GetSyncStatus()
2294
2295
2296 def BlockdevGetdimensions(disks):
2297   """Computes the size of the given disks.
2298
2299   If a disk is not found, returns None instead.
2300
2301   @type disks: list of L{objects.Disk}
2302   @param disks: the list of disk to compute the size for
2303   @rtype: list
2304   @return: list with elements None if the disk cannot be found,
2305       otherwise the pair (size, spindles), where spindles is None if the
2306       device doesn't support that
2307
2308   """
2309   result = []
2310   for cf in disks:
2311     try:
2312       rbd = _RecursiveFindBD(cf)
2313     except errors.BlockDeviceError:
2314       result.append(None)
2315       continue
2316     if rbd is None:
2317       result.append(None)
2318     else:
2319       result.append(rbd.GetActualDimensions())
2320   return result
2321
2322
2323 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2324   """Export a block device to a remote node.
2325
2326   @type disk: L{objects.Disk}
2327   @param disk: the description of the disk to export
2328   @type dest_node: str
2329   @param dest_node: the destination node to export to
2330   @type dest_path: str
2331   @param dest_path: the destination path on the target node
2332   @type cluster_name: str
2333   @param cluster_name: the cluster name, needed for SSH hostalias
2334   @rtype: None
2335
2336   """
2337   real_disk = _OpenRealBD(disk)
2338
2339   # the block size on the read dd is 1MiB to match our units
2340   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2341                                "dd if=%s bs=1048576 count=%s",
2342                                real_disk.dev_path, str(disk.size))
2343
2344   # we set here a smaller block size as, due to ssh buffering, more
2345   # than 64-128k will mostly ignored; we use nocreat to fail if the
2346   # device is not already there or we pass a wrong path; we use
2347   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2348   # to not buffer too much memory; this means that at best, we flush
2349   # every 64k, which will not be very fast
2350   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2351                                 " oflag=dsync", dest_path)
2352
2353   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2354                                                    constants.SSH_LOGIN_USER,
2355                                                    destcmd)
2356
2357   # all commands have been checked, so we're safe to combine them
2358   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2359
2360   result = utils.RunCmd(["bash", "-c", command])
2361
2362   if result.failed:
2363     _Fail("Disk copy command '%s' returned error: %s"
2364           " output: %s", command, result.fail_reason, result.output)
2365
2366
2367 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2368   """Write a file to the filesystem.
2369
2370   This allows the master to overwrite(!) a file. It will only perform
2371   the operation if the file belongs to a list of configuration files.
2372
2373   @type file_name: str
2374   @param file_name: the target file name
2375   @type data: str
2376   @param data: the new contents of the file
2377   @type mode: int
2378   @param mode: the mode to give the file (can be None)
2379   @type uid: string
2380   @param uid: the owner of the file
2381   @type gid: string
2382   @param gid: the group of the file
2383   @type atime: float
2384   @param atime: the atime to set on the file (can be None)
2385   @type mtime: float
2386   @param mtime: the mtime to set on the file (can be None)
2387   @rtype: None
2388
2389   """
2390   file_name = vcluster.LocalizeVirtualPath(file_name)
2391
2392   if not os.path.isabs(file_name):
2393     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2394
2395   if file_name not in _ALLOWED_UPLOAD_FILES:
2396     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2397           file_name)
2398
2399   raw_data = _Decompress(data)
2400
2401   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2402     _Fail("Invalid username/groupname type")
2403
2404   getents = runtime.GetEnts()
2405   uid = getents.LookupUser(uid)
2406   gid = getents.LookupGroup(gid)
2407
2408   utils.SafeWriteFile(file_name, None,
2409                       data=raw_data, mode=mode, uid=uid, gid=gid,
2410                       atime=atime, mtime=mtime)
2411
2412
2413 def RunOob(oob_program, command, node, timeout):
2414   """Executes oob_program with given command on given node.
2415
2416   @param oob_program: The path to the executable oob_program
2417   @param command: The command to invoke on oob_program
2418   @param node: The node given as an argument to the program
2419   @param timeout: Timeout after which we kill the oob program
2420
2421   @return: stdout
2422   @raise RPCFail: If execution fails for some reason
2423
2424   """
2425   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2426
2427   if result.failed:
2428     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2429           result.fail_reason, result.output)
2430
2431   return result.stdout
2432
2433
2434 def _OSOndiskAPIVersion(os_dir):
2435   """Compute and return the API version of a given OS.
2436
2437   This function will try to read the API version of the OS residing in
2438   the 'os_dir' directory.
2439
2440   @type os_dir: str
2441   @param os_dir: the directory in which we should look for the OS
2442   @rtype: tuple
2443   @return: tuple (status, data) with status denoting the validity and
2444       data holding either the vaid versions or an error message
2445
2446   """
2447   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2448
2449   try:
2450     st = os.stat(api_file)
2451   except EnvironmentError, err:
2452     return False, ("Required file '%s' not found under path %s: %s" %
2453                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2454
2455   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2456     return False, ("File '%s' in %s is not a regular file" %
2457                    (constants.OS_API_FILE, os_dir))
2458
2459   try:
2460     api_versions = utils.ReadFile(api_file).splitlines()
2461   except EnvironmentError, err:
2462     return False, ("Error while reading the API version file at %s: %s" %
2463                    (api_file, utils.ErrnoOrStr(err)))
2464
2465   try:
2466     api_versions = [int(version.strip()) for version in api_versions]
2467   except (TypeError, ValueError), err:
2468     return False, ("API version(s) can't be converted to integer: %s" %
2469                    str(err))
2470
2471   return True, api_versions
2472
2473
2474 def DiagnoseOS(top_dirs=None):
2475   """Compute the validity for all OSes.
2476
2477   @type top_dirs: list
2478   @param top_dirs: the list of directories in which to
2479       search (if not given defaults to
2480       L{pathutils.OS_SEARCH_PATH})
2481   @rtype: list of L{objects.OS}
2482   @return: a list of tuples (name, path, status, diagnose, variants,
2483       parameters, api_version) for all (potential) OSes under all
2484       search paths, where:
2485           - name is the (potential) OS name
2486           - path is the full path to the OS
2487           - status True/False is the validity of the OS
2488           - diagnose is the error message for an invalid OS, otherwise empty
2489           - variants is a list of supported OS variants, if any
2490           - parameters is a list of (name, help) parameters, if any
2491           - api_version is a list of support OS API versions
2492
2493   """
2494   if top_dirs is None:
2495     top_dirs = pathutils.OS_SEARCH_PATH
2496
2497   result = []
2498   for dir_name in top_dirs:
2499     if os.path.isdir(dir_name):
2500       try:
2501         f_names = utils.ListVisibleFiles(dir_name)
2502       except EnvironmentError, err:
2503         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2504         break
2505       for name in f_names:
2506         os_path = utils.PathJoin(dir_name, name)
2507         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2508         if status:
2509           diagnose = ""
2510           variants = os_inst.supported_variants
2511           parameters = os_inst.supported_parameters
2512           api_versions = os_inst.api_versions
2513         else:
2514           diagnose = os_inst
2515           variants = parameters = api_versions = []
2516         result.append((name, os_path, status, diagnose, variants,
2517                        parameters, api_versions))
2518
2519   return result
2520
2521
2522 def _TryOSFromDisk(name, base_dir=None):
2523   """Create an OS instance from disk.
2524
2525   This function will return an OS instance if the given name is a
2526   valid OS name.
2527
2528   @type base_dir: string
2529   @keyword base_dir: Base directory containing OS installations.
2530                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2531   @rtype: tuple
2532   @return: success and either the OS instance if we find a valid one,
2533       or error message
2534
2535   """
2536   if base_dir is None:
2537     os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2538   else:
2539     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2540
2541   if os_dir is None:
2542     return False, "Directory for OS %s not found in search path" % name
2543
2544   status, api_versions = _OSOndiskAPIVersion(os_dir)
2545   if not status:
2546     # push the error up
2547     return status, api_versions
2548
2549   if not constants.OS_API_VERSIONS.intersection(api_versions):
2550     return False, ("API version mismatch for path '%s': found %s, want %s." %
2551                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2552
2553   # OS Files dictionary, we will populate it with the absolute path
2554   # names; if the value is True, then it is a required file, otherwise
2555   # an optional one
2556   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2557
2558   if max(api_versions) >= constants.OS_API_V15:
2559     os_files[constants.OS_VARIANTS_FILE] = False
2560
2561   if max(api_versions) >= constants.OS_API_V20:
2562     os_files[constants.OS_PARAMETERS_FILE] = True
2563   else:
2564     del os_files[constants.OS_SCRIPT_VERIFY]
2565
2566   for (filename, required) in os_files.items():
2567     os_files[filename] = utils.PathJoin(os_dir, filename)
2568
2569     try:
2570       st = os.stat(os_files[filename])
2571     except EnvironmentError, err:
2572       if err.errno == errno.ENOENT and not required:
2573         del os_files[filename]
2574         continue
2575       return False, ("File '%s' under path '%s' is missing (%s)" %
2576                      (filename, os_dir, utils.ErrnoOrStr(err)))
2577
2578     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2579       return False, ("File '%s' under path '%s' is not a regular file" %
2580                      (filename, os_dir))
2581
2582     if filename in constants.OS_SCRIPTS:
2583       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2584         return False, ("File '%s' under path '%s' is not executable" %
2585                        (filename, os_dir))
2586
2587   variants = []
2588   if constants.OS_VARIANTS_FILE in os_files:
2589     variants_file = os_files[constants.OS_VARIANTS_FILE]
2590     try:
2591       variants = \
2592         utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2593     except EnvironmentError, err:
2594       # we accept missing files, but not other errors
2595       if err.errno != errno.ENOENT:
2596         return False, ("Error while reading the OS variants file at %s: %s" %
2597                        (variants_file, utils.ErrnoOrStr(err)))
2598
2599   parameters = []
2600   if constants.OS_PARAMETERS_FILE in os_files:
2601     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2602     try:
2603       parameters = utils.ReadFile(parameters_file).splitlines()
2604     except EnvironmentError, err:
2605       return False, ("Error while reading the OS parameters file at %s: %s" %
2606                      (parameters_file, utils.ErrnoOrStr(err)))
2607     parameters = [v.split(None, 1) for v in parameters]
2608
2609   os_obj = objects.OS(name=name, path=os_dir,
2610                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2611                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2612                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2613                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2614                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2615                                                  None),
2616                       supported_variants=variants,
2617                       supported_parameters=parameters,
2618                       api_versions=api_versions)
2619   return True, os_obj
2620
2621
2622 def OSFromDisk(name, base_dir=None):
2623   """Create an OS instance from disk.
2624
2625   This function will return an OS instance if the given name is a
2626   valid OS name. Otherwise, it will raise an appropriate
2627   L{RPCFail} exception, detailing why this is not a valid OS.
2628
2629   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2630   an exception but returns true/false status data.
2631
2632   @type base_dir: string
2633   @keyword base_dir: Base directory containing OS installations.
2634                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2635   @rtype: L{objects.OS}
2636   @return: the OS instance if we find a valid one
2637   @raise RPCFail: if we don't find a valid OS
2638
2639   """
2640   name_only = objects.OS.GetName(name)
2641   status, payload = _TryOSFromDisk(name_only, base_dir)
2642
2643   if not status:
2644     _Fail(payload)
2645
2646   return payload
2647
2648
2649 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2650   """Calculate the basic environment for an os script.
2651
2652   @type os_name: str
2653   @param os_name: full operating system name (including variant)
2654   @type inst_os: L{objects.OS}
2655   @param inst_os: operating system for which the environment is being built
2656   @type os_params: dict
2657   @param os_params: the OS parameters
2658   @type debug: integer
2659   @param debug: debug level (0 or 1, for OS Api 10)
2660   @rtype: dict
2661   @return: dict of environment variables
2662   @raise errors.BlockDeviceError: if the block device
2663       cannot be found
2664
2665   """
2666   result = {}
2667   api_version = \
2668     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2669   result["OS_API_VERSION"] = "%d" % api_version
2670   result["OS_NAME"] = inst_os.name
2671   result["DEBUG_LEVEL"] = "%d" % debug
2672
2673   # OS variants
2674   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2675     variant = objects.OS.GetVariant(os_name)
2676     if not variant:
2677       variant = inst_os.supported_variants[0]
2678   else:
2679     variant = ""
2680   result["OS_VARIANT"] = variant
2681
2682   # OS params
2683   for pname, pvalue in os_params.items():
2684     result["OSP_%s" % pname.upper()] = pvalue
2685
2686   # Set a default path otherwise programs called by OS scripts (or
2687   # even hooks called from OS scripts) might break, and we don't want
2688   # to have each script require setting a PATH variable
2689   result["PATH"] = constants.HOOKS_PATH
2690
2691   return result
2692
2693
2694 def OSEnvironment(instance, inst_os, debug=0):
2695   """Calculate the environment for an os script.
2696
2697   @type instance: L{objects.Instance}
2698   @param instance: target instance for the os script run
2699   @type inst_os: L{objects.OS}
2700   @param inst_os: operating system for which the environment is being built
2701   @type debug: integer
2702   @param debug: debug level (0 or 1, for OS Api 10)
2703   @rtype: dict
2704   @return: dict of environment variables
2705   @raise errors.BlockDeviceError: if the block device
2706       cannot be found
2707
2708   """
2709   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2710
2711   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2712     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2713
2714   result["HYPERVISOR"] = instance.hypervisor
2715   result["DISK_COUNT"] = "%d" % len(instance.disks)
2716   result["NIC_COUNT"] = "%d" % len(instance.nics)
2717   result["INSTANCE_SECONDARY_NODES"] = \
2718       ("%s" % " ".join(instance.secondary_nodes))
2719
2720   # Disks
2721   for idx, disk in enumerate(instance.disks):
2722     real_disk = _OpenRealBD(disk)
2723     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2724     result["DISK_%d_ACCESS" % idx] = disk.mode
2725     if constants.HV_DISK_TYPE in instance.hvparams:
2726       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2727         instance.hvparams[constants.HV_DISK_TYPE]
2728     if disk.dev_type in constants.LDS_BLOCK:
2729       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2730     elif disk.dev_type == constants.LD_FILE:
2731       result["DISK_%d_BACKEND_TYPE" % idx] = \
2732         "file:%s" % disk.physical_id[0]
2733
2734   # NICs
2735   for idx, nic in enumerate(instance.nics):
2736     result["NIC_%d_MAC" % idx] = nic.mac
2737     if nic.ip:
2738       result["NIC_%d_IP" % idx] = nic.ip
2739     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2740     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2741       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2742     if nic.nicparams[constants.NIC_LINK]:
2743       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2744     if nic.netinfo:
2745       nobj = objects.Network.FromDict(nic.netinfo)
2746       result.update(nobj.HooksDict("NIC_%d_" % idx))
2747     if constants.HV_NIC_TYPE in instance.hvparams:
2748       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2749         instance.hvparams[constants.HV_NIC_TYPE]
2750
2751   # HV/BE params
2752   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2753     for key, value in source.items():
2754       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2755
2756   return result
2757
2758
2759 def DiagnoseExtStorage(top_dirs=None):
2760   """Compute the validity for all ExtStorage Providers.
2761
2762   @type top_dirs: list
2763   @param top_dirs: the list of directories in which to
2764       search (if not given defaults to
2765       L{pathutils.ES_SEARCH_PATH})
2766   @rtype: list of L{objects.ExtStorage}
2767   @return: a list of tuples (name, path, status, diagnose, parameters)
2768       for all (potential) ExtStorage Providers under all
2769       search paths, where:
2770           - name is the (potential) ExtStorage Provider
2771           - path is the full path to the ExtStorage Provider
2772           - status True/False is the validity of the ExtStorage Provider
2773           - diagnose is the error message for an invalid ExtStorage Provider,
2774             otherwise empty
2775           - parameters is a list of (name, help) parameters, if any
2776
2777   """
2778   if top_dirs is None:
2779     top_dirs = pathutils.ES_SEARCH_PATH
2780
2781   result = []
2782   for dir_name in top_dirs:
2783     if os.path.isdir(dir_name):
2784       try:
2785         f_names = utils.ListVisibleFiles(dir_name)
2786       except EnvironmentError, err:
2787         logging.exception("Can't list the ExtStorage directory %s: %s",
2788                           dir_name, err)
2789         break
2790       for name in f_names:
2791         es_path = utils.PathJoin(dir_name, name)
2792         status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2793         if status:
2794           diagnose = ""
2795           parameters = es_inst.supported_parameters
2796         else:
2797           diagnose = es_inst
2798           parameters = []
2799         result.append((name, es_path, status, diagnose, parameters))
2800
2801   return result
2802
2803
2804 def BlockdevGrow(disk, amount, dryrun, backingstore):
2805   """Grow a stack of block devices.
2806
2807   This function is called recursively, with the childrens being the
2808   first ones to resize.
2809
2810   @type disk: L{objects.Disk}
2811   @param disk: the disk to be grown
2812   @type amount: integer
2813   @param amount: the amount (in mebibytes) to grow with
2814   @type dryrun: boolean
2815   @param dryrun: whether to execute the operation in simulation mode
2816       only, without actually increasing the size
2817   @param backingstore: whether to execute the operation on backing storage
2818       only, or on "logical" storage only; e.g. DRBD is logical storage,
2819       whereas LVM, file, RBD are backing storage
2820   @rtype: (status, result)
2821   @return: a tuple with the status of the operation (True/False), and
2822       the errors message if status is False
2823
2824   """
2825   r_dev = _RecursiveFindBD(disk)
2826   if r_dev is None:
2827     _Fail("Cannot find block device %s", disk)
2828
2829   try:
2830     r_dev.Grow(amount, dryrun, backingstore)
2831   except errors.BlockDeviceError, err:
2832     _Fail("Failed to grow block device: %s", err, exc=True)
2833
2834
2835 def BlockdevSnapshot(disk):
2836   """Create a snapshot copy of a block device.
2837
2838   This function is called recursively, and the snapshot is actually created
2839   just for the leaf lvm backend device.
2840
2841   @type disk: L{objects.Disk}
2842   @param disk: the disk to be snapshotted
2843   @rtype: string
2844   @return: snapshot disk ID as (vg, lv)
2845
2846   """
2847   if disk.dev_type == constants.LD_DRBD8:
2848     if not disk.children:
2849       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2850             disk.unique_id)
2851     return BlockdevSnapshot(disk.children[0])
2852   elif disk.dev_type == constants.LD_LV:
2853     r_dev = _RecursiveFindBD(disk)
2854     if r_dev is not None:
2855       # FIXME: choose a saner value for the snapshot size
2856       # let's stay on the safe side and ask for the full size, for now
2857       return r_dev.Snapshot(disk.size)
2858     else:
2859       _Fail("Cannot find block device %s", disk)
2860   else:
2861     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2862           disk.unique_id, disk.dev_type)
2863
2864
2865 def BlockdevSetInfo(disk, info):
2866   """Sets 'metadata' information on block devices.
2867
2868   This function sets 'info' metadata on block devices. Initial
2869   information is set at device creation; this function should be used
2870   for example after renames.
2871
2872   @type disk: L{objects.Disk}
2873   @param disk: the disk to be grown
2874   @type info: string
2875   @param info: new 'info' metadata
2876   @rtype: (status, result)
2877   @return: a tuple with the status of the operation (True/False), and
2878       the errors message if status is False
2879
2880   """
2881   r_dev = _RecursiveFindBD(disk)
2882   if r_dev is None:
2883     _Fail("Cannot find block device %s", disk)
2884
2885   try:
2886     r_dev.SetInfo(info)
2887   except errors.BlockDeviceError, err:
2888     _Fail("Failed to set information on block device: %s", err, exc=True)
2889
2890
2891 def FinalizeExport(instance, snap_disks):
2892   """Write out the export configuration information.
2893
2894   @type instance: L{objects.Instance}
2895   @param instance: the instance which we export, used for
2896       saving configuration
2897   @type snap_disks: list of L{objects.Disk}
2898   @param snap_disks: list of snapshot block devices, which
2899       will be used to get the actual name of the dump file
2900
2901   @rtype: None
2902
2903   """
2904   destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2905   finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2906
2907   config = objects.SerializableConfigParser()
2908
2909   config.add_section(constants.INISECT_EXP)
2910   config.set(constants.INISECT_EXP, "version", "0")
2911   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2912   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2913   config.set(constants.INISECT_EXP, "os", instance.os)
2914   config.set(constants.INISECT_EXP, "compression", "none")
2915
2916   config.add_section(constants.INISECT_INS)
2917   config.set(constants.INISECT_INS, "name", instance.name)
2918   config.set(constants.INISECT_INS, "maxmem", "%d" %
2919              instance.beparams[constants.BE_MAXMEM])
2920   config.set(constants.INISECT_INS, "minmem", "%d" %
2921              instance.beparams[constants.BE_MINMEM])
2922   # "memory" is deprecated, but useful for exporting to old ganeti versions
2923   config.set(constants.INISECT_INS, "memory", "%d" %
2924              instance.beparams[constants.BE_MAXMEM])
2925   config.set(constants.INISECT_INS, "vcpus", "%d" %
2926              instance.beparams[constants.BE_VCPUS])
2927   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2928   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2929   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2930
2931   nic_total = 0
2932   for nic_count, nic in enumerate(instance.nics):
2933     nic_total += 1
2934     config.set(constants.INISECT_INS, "nic%d_mac" %
2935                nic_count, "%s" % nic.mac)
2936     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2937     config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2938                "%s" % nic.network)
2939     for param in constants.NICS_PARAMETER_TYPES:
2940       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2941                  "%s" % nic.nicparams.get(param, None))
2942   # TODO: redundant: on load can read nics until it doesn't exist
2943   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2944
2945   disk_total = 0
2946   for disk_count, disk in enumerate(snap_disks):
2947     if disk:
2948       disk_total += 1
2949       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2950                  ("%s" % disk.iv_name))
2951       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2952                  ("%s" % disk.physical_id[1]))
2953       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2954                  ("%d" % disk.size))
2955
2956   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2957
2958   # New-style hypervisor/backend parameters
2959
2960   config.add_section(constants.INISECT_HYP)
2961   for name, value in instance.hvparams.items():
2962     if name not in constants.HVC_GLOBALS:
2963       config.set(constants.INISECT_HYP, name, str(value))
2964
2965   config.add_section(constants.INISECT_BEP)
2966   for name, value in instance.beparams.items():
2967     config.set(constants.INISECT_BEP, name, str(value))
2968
2969   config.add_section(constants.INISECT_OSP)
2970   for name, value in instance.osparams.items():
2971     config.set(constants.INISECT_OSP, name, str(value))
2972
2973   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2974                   data=config.Dumps())
2975   shutil.rmtree(finaldestdir, ignore_errors=True)
2976   shutil.move(destdir, finaldestdir)
2977
2978
2979 def ExportInfo(dest):
2980   """Get export configuration information.
2981
2982   @type dest: str
2983   @param dest: directory containing the export
2984
2985   @rtype: L{objects.SerializableConfigParser}
2986   @return: a serializable config file containing the
2987       export info
2988
2989   """
2990   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2991
2992   config = objects.SerializableConfigParser()
2993   config.read(cff)
2994
2995   if (not config.has_section(constants.INISECT_EXP) or
2996       not config.has_section(constants.INISECT_INS)):
2997     _Fail("Export info file doesn't have the required fields")
2998
2999   return config.Dumps()
3000
3001
3002 def ListExports():
3003   """Return a list of exports currently available on this machine.
3004
3005   @rtype: list
3006   @return: list of the exports
3007
3008   """
3009   if os.path.isdir(pathutils.EXPORT_DIR):
3010     return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3011   else:
3012     _Fail("No exports directory")
3013
3014
3015 def RemoveExport(export):
3016   """Remove an existing export from the node.
3017
3018   @type export: str
3019   @param export: the name of the export to remove
3020   @rtype: None
3021
3022   """
3023   target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3024
3025   try:
3026     shutil.rmtree(target)
3027   except EnvironmentError, err:
3028     _Fail("Error while removing the export: %s", err, exc=True)
3029
3030
3031 def BlockdevRename(devlist):
3032   """Rename a list of block devices.
3033
3034   @type devlist: list of tuples
3035   @param devlist: list of tuples of the form  (disk,
3036       new_logical_id, new_physical_id); disk is an
3037       L{objects.Disk} object describing the current disk,
3038       and new logical_id/physical_id is the name we
3039       rename it to
3040   @rtype: boolean
3041   @return: True if all renames succeeded, False otherwise
3042
3043   """
3044   msgs = []
3045   result = True
3046   for disk, unique_id in devlist:
3047     dev = _RecursiveFindBD(disk)
3048     if dev is None:
3049       msgs.append("Can't find device %s in rename" % str(disk))
3050       result = False
3051       continue
3052     try:
3053       old_rpath = dev.dev_path
3054       dev.Rename(unique_id)
3055       new_rpath = dev.dev_path
3056       if old_rpath != new_rpath:
3057         DevCacheManager.RemoveCache(old_rpath)
3058         # FIXME: we should add the new cache information here, like:
3059         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3060         # but we don't have the owner here - maybe parse from existing
3061         # cache? for now, we only lose lvm data when we rename, which
3062         # is less critical than DRBD or MD
3063     except errors.BlockDeviceError, err:
3064       msgs.append("Can't rename device '%s' to '%s': %s" %
3065                   (dev, unique_id, err))
3066       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3067       result = False
3068   if not result:
3069     _Fail("; ".join(msgs))
3070
3071
3072 def _TransformFileStorageDir(fs_dir):
3073   """Checks whether given file_storage_dir is valid.
3074
3075   Checks wheter the given fs_dir is within the cluster-wide default
3076   file_storage_dir or the shared_file_storage_dir, which are stored in
3077   SimpleStore. Only paths under those directories are allowed.
3078
3079   @type fs_dir: str
3080   @param fs_dir: the path to check
3081
3082   @return: the normalized path if valid, None otherwise
3083
3084   """
3085   if not (constants.ENABLE_FILE_STORAGE or
3086           constants.ENABLE_SHARED_FILE_STORAGE):
3087     _Fail("File storage disabled at configure time")
3088
3089   bdev.CheckFileStoragePath(fs_dir)
3090
3091   return os.path.normpath(fs_dir)
3092
3093
3094 def CreateFileStorageDir(file_storage_dir):
3095   """Create file storage directory.
3096
3097   @type file_storage_dir: str
3098   @param file_storage_dir: directory to create
3099
3100   @rtype: tuple
3101   @return: tuple with first element a boolean indicating wheter dir
3102       creation was successful or not
3103
3104   """
3105   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3106   if os.path.exists(file_storage_dir):
3107     if not os.path.isdir(file_storage_dir):
3108       _Fail("Specified storage dir '%s' is not a directory",
3109             file_storage_dir)
3110   else:
3111     try:
3112       os.makedirs(file_storage_dir, 0750)
3113     except OSError, err:
3114       _Fail("Cannot create file storage directory '%s': %s",
3115             file_storage_dir, err, exc=True)
3116
3117
3118 def RemoveFileStorageDir(file_storage_dir):
3119   """Remove file storage directory.
3120
3121   Remove it only if it's empty. If not log an error and return.
3122
3123   @type file_storage_dir: str
3124   @param file_storage_dir: the directory we should cleanup
3125   @rtype: tuple (success,)
3126   @return: tuple of one element, C{success}, denoting
3127       whether the operation was successful
3128
3129   """
3130   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3131   if os.path.exists(file_storage_dir):
3132     if not os.path.isdir(file_storage_dir):
3133       _Fail("Specified Storage directory '%s' is not a directory",
3134             file_storage_dir)
3135     # deletes dir only if empty, otherwise we want to fail the rpc call
3136     try:
3137       os.rmdir(file_storage_dir)
3138     except OSError, err:
3139       _Fail("Cannot remove file storage directory '%s': %s",
3140             file_storage_dir, err)
3141
3142
3143 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3144   """Rename the file storage directory.
3145
3146   @type old_file_storage_dir: str
3147   @param old_file_storage_dir: the current path
3148   @type new_file_storage_dir: str
3149   @param new_file_storage_dir: the name we should rename to
3150   @rtype: tuple (success,)
3151   @return: tuple of one element, C{success}, denoting
3152       whether the operation was successful
3153
3154   """
3155   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3156   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3157   if not os.path.exists(new_file_storage_dir):
3158     if os.path.isdir(old_file_storage_dir):
3159       try:
3160         os.rename(old_file_storage_dir, new_file_storage_dir)
3161       except OSError, err:
3162         _Fail("Cannot rename '%s' to '%s': %s",
3163               old_file_storage_dir, new_file_storage_dir, err)
3164     else:
3165       _Fail("Specified storage dir '%s' is not a directory",
3166             old_file_storage_dir)
3167   else:
3168     if os.path.exists(old_file_storage_dir):
3169       _Fail("Cannot rename '%s' to '%s': both locations exist",
3170             old_file_storage_dir, new_file_storage_dir)
3171
3172
3173 def _EnsureJobQueueFile(file_name):
3174   """Checks whether the given filename is in the queue directory.
3175
3176   @type file_name: str
3177   @param file_name: the file name we should check
3178   @rtype: None
3179   @raises RPCFail: if the file is not valid
3180
3181   """
3182   if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3183     _Fail("Passed job queue file '%s' does not belong to"
3184           " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3185
3186
3187 def JobQueueUpdate(file_name, content):
3188   """Updates a file in the queue directory.
3189
3190   This is just a wrapper over L{utils.io.WriteFile}, with proper
3191   checking.
3192
3193   @type file_name: str
3194   @param file_name: the job file name
3195   @type content: str
3196   @param content: the new job contents
3197   @rtype: boolean
3198   @return: the success of the operation
3199
3200   """
3201   file_name = vcluster.LocalizeVirtualPath(file_name)
3202
3203   _EnsureJobQueueFile(file_name)
3204   getents = runtime.GetEnts()
3205
3206   # Write and replace the file atomically
3207   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3208                   gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3209
3210
3211 def JobQueueRename(old, new):
3212   """Renames a job queue file.
3213
3214   This is just a wrapper over os.rename with proper checking.
3215
3216   @type old: str
3217   @param old: the old (actual) file name
3218   @type new: str
3219   @param new: the desired file name
3220   @rtype: tuple
3221   @return: the success of the operation and payload
3222
3223   """
3224   old = vcluster.LocalizeVirtualPath(old)
3225   new = vcluster.LocalizeVirtualPath(new)
3226
3227   _EnsureJobQueueFile(old)
3228   _EnsureJobQueueFile(new)
3229
3230   getents = runtime.GetEnts()
3231
3232   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3233                    dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3234
3235
3236 def BlockdevClose(instance_name, disks):
3237   """Closes the given block devices.
3238
3239   This means they will be switched to secondary mode (in case of
3240   DRBD).
3241
3242   @param instance_name: if the argument is not empty, the symlinks
3243       of this instance will be removed
3244   @type disks: list of L{objects.Disk}
3245   @param disks: the list of disks to be closed
3246   @rtype: tuple (success, message)
3247   @return: a tuple of success and message, where success
3248       indicates the succes of the operation, and message
3249       which will contain the error details in case we
3250       failed
3251
3252   """
3253   bdevs = []
3254   for cf in disks:
3255     rd = _RecursiveFindBD(cf)
3256     if rd is None:
3257       _Fail("Can't find device %s", cf)
3258     bdevs.append(rd)
3259
3260   msg = []
3261   for rd in bdevs:
3262     try:
3263       rd.Close()
3264     except errors.BlockDeviceError, err:
3265       msg.append(str(err))
3266   if msg:
3267     _Fail("Can't make devices secondary: %s", ",".join(msg))
3268   else:
3269     if instance_name:
3270       _RemoveBlockDevLinks(instance_name, disks)
3271
3272
3273 def ValidateHVParams(hvname, hvparams):
3274   """Validates the given hypervisor parameters.
3275
3276   @type hvname: string
3277   @param hvname: the hypervisor name
3278   @type hvparams: dict
3279   @param hvparams: the hypervisor parameters to be validated
3280   @rtype: None
3281
3282   """
3283   try:
3284     hv_type = hypervisor.GetHypervisor(hvname)
3285     hv_type.ValidateParameters(hvparams)
3286   except errors.HypervisorError, err:
3287     _Fail(str(err), log=False)
3288
3289
3290 def _CheckOSPList(os_obj, parameters):
3291   """Check whether a list of parameters is supported by the OS.
3292
3293   @type os_obj: L{objects.OS}
3294   @param os_obj: OS object to check
3295   @type parameters: list
3296   @param parameters: the list of parameters to check
3297
3298   """
3299   supported = [v[0] for v in os_obj.supported_parameters]
3300   delta = frozenset(parameters).difference(supported)
3301   if delta:
3302     _Fail("The following parameters are not supported"
3303           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3304
3305
3306 def ValidateOS(required, osname, checks, osparams):
3307   """Validate the given OS' parameters.
3308
3309   @type required: boolean
3310   @param required: whether absence of the OS should translate into
3311       failure or not
3312   @type osname: string
3313   @param osname: the OS to be validated
3314   @type checks: list
3315   @param checks: list of the checks to run (currently only 'parameters')
3316   @type osparams: dict
3317   @param osparams: dictionary with OS parameters
3318   @rtype: boolean
3319   @return: True if the validation passed, or False if the OS was not
3320       found and L{required} was false
3321
3322   """
3323   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3324     _Fail("Unknown checks required for OS %s: %s", osname,
3325           set(checks).difference(constants.OS_VALIDATE_CALLS))
3326
3327   name_only = objects.OS.GetName(osname)
3328   status, tbv = _TryOSFromDisk(name_only, None)
3329
3330   if not status:
3331     if required:
3332       _Fail(tbv)
3333     else:
3334       return False
3335
3336   if max(tbv.api_versions) < constants.OS_API_V20:
3337     return True
3338
3339   if constants.OS_VALIDATE_PARAMETERS in checks:
3340     _CheckOSPList(tbv, osparams.keys())
3341
3342   validate_env = OSCoreEnv(osname, tbv, osparams)
3343   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3344                         cwd=tbv.path, reset_env=True)
3345   if result.failed:
3346     logging.error("os validate command '%s' returned error: %s output: %s",
3347                   result.cmd, result.fail_reason, result.output)
3348     _Fail("OS validation script failed (%s), output: %s",
3349           result.fail_reason, result.output, log=False)
3350
3351   return True
3352
3353
3354 def DemoteFromMC():
3355   """Demotes the current node from master candidate role.
3356
3357   """
3358   # try to ensure we're not the master by mistake
3359   master, myself = ssconf.GetMasterAndMyself()
3360   if master == myself:
3361     _Fail("ssconf status shows I'm the master node, will not demote")
3362
3363   result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3364   if not result.failed:
3365     _Fail("The master daemon is running, will not demote")
3366
3367   try:
3368     if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3369       utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3370   except EnvironmentError, err:
3371     if err.errno != errno.ENOENT:
3372       _Fail("Error while backing up cluster file: %s", err, exc=True)
3373
3374   utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3375
3376
3377 def _GetX509Filenames(cryptodir, name):
3378   """Returns the full paths for the private key and certificate.
3379
3380   """
3381   return (utils.PathJoin(cryptodir, name),
3382           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3383           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3384
3385
3386 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3387   """Creates a new X509 certificate for SSL/TLS.
3388
3389   @type validity: int
3390   @param validity: Validity in seconds
3391   @rtype: tuple; (string, string)
3392   @return: Certificate name and public part
3393
3394   """
3395   (key_pem, cert_pem) = \
3396     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3397                                      min(validity, _MAX_SSL_CERT_VALIDITY))
3398
3399   cert_dir = tempfile.mkdtemp(dir=cryptodir,
3400                               prefix="x509-%s-" % utils.TimestampForFilename())
3401   try:
3402     name = os.path.basename(cert_dir)
3403     assert len(name) > 5
3404
3405     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3406
3407     utils.WriteFile(key_file, mode=0400, data=key_pem)
3408     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3409
3410     # Never return private key as it shouldn't leave the node
3411     return (name, cert_pem)
3412   except Exception:
3413     shutil.rmtree(cert_dir, ignore_errors=True)
3414     raise
3415
3416
3417 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3418   """Removes a X509 certificate.
3419
3420   @type name: string
3421   @param name: Certificate name
3422
3423   """
3424   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3425
3426   utils.RemoveFile(key_file)
3427   utils.RemoveFile(cert_file)
3428
3429   try:
3430     os.rmdir(cert_dir)
3431   except EnvironmentError, err:
3432     _Fail("Cannot remove certificate directory '%s': %s",
3433           cert_dir, err)
3434
3435
3436 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3437   """Returns the command for the requested input/output.
3438
3439   @type instance: L{objects.Instance}
3440   @param instance: The instance object
3441   @param mode: Import/export mode
3442   @param ieio: Input/output type
3443   @param ieargs: Input/output arguments
3444
3445   """
3446   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3447
3448   env = None
3449   prefix = None
3450   suffix = None
3451   exp_size = None
3452
3453   if ieio == constants.IEIO_FILE:
3454     (filename, ) = ieargs
3455
3456     if not utils.IsNormAbsPath(filename):
3457       _Fail("Path '%s' is not normalized or absolute", filename)
3458
3459     real_filename = os.path.realpath(filename)
3460     directory = os.path.dirname(real_filename)
3461
3462     if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3463       _Fail("File '%s' is not under exports directory '%s': %s",
3464             filename, pathutils.EXPORT_DIR, real_filename)
3465
3466     # Create directory
3467     utils.Makedirs(directory, mode=0750)
3468
3469     quoted_filename = utils.ShellQuote(filename)
3470
3471     if mode == constants.IEM_IMPORT:
3472       suffix = "> %s" % quoted_filename
3473     elif mode == constants.IEM_EXPORT:
3474       suffix = "< %s" % quoted_filename
3475
3476       # Retrieve file size
3477       try:
3478         st = os.stat(filename)
3479       except EnvironmentError, err:
3480         logging.error("Can't stat(2) %s: %s", filename, err)
3481       else:
3482         exp_size = utils.BytesToMebibyte(st.st_size)
3483
3484   elif ieio == constants.IEIO_RAW_DISK:
3485     (disk, ) = ieargs
3486
3487     real_disk = _OpenRealBD(disk)
3488
3489     if mode == constants.IEM_IMPORT:
3490       # we set here a smaller block size as, due to transport buffering, more
3491       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3492       # is not already there or we pass a wrong path; we use notrunc to no
3493       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3494       # much memory; this means that at best, we flush every 64k, which will
3495       # not be very fast
3496       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3497                                     " bs=%s oflag=dsync"),
3498                                     real_disk.dev_path,
3499                                     str(64 * 1024))
3500
3501     elif mode == constants.IEM_EXPORT:
3502       # the block size on the read dd is 1MiB to match our units
3503       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3504                                    real_disk.dev_path,
3505                                    str(1024 * 1024), # 1 MB
3506                                    str(disk.size))
3507       exp_size = disk.size
3508
3509   elif ieio == constants.IEIO_SCRIPT:
3510     (disk, disk_index, ) = ieargs
3511
3512     assert isinstance(disk_index, (int, long))
3513
3514     real_disk = _OpenRealBD(disk)
3515
3516     inst_os = OSFromDisk(instance.os)
3517     env = OSEnvironment(instance, inst_os)
3518
3519     if mode == constants.IEM_IMPORT:
3520       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3521       env["IMPORT_INDEX"] = str(disk_index)
3522       script = inst_os.import_script
3523
3524     elif mode == constants.IEM_EXPORT:
3525       env["EXPORT_DEVICE"] = real_disk.dev_path
3526       env["EXPORT_INDEX"] = str(disk_index)
3527       script = inst_os.export_script
3528
3529     # TODO: Pass special environment only to script
3530     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3531
3532     if mode == constants.IEM_IMPORT:
3533       suffix = "| %s" % script_cmd
3534
3535     elif mode == constants.IEM_EXPORT:
3536       prefix = "%s |" % script_cmd
3537
3538     # Let script predict size
3539     exp_size = constants.IE_CUSTOM_SIZE
3540
3541   else:
3542     _Fail("Invalid %s I/O mode %r", mode, ieio)
3543
3544   return (env, prefix, suffix, exp_size)
3545
3546
3547 def _CreateImportExportStatusDir(prefix):
3548   """Creates status directory for import/export.
3549
3550   """
3551   return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3552                           prefix=("%s-%s-" %
3553                                   (prefix, utils.TimestampForFilename())))
3554
3555
3556 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3557                             ieio, ieioargs):
3558   """Starts an import or export daemon.
3559
3560   @param mode: Import/output mode
3561   @type opts: L{objects.ImportExportOptions}
3562   @param opts: Daemon options
3563   @type host: string
3564   @param host: Remote host for export (None for import)
3565   @type port: int
3566   @param port: Remote port for export (None for import)
3567   @type instance: L{objects.Instance}
3568   @param instance: Instance object
3569   @type component: string
3570   @param component: which part of the instance is transferred now,
3571       e.g. 'disk/0'
3572   @param ieio: Input/output type
3573   @param ieioargs: Input/output arguments
3574
3575   """
3576   if mode == constants.IEM_IMPORT:
3577     prefix = "import"
3578
3579     if not (host is None and port is None):
3580       _Fail("Can not specify host or port on import")
3581
3582   elif mode == constants.IEM_EXPORT:
3583     prefix = "export"
3584
3585     if host is None or port is None:
3586       _Fail("Host and port must be specified for an export")
3587
3588   else:
3589     _Fail("Invalid mode %r", mode)
3590
3591   if (opts.key_name is None) ^ (opts.ca_pem is None):
3592     _Fail("Cluster certificate can only be used for both key and CA")
3593
3594   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3595     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3596
3597   if opts.key_name is None:
3598     # Use server.pem
3599     key_path = pathutils.NODED_CERT_FILE
3600     cert_path = pathutils.NODED_CERT_FILE
3601     assert opts.ca_pem is None
3602   else:
3603     (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3604                                                  opts.key_name)
3605     assert opts.ca_pem is not None
3606
3607   for i in [key_path, cert_path]:
3608     if not os.path.exists(i):
3609       _Fail("File '%s' does not exist" % i)
3610
3611   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3612   try:
3613     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3614     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3615     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3616
3617     if opts.ca_pem is None:
3618       # Use server.pem
3619       ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3620     else:
3621       ca = opts.ca_pem
3622
3623     # Write CA file
3624     utils.WriteFile(ca_file, data=ca, mode=0400)
3625
3626     cmd = [
3627       pathutils.IMPORT_EXPORT_DAEMON,
3628       status_file, mode,
3629       "--key=%s" % key_path,
3630       "--cert=%s" % cert_path,
3631       "--ca=%s" % ca_file,
3632       ]
3633
3634     if host:
3635       cmd.append("--host=%s" % host)
3636
3637     if port:
3638       cmd.append("--port=%s" % port)
3639
3640     if opts.ipv6:
3641       cmd.append("--ipv6")
3642     else:
3643       cmd.append("--ipv4")
3644
3645     if opts.compress:
3646       cmd.append("--compress=%s" % opts.compress)
3647
3648     if opts.magic:
3649       cmd.append("--magic=%s" % opts.magic)
3650
3651     if exp_size is not None:
3652       cmd.append("--expected-size=%s" % exp_size)
3653
3654     if cmd_prefix:
3655       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3656
3657     if cmd_suffix:
3658       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3659
3660     if mode == constants.IEM_EXPORT:
3661       # Retry connection a few times when connecting to remote peer
3662       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3663       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3664     elif opts.connect_timeout is not None:
3665       assert mode == constants.IEM_IMPORT
3666       # Overall timeout for establishing connection while listening
3667       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3668
3669     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3670
3671     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3672     # support for receiving a file descriptor for output
3673     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3674                       output=logfile)
3675
3676     # The import/export name is simply the status directory name
3677     return os.path.basename(status_dir)
3678
3679   except Exception:
3680     shutil.rmtree(status_dir, ignore_errors=True)
3681     raise
3682
3683
3684 def GetImportExportStatus(names):
3685   """Returns import/export daemon status.
3686
3687   @type names: sequence
3688   @param names: List of names
3689   @rtype: List of dicts
3690   @return: Returns a list of the state of each named import/export or None if a
3691            status couldn't be read
3692
3693   """
3694   result = []
3695
3696   for name in names:
3697     status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3698                                  _IES_STATUS_FILE)
3699
3700     try:
3701       data = utils.ReadFile(status_file)
3702     except EnvironmentError, err:
3703       if err.errno != errno.ENOENT:
3704         raise
3705       data = None
3706
3707     if not data:
3708       result.append(None)
3709       continue
3710
3711     result.append(serializer.LoadJson(data))
3712
3713   return result
3714
3715
3716 def AbortImportExport(name):
3717   """Sends SIGTERM to a running import/export daemon.
3718
3719   """
3720   logging.info("Abort import/export %s", name)
3721
3722   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3723   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3724
3725   if pid:
3726     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3727                  name, pid)
3728     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3729
3730
3731 def CleanupImportExport(name):
3732   """Cleanup after an import or export.
3733
3734   If the import/export daemon is still running it's killed. Afterwards the
3735   whole status directory is removed.
3736
3737   """
3738   logging.info("Finalizing import/export %s", name)
3739
3740   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3741
3742   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3743
3744   if pid:
3745     logging.info("Import/export %s is still running with PID %s",
3746                  name, pid)
3747     utils.KillProcess(pid, waitpid=False)
3748
3749   shutil.rmtree(status_dir, ignore_errors=True)
3750
3751
3752 def _FindDisks(nodes_ip, disks):
3753   """Sets the physical ID on disks and returns the block devices.
3754
3755   """
3756   # set the correct physical ID
3757   my_name = netutils.Hostname.GetSysName()
3758   for cf in disks:
3759     cf.SetPhysicalID(my_name, nodes_ip)
3760
3761   bdevs = []
3762
3763   for cf in disks:
3764     rd = _RecursiveFindBD(cf)
3765     if rd is None:
3766       _Fail("Can't find device %s", cf)
3767     bdevs.append(rd)
3768   return bdevs
3769
3770
3771 def DrbdDisconnectNet(nodes_ip, disks):
3772   """Disconnects the network on a list of drbd devices.
3773
3774   """
3775   bdevs = _FindDisks(nodes_ip, disks)
3776
3777   # disconnect disks
3778   for rd in bdevs:
3779     try:
3780       rd.DisconnectNet()
3781     except errors.BlockDeviceError, err:
3782       _Fail("Can't change network configuration to standalone mode: %s",
3783             err, exc=True)
3784
3785
3786 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3787   """Attaches the network on a list of drbd devices.
3788
3789   """
3790   bdevs = _FindDisks(nodes_ip, disks)
3791
3792   if multimaster:
3793     for idx, rd in enumerate(bdevs):
3794       try:
3795         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3796       except EnvironmentError, err:
3797         _Fail("Can't create symlink: %s", err)
3798   # reconnect disks, switch to new master configuration and if
3799   # needed primary mode
3800   for rd in bdevs:
3801     try:
3802       rd.AttachNet(multimaster)
3803     except errors.BlockDeviceError, err:
3804       _Fail("Can't change network configuration: %s", err)
3805
3806   # wait until the disks are connected; we need to retry the re-attach
3807   # if the device becomes standalone, as this might happen if the one
3808   # node disconnects and reconnects in a different mode before the
3809   # other node reconnects; in this case, one or both of the nodes will
3810   # decide it has wrong configuration and switch to standalone
3811
3812   def _Attach():
3813     all_connected = True
3814
3815     for rd in bdevs:
3816       stats = rd.GetProcStatus()
3817
3818       all_connected = (all_connected and
3819                        (stats.is_connected or stats.is_in_resync))
3820
3821       if stats.is_standalone:
3822         # peer had different config info and this node became
3823         # standalone, even though this should not happen with the
3824         # new staged way of changing disk configs
3825         try:
3826           rd.AttachNet(multimaster)
3827         except errors.BlockDeviceError, err:
3828           _Fail("Can't change network configuration: %s", err)
3829
3830     if not all_connected:
3831       raise utils.RetryAgain()
3832
3833   try:
3834     # Start with a delay of 100 miliseconds and go up to 5 seconds
3835     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3836   except utils.RetryTimeout:
3837     _Fail("Timeout in disk reconnecting")
3838
3839   if multimaster:
3840     # change to primary mode
3841     for rd in bdevs:
3842       try:
3843         rd.Open()
3844       except errors.BlockDeviceError, err:
3845         _Fail("Can't change to primary mode: %s", err)
3846
3847
3848 def DrbdWaitSync(nodes_ip, disks):
3849   """Wait until DRBDs have synchronized.
3850
3851   """
3852   def _helper(rd):
3853     stats = rd.GetProcStatus()
3854     if not (stats.is_connected or stats.is_in_resync):
3855       raise utils.RetryAgain()
3856     return stats
3857
3858   bdevs = _FindDisks(nodes_ip, disks)
3859
3860   min_resync = 100
3861   alldone = True
3862   for rd in bdevs:
3863     try:
3864       # poll each second for 15 seconds
3865       stats = utils.Retry(_helper, 1, 15, args=[rd])
3866     except utils.RetryTimeout:
3867       stats = rd.GetProcStatus()
3868       # last check
3869       if not (stats.is_connected or stats.is_in_resync):
3870         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3871     alldone = alldone and (not stats.is_in_resync)
3872     if stats.sync_percent is not None:
3873       min_resync = min(min_resync, stats.sync_percent)
3874
3875   return (alldone, min_resync)
3876
3877
3878 def GetDrbdUsermodeHelper():
3879   """Returns DRBD usermode helper currently configured.
3880
3881   """
3882   try:
3883     return drbd.DRBD8.GetUsermodeHelper()
3884   except errors.BlockDeviceError, err:
3885     _Fail(str(err))
3886
3887
3888 def PowercycleNode(hypervisor_type):
3889   """Hard-powercycle the node.
3890
3891   Because we need to return first, and schedule the powercycle in the
3892   background, we won't be able to report failures nicely.
3893
3894   """
3895   hyper = hypervisor.GetHypervisor(hypervisor_type)
3896   try:
3897     pid = os.fork()
3898   except OSError:
3899     # if we can't fork, we'll pretend that we're in the child process
3900     pid = 0
3901   if pid > 0:
3902     return "Reboot scheduled in 5 seconds"
3903   # ensure the child is running on ram
3904   try:
3905     utils.Mlockall()
3906   except Exception: # pylint: disable=W0703
3907     pass
3908   time.sleep(5)
3909   hyper.PowercycleNode()
3910
3911
3912 def _VerifyRestrictedCmdName(cmd):
3913   """Verifies a restricted command name.
3914
3915   @type cmd: string
3916   @param cmd: Command name
3917   @rtype: tuple; (boolean, string or None)
3918   @return: The tuple's first element is the status; if C{False}, the second
3919     element is an error message string, otherwise it's C{None}
3920
3921   """
3922   if not cmd.strip():
3923     return (False, "Missing command name")
3924
3925   if os.path.basename(cmd) != cmd:
3926     return (False, "Invalid command name")
3927
3928   if not constants.EXT_PLUGIN_MASK.match(cmd):
3929     return (False, "Command name contains forbidden characters")
3930
3931   return (True, None)
3932
3933
3934 def _CommonRestrictedCmdCheck(path, owner):
3935   """Common checks for restricted command file system directories and files.
3936
3937   @type path: string
3938   @param path: Path to check
3939   @param owner: C{None} or tuple containing UID and GID
3940   @rtype: tuple; (boolean, string or C{os.stat} result)
3941   @return: The tuple's first element is the status; if C{False}, the second
3942     element is an error message string, otherwise it's the result of C{os.stat}
3943
3944   """
3945   if owner is None:
3946     # Default to root as owner
3947     owner = (0, 0)
3948
3949   try:
3950     st = os.stat(path)
3951   except EnvironmentError, err:
3952     return (False, "Can't stat(2) '%s': %s" % (path, err))
3953
3954   if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3955     return (False, "Permissions on '%s' are too permissive" % path)
3956
3957   if (st.st_uid, st.st_gid) != owner:
3958     (owner_uid, owner_gid) = owner
3959     return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3960
3961   return (True, st)
3962
3963
3964 def _VerifyRestrictedCmdDirectory(path, _owner=None):
3965   """Verifies restricted command directory.
3966
3967   @type path: string
3968   @param path: Path to check
3969   @rtype: tuple; (boolean, string or None)
3970   @return: The tuple's first element is the status; if C{False}, the second
3971     element is an error message string, otherwise it's C{None}
3972
3973   """
3974   (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3975
3976   if not status:
3977     return (False, value)
3978
3979   if not stat.S_ISDIR(value.st_mode):
3980     return (False, "Path '%s' is not a directory" % path)
3981
3982   return (True, None)
3983
3984
3985 def _VerifyRestrictedCmd(path, cmd, _owner=None):
3986   """Verifies a whole restricted command and returns its executable filename.
3987
3988   @type path: string
3989   @param path: Directory containing restricted commands
3990   @type cmd: string
3991   @param cmd: Command name
3992   @rtype: tuple; (boolean, string)
3993   @return: The tuple's first element is the status; if C{False}, the second
3994     element is an error message string, otherwise the second element is the
3995     absolute path to the executable
3996
3997   """
3998   executable = utils.PathJoin(path, cmd)
3999
4000   (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4001
4002   if not status:
4003     return (False, msg)
4004
4005   if not utils.IsExecutable(executable):
4006     return (False, "access(2) thinks '%s' can't be executed" % executable)
4007
4008   return (True, executable)
4009
4010
4011 def _PrepareRestrictedCmd(path, cmd,
4012                           _verify_dir=_VerifyRestrictedCmdDirectory,
4013                           _verify_name=_VerifyRestrictedCmdName,
4014                           _verify_cmd=_VerifyRestrictedCmd):
4015   """Performs a number of tests on a restricted command.
4016
4017   @type path: string
4018   @param path: Directory containing restricted commands
4019   @type cmd: string
4020   @param cmd: Command name
4021   @return: Same as L{_VerifyRestrictedCmd}
4022
4023   """
4024   # Verify the directory first
4025   (status, msg) = _verify_dir(path)
4026   if status:
4027     # Check command if everything was alright
4028     (status, msg) = _verify_name(cmd)
4029
4030   if not status:
4031     return (False, msg)
4032
4033   # Check actual executable
4034   return _verify_cmd(path, cmd)
4035
4036
4037 def RunRestrictedCmd(cmd,
4038                      _lock_timeout=_RCMD_LOCK_TIMEOUT,
4039                      _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4040                      _path=pathutils.RESTRICTED_COMMANDS_DIR,
4041                      _sleep_fn=time.sleep,
4042                      _prepare_fn=_PrepareRestrictedCmd,
4043                      _runcmd_fn=utils.RunCmd,
4044                      _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4045   """Executes a restricted command after performing strict tests.
4046
4047   @type cmd: string
4048   @param cmd: Command name
4049   @rtype: string
4050   @return: Command output
4051   @raise RPCFail: In case of an error
4052
4053   """
4054   logging.info("Preparing to run restricted command '%s'", cmd)
4055
4056   if not _enabled:
4057     _Fail("Restricted commands disabled at configure time")
4058
4059   lock = None
4060   try:
4061     cmdresult = None
4062     try:
4063       lock = utils.FileLock.Open(_lock_file)
4064       lock.Exclusive(blocking=True, timeout=_lock_timeout)
4065
4066       (status, value) = _prepare_fn(_path, cmd)
4067
4068       if status:
4069         cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4070                                postfork_fn=lambda _: lock.Unlock())
4071       else:
4072         logging.error(value)
4073     except Exception: # pylint: disable=W0703
4074       # Keep original error in log
4075       logging.exception("Caught exception")
4076
4077     if cmdresult is None:
4078       logging.info("Sleeping for %0.1f seconds before returning",
4079                    _RCMD_INVALID_DELAY)
4080       _sleep_fn(_RCMD_INVALID_DELAY)
4081
4082       # Do not include original error message in returned error
4083       _Fail("Executing command '%s' failed" % cmd)
4084     elif cmdresult.failed or cmdresult.fail_reason:
4085       _Fail("Restricted command '%s' failed: %s; output: %s",
4086             cmd, cmdresult.fail_reason, cmdresult.output)
4087     else:
4088       return cmdresult.output
4089   finally:
4090     if lock is not None:
4091       # Release lock at last
4092       lock.Close()
4093       lock = None
4094
4095
4096 def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4097   """Creates or removes the watcher pause file.
4098
4099   @type until: None or number
4100   @param until: Unix timestamp saying until when the watcher shouldn't run
4101
4102   """
4103   if until is None:
4104     logging.info("Received request to no longer pause watcher")
4105     utils.RemoveFile(_filename)
4106   else:
4107     logging.info("Received request to pause watcher until %s", until)
4108
4109     if not ht.TNumber(until):
4110       _Fail("Duration must be numeric")
4111
4112     utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4113
4114
4115 class HooksRunner(object):
4116   """Hook runner.
4117
4118   This class is instantiated on the node side (ganeti-noded) and not
4119   on the master side.
4120
4121   """
4122   def __init__(self, hooks_base_dir=None):
4123     """Constructor for hooks runner.
4124
4125     @type hooks_base_dir: str or None
4126     @param hooks_base_dir: if not None, this overrides the
4127         L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4128
4129     """
4130     if hooks_base_dir is None:
4131       hooks_base_dir = pathutils.HOOKS_BASE_DIR
4132     # yeah, _BASE_DIR is not valid for attributes, we use it like a
4133     # constant
4134     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4135
4136   def RunLocalHooks(self, node_list, hpath, phase, env):
4137     """Check that the hooks will be run only locally and then run them.
4138
4139     """
4140     assert len(node_list) == 1
4141     node = node_list[0]
4142     _, myself = ssconf.GetMasterAndMyself()
4143     assert node == myself
4144
4145     results = self.RunHooks(hpath, phase, env)
4146
4147     # Return values in the form expected by HooksMaster
4148     return {node: (None, False, results)}
4149
4150   def RunHooks(self, hpath, phase, env):
4151     """Run the scripts in the hooks directory.
4152
4153     @type hpath: str
4154     @param hpath: the path to the hooks directory which
4155         holds the scripts
4156     @type phase: str
4157     @param phase: either L{constants.HOOKS_PHASE_PRE} or
4158         L{constants.HOOKS_PHASE_POST}
4159     @type env: dict
4160     @param env: dictionary with the environment for the hook
4161     @rtype: list
4162     @return: list of 3-element tuples:
4163       - script path
4164       - script result, either L{constants.HKR_SUCCESS} or
4165         L{constants.HKR_FAIL}
4166       - output of the script
4167
4168     @raise errors.ProgrammerError: for invalid input
4169         parameters
4170
4171     """
4172     if phase == constants.HOOKS_PHASE_PRE:
4173       suffix = "pre"
4174     elif phase == constants.HOOKS_PHASE_POST:
4175       suffix = "post"
4176     else:
4177       _Fail("Unknown hooks phase '%s'", phase)
4178
4179     subdir = "%s-%s.d" % (hpath, suffix)
4180     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4181
4182     results = []
4183
4184     if not os.path.isdir(dir_name):
4185       # for non-existing/non-dirs, we simply exit instead of logging a
4186       # warning at every operation
4187       return results
4188
4189     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4190
4191     for (relname, relstatus, runresult) in runparts_results:
4192       if relstatus == constants.RUNPARTS_SKIP:
4193         rrval = constants.HKR_SKIP
4194         output = ""
4195       elif relstatus == constants.RUNPARTS_ERR:
4196         rrval = constants.HKR_FAIL
4197         output = "Hook script execution error: %s" % runresult
4198       elif relstatus == constants.RUNPARTS_RUN:
4199         if runresult.failed:
4200           rrval = constants.HKR_FAIL
4201         else:
4202           rrval = constants.HKR_SUCCESS
4203         output = utils.SafeEncode(runresult.output.strip())
4204       results.append(("%s/%s" % (subdir, relname), rrval, output))
4205
4206     return results
4207
4208
4209 class IAllocatorRunner(object):
4210   """IAllocator runner.
4211
4212   This class is instantiated on the node side (ganeti-noded) and not on
4213   the master side.
4214
4215   """
4216   @staticmethod
4217   def Run(name, idata):
4218     """Run an iallocator script.
4219
4220     @type name: str
4221     @param name: the iallocator script name
4222     @type idata: str
4223     @param idata: the allocator input data
4224
4225     @rtype: tuple
4226     @return: two element tuple of:
4227        - status
4228        - either error message or stdout of allocator (for success)
4229
4230     """
4231     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4232                                   os.path.isfile)
4233     if alloc_script is None:
4234       _Fail("iallocator module '%s' not found in the search path", name)
4235
4236     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4237     try:
4238       os.write(fd, idata)
4239       os.close(fd)
4240       result = utils.RunCmd([alloc_script, fin_name])
4241       if result.failed:
4242         _Fail("iallocator module '%s' failed: %s, output '%s'",
4243               name, result.fail_reason, result.output)
4244     finally:
4245       os.unlink(fin_name)
4246
4247     return result.stdout
4248
4249
4250 class DevCacheManager(object):
4251   """Simple class for managing a cache of block device information.
4252
4253   """
4254   _DEV_PREFIX = "/dev/"
4255   _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4256
4257   @classmethod
4258   def _ConvertPath(cls, dev_path):
4259     """Converts a /dev/name path to the cache file name.
4260
4261     This replaces slashes with underscores and strips the /dev
4262     prefix. It then returns the full path to the cache file.
4263
4264     @type dev_path: str
4265     @param dev_path: the C{/dev/} path name
4266     @rtype: str
4267     @return: the converted path name
4268
4269     """
4270     if dev_path.startswith(cls._DEV_PREFIX):
4271       dev_path = dev_path[len(cls._DEV_PREFIX):]
4272     dev_path = dev_path.replace("/", "_")
4273     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4274     return fpath
4275
4276   @classmethod
4277   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4278     """Updates the cache information for a given device.
4279
4280     @type dev_path: str
4281     @param dev_path: the pathname of the device
4282     @type owner: str
4283     @param owner: the owner (instance name) of the device
4284     @type on_primary: bool
4285     @param on_primary: whether this is the primary
4286         node nor not
4287     @type iv_name: str
4288     @param iv_name: the instance-visible name of the
4289         device, as in objects.Disk.iv_name
4290
4291     @rtype: None
4292
4293     """
4294     if dev_path is None:
4295       logging.error("DevCacheManager.UpdateCache got a None dev_path")
4296       return
4297     fpath = cls._ConvertPath(dev_path)
4298     if on_primary:
4299       state = "primary"
4300     else:
4301       state = "secondary"
4302     if iv_name is None:
4303       iv_name = "not_visible"
4304     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4305     try:
4306       utils.WriteFile(fpath, data=fdata)
4307     except EnvironmentError, err:
4308       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4309
4310   @classmethod
4311   def RemoveCache(cls, dev_path):
4312     """Remove data for a dev_path.
4313
4314     This is just a wrapper over L{utils.io.RemoveFile} with a converted
4315     path name and logging.
4316
4317     @type dev_path: str
4318     @param dev_path: the pathname of the device
4319
4320     @rtype: None
4321
4322     """
4323     if dev_path is None:
4324       logging.error("DevCacheManager.RemoveCache got a None dev_path")
4325       return
4326     fpath = cls._ConvertPath(dev_path)
4327     try:
4328       utils.RemoveFile(fpath)
4329     except EnvironmentError, err:
4330       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)