First part of confd timer changes
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
67 from ganeti import ht
68
69
70 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
71 _ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
72   pathutils.DATA_DIR,
73   pathutils.JOB_QUEUE_ARCHIVE_DIR,
74   pathutils.QUEUE_DIR,
75   pathutils.CRYPTO_KEYS_DIR,
76   ])
77 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
78 _X509_KEY_FILE = "key"
79 _X509_CERT_FILE = "cert"
80 _IES_STATUS_FILE = "status"
81 _IES_PID_FILE = "pid"
82 _IES_CA_FILE = "ca"
83
84 #: Valid LVS output line regex
85 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
86
87 # Actions for the master setup script
88 _MASTER_START = "start"
89 _MASTER_STOP = "stop"
90
91 #: Maximum file permissions for remote command directory and executables
92 _RCMD_MAX_MODE = (stat.S_IRWXU |
93                   stat.S_IRGRP | stat.S_IXGRP |
94                   stat.S_IROTH | stat.S_IXOTH)
95
96 #: Delay before returning an error for remote commands
97 _RCMD_INVALID_DELAY = 10
98
99 #: How long to wait to acquire lock for remote commands (shorter than
100 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
101 #: command requests arrive
102 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
103
104
105 class RPCFail(Exception):
106   """Class denoting RPC failure.
107
108   Its argument is the error message.
109
110   """
111
112
113 def _Fail(msg, *args, **kwargs):
114   """Log an error and the raise an RPCFail exception.
115
116   This exception is then handled specially in the ganeti daemon and
117   turned into a 'failed' return type. As such, this function is a
118   useful shortcut for logging the error and returning it to the master
119   daemon.
120
121   @type msg: string
122   @param msg: the text of the exception
123   @raise RPCFail
124
125   """
126   if args:
127     msg = msg % args
128   if "log" not in kwargs or kwargs["log"]: # if we should log this error
129     if "exc" in kwargs and kwargs["exc"]:
130       logging.exception(msg)
131     else:
132       logging.error(msg)
133   raise RPCFail(msg)
134
135
136 def _GetConfig():
137   """Simple wrapper to return a SimpleStore.
138
139   @rtype: L{ssconf.SimpleStore}
140   @return: a SimpleStore instance
141
142   """
143   return ssconf.SimpleStore()
144
145
146 def _GetSshRunner(cluster_name):
147   """Simple wrapper to return an SshRunner.
148
149   @type cluster_name: str
150   @param cluster_name: the cluster name, which is needed
151       by the SshRunner constructor
152   @rtype: L{ssh.SshRunner}
153   @return: an SshRunner instance
154
155   """
156   return ssh.SshRunner(cluster_name)
157
158
159 def _Decompress(data):
160   """Unpacks data compressed by the RPC client.
161
162   @type data: list or tuple
163   @param data: Data sent by RPC client
164   @rtype: str
165   @return: Decompressed data
166
167   """
168   assert isinstance(data, (list, tuple))
169   assert len(data) == 2
170   (encoding, content) = data
171   if encoding == constants.RPC_ENCODING_NONE:
172     return content
173   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
174     return zlib.decompress(base64.b64decode(content))
175   else:
176     raise AssertionError("Unknown data encoding")
177
178
179 def _CleanDirectory(path, exclude=None):
180   """Removes all regular files in a directory.
181
182   @type path: str
183   @param path: the directory to clean
184   @type exclude: list
185   @param exclude: list of files to be excluded, defaults
186       to the empty list
187
188   """
189   if path not in _ALLOWED_CLEAN_DIRS:
190     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
191           path)
192
193   if not os.path.isdir(path):
194     return
195   if exclude is None:
196     exclude = []
197   else:
198     # Normalize excluded paths
199     exclude = [os.path.normpath(i) for i in exclude]
200
201   for rel_name in utils.ListVisibleFiles(path):
202     full_name = utils.PathJoin(path, rel_name)
203     if full_name in exclude:
204       continue
205     if os.path.isfile(full_name) and not os.path.islink(full_name):
206       utils.RemoveFile(full_name)
207
208
209 def _BuildUploadFileList():
210   """Build the list of allowed upload files.
211
212   This is abstracted so that it's built only once at module import time.
213
214   """
215   allowed_files = set([
216     pathutils.CLUSTER_CONF_FILE,
217     pathutils.ETC_HOSTS,
218     pathutils.SSH_KNOWN_HOSTS_FILE,
219     pathutils.VNC_PASSWORD_FILE,
220     pathutils.RAPI_CERT_FILE,
221     pathutils.SPICE_CERT_FILE,
222     pathutils.SPICE_CACERT_FILE,
223     pathutils.RAPI_USERS_FILE,
224     pathutils.CONFD_HMAC_KEY,
225     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
226     ])
227
228   for hv_name in constants.HYPER_TYPES:
229     hv_class = hypervisor.GetHypervisorClass(hv_name)
230     allowed_files.update(hv_class.GetAncillaryFiles()[0])
231
232   assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
233     "Allowed file storage paths should never be uploaded via RPC"
234
235   return frozenset(allowed_files)
236
237
238 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
239
240
241 def JobQueuePurge():
242   """Removes job queue files and archived jobs.
243
244   @rtype: tuple
245   @return: True, None
246
247   """
248   _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
249   _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
250
251
252 def GetMasterInfo():
253   """Returns master information.
254
255   This is an utility function to compute master information, either
256   for consumption here or from the node daemon.
257
258   @rtype: tuple
259   @return: master_netdev, master_ip, master_name, primary_ip_family,
260     master_netmask
261   @raise RPCFail: in case of errors
262
263   """
264   try:
265     cfg = _GetConfig()
266     master_netdev = cfg.GetMasterNetdev()
267     master_ip = cfg.GetMasterIP()
268     master_netmask = cfg.GetMasterNetmask()
269     master_node = cfg.GetMasterNode()
270     primary_ip_family = cfg.GetPrimaryIPFamily()
271   except errors.ConfigurationError, err:
272     _Fail("Cluster configuration incomplete: %s", err, exc=True)
273   return (master_netdev, master_ip, master_node, primary_ip_family,
274           master_netmask)
275
276
277 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
278   """Decorator that runs hooks before and after the decorated function.
279
280   @type hook_opcode: string
281   @param hook_opcode: opcode of the hook
282   @type hooks_path: string
283   @param hooks_path: path of the hooks
284   @type env_builder_fn: function
285   @param env_builder_fn: function that returns a dictionary containing the
286     environment variables for the hooks. Will get all the parameters of the
287     decorated function.
288   @raise RPCFail: in case of pre-hook failure
289
290   """
291   def decorator(fn):
292     def wrapper(*args, **kwargs):
293       _, myself = ssconf.GetMasterAndMyself()
294       nodes = ([myself], [myself])  # these hooks run locally
295
296       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
297
298       cfg = _GetConfig()
299       hr = HooksRunner()
300       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
301                             None, env_fn, logging.warning, cfg.GetClusterName(),
302                             cfg.GetMasterNode())
303
304       hm.RunPhase(constants.HOOKS_PHASE_PRE)
305       result = fn(*args, **kwargs)
306       hm.RunPhase(constants.HOOKS_PHASE_POST)
307
308       return result
309     return wrapper
310   return decorator
311
312
313 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
314   """Builds environment variables for master IP hooks.
315
316   @type master_params: L{objects.MasterNetworkParameters}
317   @param master_params: network parameters of the master
318   @type use_external_mip_script: boolean
319   @param use_external_mip_script: whether to use an external master IP
320     address setup script (unused, but necessary per the implementation of the
321     _RunLocalHooks decorator)
322
323   """
324   # pylint: disable=W0613
325   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
326   env = {
327     "MASTER_NETDEV": master_params.netdev,
328     "MASTER_IP": master_params.ip,
329     "MASTER_NETMASK": str(master_params.netmask),
330     "CLUSTER_IP_VERSION": str(ver),
331   }
332
333   return env
334
335
336 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
337   """Execute the master IP address setup script.
338
339   @type master_params: L{objects.MasterNetworkParameters}
340   @param master_params: network parameters of the master
341   @type action: string
342   @param action: action to pass to the script. Must be one of
343     L{backend._MASTER_START} or L{backend._MASTER_STOP}
344   @type use_external_mip_script: boolean
345   @param use_external_mip_script: whether to use an external master IP
346     address setup script
347   @raise backend.RPCFail: if there are errors during the execution of the
348     script
349
350   """
351   env = _BuildMasterIpEnv(master_params)
352
353   if use_external_mip_script:
354     setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
355   else:
356     setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
357
358   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
359
360   if result.failed:
361     _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
362           (action, result.exit_code, result.output), log=True)
363
364
365 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
366                _BuildMasterIpEnv)
367 def ActivateMasterIp(master_params, use_external_mip_script):
368   """Activate the IP address of the master daemon.
369
370   @type master_params: L{objects.MasterNetworkParameters}
371   @param master_params: network parameters of the master
372   @type use_external_mip_script: boolean
373   @param use_external_mip_script: whether to use an external master IP
374     address setup script
375   @raise RPCFail: in case of errors during the IP startup
376
377   """
378   _RunMasterSetupScript(master_params, _MASTER_START,
379                         use_external_mip_script)
380
381
382 def StartMasterDaemons(no_voting):
383   """Activate local node as master node.
384
385   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
386
387   @type no_voting: boolean
388   @param no_voting: whether to start ganeti-masterd without a node vote
389       but still non-interactively
390   @rtype: None
391
392   """
393
394   if no_voting:
395     masterd_args = "--no-voting --yes-do-it"
396   else:
397     masterd_args = ""
398
399   env = {
400     "EXTRA_MASTERD_ARGS": masterd_args,
401     }
402
403   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
404   if result.failed:
405     msg = "Can't start Ganeti master: %s" % result.output
406     logging.error(msg)
407     _Fail(msg)
408
409
410 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
411                _BuildMasterIpEnv)
412 def DeactivateMasterIp(master_params, use_external_mip_script):
413   """Deactivate the master IP on this node.
414
415   @type master_params: L{objects.MasterNetworkParameters}
416   @param master_params: network parameters of the master
417   @type use_external_mip_script: boolean
418   @param use_external_mip_script: whether to use an external master IP
419     address setup script
420   @raise RPCFail: in case of errors during the IP turndown
421
422   """
423   _RunMasterSetupScript(master_params, _MASTER_STOP,
424                         use_external_mip_script)
425
426
427 def StopMasterDaemons():
428   """Stop the master daemons on this node.
429
430   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
431
432   @rtype: None
433
434   """
435   # TODO: log and report back to the caller the error failures; we
436   # need to decide in which case we fail the RPC for this
437
438   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
439   if result.failed:
440     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
441                   " and error %s",
442                   result.cmd, result.exit_code, result.output)
443
444
445 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
446   """Change the netmask of the master IP.
447
448   @param old_netmask: the old value of the netmask
449   @param netmask: the new value of the netmask
450   @param master_ip: the master IP
451   @param master_netdev: the master network device
452
453   """
454   if old_netmask == netmask:
455     return
456
457   if not netutils.IPAddress.Own(master_ip):
458     _Fail("The master IP address is not up, not attempting to change its"
459           " netmask")
460
461   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
462                          "%s/%s" % (master_ip, netmask),
463                          "dev", master_netdev, "label",
464                          "%s:0" % master_netdev])
465   if result.failed:
466     _Fail("Could not set the new netmask on the master IP address")
467
468   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
469                          "%s/%s" % (master_ip, old_netmask),
470                          "dev", master_netdev, "label",
471                          "%s:0" % master_netdev])
472   if result.failed:
473     _Fail("Could not bring down the master IP address with the old netmask")
474
475
476 def EtcHostsModify(mode, host, ip):
477   """Modify a host entry in /etc/hosts.
478
479   @param mode: The mode to operate. Either add or remove entry
480   @param host: The host to operate on
481   @param ip: The ip associated with the entry
482
483   """
484   if mode == constants.ETC_HOSTS_ADD:
485     if not ip:
486       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
487               " present")
488     utils.AddHostToEtcHosts(host, ip)
489   elif mode == constants.ETC_HOSTS_REMOVE:
490     if ip:
491       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
492               " parameter is present")
493     utils.RemoveHostFromEtcHosts(host)
494   else:
495     RPCFail("Mode not supported")
496
497
498 def LeaveCluster(modify_ssh_setup):
499   """Cleans up and remove the current node.
500
501   This function cleans up and prepares the current node to be removed
502   from the cluster.
503
504   If processing is successful, then it raises an
505   L{errors.QuitGanetiException} which is used as a special case to
506   shutdown the node daemon.
507
508   @param modify_ssh_setup: boolean
509
510   """
511   _CleanDirectory(pathutils.DATA_DIR)
512   _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
513   JobQueuePurge()
514
515   if modify_ssh_setup:
516     try:
517       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
518
519       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
520
521       utils.RemoveFile(priv_key)
522       utils.RemoveFile(pub_key)
523     except errors.OpExecError:
524       logging.exception("Error while processing ssh files")
525
526   try:
527     utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
528     utils.RemoveFile(pathutils.RAPI_CERT_FILE)
529     utils.RemoveFile(pathutils.SPICE_CERT_FILE)
530     utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
531     utils.RemoveFile(pathutils.NODED_CERT_FILE)
532   except: # pylint: disable=W0702
533     logging.exception("Error while removing cluster secrets")
534
535   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
536   if result.failed:
537     logging.error("Command %s failed with exitcode %s and error %s",
538                   result.cmd, result.exit_code, result.output)
539
540   # Raise a custom exception (handled in ganeti-noded)
541   raise errors.QuitGanetiException(True, "Shutdown scheduled")
542
543
544 def _GetVgInfo(name, excl_stor):
545   """Retrieves information about a LVM volume group.
546
547   """
548   # TODO: GetVGInfo supports returning information for multiple VGs at once
549   vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
550   if vginfo:
551     vg_free = int(round(vginfo[0][0], 0))
552     vg_size = int(round(vginfo[0][1], 0))
553   else:
554     vg_free = None
555     vg_size = None
556
557   return {
558     "name": name,
559     "vg_free": vg_free,
560     "vg_size": vg_size,
561     }
562
563
564 def _GetHvInfo(name):
565   """Retrieves node information from a hypervisor.
566
567   The information returned depends on the hypervisor. Common items:
568
569     - vg_size is the size of the configured volume group in MiB
570     - vg_free is the free size of the volume group in MiB
571     - memory_dom0 is the memory allocated for domain0 in MiB
572     - memory_free is the currently available (free) ram in MiB
573     - memory_total is the total number of ram in MiB
574     - hv_version: the hypervisor version, if available
575
576   """
577   return hypervisor.GetHypervisor(name).GetNodeInfo()
578
579
580 def _GetNamedNodeInfo(names, fn):
581   """Calls C{fn} for all names in C{names} and returns a dictionary.
582
583   @rtype: None or dict
584
585   """
586   if names is None:
587     return None
588   else:
589     return map(fn, names)
590
591
592 def GetNodeInfo(vg_names, hv_names, excl_stor):
593   """Gives back a hash with different information about the node.
594
595   @type vg_names: list of string
596   @param vg_names: Names of the volume groups to ask for disk space information
597   @type hv_names: list of string
598   @param hv_names: Names of the hypervisors to ask for node information
599   @type excl_stor: boolean
600   @param excl_stor: Whether exclusive_storage is active
601   @rtype: tuple; (string, None/dict, None/dict)
602   @return: Tuple containing boot ID, volume group information and hypervisor
603     information
604
605   """
606   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
607   vg_info = _GetNamedNodeInfo(vg_names, (lambda vg: _GetVgInfo(vg, excl_stor)))
608   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
609
610   return (bootid, vg_info, hv_info)
611
612
613 def _CheckExclusivePvs(pvi_list):
614   """Check that PVs are not shared among LVs
615
616   @type pvi_list: list of L{objects.LvmPvInfo} objects
617   @param pvi_list: information about the PVs
618
619   @rtype: list of tuples (string, list of strings)
620   @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
621
622   """
623   res = []
624   for pvi in pvi_list:
625     if len(pvi.lv_list) > 1:
626       res.append((pvi.name, pvi.lv_list))
627   return res
628
629
630 def VerifyNode(what, cluster_name):
631   """Verify the status of the local node.
632
633   Based on the input L{what} parameter, various checks are done on the
634   local node.
635
636   If the I{filelist} key is present, this list of
637   files is checksummed and the file/checksum pairs are returned.
638
639   If the I{nodelist} key is present, we check that we have
640   connectivity via ssh with the target nodes (and check the hostname
641   report).
642
643   If the I{node-net-test} key is present, we check that we have
644   connectivity to the given nodes via both primary IP and, if
645   applicable, secondary IPs.
646
647   @type what: C{dict}
648   @param what: a dictionary of things to check:
649       - filelist: list of files for which to compute checksums
650       - nodelist: list of nodes we should check ssh communication with
651       - node-net-test: list of nodes we should check node daemon port
652         connectivity with
653       - hypervisor: list with hypervisors to run the verify for
654   @rtype: dict
655   @return: a dictionary with the same keys as the input dict, and
656       values representing the result of the checks
657
658   """
659   result = {}
660   my_name = netutils.Hostname.GetSysName()
661   port = netutils.GetDaemonPort(constants.NODED)
662   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
663
664   if constants.NV_HYPERVISOR in what and vm_capable:
665     result[constants.NV_HYPERVISOR] = tmp = {}
666     for hv_name in what[constants.NV_HYPERVISOR]:
667       try:
668         val = hypervisor.GetHypervisor(hv_name).Verify()
669       except errors.HypervisorError, err:
670         val = "Error while checking hypervisor: %s" % str(err)
671       tmp[hv_name] = val
672
673   if constants.NV_HVPARAMS in what and vm_capable:
674     result[constants.NV_HVPARAMS] = tmp = []
675     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
676       try:
677         logging.info("Validating hv %s, %s", hv_name, hvparms)
678         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
679       except errors.HypervisorError, err:
680         tmp.append((source, hv_name, str(err)))
681
682   if constants.NV_FILELIST in what:
683     fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
684                                               what[constants.NV_FILELIST]))
685     result[constants.NV_FILELIST] = \
686       dict((vcluster.MakeVirtualPath(key), value)
687            for (key, value) in fingerprints.items())
688
689   if constants.NV_NODELIST in what:
690     (nodes, bynode) = what[constants.NV_NODELIST]
691
692     # Add nodes from other groups (different for each node)
693     try:
694       nodes.extend(bynode[my_name])
695     except KeyError:
696       pass
697
698     # Use a random order
699     random.shuffle(nodes)
700
701     # Try to contact all nodes
702     val = {}
703     for node in nodes:
704       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
705       if not success:
706         val[node] = message
707
708     result[constants.NV_NODELIST] = val
709
710   if constants.NV_NODENETTEST in what:
711     result[constants.NV_NODENETTEST] = tmp = {}
712     my_pip = my_sip = None
713     for name, pip, sip in what[constants.NV_NODENETTEST]:
714       if name == my_name:
715         my_pip = pip
716         my_sip = sip
717         break
718     if not my_pip:
719       tmp[my_name] = ("Can't find my own primary/secondary IP"
720                       " in the node list")
721     else:
722       for name, pip, sip in what[constants.NV_NODENETTEST]:
723         fail = []
724         if not netutils.TcpPing(pip, port, source=my_pip):
725           fail.append("primary")
726         if sip != pip:
727           if not netutils.TcpPing(sip, port, source=my_sip):
728             fail.append("secondary")
729         if fail:
730           tmp[name] = ("failure using the %s interface(s)" %
731                        " and ".join(fail))
732
733   if constants.NV_MASTERIP in what:
734     # FIXME: add checks on incoming data structures (here and in the
735     # rest of the function)
736     master_name, master_ip = what[constants.NV_MASTERIP]
737     if master_name == my_name:
738       source = constants.IP4_ADDRESS_LOCALHOST
739     else:
740       source = None
741     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
742                                                      source=source)
743
744   if constants.NV_USERSCRIPTS in what:
745     result[constants.NV_USERSCRIPTS] = \
746       [script for script in what[constants.NV_USERSCRIPTS]
747        if not utils.IsExecutable(script)]
748
749   if constants.NV_OOB_PATHS in what:
750     result[constants.NV_OOB_PATHS] = tmp = []
751     for path in what[constants.NV_OOB_PATHS]:
752       try:
753         st = os.stat(path)
754       except OSError, err:
755         tmp.append("error stating out of band helper: %s" % err)
756       else:
757         if stat.S_ISREG(st.st_mode):
758           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
759             tmp.append(None)
760           else:
761             tmp.append("out of band helper %s is not executable" % path)
762         else:
763           tmp.append("out of band helper %s is not a file" % path)
764
765   if constants.NV_LVLIST in what and vm_capable:
766     try:
767       val = GetVolumeList(utils.ListVolumeGroups().keys())
768     except RPCFail, err:
769       val = str(err)
770     result[constants.NV_LVLIST] = val
771
772   if constants.NV_INSTANCELIST in what and vm_capable:
773     # GetInstanceList can fail
774     try:
775       val = GetInstanceList(what[constants.NV_INSTANCELIST])
776     except RPCFail, err:
777       val = str(err)
778     result[constants.NV_INSTANCELIST] = val
779
780   if constants.NV_VGLIST in what and vm_capable:
781     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
782
783   if constants.NV_PVLIST in what and vm_capable:
784     check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
785     val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
786                                        filter_allocatable=False,
787                                        include_lvs=check_exclusive_pvs)
788     if check_exclusive_pvs:
789       result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
790       for pvi in val:
791         # Avoid sending useless data on the wire
792         pvi.lv_list = []
793     result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
794
795   if constants.NV_VERSION in what:
796     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
797                                     constants.RELEASE_VERSION)
798
799   if constants.NV_HVINFO in what and vm_capable:
800     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
801     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
802
803   if constants.NV_DRBDLIST in what and vm_capable:
804     try:
805       used_minors = bdev.DRBD8.GetUsedDevs().keys()
806     except errors.BlockDeviceError, err:
807       logging.warning("Can't get used minors list", exc_info=True)
808       used_minors = str(err)
809     result[constants.NV_DRBDLIST] = used_minors
810
811   if constants.NV_DRBDHELPER in what and vm_capable:
812     status = True
813     try:
814       payload = bdev.BaseDRBD.GetUsermodeHelper()
815     except errors.BlockDeviceError, err:
816       logging.error("Can't get DRBD usermode helper: %s", str(err))
817       status = False
818       payload = str(err)
819     result[constants.NV_DRBDHELPER] = (status, payload)
820
821   if constants.NV_NODESETUP in what:
822     result[constants.NV_NODESETUP] = tmpr = []
823     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
824       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
825                   " under /sys, missing required directories /sys/block"
826                   " and /sys/class/net")
827     if (not os.path.isdir("/proc/sys") or
828         not os.path.isfile("/proc/sysrq-trigger")):
829       tmpr.append("The procfs filesystem doesn't seem to be mounted"
830                   " under /proc, missing required directory /proc/sys and"
831                   " the file /proc/sysrq-trigger")
832
833   if constants.NV_TIME in what:
834     result[constants.NV_TIME] = utils.SplitTime(time.time())
835
836   if constants.NV_OSLIST in what and vm_capable:
837     result[constants.NV_OSLIST] = DiagnoseOS()
838
839   if constants.NV_BRIDGES in what and vm_capable:
840     result[constants.NV_BRIDGES] = [bridge
841                                     for bridge in what[constants.NV_BRIDGES]
842                                     if not utils.BridgeExists(bridge)]
843
844   if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
845     result[constants.NV_FILE_STORAGE_PATHS] = \
846       bdev.ComputeWrongFileStoragePaths()
847
848   return result
849
850
851 def GetBlockDevSizes(devices):
852   """Return the size of the given block devices
853
854   @type devices: list
855   @param devices: list of block device nodes to query
856   @rtype: dict
857   @return:
858     dictionary of all block devices under /dev (key). The value is their
859     size in MiB.
860
861     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
862
863   """
864   DEV_PREFIX = "/dev/"
865   blockdevs = {}
866
867   for devpath in devices:
868     if not utils.IsBelowDir(DEV_PREFIX, devpath):
869       continue
870
871     try:
872       st = os.stat(devpath)
873     except EnvironmentError, err:
874       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
875       continue
876
877     if stat.S_ISBLK(st.st_mode):
878       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
879       if result.failed:
880         # We don't want to fail, just do not list this device as available
881         logging.warning("Cannot get size for block device %s", devpath)
882         continue
883
884       size = int(result.stdout) / (1024 * 1024)
885       blockdevs[devpath] = size
886   return blockdevs
887
888
889 def GetVolumeList(vg_names):
890   """Compute list of logical volumes and their size.
891
892   @type vg_names: list
893   @param vg_names: the volume groups whose LVs we should list, or
894       empty for all volume groups
895   @rtype: dict
896   @return:
897       dictionary of all partions (key) with value being a tuple of
898       their size (in MiB), inactive and online status::
899
900         {'xenvg/test1': ('20.06', True, True)}
901
902       in case of errors, a string is returned with the error
903       details.
904
905   """
906   lvs = {}
907   sep = "|"
908   if not vg_names:
909     vg_names = []
910   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
911                          "--separator=%s" % sep,
912                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
913   if result.failed:
914     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
915
916   for line in result.stdout.splitlines():
917     line = line.strip()
918     match = _LVSLINE_REGEX.match(line)
919     if not match:
920       logging.error("Invalid line returned from lvs output: '%s'", line)
921       continue
922     vg_name, name, size, attr = match.groups()
923     inactive = attr[4] == "-"
924     online = attr[5] == "o"
925     virtual = attr[0] == "v"
926     if virtual:
927       # we don't want to report such volumes as existing, since they
928       # don't really hold data
929       continue
930     lvs[vg_name + "/" + name] = (size, inactive, online)
931
932   return lvs
933
934
935 def ListVolumeGroups():
936   """List the volume groups and their size.
937
938   @rtype: dict
939   @return: dictionary with keys volume name and values the
940       size of the volume
941
942   """
943   return utils.ListVolumeGroups()
944
945
946 def NodeVolumes():
947   """List all volumes on this node.
948
949   @rtype: list
950   @return:
951     A list of dictionaries, each having four keys:
952       - name: the logical volume name,
953       - size: the size of the logical volume
954       - dev: the physical device on which the LV lives
955       - vg: the volume group to which it belongs
956
957     In case of errors, we return an empty list and log the
958     error.
959
960     Note that since a logical volume can live on multiple physical
961     volumes, the resulting list might include a logical volume
962     multiple times.
963
964   """
965   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
966                          "--separator=|",
967                          "--options=lv_name,lv_size,devices,vg_name"])
968   if result.failed:
969     _Fail("Failed to list logical volumes, lvs output: %s",
970           result.output)
971
972   def parse_dev(dev):
973     return dev.split("(")[0]
974
975   def handle_dev(dev):
976     return [parse_dev(x) for x in dev.split(",")]
977
978   def map_line(line):
979     line = [v.strip() for v in line]
980     return [{"name": line[0], "size": line[1],
981              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
982
983   all_devs = []
984   for line in result.stdout.splitlines():
985     if line.count("|") >= 3:
986       all_devs.extend(map_line(line.split("|")))
987     else:
988       logging.warning("Strange line in the output from lvs: '%s'", line)
989   return all_devs
990
991
992 def BridgesExist(bridges_list):
993   """Check if a list of bridges exist on the current node.
994
995   @rtype: boolean
996   @return: C{True} if all of them exist, C{False} otherwise
997
998   """
999   missing = []
1000   for bridge in bridges_list:
1001     if not utils.BridgeExists(bridge):
1002       missing.append(bridge)
1003
1004   if missing:
1005     _Fail("Missing bridges %s", utils.CommaJoin(missing))
1006
1007
1008 def GetInstanceList(hypervisor_list):
1009   """Provides a list of instances.
1010
1011   @type hypervisor_list: list
1012   @param hypervisor_list: the list of hypervisors to query information
1013
1014   @rtype: list
1015   @return: a list of all running instances on the current node
1016     - instance1.example.com
1017     - instance2.example.com
1018
1019   """
1020   results = []
1021   for hname in hypervisor_list:
1022     try:
1023       names = hypervisor.GetHypervisor(hname).ListInstances()
1024       results.extend(names)
1025     except errors.HypervisorError, err:
1026       _Fail("Error enumerating instances (hypervisor %s): %s",
1027             hname, err, exc=True)
1028
1029   return results
1030
1031
1032 def GetInstanceInfo(instance, hname):
1033   """Gives back the information about an instance as a dictionary.
1034
1035   @type instance: string
1036   @param instance: the instance name
1037   @type hname: string
1038   @param hname: the hypervisor type of the instance
1039
1040   @rtype: dict
1041   @return: dictionary with the following keys:
1042       - memory: memory size of instance (int)
1043       - state: xen state of instance (string)
1044       - time: cpu time of instance (float)
1045       - vcpus: the number of vcpus (int)
1046
1047   """
1048   output = {}
1049
1050   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1051   if iinfo is not None:
1052     output["memory"] = iinfo[2]
1053     output["vcpus"] = iinfo[3]
1054     output["state"] = iinfo[4]
1055     output["time"] = iinfo[5]
1056
1057   return output
1058
1059
1060 def GetInstanceMigratable(instance):
1061   """Gives whether an instance can be migrated.
1062
1063   @type instance: L{objects.Instance}
1064   @param instance: object representing the instance to be checked.
1065
1066   @rtype: tuple
1067   @return: tuple of (result, description) where:
1068       - result: whether the instance can be migrated or not
1069       - description: a description of the issue, if relevant
1070
1071   """
1072   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1073   iname = instance.name
1074   if iname not in hyper.ListInstances():
1075     _Fail("Instance %s is not running", iname)
1076
1077   for idx in range(len(instance.disks)):
1078     link_name = _GetBlockDevSymlinkPath(iname, idx)
1079     if not os.path.islink(link_name):
1080       logging.warning("Instance %s is missing symlink %s for disk %d",
1081                       iname, link_name, idx)
1082
1083
1084 def GetAllInstancesInfo(hypervisor_list):
1085   """Gather data about all instances.
1086
1087   This is the equivalent of L{GetInstanceInfo}, except that it
1088   computes data for all instances at once, thus being faster if one
1089   needs data about more than one instance.
1090
1091   @type hypervisor_list: list
1092   @param hypervisor_list: list of hypervisors to query for instance data
1093
1094   @rtype: dict
1095   @return: dictionary of instance: data, with data having the following keys:
1096       - memory: memory size of instance (int)
1097       - state: xen state of instance (string)
1098       - time: cpu time of instance (float)
1099       - vcpus: the number of vcpus
1100
1101   """
1102   output = {}
1103
1104   for hname in hypervisor_list:
1105     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1106     if iinfo:
1107       for name, _, memory, vcpus, state, times in iinfo:
1108         value = {
1109           "memory": memory,
1110           "vcpus": vcpus,
1111           "state": state,
1112           "time": times,
1113           }
1114         if name in output:
1115           # we only check static parameters, like memory and vcpus,
1116           # and not state and time which can change between the
1117           # invocations of the different hypervisors
1118           for key in "memory", "vcpus":
1119             if value[key] != output[name][key]:
1120               _Fail("Instance %s is running twice"
1121                     " with different parameters", name)
1122         output[name] = value
1123
1124   return output
1125
1126
1127 def _InstanceLogName(kind, os_name, instance, component):
1128   """Compute the OS log filename for a given instance and operation.
1129
1130   The instance name and os name are passed in as strings since not all
1131   operations have these as part of an instance object.
1132
1133   @type kind: string
1134   @param kind: the operation type (e.g. add, import, etc.)
1135   @type os_name: string
1136   @param os_name: the os name
1137   @type instance: string
1138   @param instance: the name of the instance being imported/added/etc.
1139   @type component: string or None
1140   @param component: the name of the component of the instance being
1141       transferred
1142
1143   """
1144   # TODO: Use tempfile.mkstemp to create unique filename
1145   if component:
1146     assert "/" not in component
1147     c_msg = "-%s" % component
1148   else:
1149     c_msg = ""
1150   base = ("%s-%s-%s%s-%s.log" %
1151           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1152   return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1153
1154
1155 def InstanceOsAdd(instance, reinstall, debug):
1156   """Add an OS to an instance.
1157
1158   @type instance: L{objects.Instance}
1159   @param instance: Instance whose OS is to be installed
1160   @type reinstall: boolean
1161   @param reinstall: whether this is an instance reinstall
1162   @type debug: integer
1163   @param debug: debug level, passed to the OS scripts
1164   @rtype: None
1165
1166   """
1167   inst_os = OSFromDisk(instance.os)
1168
1169   create_env = OSEnvironment(instance, inst_os, debug)
1170   if reinstall:
1171     create_env["INSTANCE_REINSTALL"] = "1"
1172
1173   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1174
1175   result = utils.RunCmd([inst_os.create_script], env=create_env,
1176                         cwd=inst_os.path, output=logfile, reset_env=True)
1177   if result.failed:
1178     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1179                   " output: %s", result.cmd, result.fail_reason, logfile,
1180                   result.output)
1181     lines = [utils.SafeEncode(val)
1182              for val in utils.TailFile(logfile, lines=20)]
1183     _Fail("OS create script failed (%s), last lines in the"
1184           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1185
1186
1187 def RunRenameInstance(instance, old_name, debug):
1188   """Run the OS rename script for an instance.
1189
1190   @type instance: L{objects.Instance}
1191   @param instance: Instance whose OS is to be installed
1192   @type old_name: string
1193   @param old_name: previous instance name
1194   @type debug: integer
1195   @param debug: debug level, passed to the OS scripts
1196   @rtype: boolean
1197   @return: the success of the operation
1198
1199   """
1200   inst_os = OSFromDisk(instance.os)
1201
1202   rename_env = OSEnvironment(instance, inst_os, debug)
1203   rename_env["OLD_INSTANCE_NAME"] = old_name
1204
1205   logfile = _InstanceLogName("rename", instance.os,
1206                              "%s-%s" % (old_name, instance.name), None)
1207
1208   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1209                         cwd=inst_os.path, output=logfile, reset_env=True)
1210
1211   if result.failed:
1212     logging.error("os create command '%s' returned error: %s output: %s",
1213                   result.cmd, result.fail_reason, result.output)
1214     lines = [utils.SafeEncode(val)
1215              for val in utils.TailFile(logfile, lines=20)]
1216     _Fail("OS rename script failed (%s), last lines in the"
1217           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1218
1219
1220 def _GetBlockDevSymlinkPath(instance_name, idx):
1221   return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1222                         (instance_name, constants.DISK_SEPARATOR, idx))
1223
1224
1225 def _SymlinkBlockDev(instance_name, device_path, idx):
1226   """Set up symlinks to a instance's block device.
1227
1228   This is an auxiliary function run when an instance is start (on the primary
1229   node) or when an instance is migrated (on the target node).
1230
1231
1232   @param instance_name: the name of the target instance
1233   @param device_path: path of the physical block device, on the node
1234   @param idx: the disk index
1235   @return: absolute path to the disk's symlink
1236
1237   """
1238   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1239   try:
1240     os.symlink(device_path, link_name)
1241   except OSError, err:
1242     if err.errno == errno.EEXIST:
1243       if (not os.path.islink(link_name) or
1244           os.readlink(link_name) != device_path):
1245         os.remove(link_name)
1246         os.symlink(device_path, link_name)
1247     else:
1248       raise
1249
1250   return link_name
1251
1252
1253 def _RemoveBlockDevLinks(instance_name, disks):
1254   """Remove the block device symlinks belonging to the given instance.
1255
1256   """
1257   for idx, _ in enumerate(disks):
1258     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1259     if os.path.islink(link_name):
1260       try:
1261         os.remove(link_name)
1262       except OSError:
1263         logging.exception("Can't remove symlink '%s'", link_name)
1264
1265
1266 def _GatherAndLinkBlockDevs(instance):
1267   """Set up an instance's block device(s).
1268
1269   This is run on the primary node at instance startup. The block
1270   devices must be already assembled.
1271
1272   @type instance: L{objects.Instance}
1273   @param instance: the instance whose disks we shoul assemble
1274   @rtype: list
1275   @return: list of (disk_object, device_path)
1276
1277   """
1278   block_devices = []
1279   for idx, disk in enumerate(instance.disks):
1280     device = _RecursiveFindBD(disk)
1281     if device is None:
1282       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1283                                     str(disk))
1284     device.Open()
1285     try:
1286       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1287     except OSError, e:
1288       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1289                                     e.strerror)
1290
1291     block_devices.append((disk, link_name))
1292
1293   return block_devices
1294
1295
1296 def StartInstance(instance, startup_paused):
1297   """Start an instance.
1298
1299   @type instance: L{objects.Instance}
1300   @param instance: the instance object
1301   @type startup_paused: bool
1302   @param instance: pause instance at startup?
1303   @rtype: None
1304
1305   """
1306   running_instances = GetInstanceList([instance.hypervisor])
1307
1308   if instance.name in running_instances:
1309     logging.info("Instance %s already running, not starting", instance.name)
1310     return
1311
1312   try:
1313     block_devices = _GatherAndLinkBlockDevs(instance)
1314     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1315     hyper.StartInstance(instance, block_devices, startup_paused)
1316   except errors.BlockDeviceError, err:
1317     _Fail("Block device error: %s", err, exc=True)
1318   except errors.HypervisorError, err:
1319     _RemoveBlockDevLinks(instance.name, instance.disks)
1320     _Fail("Hypervisor error: %s", err, exc=True)
1321
1322
1323 def InstanceShutdown(instance, timeout):
1324   """Shut an instance down.
1325
1326   @note: this functions uses polling with a hardcoded timeout.
1327
1328   @type instance: L{objects.Instance}
1329   @param instance: the instance object
1330   @type timeout: integer
1331   @param timeout: maximum timeout for soft shutdown
1332   @rtype: None
1333
1334   """
1335   hv_name = instance.hypervisor
1336   hyper = hypervisor.GetHypervisor(hv_name)
1337   iname = instance.name
1338
1339   if instance.name not in hyper.ListInstances():
1340     logging.info("Instance %s not running, doing nothing", iname)
1341     return
1342
1343   class _TryShutdown:
1344     def __init__(self):
1345       self.tried_once = False
1346
1347     def __call__(self):
1348       if iname not in hyper.ListInstances():
1349         return
1350
1351       try:
1352         hyper.StopInstance(instance, retry=self.tried_once)
1353       except errors.HypervisorError, err:
1354         if iname not in hyper.ListInstances():
1355           # if the instance is no longer existing, consider this a
1356           # success and go to cleanup
1357           return
1358
1359         _Fail("Failed to stop instance %s: %s", iname, err)
1360
1361       self.tried_once = True
1362
1363       raise utils.RetryAgain()
1364
1365   try:
1366     utils.Retry(_TryShutdown(), 5, timeout)
1367   except utils.RetryTimeout:
1368     # the shutdown did not succeed
1369     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1370
1371     try:
1372       hyper.StopInstance(instance, force=True)
1373     except errors.HypervisorError, err:
1374       if iname in hyper.ListInstances():
1375         # only raise an error if the instance still exists, otherwise
1376         # the error could simply be "instance ... unknown"!
1377         _Fail("Failed to force stop instance %s: %s", iname, err)
1378
1379     time.sleep(1)
1380
1381     if iname in hyper.ListInstances():
1382       _Fail("Could not shutdown instance %s even by destroy", iname)
1383
1384   try:
1385     hyper.CleanupInstance(instance.name)
1386   except errors.HypervisorError, err:
1387     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1388
1389   _RemoveBlockDevLinks(iname, instance.disks)
1390
1391
1392 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1393   """Reboot an instance.
1394
1395   @type instance: L{objects.Instance}
1396   @param instance: the instance object to reboot
1397   @type reboot_type: str
1398   @param reboot_type: the type of reboot, one the following
1399     constants:
1400       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1401         instance OS, do not recreate the VM
1402       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1403         restart the VM (at the hypervisor level)
1404       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1405         not accepted here, since that mode is handled differently, in
1406         cmdlib, and translates into full stop and start of the
1407         instance (instead of a call_instance_reboot RPC)
1408   @type shutdown_timeout: integer
1409   @param shutdown_timeout: maximum timeout for soft shutdown
1410   @rtype: None
1411
1412   """
1413   running_instances = GetInstanceList([instance.hypervisor])
1414
1415   if instance.name not in running_instances:
1416     _Fail("Cannot reboot instance %s that is not running", instance.name)
1417
1418   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1419   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1420     try:
1421       hyper.RebootInstance(instance)
1422     except errors.HypervisorError, err:
1423       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1424   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1425     try:
1426       InstanceShutdown(instance, shutdown_timeout)
1427       return StartInstance(instance, False)
1428     except errors.HypervisorError, err:
1429       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1430   else:
1431     _Fail("Invalid reboot_type received: %s", reboot_type)
1432
1433
1434 def InstanceBalloonMemory(instance, memory):
1435   """Resize an instance's memory.
1436
1437   @type instance: L{objects.Instance}
1438   @param instance: the instance object
1439   @type memory: int
1440   @param memory: new memory amount in MB
1441   @rtype: None
1442
1443   """
1444   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1445   running = hyper.ListInstances()
1446   if instance.name not in running:
1447     logging.info("Instance %s is not running, cannot balloon", instance.name)
1448     return
1449   try:
1450     hyper.BalloonInstanceMemory(instance, memory)
1451   except errors.HypervisorError, err:
1452     _Fail("Failed to balloon instance memory: %s", err, exc=True)
1453
1454
1455 def MigrationInfo(instance):
1456   """Gather information about an instance to be migrated.
1457
1458   @type instance: L{objects.Instance}
1459   @param instance: the instance definition
1460
1461   """
1462   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1463   try:
1464     info = hyper.MigrationInfo(instance)
1465   except errors.HypervisorError, err:
1466     _Fail("Failed to fetch migration information: %s", err, exc=True)
1467   return info
1468
1469
1470 def AcceptInstance(instance, info, target):
1471   """Prepare the node to accept an instance.
1472
1473   @type instance: L{objects.Instance}
1474   @param instance: the instance definition
1475   @type info: string/data (opaque)
1476   @param info: migration information, from the source node
1477   @type target: string
1478   @param target: target host (usually ip), on this node
1479
1480   """
1481   # TODO: why is this required only for DTS_EXT_MIRROR?
1482   if instance.disk_template in constants.DTS_EXT_MIRROR:
1483     # Create the symlinks, as the disks are not active
1484     # in any way
1485     try:
1486       _GatherAndLinkBlockDevs(instance)
1487     except errors.BlockDeviceError, err:
1488       _Fail("Block device error: %s", err, exc=True)
1489
1490   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1491   try:
1492     hyper.AcceptInstance(instance, info, target)
1493   except errors.HypervisorError, err:
1494     if instance.disk_template in constants.DTS_EXT_MIRROR:
1495       _RemoveBlockDevLinks(instance.name, instance.disks)
1496     _Fail("Failed to accept instance: %s", err, exc=True)
1497
1498
1499 def FinalizeMigrationDst(instance, info, success):
1500   """Finalize any preparation to accept an instance.
1501
1502   @type instance: L{objects.Instance}
1503   @param instance: the instance definition
1504   @type info: string/data (opaque)
1505   @param info: migration information, from the source node
1506   @type success: boolean
1507   @param success: whether the migration was a success or a failure
1508
1509   """
1510   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1511   try:
1512     hyper.FinalizeMigrationDst(instance, info, success)
1513   except errors.HypervisorError, err:
1514     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1515
1516
1517 def MigrateInstance(instance, target, live):
1518   """Migrates an instance to another node.
1519
1520   @type instance: L{objects.Instance}
1521   @param instance: the instance definition
1522   @type target: string
1523   @param target: the target node name
1524   @type live: boolean
1525   @param live: whether the migration should be done live or not (the
1526       interpretation of this parameter is left to the hypervisor)
1527   @raise RPCFail: if migration fails for some reason
1528
1529   """
1530   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1531
1532   try:
1533     hyper.MigrateInstance(instance, target, live)
1534   except errors.HypervisorError, err:
1535     _Fail("Failed to migrate instance: %s", err, exc=True)
1536
1537
1538 def FinalizeMigrationSource(instance, success, live):
1539   """Finalize the instance migration on the source node.
1540
1541   @type instance: L{objects.Instance}
1542   @param instance: the instance definition of the migrated instance
1543   @type success: bool
1544   @param success: whether the migration succeeded or not
1545   @type live: bool
1546   @param live: whether the user requested a live migration or not
1547   @raise RPCFail: If the execution fails for some reason
1548
1549   """
1550   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1551
1552   try:
1553     hyper.FinalizeMigrationSource(instance, success, live)
1554   except Exception, err:  # pylint: disable=W0703
1555     _Fail("Failed to finalize the migration on the source node: %s", err,
1556           exc=True)
1557
1558
1559 def GetMigrationStatus(instance):
1560   """Get the migration status
1561
1562   @type instance: L{objects.Instance}
1563   @param instance: the instance that is being migrated
1564   @rtype: L{objects.MigrationStatus}
1565   @return: the status of the current migration (one of
1566            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1567            progress info that can be retrieved from the hypervisor
1568   @raise RPCFail: If the migration status cannot be retrieved
1569
1570   """
1571   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1572   try:
1573     return hyper.GetMigrationStatus(instance)
1574   except Exception, err:  # pylint: disable=W0703
1575     _Fail("Failed to get migration status: %s", err, exc=True)
1576
1577
1578 def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1579   """Creates a block device for an instance.
1580
1581   @type disk: L{objects.Disk}
1582   @param disk: the object describing the disk we should create
1583   @type size: int
1584   @param size: the size of the physical underlying device, in MiB
1585   @type owner: str
1586   @param owner: the name of the instance for which disk is created,
1587       used for device cache data
1588   @type on_primary: boolean
1589   @param on_primary:  indicates if it is the primary node or not
1590   @type info: string
1591   @param info: string that will be sent to the physical device
1592       creation, used for example to set (LVM) tags on LVs
1593   @type excl_stor: boolean
1594   @param excl_stor: Whether exclusive_storage is active
1595
1596   @return: the new unique_id of the device (this can sometime be
1597       computed only after creation), or None. On secondary nodes,
1598       it's not required to return anything.
1599
1600   """
1601   # TODO: remove the obsolete "size" argument
1602   # pylint: disable=W0613
1603   clist = []
1604   if disk.children:
1605     for child in disk.children:
1606       try:
1607         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1608       except errors.BlockDeviceError, err:
1609         _Fail("Can't assemble device %s: %s", child, err)
1610       if on_primary or disk.AssembleOnSecondary():
1611         # we need the children open in case the device itself has to
1612         # be assembled
1613         try:
1614           # pylint: disable=E1103
1615           crdev.Open()
1616         except errors.BlockDeviceError, err:
1617           _Fail("Can't make child '%s' read-write: %s", child, err)
1618       clist.append(crdev)
1619
1620   try:
1621     device = bdev.Create(disk, clist, excl_stor)
1622   except errors.BlockDeviceError, err:
1623     _Fail("Can't create block device: %s", err)
1624
1625   if on_primary or disk.AssembleOnSecondary():
1626     try:
1627       device.Assemble()
1628     except errors.BlockDeviceError, err:
1629       _Fail("Can't assemble device after creation, unusual event: %s", err)
1630     if on_primary or disk.OpenOnSecondary():
1631       try:
1632         device.Open(force=True)
1633       except errors.BlockDeviceError, err:
1634         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1635     DevCacheManager.UpdateCache(device.dev_path, owner,
1636                                 on_primary, disk.iv_name)
1637
1638   device.SetInfo(info)
1639
1640   return device.unique_id
1641
1642
1643 def _WipeDevice(path, offset, size):
1644   """This function actually wipes the device.
1645
1646   @param path: The path to the device to wipe
1647   @param offset: The offset in MiB in the file
1648   @param size: The size in MiB to write
1649
1650   """
1651   # Internal sizes are always in Mebibytes; if the following "dd" command
1652   # should use a different block size the offset and size given to this
1653   # function must be adjusted accordingly before being passed to "dd".
1654   block_size = 1024 * 1024
1655
1656   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1657          "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1658          "count=%d" % size]
1659   result = utils.RunCmd(cmd)
1660
1661   if result.failed:
1662     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1663           result.fail_reason, result.output)
1664
1665
1666 def BlockdevWipe(disk, offset, size):
1667   """Wipes a block device.
1668
1669   @type disk: L{objects.Disk}
1670   @param disk: the disk object we want to wipe
1671   @type offset: int
1672   @param offset: The offset in MiB in the file
1673   @type size: int
1674   @param size: The size in MiB to write
1675
1676   """
1677   try:
1678     rdev = _RecursiveFindBD(disk)
1679   except errors.BlockDeviceError:
1680     rdev = None
1681
1682   if not rdev:
1683     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1684
1685   # Do cross verify some of the parameters
1686   if offset < 0:
1687     _Fail("Negative offset")
1688   if size < 0:
1689     _Fail("Negative size")
1690   if offset > rdev.size:
1691     _Fail("Offset is bigger than device size")
1692   if (offset + size) > rdev.size:
1693     _Fail("The provided offset and size to wipe is bigger than device size")
1694
1695   _WipeDevice(rdev.dev_path, offset, size)
1696
1697
1698 def BlockdevPauseResumeSync(disks, pause):
1699   """Pause or resume the sync of the block device.
1700
1701   @type disks: list of L{objects.Disk}
1702   @param disks: the disks object we want to pause/resume
1703   @type pause: bool
1704   @param pause: Wheater to pause or resume
1705
1706   """
1707   success = []
1708   for disk in disks:
1709     try:
1710       rdev = _RecursiveFindBD(disk)
1711     except errors.BlockDeviceError:
1712       rdev = None
1713
1714     if not rdev:
1715       success.append((False, ("Cannot change sync for device %s:"
1716                               " device not found" % disk.iv_name)))
1717       continue
1718
1719     result = rdev.PauseResumeSync(pause)
1720
1721     if result:
1722       success.append((result, None))
1723     else:
1724       if pause:
1725         msg = "Pause"
1726       else:
1727         msg = "Resume"
1728       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1729
1730   return success
1731
1732
1733 def BlockdevRemove(disk):
1734   """Remove a block device.
1735
1736   @note: This is intended to be called recursively.
1737
1738   @type disk: L{objects.Disk}
1739   @param disk: the disk object we should remove
1740   @rtype: boolean
1741   @return: the success of the operation
1742
1743   """
1744   msgs = []
1745   try:
1746     rdev = _RecursiveFindBD(disk)
1747   except errors.BlockDeviceError, err:
1748     # probably can't attach
1749     logging.info("Can't attach to device %s in remove", disk)
1750     rdev = None
1751   if rdev is not None:
1752     r_path = rdev.dev_path
1753     try:
1754       rdev.Remove()
1755     except errors.BlockDeviceError, err:
1756       msgs.append(str(err))
1757     if not msgs:
1758       DevCacheManager.RemoveCache(r_path)
1759
1760   if disk.children:
1761     for child in disk.children:
1762       try:
1763         BlockdevRemove(child)
1764       except RPCFail, err:
1765         msgs.append(str(err))
1766
1767   if msgs:
1768     _Fail("; ".join(msgs))
1769
1770
1771 def _RecursiveAssembleBD(disk, owner, as_primary):
1772   """Activate a block device for an instance.
1773
1774   This is run on the primary and secondary nodes for an instance.
1775
1776   @note: this function is called recursively.
1777
1778   @type disk: L{objects.Disk}
1779   @param disk: the disk we try to assemble
1780   @type owner: str
1781   @param owner: the name of the instance which owns the disk
1782   @type as_primary: boolean
1783   @param as_primary: if we should make the block device
1784       read/write
1785
1786   @return: the assembled device or None (in case no device
1787       was assembled)
1788   @raise errors.BlockDeviceError: in case there is an error
1789       during the activation of the children or the device
1790       itself
1791
1792   """
1793   children = []
1794   if disk.children:
1795     mcn = disk.ChildrenNeeded()
1796     if mcn == -1:
1797       mcn = 0 # max number of Nones allowed
1798     else:
1799       mcn = len(disk.children) - mcn # max number of Nones
1800     for chld_disk in disk.children:
1801       try:
1802         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1803       except errors.BlockDeviceError, err:
1804         if children.count(None) >= mcn:
1805           raise
1806         cdev = None
1807         logging.error("Error in child activation (but continuing): %s",
1808                       str(err))
1809       children.append(cdev)
1810
1811   if as_primary or disk.AssembleOnSecondary():
1812     r_dev = bdev.Assemble(disk, children)
1813     result = r_dev
1814     if as_primary or disk.OpenOnSecondary():
1815       r_dev.Open()
1816     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1817                                 as_primary, disk.iv_name)
1818
1819   else:
1820     result = True
1821   return result
1822
1823
1824 def BlockdevAssemble(disk, owner, as_primary, idx):
1825   """Activate a block device for an instance.
1826
1827   This is a wrapper over _RecursiveAssembleBD.
1828
1829   @rtype: str or boolean
1830   @return: a C{/dev/...} path for primary nodes, and
1831       C{True} for secondary nodes
1832
1833   """
1834   try:
1835     result = _RecursiveAssembleBD(disk, owner, as_primary)
1836     if isinstance(result, bdev.BlockDev):
1837       # pylint: disable=E1103
1838       result = result.dev_path
1839       if as_primary:
1840         _SymlinkBlockDev(owner, result, idx)
1841   except errors.BlockDeviceError, err:
1842     _Fail("Error while assembling disk: %s", err, exc=True)
1843   except OSError, err:
1844     _Fail("Error while symlinking disk: %s", err, exc=True)
1845
1846   return result
1847
1848
1849 def BlockdevShutdown(disk):
1850   """Shut down a block device.
1851
1852   First, if the device is assembled (Attach() is successful), then
1853   the device is shutdown. Then the children of the device are
1854   shutdown.
1855
1856   This function is called recursively. Note that we don't cache the
1857   children or such, as oppossed to assemble, shutdown of different
1858   devices doesn't require that the upper device was active.
1859
1860   @type disk: L{objects.Disk}
1861   @param disk: the description of the disk we should
1862       shutdown
1863   @rtype: None
1864
1865   """
1866   msgs = []
1867   r_dev = _RecursiveFindBD(disk)
1868   if r_dev is not None:
1869     r_path = r_dev.dev_path
1870     try:
1871       r_dev.Shutdown()
1872       DevCacheManager.RemoveCache(r_path)
1873     except errors.BlockDeviceError, err:
1874       msgs.append(str(err))
1875
1876   if disk.children:
1877     for child in disk.children:
1878       try:
1879         BlockdevShutdown(child)
1880       except RPCFail, err:
1881         msgs.append(str(err))
1882
1883   if msgs:
1884     _Fail("; ".join(msgs))
1885
1886
1887 def BlockdevAddchildren(parent_cdev, new_cdevs):
1888   """Extend a mirrored block device.
1889
1890   @type parent_cdev: L{objects.Disk}
1891   @param parent_cdev: the disk to which we should add children
1892   @type new_cdevs: list of L{objects.Disk}
1893   @param new_cdevs: the list of children which we should add
1894   @rtype: None
1895
1896   """
1897   parent_bdev = _RecursiveFindBD(parent_cdev)
1898   if parent_bdev is None:
1899     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1900   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1901   if new_bdevs.count(None) > 0:
1902     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1903   parent_bdev.AddChildren(new_bdevs)
1904
1905
1906 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1907   """Shrink a mirrored block device.
1908
1909   @type parent_cdev: L{objects.Disk}
1910   @param parent_cdev: the disk from which we should remove children
1911   @type new_cdevs: list of L{objects.Disk}
1912   @param new_cdevs: the list of children which we should remove
1913   @rtype: None
1914
1915   """
1916   parent_bdev = _RecursiveFindBD(parent_cdev)
1917   if parent_bdev is None:
1918     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1919   devs = []
1920   for disk in new_cdevs:
1921     rpath = disk.StaticDevPath()
1922     if rpath is None:
1923       bd = _RecursiveFindBD(disk)
1924       if bd is None:
1925         _Fail("Can't find device %s while removing children", disk)
1926       else:
1927         devs.append(bd.dev_path)
1928     else:
1929       if not utils.IsNormAbsPath(rpath):
1930         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1931       devs.append(rpath)
1932   parent_bdev.RemoveChildren(devs)
1933
1934
1935 def BlockdevGetmirrorstatus(disks):
1936   """Get the mirroring status of a list of devices.
1937
1938   @type disks: list of L{objects.Disk}
1939   @param disks: the list of disks which we should query
1940   @rtype: disk
1941   @return: List of L{objects.BlockDevStatus}, one for each disk
1942   @raise errors.BlockDeviceError: if any of the disks cannot be
1943       found
1944
1945   """
1946   stats = []
1947   for dsk in disks:
1948     rbd = _RecursiveFindBD(dsk)
1949     if rbd is None:
1950       _Fail("Can't find device %s", dsk)
1951
1952     stats.append(rbd.CombinedSyncStatus())
1953
1954   return stats
1955
1956
1957 def BlockdevGetmirrorstatusMulti(disks):
1958   """Get the mirroring status of a list of devices.
1959
1960   @type disks: list of L{objects.Disk}
1961   @param disks: the list of disks which we should query
1962   @rtype: disk
1963   @return: List of tuples, (bool, status), one for each disk; bool denotes
1964     success/failure, status is L{objects.BlockDevStatus} on success, string
1965     otherwise
1966
1967   """
1968   result = []
1969   for disk in disks:
1970     try:
1971       rbd = _RecursiveFindBD(disk)
1972       if rbd is None:
1973         result.append((False, "Can't find device %s" % disk))
1974         continue
1975
1976       status = rbd.CombinedSyncStatus()
1977     except errors.BlockDeviceError, err:
1978       logging.exception("Error while getting disk status")
1979       result.append((False, str(err)))
1980     else:
1981       result.append((True, status))
1982
1983   assert len(disks) == len(result)
1984
1985   return result
1986
1987
1988 def _RecursiveFindBD(disk):
1989   """Check if a device is activated.
1990
1991   If so, return information about the real device.
1992
1993   @type disk: L{objects.Disk}
1994   @param disk: the disk object we need to find
1995
1996   @return: None if the device can't be found,
1997       otherwise the device instance
1998
1999   """
2000   children = []
2001   if disk.children:
2002     for chdisk in disk.children:
2003       children.append(_RecursiveFindBD(chdisk))
2004
2005   return bdev.FindDevice(disk, children)
2006
2007
2008 def _OpenRealBD(disk):
2009   """Opens the underlying block device of a disk.
2010
2011   @type disk: L{objects.Disk}
2012   @param disk: the disk object we want to open
2013
2014   """
2015   real_disk = _RecursiveFindBD(disk)
2016   if real_disk is None:
2017     _Fail("Block device '%s' is not set up", disk)
2018
2019   real_disk.Open()
2020
2021   return real_disk
2022
2023
2024 def BlockdevFind(disk):
2025   """Check if a device is activated.
2026
2027   If it is, return information about the real device.
2028
2029   @type disk: L{objects.Disk}
2030   @param disk: the disk to find
2031   @rtype: None or objects.BlockDevStatus
2032   @return: None if the disk cannot be found, otherwise a the current
2033            information
2034
2035   """
2036   try:
2037     rbd = _RecursiveFindBD(disk)
2038   except errors.BlockDeviceError, err:
2039     _Fail("Failed to find device: %s", err, exc=True)
2040
2041   if rbd is None:
2042     return None
2043
2044   return rbd.GetSyncStatus()
2045
2046
2047 def BlockdevGetsize(disks):
2048   """Computes the size of the given disks.
2049
2050   If a disk is not found, returns None instead.
2051
2052   @type disks: list of L{objects.Disk}
2053   @param disks: the list of disk to compute the size for
2054   @rtype: list
2055   @return: list with elements None if the disk cannot be found,
2056       otherwise the size
2057
2058   """
2059   result = []
2060   for cf in disks:
2061     try:
2062       rbd = _RecursiveFindBD(cf)
2063     except errors.BlockDeviceError:
2064       result.append(None)
2065       continue
2066     if rbd is None:
2067       result.append(None)
2068     else:
2069       result.append(rbd.GetActualSize())
2070   return result
2071
2072
2073 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2074   """Export a block device to a remote node.
2075
2076   @type disk: L{objects.Disk}
2077   @param disk: the description of the disk to export
2078   @type dest_node: str
2079   @param dest_node: the destination node to export to
2080   @type dest_path: str
2081   @param dest_path: the destination path on the target node
2082   @type cluster_name: str
2083   @param cluster_name: the cluster name, needed for SSH hostalias
2084   @rtype: None
2085
2086   """
2087   real_disk = _OpenRealBD(disk)
2088
2089   # the block size on the read dd is 1MiB to match our units
2090   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2091                                "dd if=%s bs=1048576 count=%s",
2092                                real_disk.dev_path, str(disk.size))
2093
2094   # we set here a smaller block size as, due to ssh buffering, more
2095   # than 64-128k will mostly ignored; we use nocreat to fail if the
2096   # device is not already there or we pass a wrong path; we use
2097   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2098   # to not buffer too much memory; this means that at best, we flush
2099   # every 64k, which will not be very fast
2100   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2101                                 " oflag=dsync", dest_path)
2102
2103   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2104                                                    constants.SSH_LOGIN_USER,
2105                                                    destcmd)
2106
2107   # all commands have been checked, so we're safe to combine them
2108   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2109
2110   result = utils.RunCmd(["bash", "-c", command])
2111
2112   if result.failed:
2113     _Fail("Disk copy command '%s' returned error: %s"
2114           " output: %s", command, result.fail_reason, result.output)
2115
2116
2117 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2118   """Write a file to the filesystem.
2119
2120   This allows the master to overwrite(!) a file. It will only perform
2121   the operation if the file belongs to a list of configuration files.
2122
2123   @type file_name: str
2124   @param file_name: the target file name
2125   @type data: str
2126   @param data: the new contents of the file
2127   @type mode: int
2128   @param mode: the mode to give the file (can be None)
2129   @type uid: string
2130   @param uid: the owner of the file
2131   @type gid: string
2132   @param gid: the group of the file
2133   @type atime: float
2134   @param atime: the atime to set on the file (can be None)
2135   @type mtime: float
2136   @param mtime: the mtime to set on the file (can be None)
2137   @rtype: None
2138
2139   """
2140   file_name = vcluster.LocalizeVirtualPath(file_name)
2141
2142   if not os.path.isabs(file_name):
2143     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2144
2145   if file_name not in _ALLOWED_UPLOAD_FILES:
2146     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2147           file_name)
2148
2149   raw_data = _Decompress(data)
2150
2151   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2152     _Fail("Invalid username/groupname type")
2153
2154   getents = runtime.GetEnts()
2155   uid = getents.LookupUser(uid)
2156   gid = getents.LookupGroup(gid)
2157
2158   utils.SafeWriteFile(file_name, None,
2159                       data=raw_data, mode=mode, uid=uid, gid=gid,
2160                       atime=atime, mtime=mtime)
2161
2162
2163 def RunOob(oob_program, command, node, timeout):
2164   """Executes oob_program with given command on given node.
2165
2166   @param oob_program: The path to the executable oob_program
2167   @param command: The command to invoke on oob_program
2168   @param node: The node given as an argument to the program
2169   @param timeout: Timeout after which we kill the oob program
2170
2171   @return: stdout
2172   @raise RPCFail: If execution fails for some reason
2173
2174   """
2175   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2176
2177   if result.failed:
2178     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2179           result.fail_reason, result.output)
2180
2181   return result.stdout
2182
2183
2184 def _OSOndiskAPIVersion(os_dir):
2185   """Compute and return the API version of a given OS.
2186
2187   This function will try to read the API version of the OS residing in
2188   the 'os_dir' directory.
2189
2190   @type os_dir: str
2191   @param os_dir: the directory in which we should look for the OS
2192   @rtype: tuple
2193   @return: tuple (status, data) with status denoting the validity and
2194       data holding either the vaid versions or an error message
2195
2196   """
2197   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2198
2199   try:
2200     st = os.stat(api_file)
2201   except EnvironmentError, err:
2202     return False, ("Required file '%s' not found under path %s: %s" %
2203                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2204
2205   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2206     return False, ("File '%s' in %s is not a regular file" %
2207                    (constants.OS_API_FILE, os_dir))
2208
2209   try:
2210     api_versions = utils.ReadFile(api_file).splitlines()
2211   except EnvironmentError, err:
2212     return False, ("Error while reading the API version file at %s: %s" %
2213                    (api_file, utils.ErrnoOrStr(err)))
2214
2215   try:
2216     api_versions = [int(version.strip()) for version in api_versions]
2217   except (TypeError, ValueError), err:
2218     return False, ("API version(s) can't be converted to integer: %s" %
2219                    str(err))
2220
2221   return True, api_versions
2222
2223
2224 def DiagnoseOS(top_dirs=None):
2225   """Compute the validity for all OSes.
2226
2227   @type top_dirs: list
2228   @param top_dirs: the list of directories in which to
2229       search (if not given defaults to
2230       L{pathutils.OS_SEARCH_PATH})
2231   @rtype: list of L{objects.OS}
2232   @return: a list of tuples (name, path, status, diagnose, variants,
2233       parameters, api_version) for all (potential) OSes under all
2234       search paths, where:
2235           - name is the (potential) OS name
2236           - path is the full path to the OS
2237           - status True/False is the validity of the OS
2238           - diagnose is the error message for an invalid OS, otherwise empty
2239           - variants is a list of supported OS variants, if any
2240           - parameters is a list of (name, help) parameters, if any
2241           - api_version is a list of support OS API versions
2242
2243   """
2244   if top_dirs is None:
2245     top_dirs = pathutils.OS_SEARCH_PATH
2246
2247   result = []
2248   for dir_name in top_dirs:
2249     if os.path.isdir(dir_name):
2250       try:
2251         f_names = utils.ListVisibleFiles(dir_name)
2252       except EnvironmentError, err:
2253         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2254         break
2255       for name in f_names:
2256         os_path = utils.PathJoin(dir_name, name)
2257         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2258         if status:
2259           diagnose = ""
2260           variants = os_inst.supported_variants
2261           parameters = os_inst.supported_parameters
2262           api_versions = os_inst.api_versions
2263         else:
2264           diagnose = os_inst
2265           variants = parameters = api_versions = []
2266         result.append((name, os_path, status, diagnose, variants,
2267                        parameters, api_versions))
2268
2269   return result
2270
2271
2272 def _TryOSFromDisk(name, base_dir=None):
2273   """Create an OS instance from disk.
2274
2275   This function will return an OS instance if the given name is a
2276   valid OS name.
2277
2278   @type base_dir: string
2279   @keyword base_dir: Base directory containing OS installations.
2280                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2281   @rtype: tuple
2282   @return: success and either the OS instance if we find a valid one,
2283       or error message
2284
2285   """
2286   if base_dir is None:
2287     os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2288   else:
2289     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2290
2291   if os_dir is None:
2292     return False, "Directory for OS %s not found in search path" % name
2293
2294   status, api_versions = _OSOndiskAPIVersion(os_dir)
2295   if not status:
2296     # push the error up
2297     return status, api_versions
2298
2299   if not constants.OS_API_VERSIONS.intersection(api_versions):
2300     return False, ("API version mismatch for path '%s': found %s, want %s." %
2301                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2302
2303   # OS Files dictionary, we will populate it with the absolute path
2304   # names; if the value is True, then it is a required file, otherwise
2305   # an optional one
2306   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2307
2308   if max(api_versions) >= constants.OS_API_V15:
2309     os_files[constants.OS_VARIANTS_FILE] = False
2310
2311   if max(api_versions) >= constants.OS_API_V20:
2312     os_files[constants.OS_PARAMETERS_FILE] = True
2313   else:
2314     del os_files[constants.OS_SCRIPT_VERIFY]
2315
2316   for (filename, required) in os_files.items():
2317     os_files[filename] = utils.PathJoin(os_dir, filename)
2318
2319     try:
2320       st = os.stat(os_files[filename])
2321     except EnvironmentError, err:
2322       if err.errno == errno.ENOENT and not required:
2323         del os_files[filename]
2324         continue
2325       return False, ("File '%s' under path '%s' is missing (%s)" %
2326                      (filename, os_dir, utils.ErrnoOrStr(err)))
2327
2328     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2329       return False, ("File '%s' under path '%s' is not a regular file" %
2330                      (filename, os_dir))
2331
2332     if filename in constants.OS_SCRIPTS:
2333       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2334         return False, ("File '%s' under path '%s' is not executable" %
2335                        (filename, os_dir))
2336
2337   variants = []
2338   if constants.OS_VARIANTS_FILE in os_files:
2339     variants_file = os_files[constants.OS_VARIANTS_FILE]
2340     try:
2341       variants = \
2342         utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2343     except EnvironmentError, err:
2344       # we accept missing files, but not other errors
2345       if err.errno != errno.ENOENT:
2346         return False, ("Error while reading the OS variants file at %s: %s" %
2347                        (variants_file, utils.ErrnoOrStr(err)))
2348
2349   parameters = []
2350   if constants.OS_PARAMETERS_FILE in os_files:
2351     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2352     try:
2353       parameters = utils.ReadFile(parameters_file).splitlines()
2354     except EnvironmentError, err:
2355       return False, ("Error while reading the OS parameters file at %s: %s" %
2356                      (parameters_file, utils.ErrnoOrStr(err)))
2357     parameters = [v.split(None, 1) for v in parameters]
2358
2359   os_obj = objects.OS(name=name, path=os_dir,
2360                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2361                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2362                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2363                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2364                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2365                                                  None),
2366                       supported_variants=variants,
2367                       supported_parameters=parameters,
2368                       api_versions=api_versions)
2369   return True, os_obj
2370
2371
2372 def OSFromDisk(name, base_dir=None):
2373   """Create an OS instance from disk.
2374
2375   This function will return an OS instance if the given name is a
2376   valid OS name. Otherwise, it will raise an appropriate
2377   L{RPCFail} exception, detailing why this is not a valid OS.
2378
2379   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2380   an exception but returns true/false status data.
2381
2382   @type base_dir: string
2383   @keyword base_dir: Base directory containing OS installations.
2384                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2385   @rtype: L{objects.OS}
2386   @return: the OS instance if we find a valid one
2387   @raise RPCFail: if we don't find a valid OS
2388
2389   """
2390   name_only = objects.OS.GetName(name)
2391   status, payload = _TryOSFromDisk(name_only, base_dir)
2392
2393   if not status:
2394     _Fail(payload)
2395
2396   return payload
2397
2398
2399 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2400   """Calculate the basic environment for an os script.
2401
2402   @type os_name: str
2403   @param os_name: full operating system name (including variant)
2404   @type inst_os: L{objects.OS}
2405   @param inst_os: operating system for which the environment is being built
2406   @type os_params: dict
2407   @param os_params: the OS parameters
2408   @type debug: integer
2409   @param debug: debug level (0 or 1, for OS Api 10)
2410   @rtype: dict
2411   @return: dict of environment variables
2412   @raise errors.BlockDeviceError: if the block device
2413       cannot be found
2414
2415   """
2416   result = {}
2417   api_version = \
2418     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2419   result["OS_API_VERSION"] = "%d" % api_version
2420   result["OS_NAME"] = inst_os.name
2421   result["DEBUG_LEVEL"] = "%d" % debug
2422
2423   # OS variants
2424   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2425     variant = objects.OS.GetVariant(os_name)
2426     if not variant:
2427       variant = inst_os.supported_variants[0]
2428   else:
2429     variant = ""
2430   result["OS_VARIANT"] = variant
2431
2432   # OS params
2433   for pname, pvalue in os_params.items():
2434     result["OSP_%s" % pname.upper()] = pvalue
2435
2436   # Set a default path otherwise programs called by OS scripts (or
2437   # even hooks called from OS scripts) might break, and we don't want
2438   # to have each script require setting a PATH variable
2439   result["PATH"] = constants.HOOKS_PATH
2440
2441   return result
2442
2443
2444 def OSEnvironment(instance, inst_os, debug=0):
2445   """Calculate the environment for an os script.
2446
2447   @type instance: L{objects.Instance}
2448   @param instance: target instance for the os script run
2449   @type inst_os: L{objects.OS}
2450   @param inst_os: operating system for which the environment is being built
2451   @type debug: integer
2452   @param debug: debug level (0 or 1, for OS Api 10)
2453   @rtype: dict
2454   @return: dict of environment variables
2455   @raise errors.BlockDeviceError: if the block device
2456       cannot be found
2457
2458   """
2459   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2460
2461   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2462     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2463
2464   result["HYPERVISOR"] = instance.hypervisor
2465   result["DISK_COUNT"] = "%d" % len(instance.disks)
2466   result["NIC_COUNT"] = "%d" % len(instance.nics)
2467   result["INSTANCE_SECONDARY_NODES"] = \
2468       ("%s" % " ".join(instance.secondary_nodes))
2469
2470   # Disks
2471   for idx, disk in enumerate(instance.disks):
2472     real_disk = _OpenRealBD(disk)
2473     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2474     result["DISK_%d_ACCESS" % idx] = disk.mode
2475     if constants.HV_DISK_TYPE in instance.hvparams:
2476       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2477         instance.hvparams[constants.HV_DISK_TYPE]
2478     if disk.dev_type in constants.LDS_BLOCK:
2479       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2480     elif disk.dev_type == constants.LD_FILE:
2481       result["DISK_%d_BACKEND_TYPE" % idx] = \
2482         "file:%s" % disk.physical_id[0]
2483
2484   # NICs
2485   for idx, nic in enumerate(instance.nics):
2486     result["NIC_%d_MAC" % idx] = nic.mac
2487     if nic.ip:
2488       result["NIC_%d_IP" % idx] = nic.ip
2489     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2490     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2491       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2492     if nic.nicparams[constants.NIC_LINK]:
2493       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2494     if nic.network:
2495       result["NIC_%d_NETWORK" % idx] = nic.network
2496     if constants.HV_NIC_TYPE in instance.hvparams:
2497       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2498         instance.hvparams[constants.HV_NIC_TYPE]
2499
2500   # HV/BE params
2501   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2502     for key, value in source.items():
2503       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2504
2505   return result
2506
2507
2508 def DiagnoseExtStorage(top_dirs=None):
2509   """Compute the validity for all ExtStorage Providers.
2510
2511   @type top_dirs: list
2512   @param top_dirs: the list of directories in which to
2513       search (if not given defaults to
2514       L{pathutils.ES_SEARCH_PATH})
2515   @rtype: list of L{objects.ExtStorage}
2516   @return: a list of tuples (name, path, status, diagnose, parameters)
2517       for all (potential) ExtStorage Providers under all
2518       search paths, where:
2519           - name is the (potential) ExtStorage Provider
2520           - path is the full path to the ExtStorage Provider
2521           - status True/False is the validity of the ExtStorage Provider
2522           - diagnose is the error message for an invalid ExtStorage Provider,
2523             otherwise empty
2524           - parameters is a list of (name, help) parameters, if any
2525
2526   """
2527   if top_dirs is None:
2528     top_dirs = pathutils.ES_SEARCH_PATH
2529
2530   result = []
2531   for dir_name in top_dirs:
2532     if os.path.isdir(dir_name):
2533       try:
2534         f_names = utils.ListVisibleFiles(dir_name)
2535       except EnvironmentError, err:
2536         logging.exception("Can't list the ExtStorage directory %s: %s",
2537                           dir_name, err)
2538         break
2539       for name in f_names:
2540         es_path = utils.PathJoin(dir_name, name)
2541         status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2542         if status:
2543           diagnose = ""
2544           parameters = es_inst.supported_parameters
2545         else:
2546           diagnose = es_inst
2547           parameters = []
2548         result.append((name, es_path, status, diagnose, parameters))
2549
2550   return result
2551
2552
2553 def BlockdevGrow(disk, amount, dryrun, backingstore):
2554   """Grow a stack of block devices.
2555
2556   This function is called recursively, with the childrens being the
2557   first ones to resize.
2558
2559   @type disk: L{objects.Disk}
2560   @param disk: the disk to be grown
2561   @type amount: integer
2562   @param amount: the amount (in mebibytes) to grow with
2563   @type dryrun: boolean
2564   @param dryrun: whether to execute the operation in simulation mode
2565       only, without actually increasing the size
2566   @param backingstore: whether to execute the operation on backing storage
2567       only, or on "logical" storage only; e.g. DRBD is logical storage,
2568       whereas LVM, file, RBD are backing storage
2569   @rtype: (status, result)
2570   @return: a tuple with the status of the operation (True/False), and
2571       the errors message if status is False
2572
2573   """
2574   r_dev = _RecursiveFindBD(disk)
2575   if r_dev is None:
2576     _Fail("Cannot find block device %s", disk)
2577
2578   try:
2579     r_dev.Grow(amount, dryrun, backingstore)
2580   except errors.BlockDeviceError, err:
2581     _Fail("Failed to grow block device: %s", err, exc=True)
2582
2583
2584 def BlockdevSnapshot(disk):
2585   """Create a snapshot copy of a block device.
2586
2587   This function is called recursively, and the snapshot is actually created
2588   just for the leaf lvm backend device.
2589
2590   @type disk: L{objects.Disk}
2591   @param disk: the disk to be snapshotted
2592   @rtype: string
2593   @return: snapshot disk ID as (vg, lv)
2594
2595   """
2596   if disk.dev_type == constants.LD_DRBD8:
2597     if not disk.children:
2598       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2599             disk.unique_id)
2600     return BlockdevSnapshot(disk.children[0])
2601   elif disk.dev_type == constants.LD_LV:
2602     r_dev = _RecursiveFindBD(disk)
2603     if r_dev is not None:
2604       # FIXME: choose a saner value for the snapshot size
2605       # let's stay on the safe side and ask for the full size, for now
2606       return r_dev.Snapshot(disk.size)
2607     else:
2608       _Fail("Cannot find block device %s", disk)
2609   else:
2610     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2611           disk.unique_id, disk.dev_type)
2612
2613
2614 def BlockdevSetInfo(disk, info):
2615   """Sets 'metadata' information on block devices.
2616
2617   This function sets 'info' metadata on block devices. Initial
2618   information is set at device creation; this function should be used
2619   for example after renames.
2620
2621   @type disk: L{objects.Disk}
2622   @param disk: the disk to be grown
2623   @type info: string
2624   @param info: new 'info' metadata
2625   @rtype: (status, result)
2626   @return: a tuple with the status of the operation (True/False), and
2627       the errors message if status is False
2628
2629   """
2630   r_dev = _RecursiveFindBD(disk)
2631   if r_dev is None:
2632     _Fail("Cannot find block device %s", disk)
2633
2634   try:
2635     r_dev.SetInfo(info)
2636   except errors.BlockDeviceError, err:
2637     _Fail("Failed to set information on block device: %s", err, exc=True)
2638
2639
2640 def FinalizeExport(instance, snap_disks):
2641   """Write out the export configuration information.
2642
2643   @type instance: L{objects.Instance}
2644   @param instance: the instance which we export, used for
2645       saving configuration
2646   @type snap_disks: list of L{objects.Disk}
2647   @param snap_disks: list of snapshot block devices, which
2648       will be used to get the actual name of the dump file
2649
2650   @rtype: None
2651
2652   """
2653   destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2654   finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2655
2656   config = objects.SerializableConfigParser()
2657
2658   config.add_section(constants.INISECT_EXP)
2659   config.set(constants.INISECT_EXP, "version", "0")
2660   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2661   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2662   config.set(constants.INISECT_EXP, "os", instance.os)
2663   config.set(constants.INISECT_EXP, "compression", "none")
2664
2665   config.add_section(constants.INISECT_INS)
2666   config.set(constants.INISECT_INS, "name", instance.name)
2667   config.set(constants.INISECT_INS, "maxmem", "%d" %
2668              instance.beparams[constants.BE_MAXMEM])
2669   config.set(constants.INISECT_INS, "minmem", "%d" %
2670              instance.beparams[constants.BE_MINMEM])
2671   # "memory" is deprecated, but useful for exporting to old ganeti versions
2672   config.set(constants.INISECT_INS, "memory", "%d" %
2673              instance.beparams[constants.BE_MAXMEM])
2674   config.set(constants.INISECT_INS, "vcpus", "%d" %
2675              instance.beparams[constants.BE_VCPUS])
2676   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2677   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2678   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2679
2680   nic_total = 0
2681   for nic_count, nic in enumerate(instance.nics):
2682     nic_total += 1
2683     config.set(constants.INISECT_INS, "nic%d_mac" %
2684                nic_count, "%s" % nic.mac)
2685     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2686     config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2687                "%s" % nic.network)
2688     for param in constants.NICS_PARAMETER_TYPES:
2689       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2690                  "%s" % nic.nicparams.get(param, None))
2691   # TODO: redundant: on load can read nics until it doesn't exist
2692   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2693
2694   disk_total = 0
2695   for disk_count, disk in enumerate(snap_disks):
2696     if disk:
2697       disk_total += 1
2698       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2699                  ("%s" % disk.iv_name))
2700       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2701                  ("%s" % disk.physical_id[1]))
2702       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2703                  ("%d" % disk.size))
2704
2705   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2706
2707   # New-style hypervisor/backend parameters
2708
2709   config.add_section(constants.INISECT_HYP)
2710   for name, value in instance.hvparams.items():
2711     if name not in constants.HVC_GLOBALS:
2712       config.set(constants.INISECT_HYP, name, str(value))
2713
2714   config.add_section(constants.INISECT_BEP)
2715   for name, value in instance.beparams.items():
2716     config.set(constants.INISECT_BEP, name, str(value))
2717
2718   config.add_section(constants.INISECT_OSP)
2719   for name, value in instance.osparams.items():
2720     config.set(constants.INISECT_OSP, name, str(value))
2721
2722   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2723                   data=config.Dumps())
2724   shutil.rmtree(finaldestdir, ignore_errors=True)
2725   shutil.move(destdir, finaldestdir)
2726
2727
2728 def ExportInfo(dest):
2729   """Get export configuration information.
2730
2731   @type dest: str
2732   @param dest: directory containing the export
2733
2734   @rtype: L{objects.SerializableConfigParser}
2735   @return: a serializable config file containing the
2736       export info
2737
2738   """
2739   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2740
2741   config = objects.SerializableConfigParser()
2742   config.read(cff)
2743
2744   if (not config.has_section(constants.INISECT_EXP) or
2745       not config.has_section(constants.INISECT_INS)):
2746     _Fail("Export info file doesn't have the required fields")
2747
2748   return config.Dumps()
2749
2750
2751 def ListExports():
2752   """Return a list of exports currently available on this machine.
2753
2754   @rtype: list
2755   @return: list of the exports
2756
2757   """
2758   if os.path.isdir(pathutils.EXPORT_DIR):
2759     return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2760   else:
2761     _Fail("No exports directory")
2762
2763
2764 def RemoveExport(export):
2765   """Remove an existing export from the node.
2766
2767   @type export: str
2768   @param export: the name of the export to remove
2769   @rtype: None
2770
2771   """
2772   target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2773
2774   try:
2775     shutil.rmtree(target)
2776   except EnvironmentError, err:
2777     _Fail("Error while removing the export: %s", err, exc=True)
2778
2779
2780 def BlockdevRename(devlist):
2781   """Rename a list of block devices.
2782
2783   @type devlist: list of tuples
2784   @param devlist: list of tuples of the form  (disk,
2785       new_logical_id, new_physical_id); disk is an
2786       L{objects.Disk} object describing the current disk,
2787       and new logical_id/physical_id is the name we
2788       rename it to
2789   @rtype: boolean
2790   @return: True if all renames succeeded, False otherwise
2791
2792   """
2793   msgs = []
2794   result = True
2795   for disk, unique_id in devlist:
2796     dev = _RecursiveFindBD(disk)
2797     if dev is None:
2798       msgs.append("Can't find device %s in rename" % str(disk))
2799       result = False
2800       continue
2801     try:
2802       old_rpath = dev.dev_path
2803       dev.Rename(unique_id)
2804       new_rpath = dev.dev_path
2805       if old_rpath != new_rpath:
2806         DevCacheManager.RemoveCache(old_rpath)
2807         # FIXME: we should add the new cache information here, like:
2808         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2809         # but we don't have the owner here - maybe parse from existing
2810         # cache? for now, we only lose lvm data when we rename, which
2811         # is less critical than DRBD or MD
2812     except errors.BlockDeviceError, err:
2813       msgs.append("Can't rename device '%s' to '%s': %s" %
2814                   (dev, unique_id, err))
2815       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2816       result = False
2817   if not result:
2818     _Fail("; ".join(msgs))
2819
2820
2821 def _TransformFileStorageDir(fs_dir):
2822   """Checks whether given file_storage_dir is valid.
2823
2824   Checks wheter the given fs_dir is within the cluster-wide default
2825   file_storage_dir or the shared_file_storage_dir, which are stored in
2826   SimpleStore. Only paths under those directories are allowed.
2827
2828   @type fs_dir: str
2829   @param fs_dir: the path to check
2830
2831   @return: the normalized path if valid, None otherwise
2832
2833   """
2834   if not (constants.ENABLE_FILE_STORAGE or
2835           constants.ENABLE_SHARED_FILE_STORAGE):
2836     _Fail("File storage disabled at configure time")
2837
2838   bdev.CheckFileStoragePath(fs_dir)
2839
2840   return os.path.normpath(fs_dir)
2841
2842
2843 def CreateFileStorageDir(file_storage_dir):
2844   """Create file storage directory.
2845
2846   @type file_storage_dir: str
2847   @param file_storage_dir: directory to create
2848
2849   @rtype: tuple
2850   @return: tuple with first element a boolean indicating wheter dir
2851       creation was successful or not
2852
2853   """
2854   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2855   if os.path.exists(file_storage_dir):
2856     if not os.path.isdir(file_storage_dir):
2857       _Fail("Specified storage dir '%s' is not a directory",
2858             file_storage_dir)
2859   else:
2860     try:
2861       os.makedirs(file_storage_dir, 0750)
2862     except OSError, err:
2863       _Fail("Cannot create file storage directory '%s': %s",
2864             file_storage_dir, err, exc=True)
2865
2866
2867 def RemoveFileStorageDir(file_storage_dir):
2868   """Remove file storage directory.
2869
2870   Remove it only if it's empty. If not log an error and return.
2871
2872   @type file_storage_dir: str
2873   @param file_storage_dir: the directory we should cleanup
2874   @rtype: tuple (success,)
2875   @return: tuple of one element, C{success}, denoting
2876       whether the operation was successful
2877
2878   """
2879   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2880   if os.path.exists(file_storage_dir):
2881     if not os.path.isdir(file_storage_dir):
2882       _Fail("Specified Storage directory '%s' is not a directory",
2883             file_storage_dir)
2884     # deletes dir only if empty, otherwise we want to fail the rpc call
2885     try:
2886       os.rmdir(file_storage_dir)
2887     except OSError, err:
2888       _Fail("Cannot remove file storage directory '%s': %s",
2889             file_storage_dir, err)
2890
2891
2892 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2893   """Rename the file storage directory.
2894
2895   @type old_file_storage_dir: str
2896   @param old_file_storage_dir: the current path
2897   @type new_file_storage_dir: str
2898   @param new_file_storage_dir: the name we should rename to
2899   @rtype: tuple (success,)
2900   @return: tuple of one element, C{success}, denoting
2901       whether the operation was successful
2902
2903   """
2904   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2905   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2906   if not os.path.exists(new_file_storage_dir):
2907     if os.path.isdir(old_file_storage_dir):
2908       try:
2909         os.rename(old_file_storage_dir, new_file_storage_dir)
2910       except OSError, err:
2911         _Fail("Cannot rename '%s' to '%s': %s",
2912               old_file_storage_dir, new_file_storage_dir, err)
2913     else:
2914       _Fail("Specified storage dir '%s' is not a directory",
2915             old_file_storage_dir)
2916   else:
2917     if os.path.exists(old_file_storage_dir):
2918       _Fail("Cannot rename '%s' to '%s': both locations exist",
2919             old_file_storage_dir, new_file_storage_dir)
2920
2921
2922 def _EnsureJobQueueFile(file_name):
2923   """Checks whether the given filename is in the queue directory.
2924
2925   @type file_name: str
2926   @param file_name: the file name we should check
2927   @rtype: None
2928   @raises RPCFail: if the file is not valid
2929
2930   """
2931   if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2932     _Fail("Passed job queue file '%s' does not belong to"
2933           " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2934
2935
2936 def JobQueueUpdate(file_name, content):
2937   """Updates a file in the queue directory.
2938
2939   This is just a wrapper over L{utils.io.WriteFile}, with proper
2940   checking.
2941
2942   @type file_name: str
2943   @param file_name: the job file name
2944   @type content: str
2945   @param content: the new job contents
2946   @rtype: boolean
2947   @return: the success of the operation
2948
2949   """
2950   file_name = vcluster.LocalizeVirtualPath(file_name)
2951
2952   _EnsureJobQueueFile(file_name)
2953   getents = runtime.GetEnts()
2954
2955   # Write and replace the file atomically
2956   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2957                   gid=getents.masterd_gid)
2958
2959
2960 def JobQueueRename(old, new):
2961   """Renames a job queue file.
2962
2963   This is just a wrapper over os.rename with proper checking.
2964
2965   @type old: str
2966   @param old: the old (actual) file name
2967   @type new: str
2968   @param new: the desired file name
2969   @rtype: tuple
2970   @return: the success of the operation and payload
2971
2972   """
2973   old = vcluster.LocalizeVirtualPath(old)
2974   new = vcluster.LocalizeVirtualPath(new)
2975
2976   _EnsureJobQueueFile(old)
2977   _EnsureJobQueueFile(new)
2978
2979   getents = runtime.GetEnts()
2980
2981   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2982                    dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2983
2984
2985 def BlockdevClose(instance_name, disks):
2986   """Closes the given block devices.
2987
2988   This means they will be switched to secondary mode (in case of
2989   DRBD).
2990
2991   @param instance_name: if the argument is not empty, the symlinks
2992       of this instance will be removed
2993   @type disks: list of L{objects.Disk}
2994   @param disks: the list of disks to be closed
2995   @rtype: tuple (success, message)
2996   @return: a tuple of success and message, where success
2997       indicates the succes of the operation, and message
2998       which will contain the error details in case we
2999       failed
3000
3001   """
3002   bdevs = []
3003   for cf in disks:
3004     rd = _RecursiveFindBD(cf)
3005     if rd is None:
3006       _Fail("Can't find device %s", cf)
3007     bdevs.append(rd)
3008
3009   msg = []
3010   for rd in bdevs:
3011     try:
3012       rd.Close()
3013     except errors.BlockDeviceError, err:
3014       msg.append(str(err))
3015   if msg:
3016     _Fail("Can't make devices secondary: %s", ",".join(msg))
3017   else:
3018     if instance_name:
3019       _RemoveBlockDevLinks(instance_name, disks)
3020
3021
3022 def ValidateHVParams(hvname, hvparams):
3023   """Validates the given hypervisor parameters.
3024
3025   @type hvname: string
3026   @param hvname: the hypervisor name
3027   @type hvparams: dict
3028   @param hvparams: the hypervisor parameters to be validated
3029   @rtype: None
3030
3031   """
3032   try:
3033     hv_type = hypervisor.GetHypervisor(hvname)
3034     hv_type.ValidateParameters(hvparams)
3035   except errors.HypervisorError, err:
3036     _Fail(str(err), log=False)
3037
3038
3039 def _CheckOSPList(os_obj, parameters):
3040   """Check whether a list of parameters is supported by the OS.
3041
3042   @type os_obj: L{objects.OS}
3043   @param os_obj: OS object to check
3044   @type parameters: list
3045   @param parameters: the list of parameters to check
3046
3047   """
3048   supported = [v[0] for v in os_obj.supported_parameters]
3049   delta = frozenset(parameters).difference(supported)
3050   if delta:
3051     _Fail("The following parameters are not supported"
3052           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3053
3054
3055 def ValidateOS(required, osname, checks, osparams):
3056   """Validate the given OS' parameters.
3057
3058   @type required: boolean
3059   @param required: whether absence of the OS should translate into
3060       failure or not
3061   @type osname: string
3062   @param osname: the OS to be validated
3063   @type checks: list
3064   @param checks: list of the checks to run (currently only 'parameters')
3065   @type osparams: dict
3066   @param osparams: dictionary with OS parameters
3067   @rtype: boolean
3068   @return: True if the validation passed, or False if the OS was not
3069       found and L{required} was false
3070
3071   """
3072   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3073     _Fail("Unknown checks required for OS %s: %s", osname,
3074           set(checks).difference(constants.OS_VALIDATE_CALLS))
3075
3076   name_only = objects.OS.GetName(osname)
3077   status, tbv = _TryOSFromDisk(name_only, None)
3078
3079   if not status:
3080     if required:
3081       _Fail(tbv)
3082     else:
3083       return False
3084
3085   if max(tbv.api_versions) < constants.OS_API_V20:
3086     return True
3087
3088   if constants.OS_VALIDATE_PARAMETERS in checks:
3089     _CheckOSPList(tbv, osparams.keys())
3090
3091   validate_env = OSCoreEnv(osname, tbv, osparams)
3092   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3093                         cwd=tbv.path, reset_env=True)
3094   if result.failed:
3095     logging.error("os validate command '%s' returned error: %s output: %s",
3096                   result.cmd, result.fail_reason, result.output)
3097     _Fail("OS validation script failed (%s), output: %s",
3098           result.fail_reason, result.output, log=False)
3099
3100   return True
3101
3102
3103 def DemoteFromMC():
3104   """Demotes the current node from master candidate role.
3105
3106   """
3107   # try to ensure we're not the master by mistake
3108   master, myself = ssconf.GetMasterAndMyself()
3109   if master == myself:
3110     _Fail("ssconf status shows I'm the master node, will not demote")
3111
3112   result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3113   if not result.failed:
3114     _Fail("The master daemon is running, will not demote")
3115
3116   try:
3117     if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3118       utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3119   except EnvironmentError, err:
3120     if err.errno != errno.ENOENT:
3121       _Fail("Error while backing up cluster file: %s", err, exc=True)
3122
3123   utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3124
3125
3126 def _GetX509Filenames(cryptodir, name):
3127   """Returns the full paths for the private key and certificate.
3128
3129   """
3130   return (utils.PathJoin(cryptodir, name),
3131           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3132           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3133
3134
3135 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3136   """Creates a new X509 certificate for SSL/TLS.
3137
3138   @type validity: int
3139   @param validity: Validity in seconds
3140   @rtype: tuple; (string, string)
3141   @return: Certificate name and public part
3142
3143   """
3144   (key_pem, cert_pem) = \
3145     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3146                                      min(validity, _MAX_SSL_CERT_VALIDITY))
3147
3148   cert_dir = tempfile.mkdtemp(dir=cryptodir,
3149                               prefix="x509-%s-" % utils.TimestampForFilename())
3150   try:
3151     name = os.path.basename(cert_dir)
3152     assert len(name) > 5
3153
3154     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3155
3156     utils.WriteFile(key_file, mode=0400, data=key_pem)
3157     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3158
3159     # Never return private key as it shouldn't leave the node
3160     return (name, cert_pem)
3161   except Exception:
3162     shutil.rmtree(cert_dir, ignore_errors=True)
3163     raise
3164
3165
3166 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3167   """Removes a X509 certificate.
3168
3169   @type name: string
3170   @param name: Certificate name
3171
3172   """
3173   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3174
3175   utils.RemoveFile(key_file)
3176   utils.RemoveFile(cert_file)
3177
3178   try:
3179     os.rmdir(cert_dir)
3180   except EnvironmentError, err:
3181     _Fail("Cannot remove certificate directory '%s': %s",
3182           cert_dir, err)
3183
3184
3185 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3186   """Returns the command for the requested input/output.
3187
3188   @type instance: L{objects.Instance}
3189   @param instance: The instance object
3190   @param mode: Import/export mode
3191   @param ieio: Input/output type
3192   @param ieargs: Input/output arguments
3193
3194   """
3195   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3196
3197   env = None
3198   prefix = None
3199   suffix = None
3200   exp_size = None
3201
3202   if ieio == constants.IEIO_FILE:
3203     (filename, ) = ieargs
3204
3205     if not utils.IsNormAbsPath(filename):
3206       _Fail("Path '%s' is not normalized or absolute", filename)
3207
3208     real_filename = os.path.realpath(filename)
3209     directory = os.path.dirname(real_filename)
3210
3211     if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3212       _Fail("File '%s' is not under exports directory '%s': %s",
3213             filename, pathutils.EXPORT_DIR, real_filename)
3214
3215     # Create directory
3216     utils.Makedirs(directory, mode=0750)
3217
3218     quoted_filename = utils.ShellQuote(filename)
3219
3220     if mode == constants.IEM_IMPORT:
3221       suffix = "> %s" % quoted_filename
3222     elif mode == constants.IEM_EXPORT:
3223       suffix = "< %s" % quoted_filename
3224
3225       # Retrieve file size
3226       try:
3227         st = os.stat(filename)
3228       except EnvironmentError, err:
3229         logging.error("Can't stat(2) %s: %s", filename, err)
3230       else:
3231         exp_size = utils.BytesToMebibyte(st.st_size)
3232
3233   elif ieio == constants.IEIO_RAW_DISK:
3234     (disk, ) = ieargs
3235
3236     real_disk = _OpenRealBD(disk)
3237
3238     if mode == constants.IEM_IMPORT:
3239       # we set here a smaller block size as, due to transport buffering, more
3240       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3241       # is not already there or we pass a wrong path; we use notrunc to no
3242       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3243       # much memory; this means that at best, we flush every 64k, which will
3244       # not be very fast
3245       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3246                                     " bs=%s oflag=dsync"),
3247                                     real_disk.dev_path,
3248                                     str(64 * 1024))
3249
3250     elif mode == constants.IEM_EXPORT:
3251       # the block size on the read dd is 1MiB to match our units
3252       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3253                                    real_disk.dev_path,
3254                                    str(1024 * 1024), # 1 MB
3255                                    str(disk.size))
3256       exp_size = disk.size
3257
3258   elif ieio == constants.IEIO_SCRIPT:
3259     (disk, disk_index, ) = ieargs
3260
3261     assert isinstance(disk_index, (int, long))
3262
3263     real_disk = _OpenRealBD(disk)
3264
3265     inst_os = OSFromDisk(instance.os)
3266     env = OSEnvironment(instance, inst_os)
3267
3268     if mode == constants.IEM_IMPORT:
3269       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3270       env["IMPORT_INDEX"] = str(disk_index)
3271       script = inst_os.import_script
3272
3273     elif mode == constants.IEM_EXPORT:
3274       env["EXPORT_DEVICE"] = real_disk.dev_path
3275       env["EXPORT_INDEX"] = str(disk_index)
3276       script = inst_os.export_script
3277
3278     # TODO: Pass special environment only to script
3279     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3280
3281     if mode == constants.IEM_IMPORT:
3282       suffix = "| %s" % script_cmd
3283
3284     elif mode == constants.IEM_EXPORT:
3285       prefix = "%s |" % script_cmd
3286
3287     # Let script predict size
3288     exp_size = constants.IE_CUSTOM_SIZE
3289
3290   else:
3291     _Fail("Invalid %s I/O mode %r", mode, ieio)
3292
3293   return (env, prefix, suffix, exp_size)
3294
3295
3296 def _CreateImportExportStatusDir(prefix):
3297   """Creates status directory for import/export.
3298
3299   """
3300   return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3301                           prefix=("%s-%s-" %
3302                                   (prefix, utils.TimestampForFilename())))
3303
3304
3305 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3306                             ieio, ieioargs):
3307   """Starts an import or export daemon.
3308
3309   @param mode: Import/output mode
3310   @type opts: L{objects.ImportExportOptions}
3311   @param opts: Daemon options
3312   @type host: string
3313   @param host: Remote host for export (None for import)
3314   @type port: int
3315   @param port: Remote port for export (None for import)
3316   @type instance: L{objects.Instance}
3317   @param instance: Instance object
3318   @type component: string
3319   @param component: which part of the instance is transferred now,
3320       e.g. 'disk/0'
3321   @param ieio: Input/output type
3322   @param ieioargs: Input/output arguments
3323
3324   """
3325   if mode == constants.IEM_IMPORT:
3326     prefix = "import"
3327
3328     if not (host is None and port is None):
3329       _Fail("Can not specify host or port on import")
3330
3331   elif mode == constants.IEM_EXPORT:
3332     prefix = "export"
3333
3334     if host is None or port is None:
3335       _Fail("Host and port must be specified for an export")
3336
3337   else:
3338     _Fail("Invalid mode %r", mode)
3339
3340   if (opts.key_name is None) ^ (opts.ca_pem is None):
3341     _Fail("Cluster certificate can only be used for both key and CA")
3342
3343   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3344     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3345
3346   if opts.key_name is None:
3347     # Use server.pem
3348     key_path = pathutils.NODED_CERT_FILE
3349     cert_path = pathutils.NODED_CERT_FILE
3350     assert opts.ca_pem is None
3351   else:
3352     (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3353                                                  opts.key_name)
3354     assert opts.ca_pem is not None
3355
3356   for i in [key_path, cert_path]:
3357     if not os.path.exists(i):
3358       _Fail("File '%s' does not exist" % i)
3359
3360   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3361   try:
3362     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3363     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3364     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3365
3366     if opts.ca_pem is None:
3367       # Use server.pem
3368       ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3369     else:
3370       ca = opts.ca_pem
3371
3372     # Write CA file
3373     utils.WriteFile(ca_file, data=ca, mode=0400)
3374
3375     cmd = [
3376       pathutils.IMPORT_EXPORT_DAEMON,
3377       status_file, mode,
3378       "--key=%s" % key_path,
3379       "--cert=%s" % cert_path,
3380       "--ca=%s" % ca_file,
3381       ]
3382
3383     if host:
3384       cmd.append("--host=%s" % host)
3385
3386     if port:
3387       cmd.append("--port=%s" % port)
3388
3389     if opts.ipv6:
3390       cmd.append("--ipv6")
3391     else:
3392       cmd.append("--ipv4")
3393
3394     if opts.compress:
3395       cmd.append("--compress=%s" % opts.compress)
3396
3397     if opts.magic:
3398       cmd.append("--magic=%s" % opts.magic)
3399
3400     if exp_size is not None:
3401       cmd.append("--expected-size=%s" % exp_size)
3402
3403     if cmd_prefix:
3404       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3405
3406     if cmd_suffix:
3407       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3408
3409     if mode == constants.IEM_EXPORT:
3410       # Retry connection a few times when connecting to remote peer
3411       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3412       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3413     elif opts.connect_timeout is not None:
3414       assert mode == constants.IEM_IMPORT
3415       # Overall timeout for establishing connection while listening
3416       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3417
3418     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3419
3420     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3421     # support for receiving a file descriptor for output
3422     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3423                       output=logfile)
3424
3425     # The import/export name is simply the status directory name
3426     return os.path.basename(status_dir)
3427
3428   except Exception:
3429     shutil.rmtree(status_dir, ignore_errors=True)
3430     raise
3431
3432
3433 def GetImportExportStatus(names):
3434   """Returns import/export daemon status.
3435
3436   @type names: sequence
3437   @param names: List of names
3438   @rtype: List of dicts
3439   @return: Returns a list of the state of each named import/export or None if a
3440            status couldn't be read
3441
3442   """
3443   result = []
3444
3445   for name in names:
3446     status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3447                                  _IES_STATUS_FILE)
3448
3449     try:
3450       data = utils.ReadFile(status_file)
3451     except EnvironmentError, err:
3452       if err.errno != errno.ENOENT:
3453         raise
3454       data = None
3455
3456     if not data:
3457       result.append(None)
3458       continue
3459
3460     result.append(serializer.LoadJson(data))
3461
3462   return result
3463
3464
3465 def AbortImportExport(name):
3466   """Sends SIGTERM to a running import/export daemon.
3467
3468   """
3469   logging.info("Abort import/export %s", name)
3470
3471   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3472   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3473
3474   if pid:
3475     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3476                  name, pid)
3477     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3478
3479
3480 def CleanupImportExport(name):
3481   """Cleanup after an import or export.
3482
3483   If the import/export daemon is still running it's killed. Afterwards the
3484   whole status directory is removed.
3485
3486   """
3487   logging.info("Finalizing import/export %s", name)
3488
3489   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3490
3491   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3492
3493   if pid:
3494     logging.info("Import/export %s is still running with PID %s",
3495                  name, pid)
3496     utils.KillProcess(pid, waitpid=False)
3497
3498   shutil.rmtree(status_dir, ignore_errors=True)
3499
3500
3501 def _FindDisks(nodes_ip, disks):
3502   """Sets the physical ID on disks and returns the block devices.
3503
3504   """
3505   # set the correct physical ID
3506   my_name = netutils.Hostname.GetSysName()
3507   for cf in disks:
3508     cf.SetPhysicalID(my_name, nodes_ip)
3509
3510   bdevs = []
3511
3512   for cf in disks:
3513     rd = _RecursiveFindBD(cf)
3514     if rd is None:
3515       _Fail("Can't find device %s", cf)
3516     bdevs.append(rd)
3517   return bdevs
3518
3519
3520 def DrbdDisconnectNet(nodes_ip, disks):
3521   """Disconnects the network on a list of drbd devices.
3522
3523   """
3524   bdevs = _FindDisks(nodes_ip, disks)
3525
3526   # disconnect disks
3527   for rd in bdevs:
3528     try:
3529       rd.DisconnectNet()
3530     except errors.BlockDeviceError, err:
3531       _Fail("Can't change network configuration to standalone mode: %s",
3532             err, exc=True)
3533
3534
3535 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3536   """Attaches the network on a list of drbd devices.
3537
3538   """
3539   bdevs = _FindDisks(nodes_ip, disks)
3540
3541   if multimaster:
3542     for idx, rd in enumerate(bdevs):
3543       try:
3544         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3545       except EnvironmentError, err:
3546         _Fail("Can't create symlink: %s", err)
3547   # reconnect disks, switch to new master configuration and if
3548   # needed primary mode
3549   for rd in bdevs:
3550     try:
3551       rd.AttachNet(multimaster)
3552     except errors.BlockDeviceError, err:
3553       _Fail("Can't change network configuration: %s", err)
3554
3555   # wait until the disks are connected; we need to retry the re-attach
3556   # if the device becomes standalone, as this might happen if the one
3557   # node disconnects and reconnects in a different mode before the
3558   # other node reconnects; in this case, one or both of the nodes will
3559   # decide it has wrong configuration and switch to standalone
3560
3561   def _Attach():
3562     all_connected = True
3563
3564     for rd in bdevs:
3565       stats = rd.GetProcStatus()
3566
3567       all_connected = (all_connected and
3568                        (stats.is_connected or stats.is_in_resync))
3569
3570       if stats.is_standalone:
3571         # peer had different config info and this node became
3572         # standalone, even though this should not happen with the
3573         # new staged way of changing disk configs
3574         try:
3575           rd.AttachNet(multimaster)
3576         except errors.BlockDeviceError, err:
3577           _Fail("Can't change network configuration: %s", err)
3578
3579     if not all_connected:
3580       raise utils.RetryAgain()
3581
3582   try:
3583     # Start with a delay of 100 miliseconds and go up to 5 seconds
3584     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3585   except utils.RetryTimeout:
3586     _Fail("Timeout in disk reconnecting")
3587
3588   if multimaster:
3589     # change to primary mode
3590     for rd in bdevs:
3591       try:
3592         rd.Open()
3593       except errors.BlockDeviceError, err:
3594         _Fail("Can't change to primary mode: %s", err)
3595
3596
3597 def DrbdWaitSync(nodes_ip, disks):
3598   """Wait until DRBDs have synchronized.
3599
3600   """
3601   def _helper(rd):
3602     stats = rd.GetProcStatus()
3603     if not (stats.is_connected or stats.is_in_resync):
3604       raise utils.RetryAgain()
3605     return stats
3606
3607   bdevs = _FindDisks(nodes_ip, disks)
3608
3609   min_resync = 100
3610   alldone = True
3611   for rd in bdevs:
3612     try:
3613       # poll each second for 15 seconds
3614       stats = utils.Retry(_helper, 1, 15, args=[rd])
3615     except utils.RetryTimeout:
3616       stats = rd.GetProcStatus()
3617       # last check
3618       if not (stats.is_connected or stats.is_in_resync):
3619         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3620     alldone = alldone and (not stats.is_in_resync)
3621     if stats.sync_percent is not None:
3622       min_resync = min(min_resync, stats.sync_percent)
3623
3624   return (alldone, min_resync)
3625
3626
3627 def GetDrbdUsermodeHelper():
3628   """Returns DRBD usermode helper currently configured.
3629
3630   """
3631   try:
3632     return bdev.BaseDRBD.GetUsermodeHelper()
3633   except errors.BlockDeviceError, err:
3634     _Fail(str(err))
3635
3636
3637 def PowercycleNode(hypervisor_type):
3638   """Hard-powercycle the node.
3639
3640   Because we need to return first, and schedule the powercycle in the
3641   background, we won't be able to report failures nicely.
3642
3643   """
3644   hyper = hypervisor.GetHypervisor(hypervisor_type)
3645   try:
3646     pid = os.fork()
3647   except OSError:
3648     # if we can't fork, we'll pretend that we're in the child process
3649     pid = 0
3650   if pid > 0:
3651     return "Reboot scheduled in 5 seconds"
3652   # ensure the child is running on ram
3653   try:
3654     utils.Mlockall()
3655   except Exception: # pylint: disable=W0703
3656     pass
3657   time.sleep(5)
3658   hyper.PowercycleNode()
3659
3660
3661 def _VerifyRestrictedCmdName(cmd):
3662   """Verifies a remote command name.
3663
3664   @type cmd: string
3665   @param cmd: Command name
3666   @rtype: tuple; (boolean, string or None)
3667   @return: The tuple's first element is the status; if C{False}, the second
3668     element is an error message string, otherwise it's C{None}
3669
3670   """
3671   if not cmd.strip():
3672     return (False, "Missing command name")
3673
3674   if os.path.basename(cmd) != cmd:
3675     return (False, "Invalid command name")
3676
3677   if not constants.EXT_PLUGIN_MASK.match(cmd):
3678     return (False, "Command name contains forbidden characters")
3679
3680   return (True, None)
3681
3682
3683 def _CommonRestrictedCmdCheck(path, owner):
3684   """Common checks for remote command file system directories and files.
3685
3686   @type path: string
3687   @param path: Path to check
3688   @param owner: C{None} or tuple containing UID and GID
3689   @rtype: tuple; (boolean, string or C{os.stat} result)
3690   @return: The tuple's first element is the status; if C{False}, the second
3691     element is an error message string, otherwise it's the result of C{os.stat}
3692
3693   """
3694   if owner is None:
3695     # Default to root as owner
3696     owner = (0, 0)
3697
3698   try:
3699     st = os.stat(path)
3700   except EnvironmentError, err:
3701     return (False, "Can't stat(2) '%s': %s" % (path, err))
3702
3703   if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3704     return (False, "Permissions on '%s' are too permissive" % path)
3705
3706   if (st.st_uid, st.st_gid) != owner:
3707     (owner_uid, owner_gid) = owner
3708     return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3709
3710   return (True, st)
3711
3712
3713 def _VerifyRestrictedCmdDirectory(path, _owner=None):
3714   """Verifies remote command directory.
3715
3716   @type path: string
3717   @param path: Path to check
3718   @rtype: tuple; (boolean, string or None)
3719   @return: The tuple's first element is the status; if C{False}, the second
3720     element is an error message string, otherwise it's C{None}
3721
3722   """
3723   (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3724
3725   if not status:
3726     return (False, value)
3727
3728   if not stat.S_ISDIR(value.st_mode):
3729     return (False, "Path '%s' is not a directory" % path)
3730
3731   return (True, None)
3732
3733
3734 def _VerifyRestrictedCmd(path, cmd, _owner=None):
3735   """Verifies a whole remote command and returns its executable filename.
3736
3737   @type path: string
3738   @param path: Directory containing remote commands
3739   @type cmd: string
3740   @param cmd: Command name
3741   @rtype: tuple; (boolean, string)
3742   @return: The tuple's first element is the status; if C{False}, the second
3743     element is an error message string, otherwise the second element is the
3744     absolute path to the executable
3745
3746   """
3747   executable = utils.PathJoin(path, cmd)
3748
3749   (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
3750
3751   if not status:
3752     return (False, msg)
3753
3754   if not utils.IsExecutable(executable):
3755     return (False, "access(2) thinks '%s' can't be executed" % executable)
3756
3757   return (True, executable)
3758
3759
3760 def _PrepareRestrictedCmd(path, cmd,
3761                           _verify_dir=_VerifyRestrictedCmdDirectory,
3762                           _verify_name=_VerifyRestrictedCmdName,
3763                           _verify_cmd=_VerifyRestrictedCmd):
3764   """Performs a number of tests on a remote command.
3765
3766   @type path: string
3767   @param path: Directory containing remote commands
3768   @type cmd: string
3769   @param cmd: Command name
3770   @return: Same as L{_VerifyRestrictedCmd}
3771
3772   """
3773   # Verify the directory first
3774   (status, msg) = _verify_dir(path)
3775   if status:
3776     # Check command if everything was alright
3777     (status, msg) = _verify_name(cmd)
3778
3779   if not status:
3780     return (False, msg)
3781
3782   # Check actual executable
3783   return _verify_cmd(path, cmd)
3784
3785
3786 def RunRestrictedCmd(cmd,
3787                      _lock_timeout=_RCMD_LOCK_TIMEOUT,
3788                      _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
3789                      _path=pathutils.RESTRICTED_COMMANDS_DIR,
3790                      _sleep_fn=time.sleep,
3791                      _prepare_fn=_PrepareRestrictedCmd,
3792                      _runcmd_fn=utils.RunCmd,
3793                      _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
3794   """Executes a remote command after performing strict tests.
3795
3796   @type cmd: string
3797   @param cmd: Command name
3798   @rtype: string
3799   @return: Command output
3800   @raise RPCFail: In case of an error
3801
3802   """
3803   logging.info("Preparing to run remote command '%s'", cmd)
3804
3805   if not _enabled:
3806     _Fail("Remote commands disabled at configure time")
3807
3808   lock = None
3809   try:
3810     cmdresult = None
3811     try:
3812       lock = utils.FileLock.Open(_lock_file)
3813       lock.Exclusive(blocking=True, timeout=_lock_timeout)
3814
3815       (status, value) = _prepare_fn(_path, cmd)
3816
3817       if status:
3818         cmdresult = _runcmd_fn([value], env={}, reset_env=True,
3819                                postfork_fn=lambda _: lock.Unlock())
3820       else:
3821         logging.error(value)
3822     except Exception: # pylint: disable=W0703
3823       # Keep original error in log
3824       logging.exception("Caught exception")
3825
3826     if cmdresult is None:
3827       logging.info("Sleeping for %0.1f seconds before returning",
3828                    _RCMD_INVALID_DELAY)
3829       _sleep_fn(_RCMD_INVALID_DELAY)
3830
3831       # Do not include original error message in returned error
3832       _Fail("Executing command '%s' failed" % cmd)
3833     elif cmdresult.failed or cmdresult.fail_reason:
3834       _Fail("Remote command '%s' failed: %s; output: %s",
3835             cmd, cmdresult.fail_reason, cmdresult.output)
3836     else:
3837       return cmdresult.output
3838   finally:
3839     if lock is not None:
3840       # Release lock at last
3841       lock.Close()
3842       lock = None
3843
3844
3845 def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
3846   """Creates or removes the watcher pause file.
3847
3848   @type until: None or number
3849   @param until: Unix timestamp saying until when the watcher shouldn't run
3850
3851   """
3852   if until is None:
3853     logging.info("Received request to no longer pause watcher")
3854     utils.RemoveFile(_filename)
3855   else:
3856     logging.info("Received request to pause watcher until %s", until)
3857
3858     if not ht.TNumber(until):
3859       _Fail("Duration must be numeric")
3860
3861     utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
3862
3863
3864 class HooksRunner(object):
3865   """Hook runner.
3866
3867   This class is instantiated on the node side (ganeti-noded) and not
3868   on the master side.
3869
3870   """
3871   def __init__(self, hooks_base_dir=None):
3872     """Constructor for hooks runner.
3873
3874     @type hooks_base_dir: str or None
3875     @param hooks_base_dir: if not None, this overrides the
3876         L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3877
3878     """
3879     if hooks_base_dir is None:
3880       hooks_base_dir = pathutils.HOOKS_BASE_DIR
3881     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3882     # constant
3883     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3884
3885   def RunLocalHooks(self, node_list, hpath, phase, env):
3886     """Check that the hooks will be run only locally and then run them.
3887
3888     """
3889     assert len(node_list) == 1
3890     node = node_list[0]
3891     _, myself = ssconf.GetMasterAndMyself()
3892     assert node == myself
3893
3894     results = self.RunHooks(hpath, phase, env)
3895
3896     # Return values in the form expected by HooksMaster
3897     return {node: (None, False, results)}
3898
3899   def RunHooks(self, hpath, phase, env):
3900     """Run the scripts in the hooks directory.
3901
3902     @type hpath: str
3903     @param hpath: the path to the hooks directory which
3904         holds the scripts
3905     @type phase: str
3906     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3907         L{constants.HOOKS_PHASE_POST}
3908     @type env: dict
3909     @param env: dictionary with the environment for the hook
3910     @rtype: list
3911     @return: list of 3-element tuples:
3912       - script path
3913       - script result, either L{constants.HKR_SUCCESS} or
3914         L{constants.HKR_FAIL}
3915       - output of the script
3916
3917     @raise errors.ProgrammerError: for invalid input
3918         parameters
3919
3920     """
3921     if phase == constants.HOOKS_PHASE_PRE:
3922       suffix = "pre"
3923     elif phase == constants.HOOKS_PHASE_POST:
3924       suffix = "post"
3925     else:
3926       _Fail("Unknown hooks phase '%s'", phase)
3927
3928     subdir = "%s-%s.d" % (hpath, suffix)
3929     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3930
3931     results = []
3932
3933     if not os.path.isdir(dir_name):
3934       # for non-existing/non-dirs, we simply exit instead of logging a
3935       # warning at every operation
3936       return results
3937
3938     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3939
3940     for (relname, relstatus, runresult) in runparts_results:
3941       if relstatus == constants.RUNPARTS_SKIP:
3942         rrval = constants.HKR_SKIP
3943         output = ""
3944       elif relstatus == constants.RUNPARTS_ERR:
3945         rrval = constants.HKR_FAIL
3946         output = "Hook script execution error: %s" % runresult
3947       elif relstatus == constants.RUNPARTS_RUN:
3948         if runresult.failed:
3949           rrval = constants.HKR_FAIL
3950         else:
3951           rrval = constants.HKR_SUCCESS
3952         output = utils.SafeEncode(runresult.output.strip())
3953       results.append(("%s/%s" % (subdir, relname), rrval, output))
3954
3955     return results
3956
3957
3958 class IAllocatorRunner(object):
3959   """IAllocator runner.
3960
3961   This class is instantiated on the node side (ganeti-noded) and not on
3962   the master side.
3963
3964   """
3965   @staticmethod
3966   def Run(name, idata):
3967     """Run an iallocator script.
3968
3969     @type name: str
3970     @param name: the iallocator script name
3971     @type idata: str
3972     @param idata: the allocator input data
3973
3974     @rtype: tuple
3975     @return: two element tuple of:
3976        - status
3977        - either error message or stdout of allocator (for success)
3978
3979     """
3980     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3981                                   os.path.isfile)
3982     if alloc_script is None:
3983       _Fail("iallocator module '%s' not found in the search path", name)
3984
3985     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3986     try:
3987       os.write(fd, idata)
3988       os.close(fd)
3989       result = utils.RunCmd([alloc_script, fin_name])
3990       if result.failed:
3991         _Fail("iallocator module '%s' failed: %s, output '%s'",
3992               name, result.fail_reason, result.output)
3993     finally:
3994       os.unlink(fin_name)
3995
3996     return result.stdout
3997
3998
3999 class DevCacheManager(object):
4000   """Simple class for managing a cache of block device information.
4001
4002   """
4003   _DEV_PREFIX = "/dev/"
4004   _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4005
4006   @classmethod
4007   def _ConvertPath(cls, dev_path):
4008     """Converts a /dev/name path to the cache file name.
4009
4010     This replaces slashes with underscores and strips the /dev
4011     prefix. It then returns the full path to the cache file.
4012
4013     @type dev_path: str
4014     @param dev_path: the C{/dev/} path name
4015     @rtype: str
4016     @return: the converted path name
4017
4018     """
4019     if dev_path.startswith(cls._DEV_PREFIX):
4020       dev_path = dev_path[len(cls._DEV_PREFIX):]
4021     dev_path = dev_path.replace("/", "_")
4022     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4023     return fpath
4024
4025   @classmethod
4026   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4027     """Updates the cache information for a given device.
4028
4029     @type dev_path: str
4030     @param dev_path: the pathname of the device
4031     @type owner: str
4032     @param owner: the owner (instance name) of the device
4033     @type on_primary: bool
4034     @param on_primary: whether this is the primary
4035         node nor not
4036     @type iv_name: str
4037     @param iv_name: the instance-visible name of the
4038         device, as in objects.Disk.iv_name
4039
4040     @rtype: None
4041
4042     """
4043     if dev_path is None:
4044       logging.error("DevCacheManager.UpdateCache got a None dev_path")
4045       return
4046     fpath = cls._ConvertPath(dev_path)
4047     if on_primary:
4048       state = "primary"
4049     else:
4050       state = "secondary"
4051     if iv_name is None:
4052       iv_name = "not_visible"
4053     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4054     try:
4055       utils.WriteFile(fpath, data=fdata)
4056     except EnvironmentError, err:
4057       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4058
4059   @classmethod
4060   def RemoveCache(cls, dev_path):
4061     """Remove data for a dev_path.
4062
4063     This is just a wrapper over L{utils.io.RemoveFile} with a converted
4064     path name and logging.
4065
4066     @type dev_path: str
4067     @param dev_path: the pathname of the device
4068
4069     @rtype: None
4070
4071     """
4072     if dev_path is None:
4073       logging.error("DevCacheManager.RemoveCache got a None dev_path")
4074       return
4075     fpath = cls._ConvertPath(dev_path)
4076     try:
4077       utils.RemoveFile(fpath)
4078     except EnvironmentError, err:
4079       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)