Check minimum size of networks on creation
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
67 from ganeti import ht
68
69
70 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
71 _ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
72   pathutils.DATA_DIR,
73   pathutils.JOB_QUEUE_ARCHIVE_DIR,
74   pathutils.QUEUE_DIR,
75   pathutils.CRYPTO_KEYS_DIR,
76   ])
77 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
78 _X509_KEY_FILE = "key"
79 _X509_CERT_FILE = "cert"
80 _IES_STATUS_FILE = "status"
81 _IES_PID_FILE = "pid"
82 _IES_CA_FILE = "ca"
83
84 #: Valid LVS output line regex
85 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
86
87 # Actions for the master setup script
88 _MASTER_START = "start"
89 _MASTER_STOP = "stop"
90
91 #: Maximum file permissions for restricted command directory and executables
92 _RCMD_MAX_MODE = (stat.S_IRWXU |
93                   stat.S_IRGRP | stat.S_IXGRP |
94                   stat.S_IROTH | stat.S_IXOTH)
95
96 #: Delay before returning an error for restricted commands
97 _RCMD_INVALID_DELAY = 10
98
99 #: How long to wait to acquire lock for restricted commands (shorter than
100 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
101 #: command requests arrive
102 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
103
104
105 class RPCFail(Exception):
106   """Class denoting RPC failure.
107
108   Its argument is the error message.
109
110   """
111
112
113 def _Fail(msg, *args, **kwargs):
114   """Log an error and the raise an RPCFail exception.
115
116   This exception is then handled specially in the ganeti daemon and
117   turned into a 'failed' return type. As such, this function is a
118   useful shortcut for logging the error and returning it to the master
119   daemon.
120
121   @type msg: string
122   @param msg: the text of the exception
123   @raise RPCFail
124
125   """
126   if args:
127     msg = msg % args
128   if "log" not in kwargs or kwargs["log"]: # if we should log this error
129     if "exc" in kwargs and kwargs["exc"]:
130       logging.exception(msg)
131     else:
132       logging.error(msg)
133   raise RPCFail(msg)
134
135
136 def _GetConfig():
137   """Simple wrapper to return a SimpleStore.
138
139   @rtype: L{ssconf.SimpleStore}
140   @return: a SimpleStore instance
141
142   """
143   return ssconf.SimpleStore()
144
145
146 def _GetSshRunner(cluster_name):
147   """Simple wrapper to return an SshRunner.
148
149   @type cluster_name: str
150   @param cluster_name: the cluster name, which is needed
151       by the SshRunner constructor
152   @rtype: L{ssh.SshRunner}
153   @return: an SshRunner instance
154
155   """
156   return ssh.SshRunner(cluster_name)
157
158
159 def _Decompress(data):
160   """Unpacks data compressed by the RPC client.
161
162   @type data: list or tuple
163   @param data: Data sent by RPC client
164   @rtype: str
165   @return: Decompressed data
166
167   """
168   assert isinstance(data, (list, tuple))
169   assert len(data) == 2
170   (encoding, content) = data
171   if encoding == constants.RPC_ENCODING_NONE:
172     return content
173   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
174     return zlib.decompress(base64.b64decode(content))
175   else:
176     raise AssertionError("Unknown data encoding")
177
178
179 def _CleanDirectory(path, exclude=None):
180   """Removes all regular files in a directory.
181
182   @type path: str
183   @param path: the directory to clean
184   @type exclude: list
185   @param exclude: list of files to be excluded, defaults
186       to the empty list
187
188   """
189   if path not in _ALLOWED_CLEAN_DIRS:
190     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
191           path)
192
193   if not os.path.isdir(path):
194     return
195   if exclude is None:
196     exclude = []
197   else:
198     # Normalize excluded paths
199     exclude = [os.path.normpath(i) for i in exclude]
200
201   for rel_name in utils.ListVisibleFiles(path):
202     full_name = utils.PathJoin(path, rel_name)
203     if full_name in exclude:
204       continue
205     if os.path.isfile(full_name) and not os.path.islink(full_name):
206       utils.RemoveFile(full_name)
207
208
209 def _BuildUploadFileList():
210   """Build the list of allowed upload files.
211
212   This is abstracted so that it's built only once at module import time.
213
214   """
215   allowed_files = set([
216     pathutils.CLUSTER_CONF_FILE,
217     pathutils.ETC_HOSTS,
218     pathutils.SSH_KNOWN_HOSTS_FILE,
219     pathutils.VNC_PASSWORD_FILE,
220     pathutils.RAPI_CERT_FILE,
221     pathutils.SPICE_CERT_FILE,
222     pathutils.SPICE_CACERT_FILE,
223     pathutils.RAPI_USERS_FILE,
224     pathutils.CONFD_HMAC_KEY,
225     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
226     ])
227
228   for hv_name in constants.HYPER_TYPES:
229     hv_class = hypervisor.GetHypervisorClass(hv_name)
230     allowed_files.update(hv_class.GetAncillaryFiles()[0])
231
232   assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
233     "Allowed file storage paths should never be uploaded via RPC"
234
235   return frozenset(allowed_files)
236
237
238 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
239
240
241 def JobQueuePurge():
242   """Removes job queue files and archived jobs.
243
244   @rtype: tuple
245   @return: True, None
246
247   """
248   _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
249   _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
250
251
252 def GetMasterInfo():
253   """Returns master information.
254
255   This is an utility function to compute master information, either
256   for consumption here or from the node daemon.
257
258   @rtype: tuple
259   @return: master_netdev, master_ip, master_name, primary_ip_family,
260     master_netmask
261   @raise RPCFail: in case of errors
262
263   """
264   try:
265     cfg = _GetConfig()
266     master_netdev = cfg.GetMasterNetdev()
267     master_ip = cfg.GetMasterIP()
268     master_netmask = cfg.GetMasterNetmask()
269     master_node = cfg.GetMasterNode()
270     primary_ip_family = cfg.GetPrimaryIPFamily()
271   except errors.ConfigurationError, err:
272     _Fail("Cluster configuration incomplete: %s", err, exc=True)
273   return (master_netdev, master_ip, master_node, primary_ip_family,
274           master_netmask)
275
276
277 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
278   """Decorator that runs hooks before and after the decorated function.
279
280   @type hook_opcode: string
281   @param hook_opcode: opcode of the hook
282   @type hooks_path: string
283   @param hooks_path: path of the hooks
284   @type env_builder_fn: function
285   @param env_builder_fn: function that returns a dictionary containing the
286     environment variables for the hooks. Will get all the parameters of the
287     decorated function.
288   @raise RPCFail: in case of pre-hook failure
289
290   """
291   def decorator(fn):
292     def wrapper(*args, **kwargs):
293       _, myself = ssconf.GetMasterAndMyself()
294       nodes = ([myself], [myself])  # these hooks run locally
295
296       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
297
298       cfg = _GetConfig()
299       hr = HooksRunner()
300       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
301                             None, env_fn, logging.warning, cfg.GetClusterName(),
302                             cfg.GetMasterNode())
303
304       hm.RunPhase(constants.HOOKS_PHASE_PRE)
305       result = fn(*args, **kwargs)
306       hm.RunPhase(constants.HOOKS_PHASE_POST)
307
308       return result
309     return wrapper
310   return decorator
311
312
313 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
314   """Builds environment variables for master IP hooks.
315
316   @type master_params: L{objects.MasterNetworkParameters}
317   @param master_params: network parameters of the master
318   @type use_external_mip_script: boolean
319   @param use_external_mip_script: whether to use an external master IP
320     address setup script (unused, but necessary per the implementation of the
321     _RunLocalHooks decorator)
322
323   """
324   # pylint: disable=W0613
325   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
326   env = {
327     "MASTER_NETDEV": master_params.netdev,
328     "MASTER_IP": master_params.ip,
329     "MASTER_NETMASK": str(master_params.netmask),
330     "CLUSTER_IP_VERSION": str(ver),
331   }
332
333   return env
334
335
336 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
337   """Execute the master IP address setup script.
338
339   @type master_params: L{objects.MasterNetworkParameters}
340   @param master_params: network parameters of the master
341   @type action: string
342   @param action: action to pass to the script. Must be one of
343     L{backend._MASTER_START} or L{backend._MASTER_STOP}
344   @type use_external_mip_script: boolean
345   @param use_external_mip_script: whether to use an external master IP
346     address setup script
347   @raise backend.RPCFail: if there are errors during the execution of the
348     script
349
350   """
351   env = _BuildMasterIpEnv(master_params)
352
353   if use_external_mip_script:
354     setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
355   else:
356     setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
357
358   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
359
360   if result.failed:
361     _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
362           (action, result.exit_code, result.output), log=True)
363
364
365 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
366                _BuildMasterIpEnv)
367 def ActivateMasterIp(master_params, use_external_mip_script):
368   """Activate the IP address of the master daemon.
369
370   @type master_params: L{objects.MasterNetworkParameters}
371   @param master_params: network parameters of the master
372   @type use_external_mip_script: boolean
373   @param use_external_mip_script: whether to use an external master IP
374     address setup script
375   @raise RPCFail: in case of errors during the IP startup
376
377   """
378   _RunMasterSetupScript(master_params, _MASTER_START,
379                         use_external_mip_script)
380
381
382 def StartMasterDaemons(no_voting):
383   """Activate local node as master node.
384
385   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
386
387   @type no_voting: boolean
388   @param no_voting: whether to start ganeti-masterd without a node vote
389       but still non-interactively
390   @rtype: None
391
392   """
393
394   if no_voting:
395     masterd_args = "--no-voting --yes-do-it"
396   else:
397     masterd_args = ""
398
399   env = {
400     "EXTRA_MASTERD_ARGS": masterd_args,
401     }
402
403   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
404   if result.failed:
405     msg = "Can't start Ganeti master: %s" % result.output
406     logging.error(msg)
407     _Fail(msg)
408
409
410 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
411                _BuildMasterIpEnv)
412 def DeactivateMasterIp(master_params, use_external_mip_script):
413   """Deactivate the master IP on this node.
414
415   @type master_params: L{objects.MasterNetworkParameters}
416   @param master_params: network parameters of the master
417   @type use_external_mip_script: boolean
418   @param use_external_mip_script: whether to use an external master IP
419     address setup script
420   @raise RPCFail: in case of errors during the IP turndown
421
422   """
423   _RunMasterSetupScript(master_params, _MASTER_STOP,
424                         use_external_mip_script)
425
426
427 def StopMasterDaemons():
428   """Stop the master daemons on this node.
429
430   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
431
432   @rtype: None
433
434   """
435   # TODO: log and report back to the caller the error failures; we
436   # need to decide in which case we fail the RPC for this
437
438   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
439   if result.failed:
440     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
441                   " and error %s",
442                   result.cmd, result.exit_code, result.output)
443
444
445 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
446   """Change the netmask of the master IP.
447
448   @param old_netmask: the old value of the netmask
449   @param netmask: the new value of the netmask
450   @param master_ip: the master IP
451   @param master_netdev: the master network device
452
453   """
454   if old_netmask == netmask:
455     return
456
457   if not netutils.IPAddress.Own(master_ip):
458     _Fail("The master IP address is not up, not attempting to change its"
459           " netmask")
460
461   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
462                          "%s/%s" % (master_ip, netmask),
463                          "dev", master_netdev, "label",
464                          "%s:0" % master_netdev])
465   if result.failed:
466     _Fail("Could not set the new netmask on the master IP address")
467
468   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
469                          "%s/%s" % (master_ip, old_netmask),
470                          "dev", master_netdev, "label",
471                          "%s:0" % master_netdev])
472   if result.failed:
473     _Fail("Could not bring down the master IP address with the old netmask")
474
475
476 def EtcHostsModify(mode, host, ip):
477   """Modify a host entry in /etc/hosts.
478
479   @param mode: The mode to operate. Either add or remove entry
480   @param host: The host to operate on
481   @param ip: The ip associated with the entry
482
483   """
484   if mode == constants.ETC_HOSTS_ADD:
485     if not ip:
486       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
487               " present")
488     utils.AddHostToEtcHosts(host, ip)
489   elif mode == constants.ETC_HOSTS_REMOVE:
490     if ip:
491       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
492               " parameter is present")
493     utils.RemoveHostFromEtcHosts(host)
494   else:
495     RPCFail("Mode not supported")
496
497
498 def LeaveCluster(modify_ssh_setup):
499   """Cleans up and remove the current node.
500
501   This function cleans up and prepares the current node to be removed
502   from the cluster.
503
504   If processing is successful, then it raises an
505   L{errors.QuitGanetiException} which is used as a special case to
506   shutdown the node daemon.
507
508   @param modify_ssh_setup: boolean
509
510   """
511   _CleanDirectory(pathutils.DATA_DIR)
512   _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
513   JobQueuePurge()
514
515   if modify_ssh_setup:
516     try:
517       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
518
519       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
520
521       utils.RemoveFile(priv_key)
522       utils.RemoveFile(pub_key)
523     except errors.OpExecError:
524       logging.exception("Error while processing ssh files")
525
526   try:
527     utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
528     utils.RemoveFile(pathutils.RAPI_CERT_FILE)
529     utils.RemoveFile(pathutils.SPICE_CERT_FILE)
530     utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
531     utils.RemoveFile(pathutils.NODED_CERT_FILE)
532   except: # pylint: disable=W0702
533     logging.exception("Error while removing cluster secrets")
534
535   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
536   if result.failed:
537     logging.error("Command %s failed with exitcode %s and error %s",
538                   result.cmd, result.exit_code, result.output)
539
540   # Raise a custom exception (handled in ganeti-noded)
541   raise errors.QuitGanetiException(True, "Shutdown scheduled")
542
543
544 def _GetVgInfo(name, excl_stor):
545   """Retrieves information about a LVM volume group.
546
547   """
548   # TODO: GetVGInfo supports returning information for multiple VGs at once
549   vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
550   if vginfo:
551     vg_free = int(round(vginfo[0][0], 0))
552     vg_size = int(round(vginfo[0][1], 0))
553   else:
554     vg_free = None
555     vg_size = None
556
557   return {
558     "name": name,
559     "vg_free": vg_free,
560     "vg_size": vg_size,
561     }
562
563
564 def _GetHvInfo(name):
565   """Retrieves node information from a hypervisor.
566
567   The information returned depends on the hypervisor. Common items:
568
569     - vg_size is the size of the configured volume group in MiB
570     - vg_free is the free size of the volume group in MiB
571     - memory_dom0 is the memory allocated for domain0 in MiB
572     - memory_free is the currently available (free) ram in MiB
573     - memory_total is the total number of ram in MiB
574     - hv_version: the hypervisor version, if available
575
576   """
577   return hypervisor.GetHypervisor(name).GetNodeInfo()
578
579
580 def _GetNamedNodeInfo(names, fn):
581   """Calls C{fn} for all names in C{names} and returns a dictionary.
582
583   @rtype: None or dict
584
585   """
586   if names is None:
587     return None
588   else:
589     return map(fn, names)
590
591
592 def GetNodeInfo(vg_names, hv_names, excl_stor):
593   """Gives back a hash with different information about the node.
594
595   @type vg_names: list of string
596   @param vg_names: Names of the volume groups to ask for disk space information
597   @type hv_names: list of string
598   @param hv_names: Names of the hypervisors to ask for node information
599   @type excl_stor: boolean
600   @param excl_stor: Whether exclusive_storage is active
601   @rtype: tuple; (string, None/dict, None/dict)
602   @return: Tuple containing boot ID, volume group information and hypervisor
603     information
604
605   """
606   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
607   vg_info = _GetNamedNodeInfo(vg_names, (lambda vg: _GetVgInfo(vg, excl_stor)))
608   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
609
610   return (bootid, vg_info, hv_info)
611
612
613 def _CheckExclusivePvs(pvi_list):
614   """Check that PVs are not shared among LVs
615
616   @type pvi_list: list of L{objects.LvmPvInfo} objects
617   @param pvi_list: information about the PVs
618
619   @rtype: list of tuples (string, list of strings)
620   @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
621
622   """
623   res = []
624   for pvi in pvi_list:
625     if len(pvi.lv_list) > 1:
626       res.append((pvi.name, pvi.lv_list))
627   return res
628
629
630 def VerifyNode(what, cluster_name):
631   """Verify the status of the local node.
632
633   Based on the input L{what} parameter, various checks are done on the
634   local node.
635
636   If the I{filelist} key is present, this list of
637   files is checksummed and the file/checksum pairs are returned.
638
639   If the I{nodelist} key is present, we check that we have
640   connectivity via ssh with the target nodes (and check the hostname
641   report).
642
643   If the I{node-net-test} key is present, we check that we have
644   connectivity to the given nodes via both primary IP and, if
645   applicable, secondary IPs.
646
647   @type what: C{dict}
648   @param what: a dictionary of things to check:
649       - filelist: list of files for which to compute checksums
650       - nodelist: list of nodes we should check ssh communication with
651       - node-net-test: list of nodes we should check node daemon port
652         connectivity with
653       - hypervisor: list with hypervisors to run the verify for
654   @rtype: dict
655   @return: a dictionary with the same keys as the input dict, and
656       values representing the result of the checks
657
658   """
659   result = {}
660   my_name = netutils.Hostname.GetSysName()
661   port = netutils.GetDaemonPort(constants.NODED)
662   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
663
664   if constants.NV_HYPERVISOR in what and vm_capable:
665     result[constants.NV_HYPERVISOR] = tmp = {}
666     for hv_name in what[constants.NV_HYPERVISOR]:
667       try:
668         val = hypervisor.GetHypervisor(hv_name).Verify()
669       except errors.HypervisorError, err:
670         val = "Error while checking hypervisor: %s" % str(err)
671       tmp[hv_name] = val
672
673   if constants.NV_HVPARAMS in what and vm_capable:
674     result[constants.NV_HVPARAMS] = tmp = []
675     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
676       try:
677         logging.info("Validating hv %s, %s", hv_name, hvparms)
678         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
679       except errors.HypervisorError, err:
680         tmp.append((source, hv_name, str(err)))
681
682   if constants.NV_FILELIST in what:
683     fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
684                                               what[constants.NV_FILELIST]))
685     result[constants.NV_FILELIST] = \
686       dict((vcluster.MakeVirtualPath(key), value)
687            for (key, value) in fingerprints.items())
688
689   if constants.NV_NODELIST in what:
690     (nodes, bynode) = what[constants.NV_NODELIST]
691
692     # Add nodes from other groups (different for each node)
693     try:
694       nodes.extend(bynode[my_name])
695     except KeyError:
696       pass
697
698     # Use a random order
699     random.shuffle(nodes)
700
701     # Try to contact all nodes
702     val = {}
703     for node in nodes:
704       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
705       if not success:
706         val[node] = message
707
708     result[constants.NV_NODELIST] = val
709
710   if constants.NV_NODENETTEST in what:
711     result[constants.NV_NODENETTEST] = tmp = {}
712     my_pip = my_sip = None
713     for name, pip, sip in what[constants.NV_NODENETTEST]:
714       if name == my_name:
715         my_pip = pip
716         my_sip = sip
717         break
718     if not my_pip:
719       tmp[my_name] = ("Can't find my own primary/secondary IP"
720                       " in the node list")
721     else:
722       for name, pip, sip in what[constants.NV_NODENETTEST]:
723         fail = []
724         if not netutils.TcpPing(pip, port, source=my_pip):
725           fail.append("primary")
726         if sip != pip:
727           if not netutils.TcpPing(sip, port, source=my_sip):
728             fail.append("secondary")
729         if fail:
730           tmp[name] = ("failure using the %s interface(s)" %
731                        " and ".join(fail))
732
733   if constants.NV_MASTERIP in what:
734     # FIXME: add checks on incoming data structures (here and in the
735     # rest of the function)
736     master_name, master_ip = what[constants.NV_MASTERIP]
737     if master_name == my_name:
738       source = constants.IP4_ADDRESS_LOCALHOST
739     else:
740       source = None
741     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
742                                                      source=source)
743
744   if constants.NV_USERSCRIPTS in what:
745     result[constants.NV_USERSCRIPTS] = \
746       [script for script in what[constants.NV_USERSCRIPTS]
747        if not utils.IsExecutable(script)]
748
749   if constants.NV_OOB_PATHS in what:
750     result[constants.NV_OOB_PATHS] = tmp = []
751     for path in what[constants.NV_OOB_PATHS]:
752       try:
753         st = os.stat(path)
754       except OSError, err:
755         tmp.append("error stating out of band helper: %s" % err)
756       else:
757         if stat.S_ISREG(st.st_mode):
758           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
759             tmp.append(None)
760           else:
761             tmp.append("out of band helper %s is not executable" % path)
762         else:
763           tmp.append("out of band helper %s is not a file" % path)
764
765   if constants.NV_LVLIST in what and vm_capable:
766     try:
767       val = GetVolumeList(utils.ListVolumeGroups().keys())
768     except RPCFail, err:
769       val = str(err)
770     result[constants.NV_LVLIST] = val
771
772   if constants.NV_INSTANCELIST in what and vm_capable:
773     # GetInstanceList can fail
774     try:
775       val = GetInstanceList(what[constants.NV_INSTANCELIST])
776     except RPCFail, err:
777       val = str(err)
778     result[constants.NV_INSTANCELIST] = val
779
780   if constants.NV_VGLIST in what and vm_capable:
781     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
782
783   if constants.NV_PVLIST in what and vm_capable:
784     check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
785     val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
786                                        filter_allocatable=False,
787                                        include_lvs=check_exclusive_pvs)
788     if check_exclusive_pvs:
789       result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
790       for pvi in val:
791         # Avoid sending useless data on the wire
792         pvi.lv_list = []
793     result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
794
795   if constants.NV_VERSION in what:
796     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
797                                     constants.RELEASE_VERSION)
798
799   if constants.NV_HVINFO in what and vm_capable:
800     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
801     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
802
803   if constants.NV_DRBDLIST in what and vm_capable:
804     try:
805       used_minors = bdev.DRBD8.GetUsedDevs().keys()
806     except errors.BlockDeviceError, err:
807       logging.warning("Can't get used minors list", exc_info=True)
808       used_minors = str(err)
809     result[constants.NV_DRBDLIST] = used_minors
810
811   if constants.NV_DRBDHELPER in what and vm_capable:
812     status = True
813     try:
814       payload = bdev.BaseDRBD.GetUsermodeHelper()
815     except errors.BlockDeviceError, err:
816       logging.error("Can't get DRBD usermode helper: %s", str(err))
817       status = False
818       payload = str(err)
819     result[constants.NV_DRBDHELPER] = (status, payload)
820
821   if constants.NV_NODESETUP in what:
822     result[constants.NV_NODESETUP] = tmpr = []
823     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
824       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
825                   " under /sys, missing required directories /sys/block"
826                   " and /sys/class/net")
827     if (not os.path.isdir("/proc/sys") or
828         not os.path.isfile("/proc/sysrq-trigger")):
829       tmpr.append("The procfs filesystem doesn't seem to be mounted"
830                   " under /proc, missing required directory /proc/sys and"
831                   " the file /proc/sysrq-trigger")
832
833   if constants.NV_TIME in what:
834     result[constants.NV_TIME] = utils.SplitTime(time.time())
835
836   if constants.NV_OSLIST in what and vm_capable:
837     result[constants.NV_OSLIST] = DiagnoseOS()
838
839   if constants.NV_BRIDGES in what and vm_capable:
840     result[constants.NV_BRIDGES] = [bridge
841                                     for bridge in what[constants.NV_BRIDGES]
842                                     if not utils.BridgeExists(bridge)]
843
844   if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
845     result[constants.NV_FILE_STORAGE_PATHS] = \
846       bdev.ComputeWrongFileStoragePaths()
847
848   return result
849
850
851 def GetBlockDevSizes(devices):
852   """Return the size of the given block devices
853
854   @type devices: list
855   @param devices: list of block device nodes to query
856   @rtype: dict
857   @return:
858     dictionary of all block devices under /dev (key). The value is their
859     size in MiB.
860
861     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
862
863   """
864   DEV_PREFIX = "/dev/"
865   blockdevs = {}
866
867   for devpath in devices:
868     if not utils.IsBelowDir(DEV_PREFIX, devpath):
869       continue
870
871     try:
872       st = os.stat(devpath)
873     except EnvironmentError, err:
874       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
875       continue
876
877     if stat.S_ISBLK(st.st_mode):
878       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
879       if result.failed:
880         # We don't want to fail, just do not list this device as available
881         logging.warning("Cannot get size for block device %s", devpath)
882         continue
883
884       size = int(result.stdout) / (1024 * 1024)
885       blockdevs[devpath] = size
886   return blockdevs
887
888
889 def GetVolumeList(vg_names):
890   """Compute list of logical volumes and their size.
891
892   @type vg_names: list
893   @param vg_names: the volume groups whose LVs we should list, or
894       empty for all volume groups
895   @rtype: dict
896   @return:
897       dictionary of all partions (key) with value being a tuple of
898       their size (in MiB), inactive and online status::
899
900         {'xenvg/test1': ('20.06', True, True)}
901
902       in case of errors, a string is returned with the error
903       details.
904
905   """
906   lvs = {}
907   sep = "|"
908   if not vg_names:
909     vg_names = []
910   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
911                          "--separator=%s" % sep,
912                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
913   if result.failed:
914     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
915
916   for line in result.stdout.splitlines():
917     line = line.strip()
918     match = _LVSLINE_REGEX.match(line)
919     if not match:
920       logging.error("Invalid line returned from lvs output: '%s'", line)
921       continue
922     vg_name, name, size, attr = match.groups()
923     inactive = attr[4] == "-"
924     online = attr[5] == "o"
925     virtual = attr[0] == "v"
926     if virtual:
927       # we don't want to report such volumes as existing, since they
928       # don't really hold data
929       continue
930     lvs[vg_name + "/" + name] = (size, inactive, online)
931
932   return lvs
933
934
935 def ListVolumeGroups():
936   """List the volume groups and their size.
937
938   @rtype: dict
939   @return: dictionary with keys volume name and values the
940       size of the volume
941
942   """
943   return utils.ListVolumeGroups()
944
945
946 def NodeVolumes():
947   """List all volumes on this node.
948
949   @rtype: list
950   @return:
951     A list of dictionaries, each having four keys:
952       - name: the logical volume name,
953       - size: the size of the logical volume
954       - dev: the physical device on which the LV lives
955       - vg: the volume group to which it belongs
956
957     In case of errors, we return an empty list and log the
958     error.
959
960     Note that since a logical volume can live on multiple physical
961     volumes, the resulting list might include a logical volume
962     multiple times.
963
964   """
965   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
966                          "--separator=|",
967                          "--options=lv_name,lv_size,devices,vg_name"])
968   if result.failed:
969     _Fail("Failed to list logical volumes, lvs output: %s",
970           result.output)
971
972   def parse_dev(dev):
973     return dev.split("(")[0]
974
975   def handle_dev(dev):
976     return [parse_dev(x) for x in dev.split(",")]
977
978   def map_line(line):
979     line = [v.strip() for v in line]
980     return [{"name": line[0], "size": line[1],
981              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
982
983   all_devs = []
984   for line in result.stdout.splitlines():
985     if line.count("|") >= 3:
986       all_devs.extend(map_line(line.split("|")))
987     else:
988       logging.warning("Strange line in the output from lvs: '%s'", line)
989   return all_devs
990
991
992 def BridgesExist(bridges_list):
993   """Check if a list of bridges exist on the current node.
994
995   @rtype: boolean
996   @return: C{True} if all of them exist, C{False} otherwise
997
998   """
999   missing = []
1000   for bridge in bridges_list:
1001     if not utils.BridgeExists(bridge):
1002       missing.append(bridge)
1003
1004   if missing:
1005     _Fail("Missing bridges %s", utils.CommaJoin(missing))
1006
1007
1008 def GetInstanceList(hypervisor_list):
1009   """Provides a list of instances.
1010
1011   @type hypervisor_list: list
1012   @param hypervisor_list: the list of hypervisors to query information
1013
1014   @rtype: list
1015   @return: a list of all running instances on the current node
1016     - instance1.example.com
1017     - instance2.example.com
1018
1019   """
1020   results = []
1021   for hname in hypervisor_list:
1022     try:
1023       names = hypervisor.GetHypervisor(hname).ListInstances()
1024       results.extend(names)
1025     except errors.HypervisorError, err:
1026       _Fail("Error enumerating instances (hypervisor %s): %s",
1027             hname, err, exc=True)
1028
1029   return results
1030
1031
1032 def GetInstanceInfo(instance, hname):
1033   """Gives back the information about an instance as a dictionary.
1034
1035   @type instance: string
1036   @param instance: the instance name
1037   @type hname: string
1038   @param hname: the hypervisor type of the instance
1039
1040   @rtype: dict
1041   @return: dictionary with the following keys:
1042       - memory: memory size of instance (int)
1043       - state: xen state of instance (string)
1044       - time: cpu time of instance (float)
1045       - vcpus: the number of vcpus (int)
1046
1047   """
1048   output = {}
1049
1050   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1051   if iinfo is not None:
1052     output["memory"] = iinfo[2]
1053     output["vcpus"] = iinfo[3]
1054     output["state"] = iinfo[4]
1055     output["time"] = iinfo[5]
1056
1057   return output
1058
1059
1060 def GetInstanceMigratable(instance):
1061   """Gives whether an instance can be migrated.
1062
1063   @type instance: L{objects.Instance}
1064   @param instance: object representing the instance to be checked.
1065
1066   @rtype: tuple
1067   @return: tuple of (result, description) where:
1068       - result: whether the instance can be migrated or not
1069       - description: a description of the issue, if relevant
1070
1071   """
1072   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1073   iname = instance.name
1074   if iname not in hyper.ListInstances():
1075     _Fail("Instance %s is not running", iname)
1076
1077   for idx in range(len(instance.disks)):
1078     link_name = _GetBlockDevSymlinkPath(iname, idx)
1079     if not os.path.islink(link_name):
1080       logging.warning("Instance %s is missing symlink %s for disk %d",
1081                       iname, link_name, idx)
1082
1083
1084 def GetAllInstancesInfo(hypervisor_list):
1085   """Gather data about all instances.
1086
1087   This is the equivalent of L{GetInstanceInfo}, except that it
1088   computes data for all instances at once, thus being faster if one
1089   needs data about more than one instance.
1090
1091   @type hypervisor_list: list
1092   @param hypervisor_list: list of hypervisors to query for instance data
1093
1094   @rtype: dict
1095   @return: dictionary of instance: data, with data having the following keys:
1096       - memory: memory size of instance (int)
1097       - state: xen state of instance (string)
1098       - time: cpu time of instance (float)
1099       - vcpus: the number of vcpus
1100
1101   """
1102   output = {}
1103
1104   for hname in hypervisor_list:
1105     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1106     if iinfo:
1107       for name, _, memory, vcpus, state, times in iinfo:
1108         value = {
1109           "memory": memory,
1110           "vcpus": vcpus,
1111           "state": state,
1112           "time": times,
1113           }
1114         if name in output:
1115           # we only check static parameters, like memory and vcpus,
1116           # and not state and time which can change between the
1117           # invocations of the different hypervisors
1118           for key in "memory", "vcpus":
1119             if value[key] != output[name][key]:
1120               _Fail("Instance %s is running twice"
1121                     " with different parameters", name)
1122         output[name] = value
1123
1124   return output
1125
1126
1127 def _InstanceLogName(kind, os_name, instance, component):
1128   """Compute the OS log filename for a given instance and operation.
1129
1130   The instance name and os name are passed in as strings since not all
1131   operations have these as part of an instance object.
1132
1133   @type kind: string
1134   @param kind: the operation type (e.g. add, import, etc.)
1135   @type os_name: string
1136   @param os_name: the os name
1137   @type instance: string
1138   @param instance: the name of the instance being imported/added/etc.
1139   @type component: string or None
1140   @param component: the name of the component of the instance being
1141       transferred
1142
1143   """
1144   # TODO: Use tempfile.mkstemp to create unique filename
1145   if component:
1146     assert "/" not in component
1147     c_msg = "-%s" % component
1148   else:
1149     c_msg = ""
1150   base = ("%s-%s-%s%s-%s.log" %
1151           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1152   return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1153
1154
1155 def InstanceOsAdd(instance, reinstall, debug):
1156   """Add an OS to an instance.
1157
1158   @type instance: L{objects.Instance}
1159   @param instance: Instance whose OS is to be installed
1160   @type reinstall: boolean
1161   @param reinstall: whether this is an instance reinstall
1162   @type debug: integer
1163   @param debug: debug level, passed to the OS scripts
1164   @rtype: None
1165
1166   """
1167   inst_os = OSFromDisk(instance.os)
1168
1169   create_env = OSEnvironment(instance, inst_os, debug)
1170   if reinstall:
1171     create_env["INSTANCE_REINSTALL"] = "1"
1172
1173   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1174
1175   result = utils.RunCmd([inst_os.create_script], env=create_env,
1176                         cwd=inst_os.path, output=logfile, reset_env=True)
1177   if result.failed:
1178     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1179                   " output: %s", result.cmd, result.fail_reason, logfile,
1180                   result.output)
1181     lines = [utils.SafeEncode(val)
1182              for val in utils.TailFile(logfile, lines=20)]
1183     _Fail("OS create script failed (%s), last lines in the"
1184           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1185
1186
1187 def RunRenameInstance(instance, old_name, debug):
1188   """Run the OS rename script for an instance.
1189
1190   @type instance: L{objects.Instance}
1191   @param instance: Instance whose OS is to be installed
1192   @type old_name: string
1193   @param old_name: previous instance name
1194   @type debug: integer
1195   @param debug: debug level, passed to the OS scripts
1196   @rtype: boolean
1197   @return: the success of the operation
1198
1199   """
1200   inst_os = OSFromDisk(instance.os)
1201
1202   rename_env = OSEnvironment(instance, inst_os, debug)
1203   rename_env["OLD_INSTANCE_NAME"] = old_name
1204
1205   logfile = _InstanceLogName("rename", instance.os,
1206                              "%s-%s" % (old_name, instance.name), None)
1207
1208   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1209                         cwd=inst_os.path, output=logfile, reset_env=True)
1210
1211   if result.failed:
1212     logging.error("os create command '%s' returned error: %s output: %s",
1213                   result.cmd, result.fail_reason, result.output)
1214     lines = [utils.SafeEncode(val)
1215              for val in utils.TailFile(logfile, lines=20)]
1216     _Fail("OS rename script failed (%s), last lines in the"
1217           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1218
1219
1220 def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1221   """Returns symlink path for block device.
1222
1223   """
1224   if _dir is None:
1225     _dir = pathutils.DISK_LINKS_DIR
1226
1227   return utils.PathJoin(_dir,
1228                         ("%s%s%s" %
1229                          (instance_name, constants.DISK_SEPARATOR, idx)))
1230
1231
1232 def _SymlinkBlockDev(instance_name, device_path, idx):
1233   """Set up symlinks to a instance's block device.
1234
1235   This is an auxiliary function run when an instance is start (on the primary
1236   node) or when an instance is migrated (on the target node).
1237
1238
1239   @param instance_name: the name of the target instance
1240   @param device_path: path of the physical block device, on the node
1241   @param idx: the disk index
1242   @return: absolute path to the disk's symlink
1243
1244   """
1245   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1246   try:
1247     os.symlink(device_path, link_name)
1248   except OSError, err:
1249     if err.errno == errno.EEXIST:
1250       if (not os.path.islink(link_name) or
1251           os.readlink(link_name) != device_path):
1252         os.remove(link_name)
1253         os.symlink(device_path, link_name)
1254     else:
1255       raise
1256
1257   return link_name
1258
1259
1260 def _RemoveBlockDevLinks(instance_name, disks):
1261   """Remove the block device symlinks belonging to the given instance.
1262
1263   """
1264   for idx, _ in enumerate(disks):
1265     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1266     if os.path.islink(link_name):
1267       try:
1268         os.remove(link_name)
1269       except OSError:
1270         logging.exception("Can't remove symlink '%s'", link_name)
1271
1272
1273 def _GatherAndLinkBlockDevs(instance):
1274   """Set up an instance's block device(s).
1275
1276   This is run on the primary node at instance startup. The block
1277   devices must be already assembled.
1278
1279   @type instance: L{objects.Instance}
1280   @param instance: the instance whose disks we shoul assemble
1281   @rtype: list
1282   @return: list of (disk_object, device_path)
1283
1284   """
1285   block_devices = []
1286   for idx, disk in enumerate(instance.disks):
1287     device = _RecursiveFindBD(disk)
1288     if device is None:
1289       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1290                                     str(disk))
1291     device.Open()
1292     try:
1293       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1294     except OSError, e:
1295       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1296                                     e.strerror)
1297
1298     block_devices.append((disk, link_name))
1299
1300   return block_devices
1301
1302
1303 def StartInstance(instance, startup_paused):
1304   """Start an instance.
1305
1306   @type instance: L{objects.Instance}
1307   @param instance: the instance object
1308   @type startup_paused: bool
1309   @param instance: pause instance at startup?
1310   @rtype: None
1311
1312   """
1313   running_instances = GetInstanceList([instance.hypervisor])
1314
1315   if instance.name in running_instances:
1316     logging.info("Instance %s already running, not starting", instance.name)
1317     return
1318
1319   try:
1320     block_devices = _GatherAndLinkBlockDevs(instance)
1321     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1322     hyper.StartInstance(instance, block_devices, startup_paused)
1323   except errors.BlockDeviceError, err:
1324     _Fail("Block device error: %s", err, exc=True)
1325   except errors.HypervisorError, err:
1326     _RemoveBlockDevLinks(instance.name, instance.disks)
1327     _Fail("Hypervisor error: %s", err, exc=True)
1328
1329
1330 def InstanceShutdown(instance, timeout):
1331   """Shut an instance down.
1332
1333   @note: this functions uses polling with a hardcoded timeout.
1334
1335   @type instance: L{objects.Instance}
1336   @param instance: the instance object
1337   @type timeout: integer
1338   @param timeout: maximum timeout for soft shutdown
1339   @rtype: None
1340
1341   """
1342   hv_name = instance.hypervisor
1343   hyper = hypervisor.GetHypervisor(hv_name)
1344   iname = instance.name
1345
1346   if instance.name not in hyper.ListInstances():
1347     logging.info("Instance %s not running, doing nothing", iname)
1348     return
1349
1350   class _TryShutdown:
1351     def __init__(self):
1352       self.tried_once = False
1353
1354     def __call__(self):
1355       if iname not in hyper.ListInstances():
1356         return
1357
1358       try:
1359         hyper.StopInstance(instance, retry=self.tried_once)
1360       except errors.HypervisorError, err:
1361         if iname not in hyper.ListInstances():
1362           # if the instance is no longer existing, consider this a
1363           # success and go to cleanup
1364           return
1365
1366         _Fail("Failed to stop instance %s: %s", iname, err)
1367
1368       self.tried_once = True
1369
1370       raise utils.RetryAgain()
1371
1372   try:
1373     utils.Retry(_TryShutdown(), 5, timeout)
1374   except utils.RetryTimeout:
1375     # the shutdown did not succeed
1376     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1377
1378     try:
1379       hyper.StopInstance(instance, force=True)
1380     except errors.HypervisorError, err:
1381       if iname in hyper.ListInstances():
1382         # only raise an error if the instance still exists, otherwise
1383         # the error could simply be "instance ... unknown"!
1384         _Fail("Failed to force stop instance %s: %s", iname, err)
1385
1386     time.sleep(1)
1387
1388     if iname in hyper.ListInstances():
1389       _Fail("Could not shutdown instance %s even by destroy", iname)
1390
1391   try:
1392     hyper.CleanupInstance(instance.name)
1393   except errors.HypervisorError, err:
1394     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1395
1396   _RemoveBlockDevLinks(iname, instance.disks)
1397
1398
1399 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1400   """Reboot an instance.
1401
1402   @type instance: L{objects.Instance}
1403   @param instance: the instance object to reboot
1404   @type reboot_type: str
1405   @param reboot_type: the type of reboot, one the following
1406     constants:
1407       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1408         instance OS, do not recreate the VM
1409       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1410         restart the VM (at the hypervisor level)
1411       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1412         not accepted here, since that mode is handled differently, in
1413         cmdlib, and translates into full stop and start of the
1414         instance (instead of a call_instance_reboot RPC)
1415   @type shutdown_timeout: integer
1416   @param shutdown_timeout: maximum timeout for soft shutdown
1417   @rtype: None
1418
1419   """
1420   running_instances = GetInstanceList([instance.hypervisor])
1421
1422   if instance.name not in running_instances:
1423     _Fail("Cannot reboot instance %s that is not running", instance.name)
1424
1425   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1426   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1427     try:
1428       hyper.RebootInstance(instance)
1429     except errors.HypervisorError, err:
1430       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1431   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1432     try:
1433       InstanceShutdown(instance, shutdown_timeout)
1434       return StartInstance(instance, False)
1435     except errors.HypervisorError, err:
1436       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1437   else:
1438     _Fail("Invalid reboot_type received: %s", reboot_type)
1439
1440
1441 def InstanceBalloonMemory(instance, memory):
1442   """Resize an instance's memory.
1443
1444   @type instance: L{objects.Instance}
1445   @param instance: the instance object
1446   @type memory: int
1447   @param memory: new memory amount in MB
1448   @rtype: None
1449
1450   """
1451   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1452   running = hyper.ListInstances()
1453   if instance.name not in running:
1454     logging.info("Instance %s is not running, cannot balloon", instance.name)
1455     return
1456   try:
1457     hyper.BalloonInstanceMemory(instance, memory)
1458   except errors.HypervisorError, err:
1459     _Fail("Failed to balloon instance memory: %s", err, exc=True)
1460
1461
1462 def MigrationInfo(instance):
1463   """Gather information about an instance to be migrated.
1464
1465   @type instance: L{objects.Instance}
1466   @param instance: the instance definition
1467
1468   """
1469   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1470   try:
1471     info = hyper.MigrationInfo(instance)
1472   except errors.HypervisorError, err:
1473     _Fail("Failed to fetch migration information: %s", err, exc=True)
1474   return info
1475
1476
1477 def AcceptInstance(instance, info, target):
1478   """Prepare the node to accept an instance.
1479
1480   @type instance: L{objects.Instance}
1481   @param instance: the instance definition
1482   @type info: string/data (opaque)
1483   @param info: migration information, from the source node
1484   @type target: string
1485   @param target: target host (usually ip), on this node
1486
1487   """
1488   # TODO: why is this required only for DTS_EXT_MIRROR?
1489   if instance.disk_template in constants.DTS_EXT_MIRROR:
1490     # Create the symlinks, as the disks are not active
1491     # in any way
1492     try:
1493       _GatherAndLinkBlockDevs(instance)
1494     except errors.BlockDeviceError, err:
1495       _Fail("Block device error: %s", err, exc=True)
1496
1497   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1498   try:
1499     hyper.AcceptInstance(instance, info, target)
1500   except errors.HypervisorError, err:
1501     if instance.disk_template in constants.DTS_EXT_MIRROR:
1502       _RemoveBlockDevLinks(instance.name, instance.disks)
1503     _Fail("Failed to accept instance: %s", err, exc=True)
1504
1505
1506 def FinalizeMigrationDst(instance, info, success):
1507   """Finalize any preparation to accept an instance.
1508
1509   @type instance: L{objects.Instance}
1510   @param instance: the instance definition
1511   @type info: string/data (opaque)
1512   @param info: migration information, from the source node
1513   @type success: boolean
1514   @param success: whether the migration was a success or a failure
1515
1516   """
1517   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1518   try:
1519     hyper.FinalizeMigrationDst(instance, info, success)
1520   except errors.HypervisorError, err:
1521     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1522
1523
1524 def MigrateInstance(instance, target, live):
1525   """Migrates an instance to another node.
1526
1527   @type instance: L{objects.Instance}
1528   @param instance: the instance definition
1529   @type target: string
1530   @param target: the target node name
1531   @type live: boolean
1532   @param live: whether the migration should be done live or not (the
1533       interpretation of this parameter is left to the hypervisor)
1534   @raise RPCFail: if migration fails for some reason
1535
1536   """
1537   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1538
1539   try:
1540     hyper.MigrateInstance(instance, target, live)
1541   except errors.HypervisorError, err:
1542     _Fail("Failed to migrate instance: %s", err, exc=True)
1543
1544
1545 def FinalizeMigrationSource(instance, success, live):
1546   """Finalize the instance migration on the source node.
1547
1548   @type instance: L{objects.Instance}
1549   @param instance: the instance definition of the migrated instance
1550   @type success: bool
1551   @param success: whether the migration succeeded or not
1552   @type live: bool
1553   @param live: whether the user requested a live migration or not
1554   @raise RPCFail: If the execution fails for some reason
1555
1556   """
1557   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1558
1559   try:
1560     hyper.FinalizeMigrationSource(instance, success, live)
1561   except Exception, err:  # pylint: disable=W0703
1562     _Fail("Failed to finalize the migration on the source node: %s", err,
1563           exc=True)
1564
1565
1566 def GetMigrationStatus(instance):
1567   """Get the migration status
1568
1569   @type instance: L{objects.Instance}
1570   @param instance: the instance that is being migrated
1571   @rtype: L{objects.MigrationStatus}
1572   @return: the status of the current migration (one of
1573            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1574            progress info that can be retrieved from the hypervisor
1575   @raise RPCFail: If the migration status cannot be retrieved
1576
1577   """
1578   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1579   try:
1580     return hyper.GetMigrationStatus(instance)
1581   except Exception, err:  # pylint: disable=W0703
1582     _Fail("Failed to get migration status: %s", err, exc=True)
1583
1584
1585 def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1586   """Creates a block device for an instance.
1587
1588   @type disk: L{objects.Disk}
1589   @param disk: the object describing the disk we should create
1590   @type size: int
1591   @param size: the size of the physical underlying device, in MiB
1592   @type owner: str
1593   @param owner: the name of the instance for which disk is created,
1594       used for device cache data
1595   @type on_primary: boolean
1596   @param on_primary:  indicates if it is the primary node or not
1597   @type info: string
1598   @param info: string that will be sent to the physical device
1599       creation, used for example to set (LVM) tags on LVs
1600   @type excl_stor: boolean
1601   @param excl_stor: Whether exclusive_storage is active
1602
1603   @return: the new unique_id of the device (this can sometime be
1604       computed only after creation), or None. On secondary nodes,
1605       it's not required to return anything.
1606
1607   """
1608   # TODO: remove the obsolete "size" argument
1609   # pylint: disable=W0613
1610   clist = []
1611   if disk.children:
1612     for child in disk.children:
1613       try:
1614         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1615       except errors.BlockDeviceError, err:
1616         _Fail("Can't assemble device %s: %s", child, err)
1617       if on_primary or disk.AssembleOnSecondary():
1618         # we need the children open in case the device itself has to
1619         # be assembled
1620         try:
1621           # pylint: disable=E1103
1622           crdev.Open()
1623         except errors.BlockDeviceError, err:
1624           _Fail("Can't make child '%s' read-write: %s", child, err)
1625       clist.append(crdev)
1626
1627   try:
1628     device = bdev.Create(disk, clist, excl_stor)
1629   except errors.BlockDeviceError, err:
1630     _Fail("Can't create block device: %s", err)
1631
1632   if on_primary or disk.AssembleOnSecondary():
1633     try:
1634       device.Assemble()
1635     except errors.BlockDeviceError, err:
1636       _Fail("Can't assemble device after creation, unusual event: %s", err)
1637     if on_primary or disk.OpenOnSecondary():
1638       try:
1639         device.Open(force=True)
1640       except errors.BlockDeviceError, err:
1641         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1642     DevCacheManager.UpdateCache(device.dev_path, owner,
1643                                 on_primary, disk.iv_name)
1644
1645   device.SetInfo(info)
1646
1647   return device.unique_id
1648
1649
1650 def _WipeDevice(path, offset, size):
1651   """This function actually wipes the device.
1652
1653   @param path: The path to the device to wipe
1654   @param offset: The offset in MiB in the file
1655   @param size: The size in MiB to write
1656
1657   """
1658   # Internal sizes are always in Mebibytes; if the following "dd" command
1659   # should use a different block size the offset and size given to this
1660   # function must be adjusted accordingly before being passed to "dd".
1661   block_size = 1024 * 1024
1662
1663   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1664          "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1665          "count=%d" % size]
1666   result = utils.RunCmd(cmd)
1667
1668   if result.failed:
1669     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1670           result.fail_reason, result.output)
1671
1672
1673 def BlockdevWipe(disk, offset, size):
1674   """Wipes a block device.
1675
1676   @type disk: L{objects.Disk}
1677   @param disk: the disk object we want to wipe
1678   @type offset: int
1679   @param offset: The offset in MiB in the file
1680   @type size: int
1681   @param size: The size in MiB to write
1682
1683   """
1684   try:
1685     rdev = _RecursiveFindBD(disk)
1686   except errors.BlockDeviceError:
1687     rdev = None
1688
1689   if not rdev:
1690     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1691
1692   # Do cross verify some of the parameters
1693   if offset < 0:
1694     _Fail("Negative offset")
1695   if size < 0:
1696     _Fail("Negative size")
1697   if offset > rdev.size:
1698     _Fail("Offset is bigger than device size")
1699   if (offset + size) > rdev.size:
1700     _Fail("The provided offset and size to wipe is bigger than device size")
1701
1702   _WipeDevice(rdev.dev_path, offset, size)
1703
1704
1705 def BlockdevPauseResumeSync(disks, pause):
1706   """Pause or resume the sync of the block device.
1707
1708   @type disks: list of L{objects.Disk}
1709   @param disks: the disks object we want to pause/resume
1710   @type pause: bool
1711   @param pause: Wheater to pause or resume
1712
1713   """
1714   success = []
1715   for disk in disks:
1716     try:
1717       rdev = _RecursiveFindBD(disk)
1718     except errors.BlockDeviceError:
1719       rdev = None
1720
1721     if not rdev:
1722       success.append((False, ("Cannot change sync for device %s:"
1723                               " device not found" % disk.iv_name)))
1724       continue
1725
1726     result = rdev.PauseResumeSync(pause)
1727
1728     if result:
1729       success.append((result, None))
1730     else:
1731       if pause:
1732         msg = "Pause"
1733       else:
1734         msg = "Resume"
1735       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1736
1737   return success
1738
1739
1740 def BlockdevRemove(disk):
1741   """Remove a block device.
1742
1743   @note: This is intended to be called recursively.
1744
1745   @type disk: L{objects.Disk}
1746   @param disk: the disk object we should remove
1747   @rtype: boolean
1748   @return: the success of the operation
1749
1750   """
1751   msgs = []
1752   try:
1753     rdev = _RecursiveFindBD(disk)
1754   except errors.BlockDeviceError, err:
1755     # probably can't attach
1756     logging.info("Can't attach to device %s in remove", disk)
1757     rdev = None
1758   if rdev is not None:
1759     r_path = rdev.dev_path
1760     try:
1761       rdev.Remove()
1762     except errors.BlockDeviceError, err:
1763       msgs.append(str(err))
1764     if not msgs:
1765       DevCacheManager.RemoveCache(r_path)
1766
1767   if disk.children:
1768     for child in disk.children:
1769       try:
1770         BlockdevRemove(child)
1771       except RPCFail, err:
1772         msgs.append(str(err))
1773
1774   if msgs:
1775     _Fail("; ".join(msgs))
1776
1777
1778 def _RecursiveAssembleBD(disk, owner, as_primary):
1779   """Activate a block device for an instance.
1780
1781   This is run on the primary and secondary nodes for an instance.
1782
1783   @note: this function is called recursively.
1784
1785   @type disk: L{objects.Disk}
1786   @param disk: the disk we try to assemble
1787   @type owner: str
1788   @param owner: the name of the instance which owns the disk
1789   @type as_primary: boolean
1790   @param as_primary: if we should make the block device
1791       read/write
1792
1793   @return: the assembled device or None (in case no device
1794       was assembled)
1795   @raise errors.BlockDeviceError: in case there is an error
1796       during the activation of the children or the device
1797       itself
1798
1799   """
1800   children = []
1801   if disk.children:
1802     mcn = disk.ChildrenNeeded()
1803     if mcn == -1:
1804       mcn = 0 # max number of Nones allowed
1805     else:
1806       mcn = len(disk.children) - mcn # max number of Nones
1807     for chld_disk in disk.children:
1808       try:
1809         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1810       except errors.BlockDeviceError, err:
1811         if children.count(None) >= mcn:
1812           raise
1813         cdev = None
1814         logging.error("Error in child activation (but continuing): %s",
1815                       str(err))
1816       children.append(cdev)
1817
1818   if as_primary or disk.AssembleOnSecondary():
1819     r_dev = bdev.Assemble(disk, children)
1820     result = r_dev
1821     if as_primary or disk.OpenOnSecondary():
1822       r_dev.Open()
1823     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1824                                 as_primary, disk.iv_name)
1825
1826   else:
1827     result = True
1828   return result
1829
1830
1831 def BlockdevAssemble(disk, owner, as_primary, idx):
1832   """Activate a block device for an instance.
1833
1834   This is a wrapper over _RecursiveAssembleBD.
1835
1836   @rtype: str or boolean
1837   @return: a C{/dev/...} path for primary nodes, and
1838       C{True} for secondary nodes
1839
1840   """
1841   try:
1842     result = _RecursiveAssembleBD(disk, owner, as_primary)
1843     if isinstance(result, bdev.BlockDev):
1844       # pylint: disable=E1103
1845       result = result.dev_path
1846       if as_primary:
1847         _SymlinkBlockDev(owner, result, idx)
1848   except errors.BlockDeviceError, err:
1849     _Fail("Error while assembling disk: %s", err, exc=True)
1850   except OSError, err:
1851     _Fail("Error while symlinking disk: %s", err, exc=True)
1852
1853   return result
1854
1855
1856 def BlockdevShutdown(disk):
1857   """Shut down a block device.
1858
1859   First, if the device is assembled (Attach() is successful), then
1860   the device is shutdown. Then the children of the device are
1861   shutdown.
1862
1863   This function is called recursively. Note that we don't cache the
1864   children or such, as oppossed to assemble, shutdown of different
1865   devices doesn't require that the upper device was active.
1866
1867   @type disk: L{objects.Disk}
1868   @param disk: the description of the disk we should
1869       shutdown
1870   @rtype: None
1871
1872   """
1873   msgs = []
1874   r_dev = _RecursiveFindBD(disk)
1875   if r_dev is not None:
1876     r_path = r_dev.dev_path
1877     try:
1878       r_dev.Shutdown()
1879       DevCacheManager.RemoveCache(r_path)
1880     except errors.BlockDeviceError, err:
1881       msgs.append(str(err))
1882
1883   if disk.children:
1884     for child in disk.children:
1885       try:
1886         BlockdevShutdown(child)
1887       except RPCFail, err:
1888         msgs.append(str(err))
1889
1890   if msgs:
1891     _Fail("; ".join(msgs))
1892
1893
1894 def BlockdevAddchildren(parent_cdev, new_cdevs):
1895   """Extend a mirrored block device.
1896
1897   @type parent_cdev: L{objects.Disk}
1898   @param parent_cdev: the disk to which we should add children
1899   @type new_cdevs: list of L{objects.Disk}
1900   @param new_cdevs: the list of children which we should add
1901   @rtype: None
1902
1903   """
1904   parent_bdev = _RecursiveFindBD(parent_cdev)
1905   if parent_bdev is None:
1906     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1907   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1908   if new_bdevs.count(None) > 0:
1909     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1910   parent_bdev.AddChildren(new_bdevs)
1911
1912
1913 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1914   """Shrink a mirrored block device.
1915
1916   @type parent_cdev: L{objects.Disk}
1917   @param parent_cdev: the disk from which we should remove children
1918   @type new_cdevs: list of L{objects.Disk}
1919   @param new_cdevs: the list of children which we should remove
1920   @rtype: None
1921
1922   """
1923   parent_bdev = _RecursiveFindBD(parent_cdev)
1924   if parent_bdev is None:
1925     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1926   devs = []
1927   for disk in new_cdevs:
1928     rpath = disk.StaticDevPath()
1929     if rpath is None:
1930       bd = _RecursiveFindBD(disk)
1931       if bd is None:
1932         _Fail("Can't find device %s while removing children", disk)
1933       else:
1934         devs.append(bd.dev_path)
1935     else:
1936       if not utils.IsNormAbsPath(rpath):
1937         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1938       devs.append(rpath)
1939   parent_bdev.RemoveChildren(devs)
1940
1941
1942 def BlockdevGetmirrorstatus(disks):
1943   """Get the mirroring status of a list of devices.
1944
1945   @type disks: list of L{objects.Disk}
1946   @param disks: the list of disks which we should query
1947   @rtype: disk
1948   @return: List of L{objects.BlockDevStatus}, one for each disk
1949   @raise errors.BlockDeviceError: if any of the disks cannot be
1950       found
1951
1952   """
1953   stats = []
1954   for dsk in disks:
1955     rbd = _RecursiveFindBD(dsk)
1956     if rbd is None:
1957       _Fail("Can't find device %s", dsk)
1958
1959     stats.append(rbd.CombinedSyncStatus())
1960
1961   return stats
1962
1963
1964 def BlockdevGetmirrorstatusMulti(disks):
1965   """Get the mirroring status of a list of devices.
1966
1967   @type disks: list of L{objects.Disk}
1968   @param disks: the list of disks which we should query
1969   @rtype: disk
1970   @return: List of tuples, (bool, status), one for each disk; bool denotes
1971     success/failure, status is L{objects.BlockDevStatus} on success, string
1972     otherwise
1973
1974   """
1975   result = []
1976   for disk in disks:
1977     try:
1978       rbd = _RecursiveFindBD(disk)
1979       if rbd is None:
1980         result.append((False, "Can't find device %s" % disk))
1981         continue
1982
1983       status = rbd.CombinedSyncStatus()
1984     except errors.BlockDeviceError, err:
1985       logging.exception("Error while getting disk status")
1986       result.append((False, str(err)))
1987     else:
1988       result.append((True, status))
1989
1990   assert len(disks) == len(result)
1991
1992   return result
1993
1994
1995 def _RecursiveFindBD(disk):
1996   """Check if a device is activated.
1997
1998   If so, return information about the real device.
1999
2000   @type disk: L{objects.Disk}
2001   @param disk: the disk object we need to find
2002
2003   @return: None if the device can't be found,
2004       otherwise the device instance
2005
2006   """
2007   children = []
2008   if disk.children:
2009     for chdisk in disk.children:
2010       children.append(_RecursiveFindBD(chdisk))
2011
2012   return bdev.FindDevice(disk, children)
2013
2014
2015 def _OpenRealBD(disk):
2016   """Opens the underlying block device of a disk.
2017
2018   @type disk: L{objects.Disk}
2019   @param disk: the disk object we want to open
2020
2021   """
2022   real_disk = _RecursiveFindBD(disk)
2023   if real_disk is None:
2024     _Fail("Block device '%s' is not set up", disk)
2025
2026   real_disk.Open()
2027
2028   return real_disk
2029
2030
2031 def BlockdevFind(disk):
2032   """Check if a device is activated.
2033
2034   If it is, return information about the real device.
2035
2036   @type disk: L{objects.Disk}
2037   @param disk: the disk to find
2038   @rtype: None or objects.BlockDevStatus
2039   @return: None if the disk cannot be found, otherwise a the current
2040            information
2041
2042   """
2043   try:
2044     rbd = _RecursiveFindBD(disk)
2045   except errors.BlockDeviceError, err:
2046     _Fail("Failed to find device: %s", err, exc=True)
2047
2048   if rbd is None:
2049     return None
2050
2051   return rbd.GetSyncStatus()
2052
2053
2054 def BlockdevGetsize(disks):
2055   """Computes the size of the given disks.
2056
2057   If a disk is not found, returns None instead.
2058
2059   @type disks: list of L{objects.Disk}
2060   @param disks: the list of disk to compute the size for
2061   @rtype: list
2062   @return: list with elements None if the disk cannot be found,
2063       otherwise the size
2064
2065   """
2066   result = []
2067   for cf in disks:
2068     try:
2069       rbd = _RecursiveFindBD(cf)
2070     except errors.BlockDeviceError:
2071       result.append(None)
2072       continue
2073     if rbd is None:
2074       result.append(None)
2075     else:
2076       result.append(rbd.GetActualSize())
2077   return result
2078
2079
2080 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2081   """Export a block device to a remote node.
2082
2083   @type disk: L{objects.Disk}
2084   @param disk: the description of the disk to export
2085   @type dest_node: str
2086   @param dest_node: the destination node to export to
2087   @type dest_path: str
2088   @param dest_path: the destination path on the target node
2089   @type cluster_name: str
2090   @param cluster_name: the cluster name, needed for SSH hostalias
2091   @rtype: None
2092
2093   """
2094   real_disk = _OpenRealBD(disk)
2095
2096   # the block size on the read dd is 1MiB to match our units
2097   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2098                                "dd if=%s bs=1048576 count=%s",
2099                                real_disk.dev_path, str(disk.size))
2100
2101   # we set here a smaller block size as, due to ssh buffering, more
2102   # than 64-128k will mostly ignored; we use nocreat to fail if the
2103   # device is not already there or we pass a wrong path; we use
2104   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2105   # to not buffer too much memory; this means that at best, we flush
2106   # every 64k, which will not be very fast
2107   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2108                                 " oflag=dsync", dest_path)
2109
2110   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2111                                                    constants.SSH_LOGIN_USER,
2112                                                    destcmd)
2113
2114   # all commands have been checked, so we're safe to combine them
2115   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2116
2117   result = utils.RunCmd(["bash", "-c", command])
2118
2119   if result.failed:
2120     _Fail("Disk copy command '%s' returned error: %s"
2121           " output: %s", command, result.fail_reason, result.output)
2122
2123
2124 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2125   """Write a file to the filesystem.
2126
2127   This allows the master to overwrite(!) a file. It will only perform
2128   the operation if the file belongs to a list of configuration files.
2129
2130   @type file_name: str
2131   @param file_name: the target file name
2132   @type data: str
2133   @param data: the new contents of the file
2134   @type mode: int
2135   @param mode: the mode to give the file (can be None)
2136   @type uid: string
2137   @param uid: the owner of the file
2138   @type gid: string
2139   @param gid: the group of the file
2140   @type atime: float
2141   @param atime: the atime to set on the file (can be None)
2142   @type mtime: float
2143   @param mtime: the mtime to set on the file (can be None)
2144   @rtype: None
2145
2146   """
2147   file_name = vcluster.LocalizeVirtualPath(file_name)
2148
2149   if not os.path.isabs(file_name):
2150     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2151
2152   if file_name not in _ALLOWED_UPLOAD_FILES:
2153     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2154           file_name)
2155
2156   raw_data = _Decompress(data)
2157
2158   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2159     _Fail("Invalid username/groupname type")
2160
2161   getents = runtime.GetEnts()
2162   uid = getents.LookupUser(uid)
2163   gid = getents.LookupGroup(gid)
2164
2165   utils.SafeWriteFile(file_name, None,
2166                       data=raw_data, mode=mode, uid=uid, gid=gid,
2167                       atime=atime, mtime=mtime)
2168
2169
2170 def RunOob(oob_program, command, node, timeout):
2171   """Executes oob_program with given command on given node.
2172
2173   @param oob_program: The path to the executable oob_program
2174   @param command: The command to invoke on oob_program
2175   @param node: The node given as an argument to the program
2176   @param timeout: Timeout after which we kill the oob program
2177
2178   @return: stdout
2179   @raise RPCFail: If execution fails for some reason
2180
2181   """
2182   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2183
2184   if result.failed:
2185     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2186           result.fail_reason, result.output)
2187
2188   return result.stdout
2189
2190
2191 def _OSOndiskAPIVersion(os_dir):
2192   """Compute and return the API version of a given OS.
2193
2194   This function will try to read the API version of the OS residing in
2195   the 'os_dir' directory.
2196
2197   @type os_dir: str
2198   @param os_dir: the directory in which we should look for the OS
2199   @rtype: tuple
2200   @return: tuple (status, data) with status denoting the validity and
2201       data holding either the vaid versions or an error message
2202
2203   """
2204   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2205
2206   try:
2207     st = os.stat(api_file)
2208   except EnvironmentError, err:
2209     return False, ("Required file '%s' not found under path %s: %s" %
2210                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2211
2212   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2213     return False, ("File '%s' in %s is not a regular file" %
2214                    (constants.OS_API_FILE, os_dir))
2215
2216   try:
2217     api_versions = utils.ReadFile(api_file).splitlines()
2218   except EnvironmentError, err:
2219     return False, ("Error while reading the API version file at %s: %s" %
2220                    (api_file, utils.ErrnoOrStr(err)))
2221
2222   try:
2223     api_versions = [int(version.strip()) for version in api_versions]
2224   except (TypeError, ValueError), err:
2225     return False, ("API version(s) can't be converted to integer: %s" %
2226                    str(err))
2227
2228   return True, api_versions
2229
2230
2231 def DiagnoseOS(top_dirs=None):
2232   """Compute the validity for all OSes.
2233
2234   @type top_dirs: list
2235   @param top_dirs: the list of directories in which to
2236       search (if not given defaults to
2237       L{pathutils.OS_SEARCH_PATH})
2238   @rtype: list of L{objects.OS}
2239   @return: a list of tuples (name, path, status, diagnose, variants,
2240       parameters, api_version) for all (potential) OSes under all
2241       search paths, where:
2242           - name is the (potential) OS name
2243           - path is the full path to the OS
2244           - status True/False is the validity of the OS
2245           - diagnose is the error message for an invalid OS, otherwise empty
2246           - variants is a list of supported OS variants, if any
2247           - parameters is a list of (name, help) parameters, if any
2248           - api_version is a list of support OS API versions
2249
2250   """
2251   if top_dirs is None:
2252     top_dirs = pathutils.OS_SEARCH_PATH
2253
2254   result = []
2255   for dir_name in top_dirs:
2256     if os.path.isdir(dir_name):
2257       try:
2258         f_names = utils.ListVisibleFiles(dir_name)
2259       except EnvironmentError, err:
2260         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2261         break
2262       for name in f_names:
2263         os_path = utils.PathJoin(dir_name, name)
2264         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2265         if status:
2266           diagnose = ""
2267           variants = os_inst.supported_variants
2268           parameters = os_inst.supported_parameters
2269           api_versions = os_inst.api_versions
2270         else:
2271           diagnose = os_inst
2272           variants = parameters = api_versions = []
2273         result.append((name, os_path, status, diagnose, variants,
2274                        parameters, api_versions))
2275
2276   return result
2277
2278
2279 def _TryOSFromDisk(name, base_dir=None):
2280   """Create an OS instance from disk.
2281
2282   This function will return an OS instance if the given name is a
2283   valid OS name.
2284
2285   @type base_dir: string
2286   @keyword base_dir: Base directory containing OS installations.
2287                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2288   @rtype: tuple
2289   @return: success and either the OS instance if we find a valid one,
2290       or error message
2291
2292   """
2293   if base_dir is None:
2294     os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2295   else:
2296     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2297
2298   if os_dir is None:
2299     return False, "Directory for OS %s not found in search path" % name
2300
2301   status, api_versions = _OSOndiskAPIVersion(os_dir)
2302   if not status:
2303     # push the error up
2304     return status, api_versions
2305
2306   if not constants.OS_API_VERSIONS.intersection(api_versions):
2307     return False, ("API version mismatch for path '%s': found %s, want %s." %
2308                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2309
2310   # OS Files dictionary, we will populate it with the absolute path
2311   # names; if the value is True, then it is a required file, otherwise
2312   # an optional one
2313   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2314
2315   if max(api_versions) >= constants.OS_API_V15:
2316     os_files[constants.OS_VARIANTS_FILE] = False
2317
2318   if max(api_versions) >= constants.OS_API_V20:
2319     os_files[constants.OS_PARAMETERS_FILE] = True
2320   else:
2321     del os_files[constants.OS_SCRIPT_VERIFY]
2322
2323   for (filename, required) in os_files.items():
2324     os_files[filename] = utils.PathJoin(os_dir, filename)
2325
2326     try:
2327       st = os.stat(os_files[filename])
2328     except EnvironmentError, err:
2329       if err.errno == errno.ENOENT and not required:
2330         del os_files[filename]
2331         continue
2332       return False, ("File '%s' under path '%s' is missing (%s)" %
2333                      (filename, os_dir, utils.ErrnoOrStr(err)))
2334
2335     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2336       return False, ("File '%s' under path '%s' is not a regular file" %
2337                      (filename, os_dir))
2338
2339     if filename in constants.OS_SCRIPTS:
2340       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2341         return False, ("File '%s' under path '%s' is not executable" %
2342                        (filename, os_dir))
2343
2344   variants = []
2345   if constants.OS_VARIANTS_FILE in os_files:
2346     variants_file = os_files[constants.OS_VARIANTS_FILE]
2347     try:
2348       variants = \
2349         utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2350     except EnvironmentError, err:
2351       # we accept missing files, but not other errors
2352       if err.errno != errno.ENOENT:
2353         return False, ("Error while reading the OS variants file at %s: %s" %
2354                        (variants_file, utils.ErrnoOrStr(err)))
2355
2356   parameters = []
2357   if constants.OS_PARAMETERS_FILE in os_files:
2358     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2359     try:
2360       parameters = utils.ReadFile(parameters_file).splitlines()
2361     except EnvironmentError, err:
2362       return False, ("Error while reading the OS parameters file at %s: %s" %
2363                      (parameters_file, utils.ErrnoOrStr(err)))
2364     parameters = [v.split(None, 1) for v in parameters]
2365
2366   os_obj = objects.OS(name=name, path=os_dir,
2367                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2368                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2369                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2370                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2371                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2372                                                  None),
2373                       supported_variants=variants,
2374                       supported_parameters=parameters,
2375                       api_versions=api_versions)
2376   return True, os_obj
2377
2378
2379 def OSFromDisk(name, base_dir=None):
2380   """Create an OS instance from disk.
2381
2382   This function will return an OS instance if the given name is a
2383   valid OS name. Otherwise, it will raise an appropriate
2384   L{RPCFail} exception, detailing why this is not a valid OS.
2385
2386   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2387   an exception but returns true/false status data.
2388
2389   @type base_dir: string
2390   @keyword base_dir: Base directory containing OS installations.
2391                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2392   @rtype: L{objects.OS}
2393   @return: the OS instance if we find a valid one
2394   @raise RPCFail: if we don't find a valid OS
2395
2396   """
2397   name_only = objects.OS.GetName(name)
2398   status, payload = _TryOSFromDisk(name_only, base_dir)
2399
2400   if not status:
2401     _Fail(payload)
2402
2403   return payload
2404
2405
2406 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2407   """Calculate the basic environment for an os script.
2408
2409   @type os_name: str
2410   @param os_name: full operating system name (including variant)
2411   @type inst_os: L{objects.OS}
2412   @param inst_os: operating system for which the environment is being built
2413   @type os_params: dict
2414   @param os_params: the OS parameters
2415   @type debug: integer
2416   @param debug: debug level (0 or 1, for OS Api 10)
2417   @rtype: dict
2418   @return: dict of environment variables
2419   @raise errors.BlockDeviceError: if the block device
2420       cannot be found
2421
2422   """
2423   result = {}
2424   api_version = \
2425     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2426   result["OS_API_VERSION"] = "%d" % api_version
2427   result["OS_NAME"] = inst_os.name
2428   result["DEBUG_LEVEL"] = "%d" % debug
2429
2430   # OS variants
2431   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2432     variant = objects.OS.GetVariant(os_name)
2433     if not variant:
2434       variant = inst_os.supported_variants[0]
2435   else:
2436     variant = ""
2437   result["OS_VARIANT"] = variant
2438
2439   # OS params
2440   for pname, pvalue in os_params.items():
2441     result["OSP_%s" % pname.upper()] = pvalue
2442
2443   # Set a default path otherwise programs called by OS scripts (or
2444   # even hooks called from OS scripts) might break, and we don't want
2445   # to have each script require setting a PATH variable
2446   result["PATH"] = constants.HOOKS_PATH
2447
2448   return result
2449
2450
2451 def OSEnvironment(instance, inst_os, debug=0):
2452   """Calculate the environment for an os script.
2453
2454   @type instance: L{objects.Instance}
2455   @param instance: target instance for the os script run
2456   @type inst_os: L{objects.OS}
2457   @param inst_os: operating system for which the environment is being built
2458   @type debug: integer
2459   @param debug: debug level (0 or 1, for OS Api 10)
2460   @rtype: dict
2461   @return: dict of environment variables
2462   @raise errors.BlockDeviceError: if the block device
2463       cannot be found
2464
2465   """
2466   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2467
2468   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2469     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2470
2471   result["HYPERVISOR"] = instance.hypervisor
2472   result["DISK_COUNT"] = "%d" % len(instance.disks)
2473   result["NIC_COUNT"] = "%d" % len(instance.nics)
2474   result["INSTANCE_SECONDARY_NODES"] = \
2475       ("%s" % " ".join(instance.secondary_nodes))
2476
2477   # Disks
2478   for idx, disk in enumerate(instance.disks):
2479     real_disk = _OpenRealBD(disk)
2480     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2481     result["DISK_%d_ACCESS" % idx] = disk.mode
2482     if constants.HV_DISK_TYPE in instance.hvparams:
2483       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2484         instance.hvparams[constants.HV_DISK_TYPE]
2485     if disk.dev_type in constants.LDS_BLOCK:
2486       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2487     elif disk.dev_type == constants.LD_FILE:
2488       result["DISK_%d_BACKEND_TYPE" % idx] = \
2489         "file:%s" % disk.physical_id[0]
2490
2491   # NICs
2492   for idx, nic in enumerate(instance.nics):
2493     result["NIC_%d_MAC" % idx] = nic.mac
2494     if nic.ip:
2495       result["NIC_%d_IP" % idx] = nic.ip
2496     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2497     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2498       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2499     if nic.nicparams[constants.NIC_LINK]:
2500       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2501     if nic.netinfo:
2502       nobj = objects.Network.FromDict(nic.netinfo)
2503       result.update(nobj.HooksDict("NIC_%d_" % idx))
2504     if constants.HV_NIC_TYPE in instance.hvparams:
2505       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2506         instance.hvparams[constants.HV_NIC_TYPE]
2507
2508   # HV/BE params
2509   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2510     for key, value in source.items():
2511       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2512
2513   return result
2514
2515
2516 def DiagnoseExtStorage(top_dirs=None):
2517   """Compute the validity for all ExtStorage Providers.
2518
2519   @type top_dirs: list
2520   @param top_dirs: the list of directories in which to
2521       search (if not given defaults to
2522       L{pathutils.ES_SEARCH_PATH})
2523   @rtype: list of L{objects.ExtStorage}
2524   @return: a list of tuples (name, path, status, diagnose, parameters)
2525       for all (potential) ExtStorage Providers under all
2526       search paths, where:
2527           - name is the (potential) ExtStorage Provider
2528           - path is the full path to the ExtStorage Provider
2529           - status True/False is the validity of the ExtStorage Provider
2530           - diagnose is the error message for an invalid ExtStorage Provider,
2531             otherwise empty
2532           - parameters is a list of (name, help) parameters, if any
2533
2534   """
2535   if top_dirs is None:
2536     top_dirs = pathutils.ES_SEARCH_PATH
2537
2538   result = []
2539   for dir_name in top_dirs:
2540     if os.path.isdir(dir_name):
2541       try:
2542         f_names = utils.ListVisibleFiles(dir_name)
2543       except EnvironmentError, err:
2544         logging.exception("Can't list the ExtStorage directory %s: %s",
2545                           dir_name, err)
2546         break
2547       for name in f_names:
2548         es_path = utils.PathJoin(dir_name, name)
2549         status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2550         if status:
2551           diagnose = ""
2552           parameters = es_inst.supported_parameters
2553         else:
2554           diagnose = es_inst
2555           parameters = []
2556         result.append((name, es_path, status, diagnose, parameters))
2557
2558   return result
2559
2560
2561 def BlockdevGrow(disk, amount, dryrun, backingstore):
2562   """Grow a stack of block devices.
2563
2564   This function is called recursively, with the childrens being the
2565   first ones to resize.
2566
2567   @type disk: L{objects.Disk}
2568   @param disk: the disk to be grown
2569   @type amount: integer
2570   @param amount: the amount (in mebibytes) to grow with
2571   @type dryrun: boolean
2572   @param dryrun: whether to execute the operation in simulation mode
2573       only, without actually increasing the size
2574   @param backingstore: whether to execute the operation on backing storage
2575       only, or on "logical" storage only; e.g. DRBD is logical storage,
2576       whereas LVM, file, RBD are backing storage
2577   @rtype: (status, result)
2578   @return: a tuple with the status of the operation (True/False), and
2579       the errors message if status is False
2580
2581   """
2582   r_dev = _RecursiveFindBD(disk)
2583   if r_dev is None:
2584     _Fail("Cannot find block device %s", disk)
2585
2586   try:
2587     r_dev.Grow(amount, dryrun, backingstore)
2588   except errors.BlockDeviceError, err:
2589     _Fail("Failed to grow block device: %s", err, exc=True)
2590
2591
2592 def BlockdevSnapshot(disk):
2593   """Create a snapshot copy of a block device.
2594
2595   This function is called recursively, and the snapshot is actually created
2596   just for the leaf lvm backend device.
2597
2598   @type disk: L{objects.Disk}
2599   @param disk: the disk to be snapshotted
2600   @rtype: string
2601   @return: snapshot disk ID as (vg, lv)
2602
2603   """
2604   if disk.dev_type == constants.LD_DRBD8:
2605     if not disk.children:
2606       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2607             disk.unique_id)
2608     return BlockdevSnapshot(disk.children[0])
2609   elif disk.dev_type == constants.LD_LV:
2610     r_dev = _RecursiveFindBD(disk)
2611     if r_dev is not None:
2612       # FIXME: choose a saner value for the snapshot size
2613       # let's stay on the safe side and ask for the full size, for now
2614       return r_dev.Snapshot(disk.size)
2615     else:
2616       _Fail("Cannot find block device %s", disk)
2617   else:
2618     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2619           disk.unique_id, disk.dev_type)
2620
2621
2622 def BlockdevSetInfo(disk, info):
2623   """Sets 'metadata' information on block devices.
2624
2625   This function sets 'info' metadata on block devices. Initial
2626   information is set at device creation; this function should be used
2627   for example after renames.
2628
2629   @type disk: L{objects.Disk}
2630   @param disk: the disk to be grown
2631   @type info: string
2632   @param info: new 'info' metadata
2633   @rtype: (status, result)
2634   @return: a tuple with the status of the operation (True/False), and
2635       the errors message if status is False
2636
2637   """
2638   r_dev = _RecursiveFindBD(disk)
2639   if r_dev is None:
2640     _Fail("Cannot find block device %s", disk)
2641
2642   try:
2643     r_dev.SetInfo(info)
2644   except errors.BlockDeviceError, err:
2645     _Fail("Failed to set information on block device: %s", err, exc=True)
2646
2647
2648 def FinalizeExport(instance, snap_disks):
2649   """Write out the export configuration information.
2650
2651   @type instance: L{objects.Instance}
2652   @param instance: the instance which we export, used for
2653       saving configuration
2654   @type snap_disks: list of L{objects.Disk}
2655   @param snap_disks: list of snapshot block devices, which
2656       will be used to get the actual name of the dump file
2657
2658   @rtype: None
2659
2660   """
2661   destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2662   finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2663
2664   config = objects.SerializableConfigParser()
2665
2666   config.add_section(constants.INISECT_EXP)
2667   config.set(constants.INISECT_EXP, "version", "0")
2668   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2669   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2670   config.set(constants.INISECT_EXP, "os", instance.os)
2671   config.set(constants.INISECT_EXP, "compression", "none")
2672
2673   config.add_section(constants.INISECT_INS)
2674   config.set(constants.INISECT_INS, "name", instance.name)
2675   config.set(constants.INISECT_INS, "maxmem", "%d" %
2676              instance.beparams[constants.BE_MAXMEM])
2677   config.set(constants.INISECT_INS, "minmem", "%d" %
2678              instance.beparams[constants.BE_MINMEM])
2679   # "memory" is deprecated, but useful for exporting to old ganeti versions
2680   config.set(constants.INISECT_INS, "memory", "%d" %
2681              instance.beparams[constants.BE_MAXMEM])
2682   config.set(constants.INISECT_INS, "vcpus", "%d" %
2683              instance.beparams[constants.BE_VCPUS])
2684   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2685   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2686   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2687
2688   nic_total = 0
2689   for nic_count, nic in enumerate(instance.nics):
2690     nic_total += 1
2691     config.set(constants.INISECT_INS, "nic%d_mac" %
2692                nic_count, "%s" % nic.mac)
2693     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2694     config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2695                "%s" % nic.network)
2696     for param in constants.NICS_PARAMETER_TYPES:
2697       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2698                  "%s" % nic.nicparams.get(param, None))
2699   # TODO: redundant: on load can read nics until it doesn't exist
2700   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2701
2702   disk_total = 0
2703   for disk_count, disk in enumerate(snap_disks):
2704     if disk:
2705       disk_total += 1
2706       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2707                  ("%s" % disk.iv_name))
2708       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2709                  ("%s" % disk.physical_id[1]))
2710       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2711                  ("%d" % disk.size))
2712
2713   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2714
2715   # New-style hypervisor/backend parameters
2716
2717   config.add_section(constants.INISECT_HYP)
2718   for name, value in instance.hvparams.items():
2719     if name not in constants.HVC_GLOBALS:
2720       config.set(constants.INISECT_HYP, name, str(value))
2721
2722   config.add_section(constants.INISECT_BEP)
2723   for name, value in instance.beparams.items():
2724     config.set(constants.INISECT_BEP, name, str(value))
2725
2726   config.add_section(constants.INISECT_OSP)
2727   for name, value in instance.osparams.items():
2728     config.set(constants.INISECT_OSP, name, str(value))
2729
2730   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2731                   data=config.Dumps())
2732   shutil.rmtree(finaldestdir, ignore_errors=True)
2733   shutil.move(destdir, finaldestdir)
2734
2735
2736 def ExportInfo(dest):
2737   """Get export configuration information.
2738
2739   @type dest: str
2740   @param dest: directory containing the export
2741
2742   @rtype: L{objects.SerializableConfigParser}
2743   @return: a serializable config file containing the
2744       export info
2745
2746   """
2747   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2748
2749   config = objects.SerializableConfigParser()
2750   config.read(cff)
2751
2752   if (not config.has_section(constants.INISECT_EXP) or
2753       not config.has_section(constants.INISECT_INS)):
2754     _Fail("Export info file doesn't have the required fields")
2755
2756   return config.Dumps()
2757
2758
2759 def ListExports():
2760   """Return a list of exports currently available on this machine.
2761
2762   @rtype: list
2763   @return: list of the exports
2764
2765   """
2766   if os.path.isdir(pathutils.EXPORT_DIR):
2767     return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2768   else:
2769     _Fail("No exports directory")
2770
2771
2772 def RemoveExport(export):
2773   """Remove an existing export from the node.
2774
2775   @type export: str
2776   @param export: the name of the export to remove
2777   @rtype: None
2778
2779   """
2780   target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2781
2782   try:
2783     shutil.rmtree(target)
2784   except EnvironmentError, err:
2785     _Fail("Error while removing the export: %s", err, exc=True)
2786
2787
2788 def BlockdevRename(devlist):
2789   """Rename a list of block devices.
2790
2791   @type devlist: list of tuples
2792   @param devlist: list of tuples of the form  (disk,
2793       new_logical_id, new_physical_id); disk is an
2794       L{objects.Disk} object describing the current disk,
2795       and new logical_id/physical_id is the name we
2796       rename it to
2797   @rtype: boolean
2798   @return: True if all renames succeeded, False otherwise
2799
2800   """
2801   msgs = []
2802   result = True
2803   for disk, unique_id in devlist:
2804     dev = _RecursiveFindBD(disk)
2805     if dev is None:
2806       msgs.append("Can't find device %s in rename" % str(disk))
2807       result = False
2808       continue
2809     try:
2810       old_rpath = dev.dev_path
2811       dev.Rename(unique_id)
2812       new_rpath = dev.dev_path
2813       if old_rpath != new_rpath:
2814         DevCacheManager.RemoveCache(old_rpath)
2815         # FIXME: we should add the new cache information here, like:
2816         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2817         # but we don't have the owner here - maybe parse from existing
2818         # cache? for now, we only lose lvm data when we rename, which
2819         # is less critical than DRBD or MD
2820     except errors.BlockDeviceError, err:
2821       msgs.append("Can't rename device '%s' to '%s': %s" %
2822                   (dev, unique_id, err))
2823       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2824       result = False
2825   if not result:
2826     _Fail("; ".join(msgs))
2827
2828
2829 def _TransformFileStorageDir(fs_dir):
2830   """Checks whether given file_storage_dir is valid.
2831
2832   Checks wheter the given fs_dir is within the cluster-wide default
2833   file_storage_dir or the shared_file_storage_dir, which are stored in
2834   SimpleStore. Only paths under those directories are allowed.
2835
2836   @type fs_dir: str
2837   @param fs_dir: the path to check
2838
2839   @return: the normalized path if valid, None otherwise
2840
2841   """
2842   if not (constants.ENABLE_FILE_STORAGE or
2843           constants.ENABLE_SHARED_FILE_STORAGE):
2844     _Fail("File storage disabled at configure time")
2845
2846   bdev.CheckFileStoragePath(fs_dir)
2847
2848   return os.path.normpath(fs_dir)
2849
2850
2851 def CreateFileStorageDir(file_storage_dir):
2852   """Create file storage directory.
2853
2854   @type file_storage_dir: str
2855   @param file_storage_dir: directory to create
2856
2857   @rtype: tuple
2858   @return: tuple with first element a boolean indicating wheter dir
2859       creation was successful or not
2860
2861   """
2862   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2863   if os.path.exists(file_storage_dir):
2864     if not os.path.isdir(file_storage_dir):
2865       _Fail("Specified storage dir '%s' is not a directory",
2866             file_storage_dir)
2867   else:
2868     try:
2869       os.makedirs(file_storage_dir, 0750)
2870     except OSError, err:
2871       _Fail("Cannot create file storage directory '%s': %s",
2872             file_storage_dir, err, exc=True)
2873
2874
2875 def RemoveFileStorageDir(file_storage_dir):
2876   """Remove file storage directory.
2877
2878   Remove it only if it's empty. If not log an error and return.
2879
2880   @type file_storage_dir: str
2881   @param file_storage_dir: the directory we should cleanup
2882   @rtype: tuple (success,)
2883   @return: tuple of one element, C{success}, denoting
2884       whether the operation was successful
2885
2886   """
2887   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2888   if os.path.exists(file_storage_dir):
2889     if not os.path.isdir(file_storage_dir):
2890       _Fail("Specified Storage directory '%s' is not a directory",
2891             file_storage_dir)
2892     # deletes dir only if empty, otherwise we want to fail the rpc call
2893     try:
2894       os.rmdir(file_storage_dir)
2895     except OSError, err:
2896       _Fail("Cannot remove file storage directory '%s': %s",
2897             file_storage_dir, err)
2898
2899
2900 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2901   """Rename the file storage directory.
2902
2903   @type old_file_storage_dir: str
2904   @param old_file_storage_dir: the current path
2905   @type new_file_storage_dir: str
2906   @param new_file_storage_dir: the name we should rename to
2907   @rtype: tuple (success,)
2908   @return: tuple of one element, C{success}, denoting
2909       whether the operation was successful
2910
2911   """
2912   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2913   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2914   if not os.path.exists(new_file_storage_dir):
2915     if os.path.isdir(old_file_storage_dir):
2916       try:
2917         os.rename(old_file_storage_dir, new_file_storage_dir)
2918       except OSError, err:
2919         _Fail("Cannot rename '%s' to '%s': %s",
2920               old_file_storage_dir, new_file_storage_dir, err)
2921     else:
2922       _Fail("Specified storage dir '%s' is not a directory",
2923             old_file_storage_dir)
2924   else:
2925     if os.path.exists(old_file_storage_dir):
2926       _Fail("Cannot rename '%s' to '%s': both locations exist",
2927             old_file_storage_dir, new_file_storage_dir)
2928
2929
2930 def _EnsureJobQueueFile(file_name):
2931   """Checks whether the given filename is in the queue directory.
2932
2933   @type file_name: str
2934   @param file_name: the file name we should check
2935   @rtype: None
2936   @raises RPCFail: if the file is not valid
2937
2938   """
2939   if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2940     _Fail("Passed job queue file '%s' does not belong to"
2941           " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2942
2943
2944 def JobQueueUpdate(file_name, content):
2945   """Updates a file in the queue directory.
2946
2947   This is just a wrapper over L{utils.io.WriteFile}, with proper
2948   checking.
2949
2950   @type file_name: str
2951   @param file_name: the job file name
2952   @type content: str
2953   @param content: the new job contents
2954   @rtype: boolean
2955   @return: the success of the operation
2956
2957   """
2958   file_name = vcluster.LocalizeVirtualPath(file_name)
2959
2960   _EnsureJobQueueFile(file_name)
2961   getents = runtime.GetEnts()
2962
2963   # Write and replace the file atomically
2964   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2965                   gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
2966
2967
2968 def JobQueueRename(old, new):
2969   """Renames a job queue file.
2970
2971   This is just a wrapper over os.rename with proper checking.
2972
2973   @type old: str
2974   @param old: the old (actual) file name
2975   @type new: str
2976   @param new: the desired file name
2977   @rtype: tuple
2978   @return: the success of the operation and payload
2979
2980   """
2981   old = vcluster.LocalizeVirtualPath(old)
2982   new = vcluster.LocalizeVirtualPath(new)
2983
2984   _EnsureJobQueueFile(old)
2985   _EnsureJobQueueFile(new)
2986
2987   getents = runtime.GetEnts()
2988
2989   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
2990                    dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
2991
2992
2993 def BlockdevClose(instance_name, disks):
2994   """Closes the given block devices.
2995
2996   This means they will be switched to secondary mode (in case of
2997   DRBD).
2998
2999   @param instance_name: if the argument is not empty, the symlinks
3000       of this instance will be removed
3001   @type disks: list of L{objects.Disk}
3002   @param disks: the list of disks to be closed
3003   @rtype: tuple (success, message)
3004   @return: a tuple of success and message, where success
3005       indicates the succes of the operation, and message
3006       which will contain the error details in case we
3007       failed
3008
3009   """
3010   bdevs = []
3011   for cf in disks:
3012     rd = _RecursiveFindBD(cf)
3013     if rd is None:
3014       _Fail("Can't find device %s", cf)
3015     bdevs.append(rd)
3016
3017   msg = []
3018   for rd in bdevs:
3019     try:
3020       rd.Close()
3021     except errors.BlockDeviceError, err:
3022       msg.append(str(err))
3023   if msg:
3024     _Fail("Can't make devices secondary: %s", ",".join(msg))
3025   else:
3026     if instance_name:
3027       _RemoveBlockDevLinks(instance_name, disks)
3028
3029
3030 def ValidateHVParams(hvname, hvparams):
3031   """Validates the given hypervisor parameters.
3032
3033   @type hvname: string
3034   @param hvname: the hypervisor name
3035   @type hvparams: dict
3036   @param hvparams: the hypervisor parameters to be validated
3037   @rtype: None
3038
3039   """
3040   try:
3041     hv_type = hypervisor.GetHypervisor(hvname)
3042     hv_type.ValidateParameters(hvparams)
3043   except errors.HypervisorError, err:
3044     _Fail(str(err), log=False)
3045
3046
3047 def _CheckOSPList(os_obj, parameters):
3048   """Check whether a list of parameters is supported by the OS.
3049
3050   @type os_obj: L{objects.OS}
3051   @param os_obj: OS object to check
3052   @type parameters: list
3053   @param parameters: the list of parameters to check
3054
3055   """
3056   supported = [v[0] for v in os_obj.supported_parameters]
3057   delta = frozenset(parameters).difference(supported)
3058   if delta:
3059     _Fail("The following parameters are not supported"
3060           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3061
3062
3063 def ValidateOS(required, osname, checks, osparams):
3064   """Validate the given OS' parameters.
3065
3066   @type required: boolean
3067   @param required: whether absence of the OS should translate into
3068       failure or not
3069   @type osname: string
3070   @param osname: the OS to be validated
3071   @type checks: list
3072   @param checks: list of the checks to run (currently only 'parameters')
3073   @type osparams: dict
3074   @param osparams: dictionary with OS parameters
3075   @rtype: boolean
3076   @return: True if the validation passed, or False if the OS was not
3077       found and L{required} was false
3078
3079   """
3080   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3081     _Fail("Unknown checks required for OS %s: %s", osname,
3082           set(checks).difference(constants.OS_VALIDATE_CALLS))
3083
3084   name_only = objects.OS.GetName(osname)
3085   status, tbv = _TryOSFromDisk(name_only, None)
3086
3087   if not status:
3088     if required:
3089       _Fail(tbv)
3090     else:
3091       return False
3092
3093   if max(tbv.api_versions) < constants.OS_API_V20:
3094     return True
3095
3096   if constants.OS_VALIDATE_PARAMETERS in checks:
3097     _CheckOSPList(tbv, osparams.keys())
3098
3099   validate_env = OSCoreEnv(osname, tbv, osparams)
3100   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3101                         cwd=tbv.path, reset_env=True)
3102   if result.failed:
3103     logging.error("os validate command '%s' returned error: %s output: %s",
3104                   result.cmd, result.fail_reason, result.output)
3105     _Fail("OS validation script failed (%s), output: %s",
3106           result.fail_reason, result.output, log=False)
3107
3108   return True
3109
3110
3111 def DemoteFromMC():
3112   """Demotes the current node from master candidate role.
3113
3114   """
3115   # try to ensure we're not the master by mistake
3116   master, myself = ssconf.GetMasterAndMyself()
3117   if master == myself:
3118     _Fail("ssconf status shows I'm the master node, will not demote")
3119
3120   result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3121   if not result.failed:
3122     _Fail("The master daemon is running, will not demote")
3123
3124   try:
3125     if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3126       utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3127   except EnvironmentError, err:
3128     if err.errno != errno.ENOENT:
3129       _Fail("Error while backing up cluster file: %s", err, exc=True)
3130
3131   utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3132
3133
3134 def _GetX509Filenames(cryptodir, name):
3135   """Returns the full paths for the private key and certificate.
3136
3137   """
3138   return (utils.PathJoin(cryptodir, name),
3139           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3140           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3141
3142
3143 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3144   """Creates a new X509 certificate for SSL/TLS.
3145
3146   @type validity: int
3147   @param validity: Validity in seconds
3148   @rtype: tuple; (string, string)
3149   @return: Certificate name and public part
3150
3151   """
3152   (key_pem, cert_pem) = \
3153     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3154                                      min(validity, _MAX_SSL_CERT_VALIDITY))
3155
3156   cert_dir = tempfile.mkdtemp(dir=cryptodir,
3157                               prefix="x509-%s-" % utils.TimestampForFilename())
3158   try:
3159     name = os.path.basename(cert_dir)
3160     assert len(name) > 5
3161
3162     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3163
3164     utils.WriteFile(key_file, mode=0400, data=key_pem)
3165     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3166
3167     # Never return private key as it shouldn't leave the node
3168     return (name, cert_pem)
3169   except Exception:
3170     shutil.rmtree(cert_dir, ignore_errors=True)
3171     raise
3172
3173
3174 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3175   """Removes a X509 certificate.
3176
3177   @type name: string
3178   @param name: Certificate name
3179
3180   """
3181   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3182
3183   utils.RemoveFile(key_file)
3184   utils.RemoveFile(cert_file)
3185
3186   try:
3187     os.rmdir(cert_dir)
3188   except EnvironmentError, err:
3189     _Fail("Cannot remove certificate directory '%s': %s",
3190           cert_dir, err)
3191
3192
3193 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3194   """Returns the command for the requested input/output.
3195
3196   @type instance: L{objects.Instance}
3197   @param instance: The instance object
3198   @param mode: Import/export mode
3199   @param ieio: Input/output type
3200   @param ieargs: Input/output arguments
3201
3202   """
3203   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3204
3205   env = None
3206   prefix = None
3207   suffix = None
3208   exp_size = None
3209
3210   if ieio == constants.IEIO_FILE:
3211     (filename, ) = ieargs
3212
3213     if not utils.IsNormAbsPath(filename):
3214       _Fail("Path '%s' is not normalized or absolute", filename)
3215
3216     real_filename = os.path.realpath(filename)
3217     directory = os.path.dirname(real_filename)
3218
3219     if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3220       _Fail("File '%s' is not under exports directory '%s': %s",
3221             filename, pathutils.EXPORT_DIR, real_filename)
3222
3223     # Create directory
3224     utils.Makedirs(directory, mode=0750)
3225
3226     quoted_filename = utils.ShellQuote(filename)
3227
3228     if mode == constants.IEM_IMPORT:
3229       suffix = "> %s" % quoted_filename
3230     elif mode == constants.IEM_EXPORT:
3231       suffix = "< %s" % quoted_filename
3232
3233       # Retrieve file size
3234       try:
3235         st = os.stat(filename)
3236       except EnvironmentError, err:
3237         logging.error("Can't stat(2) %s: %s", filename, err)
3238       else:
3239         exp_size = utils.BytesToMebibyte(st.st_size)
3240
3241   elif ieio == constants.IEIO_RAW_DISK:
3242     (disk, ) = ieargs
3243
3244     real_disk = _OpenRealBD(disk)
3245
3246     if mode == constants.IEM_IMPORT:
3247       # we set here a smaller block size as, due to transport buffering, more
3248       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3249       # is not already there or we pass a wrong path; we use notrunc to no
3250       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3251       # much memory; this means that at best, we flush every 64k, which will
3252       # not be very fast
3253       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3254                                     " bs=%s oflag=dsync"),
3255                                     real_disk.dev_path,
3256                                     str(64 * 1024))
3257
3258     elif mode == constants.IEM_EXPORT:
3259       # the block size on the read dd is 1MiB to match our units
3260       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3261                                    real_disk.dev_path,
3262                                    str(1024 * 1024), # 1 MB
3263                                    str(disk.size))
3264       exp_size = disk.size
3265
3266   elif ieio == constants.IEIO_SCRIPT:
3267     (disk, disk_index, ) = ieargs
3268
3269     assert isinstance(disk_index, (int, long))
3270
3271     real_disk = _OpenRealBD(disk)
3272
3273     inst_os = OSFromDisk(instance.os)
3274     env = OSEnvironment(instance, inst_os)
3275
3276     if mode == constants.IEM_IMPORT:
3277       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3278       env["IMPORT_INDEX"] = str(disk_index)
3279       script = inst_os.import_script
3280
3281     elif mode == constants.IEM_EXPORT:
3282       env["EXPORT_DEVICE"] = real_disk.dev_path
3283       env["EXPORT_INDEX"] = str(disk_index)
3284       script = inst_os.export_script
3285
3286     # TODO: Pass special environment only to script
3287     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3288
3289     if mode == constants.IEM_IMPORT:
3290       suffix = "| %s" % script_cmd
3291
3292     elif mode == constants.IEM_EXPORT:
3293       prefix = "%s |" % script_cmd
3294
3295     # Let script predict size
3296     exp_size = constants.IE_CUSTOM_SIZE
3297
3298   else:
3299     _Fail("Invalid %s I/O mode %r", mode, ieio)
3300
3301   return (env, prefix, suffix, exp_size)
3302
3303
3304 def _CreateImportExportStatusDir(prefix):
3305   """Creates status directory for import/export.
3306
3307   """
3308   return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3309                           prefix=("%s-%s-" %
3310                                   (prefix, utils.TimestampForFilename())))
3311
3312
3313 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3314                             ieio, ieioargs):
3315   """Starts an import or export daemon.
3316
3317   @param mode: Import/output mode
3318   @type opts: L{objects.ImportExportOptions}
3319   @param opts: Daemon options
3320   @type host: string
3321   @param host: Remote host for export (None for import)
3322   @type port: int
3323   @param port: Remote port for export (None for import)
3324   @type instance: L{objects.Instance}
3325   @param instance: Instance object
3326   @type component: string
3327   @param component: which part of the instance is transferred now,
3328       e.g. 'disk/0'
3329   @param ieio: Input/output type
3330   @param ieioargs: Input/output arguments
3331
3332   """
3333   if mode == constants.IEM_IMPORT:
3334     prefix = "import"
3335
3336     if not (host is None and port is None):
3337       _Fail("Can not specify host or port on import")
3338
3339   elif mode == constants.IEM_EXPORT:
3340     prefix = "export"
3341
3342     if host is None or port is None:
3343       _Fail("Host and port must be specified for an export")
3344
3345   else:
3346     _Fail("Invalid mode %r", mode)
3347
3348   if (opts.key_name is None) ^ (opts.ca_pem is None):
3349     _Fail("Cluster certificate can only be used for both key and CA")
3350
3351   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3352     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3353
3354   if opts.key_name is None:
3355     # Use server.pem
3356     key_path = pathutils.NODED_CERT_FILE
3357     cert_path = pathutils.NODED_CERT_FILE
3358     assert opts.ca_pem is None
3359   else:
3360     (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3361                                                  opts.key_name)
3362     assert opts.ca_pem is not None
3363
3364   for i in [key_path, cert_path]:
3365     if not os.path.exists(i):
3366       _Fail("File '%s' does not exist" % i)
3367
3368   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3369   try:
3370     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3371     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3372     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3373
3374     if opts.ca_pem is None:
3375       # Use server.pem
3376       ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3377     else:
3378       ca = opts.ca_pem
3379
3380     # Write CA file
3381     utils.WriteFile(ca_file, data=ca, mode=0400)
3382
3383     cmd = [
3384       pathutils.IMPORT_EXPORT_DAEMON,
3385       status_file, mode,
3386       "--key=%s" % key_path,
3387       "--cert=%s" % cert_path,
3388       "--ca=%s" % ca_file,
3389       ]
3390
3391     if host:
3392       cmd.append("--host=%s" % host)
3393
3394     if port:
3395       cmd.append("--port=%s" % port)
3396
3397     if opts.ipv6:
3398       cmd.append("--ipv6")
3399     else:
3400       cmd.append("--ipv4")
3401
3402     if opts.compress:
3403       cmd.append("--compress=%s" % opts.compress)
3404
3405     if opts.magic:
3406       cmd.append("--magic=%s" % opts.magic)
3407
3408     if exp_size is not None:
3409       cmd.append("--expected-size=%s" % exp_size)
3410
3411     if cmd_prefix:
3412       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3413
3414     if cmd_suffix:
3415       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3416
3417     if mode == constants.IEM_EXPORT:
3418       # Retry connection a few times when connecting to remote peer
3419       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3420       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3421     elif opts.connect_timeout is not None:
3422       assert mode == constants.IEM_IMPORT
3423       # Overall timeout for establishing connection while listening
3424       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3425
3426     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3427
3428     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3429     # support for receiving a file descriptor for output
3430     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3431                       output=logfile)
3432
3433     # The import/export name is simply the status directory name
3434     return os.path.basename(status_dir)
3435
3436   except Exception:
3437     shutil.rmtree(status_dir, ignore_errors=True)
3438     raise
3439
3440
3441 def GetImportExportStatus(names):
3442   """Returns import/export daemon status.
3443
3444   @type names: sequence
3445   @param names: List of names
3446   @rtype: List of dicts
3447   @return: Returns a list of the state of each named import/export or None if a
3448            status couldn't be read
3449
3450   """
3451   result = []
3452
3453   for name in names:
3454     status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3455                                  _IES_STATUS_FILE)
3456
3457     try:
3458       data = utils.ReadFile(status_file)
3459     except EnvironmentError, err:
3460       if err.errno != errno.ENOENT:
3461         raise
3462       data = None
3463
3464     if not data:
3465       result.append(None)
3466       continue
3467
3468     result.append(serializer.LoadJson(data))
3469
3470   return result
3471
3472
3473 def AbortImportExport(name):
3474   """Sends SIGTERM to a running import/export daemon.
3475
3476   """
3477   logging.info("Abort import/export %s", name)
3478
3479   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3480   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3481
3482   if pid:
3483     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3484                  name, pid)
3485     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3486
3487
3488 def CleanupImportExport(name):
3489   """Cleanup after an import or export.
3490
3491   If the import/export daemon is still running it's killed. Afterwards the
3492   whole status directory is removed.
3493
3494   """
3495   logging.info("Finalizing import/export %s", name)
3496
3497   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3498
3499   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3500
3501   if pid:
3502     logging.info("Import/export %s is still running with PID %s",
3503                  name, pid)
3504     utils.KillProcess(pid, waitpid=False)
3505
3506   shutil.rmtree(status_dir, ignore_errors=True)
3507
3508
3509 def _FindDisks(nodes_ip, disks):
3510   """Sets the physical ID on disks and returns the block devices.
3511
3512   """
3513   # set the correct physical ID
3514   my_name = netutils.Hostname.GetSysName()
3515   for cf in disks:
3516     cf.SetPhysicalID(my_name, nodes_ip)
3517
3518   bdevs = []
3519
3520   for cf in disks:
3521     rd = _RecursiveFindBD(cf)
3522     if rd is None:
3523       _Fail("Can't find device %s", cf)
3524     bdevs.append(rd)
3525   return bdevs
3526
3527
3528 def DrbdDisconnectNet(nodes_ip, disks):
3529   """Disconnects the network on a list of drbd devices.
3530
3531   """
3532   bdevs = _FindDisks(nodes_ip, disks)
3533
3534   # disconnect disks
3535   for rd in bdevs:
3536     try:
3537       rd.DisconnectNet()
3538     except errors.BlockDeviceError, err:
3539       _Fail("Can't change network configuration to standalone mode: %s",
3540             err, exc=True)
3541
3542
3543 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3544   """Attaches the network on a list of drbd devices.
3545
3546   """
3547   bdevs = _FindDisks(nodes_ip, disks)
3548
3549   if multimaster:
3550     for idx, rd in enumerate(bdevs):
3551       try:
3552         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3553       except EnvironmentError, err:
3554         _Fail("Can't create symlink: %s", err)
3555   # reconnect disks, switch to new master configuration and if
3556   # needed primary mode
3557   for rd in bdevs:
3558     try:
3559       rd.AttachNet(multimaster)
3560     except errors.BlockDeviceError, err:
3561       _Fail("Can't change network configuration: %s", err)
3562
3563   # wait until the disks are connected; we need to retry the re-attach
3564   # if the device becomes standalone, as this might happen if the one
3565   # node disconnects and reconnects in a different mode before the
3566   # other node reconnects; in this case, one or both of the nodes will
3567   # decide it has wrong configuration and switch to standalone
3568
3569   def _Attach():
3570     all_connected = True
3571
3572     for rd in bdevs:
3573       stats = rd.GetProcStatus()
3574
3575       all_connected = (all_connected and
3576                        (stats.is_connected or stats.is_in_resync))
3577
3578       if stats.is_standalone:
3579         # peer had different config info and this node became
3580         # standalone, even though this should not happen with the
3581         # new staged way of changing disk configs
3582         try:
3583           rd.AttachNet(multimaster)
3584         except errors.BlockDeviceError, err:
3585           _Fail("Can't change network configuration: %s", err)
3586
3587     if not all_connected:
3588       raise utils.RetryAgain()
3589
3590   try:
3591     # Start with a delay of 100 miliseconds and go up to 5 seconds
3592     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3593   except utils.RetryTimeout:
3594     _Fail("Timeout in disk reconnecting")
3595
3596   if multimaster:
3597     # change to primary mode
3598     for rd in bdevs:
3599       try:
3600         rd.Open()
3601       except errors.BlockDeviceError, err:
3602         _Fail("Can't change to primary mode: %s", err)
3603
3604
3605 def DrbdWaitSync(nodes_ip, disks):
3606   """Wait until DRBDs have synchronized.
3607
3608   """
3609   def _helper(rd):
3610     stats = rd.GetProcStatus()
3611     if not (stats.is_connected or stats.is_in_resync):
3612       raise utils.RetryAgain()
3613     return stats
3614
3615   bdevs = _FindDisks(nodes_ip, disks)
3616
3617   min_resync = 100
3618   alldone = True
3619   for rd in bdevs:
3620     try:
3621       # poll each second for 15 seconds
3622       stats = utils.Retry(_helper, 1, 15, args=[rd])
3623     except utils.RetryTimeout:
3624       stats = rd.GetProcStatus()
3625       # last check
3626       if not (stats.is_connected or stats.is_in_resync):
3627         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3628     alldone = alldone and (not stats.is_in_resync)
3629     if stats.sync_percent is not None:
3630       min_resync = min(min_resync, stats.sync_percent)
3631
3632   return (alldone, min_resync)
3633
3634
3635 def GetDrbdUsermodeHelper():
3636   """Returns DRBD usermode helper currently configured.
3637
3638   """
3639   try:
3640     return bdev.BaseDRBD.GetUsermodeHelper()
3641   except errors.BlockDeviceError, err:
3642     _Fail(str(err))
3643
3644
3645 def PowercycleNode(hypervisor_type):
3646   """Hard-powercycle the node.
3647
3648   Because we need to return first, and schedule the powercycle in the
3649   background, we won't be able to report failures nicely.
3650
3651   """
3652   hyper = hypervisor.GetHypervisor(hypervisor_type)
3653   try:
3654     pid = os.fork()
3655   except OSError:
3656     # if we can't fork, we'll pretend that we're in the child process
3657     pid = 0
3658   if pid > 0:
3659     return "Reboot scheduled in 5 seconds"
3660   # ensure the child is running on ram
3661   try:
3662     utils.Mlockall()
3663   except Exception: # pylint: disable=W0703
3664     pass
3665   time.sleep(5)
3666   hyper.PowercycleNode()
3667
3668
3669 def _VerifyRestrictedCmdName(cmd):
3670   """Verifies a restricted command name.
3671
3672   @type cmd: string
3673   @param cmd: Command name
3674   @rtype: tuple; (boolean, string or None)
3675   @return: The tuple's first element is the status; if C{False}, the second
3676     element is an error message string, otherwise it's C{None}
3677
3678   """
3679   if not cmd.strip():
3680     return (False, "Missing command name")
3681
3682   if os.path.basename(cmd) != cmd:
3683     return (False, "Invalid command name")
3684
3685   if not constants.EXT_PLUGIN_MASK.match(cmd):
3686     return (False, "Command name contains forbidden characters")
3687
3688   return (True, None)
3689
3690
3691 def _CommonRestrictedCmdCheck(path, owner):
3692   """Common checks for restricted command file system directories and files.
3693
3694   @type path: string
3695   @param path: Path to check
3696   @param owner: C{None} or tuple containing UID and GID
3697   @rtype: tuple; (boolean, string or C{os.stat} result)
3698   @return: The tuple's first element is the status; if C{False}, the second
3699     element is an error message string, otherwise it's the result of C{os.stat}
3700
3701   """
3702   if owner is None:
3703     # Default to root as owner
3704     owner = (0, 0)
3705
3706   try:
3707     st = os.stat(path)
3708   except EnvironmentError, err:
3709     return (False, "Can't stat(2) '%s': %s" % (path, err))
3710
3711   if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3712     return (False, "Permissions on '%s' are too permissive" % path)
3713
3714   if (st.st_uid, st.st_gid) != owner:
3715     (owner_uid, owner_gid) = owner
3716     return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3717
3718   return (True, st)
3719
3720
3721 def _VerifyRestrictedCmdDirectory(path, _owner=None):
3722   """Verifies restricted command directory.
3723
3724   @type path: string
3725   @param path: Path to check
3726   @rtype: tuple; (boolean, string or None)
3727   @return: The tuple's first element is the status; if C{False}, the second
3728     element is an error message string, otherwise it's C{None}
3729
3730   """
3731   (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3732
3733   if not status:
3734     return (False, value)
3735
3736   if not stat.S_ISDIR(value.st_mode):
3737     return (False, "Path '%s' is not a directory" % path)
3738
3739   return (True, None)
3740
3741
3742 def _VerifyRestrictedCmd(path, cmd, _owner=None):
3743   """Verifies a whole restricted command and returns its executable filename.
3744
3745   @type path: string
3746   @param path: Directory containing restricted commands
3747   @type cmd: string
3748   @param cmd: Command name
3749   @rtype: tuple; (boolean, string)
3750   @return: The tuple's first element is the status; if C{False}, the second
3751     element is an error message string, otherwise the second element is the
3752     absolute path to the executable
3753
3754   """
3755   executable = utils.PathJoin(path, cmd)
3756
3757   (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
3758
3759   if not status:
3760     return (False, msg)
3761
3762   if not utils.IsExecutable(executable):
3763     return (False, "access(2) thinks '%s' can't be executed" % executable)
3764
3765   return (True, executable)
3766
3767
3768 def _PrepareRestrictedCmd(path, cmd,
3769                           _verify_dir=_VerifyRestrictedCmdDirectory,
3770                           _verify_name=_VerifyRestrictedCmdName,
3771                           _verify_cmd=_VerifyRestrictedCmd):
3772   """Performs a number of tests on a restricted command.
3773
3774   @type path: string
3775   @param path: Directory containing restricted commands
3776   @type cmd: string
3777   @param cmd: Command name
3778   @return: Same as L{_VerifyRestrictedCmd}
3779
3780   """
3781   # Verify the directory first
3782   (status, msg) = _verify_dir(path)
3783   if status:
3784     # Check command if everything was alright
3785     (status, msg) = _verify_name(cmd)
3786
3787   if not status:
3788     return (False, msg)
3789
3790   # Check actual executable
3791   return _verify_cmd(path, cmd)
3792
3793
3794 def RunRestrictedCmd(cmd,
3795                      _lock_timeout=_RCMD_LOCK_TIMEOUT,
3796                      _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
3797                      _path=pathutils.RESTRICTED_COMMANDS_DIR,
3798                      _sleep_fn=time.sleep,
3799                      _prepare_fn=_PrepareRestrictedCmd,
3800                      _runcmd_fn=utils.RunCmd,
3801                      _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
3802   """Executes a restricted command after performing strict tests.
3803
3804   @type cmd: string
3805   @param cmd: Command name
3806   @rtype: string
3807   @return: Command output
3808   @raise RPCFail: In case of an error
3809
3810   """
3811   logging.info("Preparing to run restricted command '%s'", cmd)
3812
3813   if not _enabled:
3814     _Fail("Restricted commands disabled at configure time")
3815
3816   lock = None
3817   try:
3818     cmdresult = None
3819     try:
3820       lock = utils.FileLock.Open(_lock_file)
3821       lock.Exclusive(blocking=True, timeout=_lock_timeout)
3822
3823       (status, value) = _prepare_fn(_path, cmd)
3824
3825       if status:
3826         cmdresult = _runcmd_fn([value], env={}, reset_env=True,
3827                                postfork_fn=lambda _: lock.Unlock())
3828       else:
3829         logging.error(value)
3830     except Exception: # pylint: disable=W0703
3831       # Keep original error in log
3832       logging.exception("Caught exception")
3833
3834     if cmdresult is None:
3835       logging.info("Sleeping for %0.1f seconds before returning",
3836                    _RCMD_INVALID_DELAY)
3837       _sleep_fn(_RCMD_INVALID_DELAY)
3838
3839       # Do not include original error message in returned error
3840       _Fail("Executing command '%s' failed" % cmd)
3841     elif cmdresult.failed or cmdresult.fail_reason:
3842       _Fail("Restricted command '%s' failed: %s; output: %s",
3843             cmd, cmdresult.fail_reason, cmdresult.output)
3844     else:
3845       return cmdresult.output
3846   finally:
3847     if lock is not None:
3848       # Release lock at last
3849       lock.Close()
3850       lock = None
3851
3852
3853 def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
3854   """Creates or removes the watcher pause file.
3855
3856   @type until: None or number
3857   @param until: Unix timestamp saying until when the watcher shouldn't run
3858
3859   """
3860   if until is None:
3861     logging.info("Received request to no longer pause watcher")
3862     utils.RemoveFile(_filename)
3863   else:
3864     logging.info("Received request to pause watcher until %s", until)
3865
3866     if not ht.TNumber(until):
3867       _Fail("Duration must be numeric")
3868
3869     utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
3870
3871
3872 class HooksRunner(object):
3873   """Hook runner.
3874
3875   This class is instantiated on the node side (ganeti-noded) and not
3876   on the master side.
3877
3878   """
3879   def __init__(self, hooks_base_dir=None):
3880     """Constructor for hooks runner.
3881
3882     @type hooks_base_dir: str or None
3883     @param hooks_base_dir: if not None, this overrides the
3884         L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3885
3886     """
3887     if hooks_base_dir is None:
3888       hooks_base_dir = pathutils.HOOKS_BASE_DIR
3889     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3890     # constant
3891     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3892
3893   def RunLocalHooks(self, node_list, hpath, phase, env):
3894     """Check that the hooks will be run only locally and then run them.
3895
3896     """
3897     assert len(node_list) == 1
3898     node = node_list[0]
3899     _, myself = ssconf.GetMasterAndMyself()
3900     assert node == myself
3901
3902     results = self.RunHooks(hpath, phase, env)
3903
3904     # Return values in the form expected by HooksMaster
3905     return {node: (None, False, results)}
3906
3907   def RunHooks(self, hpath, phase, env):
3908     """Run the scripts in the hooks directory.
3909
3910     @type hpath: str
3911     @param hpath: the path to the hooks directory which
3912         holds the scripts
3913     @type phase: str
3914     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3915         L{constants.HOOKS_PHASE_POST}
3916     @type env: dict
3917     @param env: dictionary with the environment for the hook
3918     @rtype: list
3919     @return: list of 3-element tuples:
3920       - script path
3921       - script result, either L{constants.HKR_SUCCESS} or
3922         L{constants.HKR_FAIL}
3923       - output of the script
3924
3925     @raise errors.ProgrammerError: for invalid input
3926         parameters
3927
3928     """
3929     if phase == constants.HOOKS_PHASE_PRE:
3930       suffix = "pre"
3931     elif phase == constants.HOOKS_PHASE_POST:
3932       suffix = "post"
3933     else:
3934       _Fail("Unknown hooks phase '%s'", phase)
3935
3936     subdir = "%s-%s.d" % (hpath, suffix)
3937     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3938
3939     results = []
3940
3941     if not os.path.isdir(dir_name):
3942       # for non-existing/non-dirs, we simply exit instead of logging a
3943       # warning at every operation
3944       return results
3945
3946     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3947
3948     for (relname, relstatus, runresult) in runparts_results:
3949       if relstatus == constants.RUNPARTS_SKIP:
3950         rrval = constants.HKR_SKIP
3951         output = ""
3952       elif relstatus == constants.RUNPARTS_ERR:
3953         rrval = constants.HKR_FAIL
3954         output = "Hook script execution error: %s" % runresult
3955       elif relstatus == constants.RUNPARTS_RUN:
3956         if runresult.failed:
3957           rrval = constants.HKR_FAIL
3958         else:
3959           rrval = constants.HKR_SUCCESS
3960         output = utils.SafeEncode(runresult.output.strip())
3961       results.append(("%s/%s" % (subdir, relname), rrval, output))
3962
3963     return results
3964
3965
3966 class IAllocatorRunner(object):
3967   """IAllocator runner.
3968
3969   This class is instantiated on the node side (ganeti-noded) and not on
3970   the master side.
3971
3972   """
3973   @staticmethod
3974   def Run(name, idata):
3975     """Run an iallocator script.
3976
3977     @type name: str
3978     @param name: the iallocator script name
3979     @type idata: str
3980     @param idata: the allocator input data
3981
3982     @rtype: tuple
3983     @return: two element tuple of:
3984        - status
3985        - either error message or stdout of allocator (for success)
3986
3987     """
3988     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3989                                   os.path.isfile)
3990     if alloc_script is None:
3991       _Fail("iallocator module '%s' not found in the search path", name)
3992
3993     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3994     try:
3995       os.write(fd, idata)
3996       os.close(fd)
3997       result = utils.RunCmd([alloc_script, fin_name])
3998       if result.failed:
3999         _Fail("iallocator module '%s' failed: %s, output '%s'",
4000               name, result.fail_reason, result.output)
4001     finally:
4002       os.unlink(fin_name)
4003
4004     return result.stdout
4005
4006
4007 class DevCacheManager(object):
4008   """Simple class for managing a cache of block device information.
4009
4010   """
4011   _DEV_PREFIX = "/dev/"
4012   _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4013
4014   @classmethod
4015   def _ConvertPath(cls, dev_path):
4016     """Converts a /dev/name path to the cache file name.
4017
4018     This replaces slashes with underscores and strips the /dev
4019     prefix. It then returns the full path to the cache file.
4020
4021     @type dev_path: str
4022     @param dev_path: the C{/dev/} path name
4023     @rtype: str
4024     @return: the converted path name
4025
4026     """
4027     if dev_path.startswith(cls._DEV_PREFIX):
4028       dev_path = dev_path[len(cls._DEV_PREFIX):]
4029     dev_path = dev_path.replace("/", "_")
4030     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4031     return fpath
4032
4033   @classmethod
4034   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4035     """Updates the cache information for a given device.
4036
4037     @type dev_path: str
4038     @param dev_path: the pathname of the device
4039     @type owner: str
4040     @param owner: the owner (instance name) of the device
4041     @type on_primary: bool
4042     @param on_primary: whether this is the primary
4043         node nor not
4044     @type iv_name: str
4045     @param iv_name: the instance-visible name of the
4046         device, as in objects.Disk.iv_name
4047
4048     @rtype: None
4049
4050     """
4051     if dev_path is None:
4052       logging.error("DevCacheManager.UpdateCache got a None dev_path")
4053       return
4054     fpath = cls._ConvertPath(dev_path)
4055     if on_primary:
4056       state = "primary"
4057     else:
4058       state = "secondary"
4059     if iv_name is None:
4060       iv_name = "not_visible"
4061     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4062     try:
4063       utils.WriteFile(fpath, data=fdata)
4064     except EnvironmentError, err:
4065       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4066
4067   @classmethod
4068   def RemoveCache(cls, dev_path):
4069     """Remove data for a dev_path.
4070
4071     This is just a wrapper over L{utils.io.RemoveFile} with a converted
4072     path name and logging.
4073
4074     @type dev_path: str
4075     @param dev_path: the pathname of the device
4076
4077     @rtype: None
4078
4079     """
4080     if dev_path is None:
4081       logging.error("DevCacheManager.RemoveCache got a None dev_path")
4082       return
4083     fpath = cls._ConvertPath(dev_path)
4084     try:
4085       utils.RemoveFile(fpath)
4086     except EnvironmentError, err:
4087       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)