hv_kvm: Error messages, punctuation, other style fixes
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
67 from ganeti import ht
68
69
70 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
71 _ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
72   pathutils.DATA_DIR,
73   pathutils.JOB_QUEUE_ARCHIVE_DIR,
74   pathutils.QUEUE_DIR,
75   pathutils.CRYPTO_KEYS_DIR,
76   ])
77 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
78 _X509_KEY_FILE = "key"
79 _X509_CERT_FILE = "cert"
80 _IES_STATUS_FILE = "status"
81 _IES_PID_FILE = "pid"
82 _IES_CA_FILE = "ca"
83
84 #: Valid LVS output line regex
85 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
86
87 # Actions for the master setup script
88 _MASTER_START = "start"
89 _MASTER_STOP = "stop"
90
91 #: Maximum file permissions for remote command directory and executables
92 _RCMD_MAX_MODE = (stat.S_IRWXU |
93                   stat.S_IRGRP | stat.S_IXGRP |
94                   stat.S_IROTH | stat.S_IXOTH)
95
96 #: Delay before returning an error for remote commands
97 _RCMD_INVALID_DELAY = 10
98
99 #: How long to wait to acquire lock for remote commands (shorter than
100 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
101 #: command requests arrive
102 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
103
104
105 class RPCFail(Exception):
106   """Class denoting RPC failure.
107
108   Its argument is the error message.
109
110   """
111
112
113 def _Fail(msg, *args, **kwargs):
114   """Log an error and the raise an RPCFail exception.
115
116   This exception is then handled specially in the ganeti daemon and
117   turned into a 'failed' return type. As such, this function is a
118   useful shortcut for logging the error and returning it to the master
119   daemon.
120
121   @type msg: string
122   @param msg: the text of the exception
123   @raise RPCFail
124
125   """
126   if args:
127     msg = msg % args
128   if "log" not in kwargs or kwargs["log"]: # if we should log this error
129     if "exc" in kwargs and kwargs["exc"]:
130       logging.exception(msg)
131     else:
132       logging.error(msg)
133   raise RPCFail(msg)
134
135
136 def _GetConfig():
137   """Simple wrapper to return a SimpleStore.
138
139   @rtype: L{ssconf.SimpleStore}
140   @return: a SimpleStore instance
141
142   """
143   return ssconf.SimpleStore()
144
145
146 def _GetSshRunner(cluster_name):
147   """Simple wrapper to return an SshRunner.
148
149   @type cluster_name: str
150   @param cluster_name: the cluster name, which is needed
151       by the SshRunner constructor
152   @rtype: L{ssh.SshRunner}
153   @return: an SshRunner instance
154
155   """
156   return ssh.SshRunner(cluster_name)
157
158
159 def _Decompress(data):
160   """Unpacks data compressed by the RPC client.
161
162   @type data: list or tuple
163   @param data: Data sent by RPC client
164   @rtype: str
165   @return: Decompressed data
166
167   """
168   assert isinstance(data, (list, tuple))
169   assert len(data) == 2
170   (encoding, content) = data
171   if encoding == constants.RPC_ENCODING_NONE:
172     return content
173   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
174     return zlib.decompress(base64.b64decode(content))
175   else:
176     raise AssertionError("Unknown data encoding")
177
178
179 def _CleanDirectory(path, exclude=None):
180   """Removes all regular files in a directory.
181
182   @type path: str
183   @param path: the directory to clean
184   @type exclude: list
185   @param exclude: list of files to be excluded, defaults
186       to the empty list
187
188   """
189   if path not in _ALLOWED_CLEAN_DIRS:
190     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
191           path)
192
193   if not os.path.isdir(path):
194     return
195   if exclude is None:
196     exclude = []
197   else:
198     # Normalize excluded paths
199     exclude = [os.path.normpath(i) for i in exclude]
200
201   for rel_name in utils.ListVisibleFiles(path):
202     full_name = utils.PathJoin(path, rel_name)
203     if full_name in exclude:
204       continue
205     if os.path.isfile(full_name) and not os.path.islink(full_name):
206       utils.RemoveFile(full_name)
207
208
209 def _BuildUploadFileList():
210   """Build the list of allowed upload files.
211
212   This is abstracted so that it's built only once at module import time.
213
214   """
215   allowed_files = set([
216     pathutils.CLUSTER_CONF_FILE,
217     pathutils.ETC_HOSTS,
218     pathutils.SSH_KNOWN_HOSTS_FILE,
219     pathutils.VNC_PASSWORD_FILE,
220     pathutils.RAPI_CERT_FILE,
221     pathutils.SPICE_CERT_FILE,
222     pathutils.SPICE_CACERT_FILE,
223     pathutils.RAPI_USERS_FILE,
224     pathutils.CONFD_HMAC_KEY,
225     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
226     ])
227
228   for hv_name in constants.HYPER_TYPES:
229     hv_class = hypervisor.GetHypervisorClass(hv_name)
230     allowed_files.update(hv_class.GetAncillaryFiles()[0])
231
232   assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
233     "Allowed file storage paths should never be uploaded via RPC"
234
235   return frozenset(allowed_files)
236
237
238 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
239
240
241 def JobQueuePurge():
242   """Removes job queue files and archived jobs.
243
244   @rtype: tuple
245   @return: True, None
246
247   """
248   _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
249   _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
250
251
252 def GetMasterInfo():
253   """Returns master information.
254
255   This is an utility function to compute master information, either
256   for consumption here or from the node daemon.
257
258   @rtype: tuple
259   @return: master_netdev, master_ip, master_name, primary_ip_family,
260     master_netmask
261   @raise RPCFail: in case of errors
262
263   """
264   try:
265     cfg = _GetConfig()
266     master_netdev = cfg.GetMasterNetdev()
267     master_ip = cfg.GetMasterIP()
268     master_netmask = cfg.GetMasterNetmask()
269     master_node = cfg.GetMasterNode()
270     primary_ip_family = cfg.GetPrimaryIPFamily()
271   except errors.ConfigurationError, err:
272     _Fail("Cluster configuration incomplete: %s", err, exc=True)
273   return (master_netdev, master_ip, master_node, primary_ip_family,
274           master_netmask)
275
276
277 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
278   """Decorator that runs hooks before and after the decorated function.
279
280   @type hook_opcode: string
281   @param hook_opcode: opcode of the hook
282   @type hooks_path: string
283   @param hooks_path: path of the hooks
284   @type env_builder_fn: function
285   @param env_builder_fn: function that returns a dictionary containing the
286     environment variables for the hooks. Will get all the parameters of the
287     decorated function.
288   @raise RPCFail: in case of pre-hook failure
289
290   """
291   def decorator(fn):
292     def wrapper(*args, **kwargs):
293       _, myself = ssconf.GetMasterAndMyself()
294       nodes = ([myself], [myself])  # these hooks run locally
295
296       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
297
298       cfg = _GetConfig()
299       hr = HooksRunner()
300       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
301                             None, env_fn, logging.warning, cfg.GetClusterName(),
302                             cfg.GetMasterNode())
303
304       hm.RunPhase(constants.HOOKS_PHASE_PRE)
305       result = fn(*args, **kwargs)
306       hm.RunPhase(constants.HOOKS_PHASE_POST)
307
308       return result
309     return wrapper
310   return decorator
311
312
313 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
314   """Builds environment variables for master IP hooks.
315
316   @type master_params: L{objects.MasterNetworkParameters}
317   @param master_params: network parameters of the master
318   @type use_external_mip_script: boolean
319   @param use_external_mip_script: whether to use an external master IP
320     address setup script (unused, but necessary per the implementation of the
321     _RunLocalHooks decorator)
322
323   """
324   # pylint: disable=W0613
325   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
326   env = {
327     "MASTER_NETDEV": master_params.netdev,
328     "MASTER_IP": master_params.ip,
329     "MASTER_NETMASK": str(master_params.netmask),
330     "CLUSTER_IP_VERSION": str(ver),
331   }
332
333   return env
334
335
336 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
337   """Execute the master IP address setup script.
338
339   @type master_params: L{objects.MasterNetworkParameters}
340   @param master_params: network parameters of the master
341   @type action: string
342   @param action: action to pass to the script. Must be one of
343     L{backend._MASTER_START} or L{backend._MASTER_STOP}
344   @type use_external_mip_script: boolean
345   @param use_external_mip_script: whether to use an external master IP
346     address setup script
347   @raise backend.RPCFail: if there are errors during the execution of the
348     script
349
350   """
351   env = _BuildMasterIpEnv(master_params)
352
353   if use_external_mip_script:
354     setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
355   else:
356     setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
357
358   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
359
360   if result.failed:
361     _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
362           (action, result.exit_code, result.output), log=True)
363
364
365 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
366                _BuildMasterIpEnv)
367 def ActivateMasterIp(master_params, use_external_mip_script):
368   """Activate the IP address of the master daemon.
369
370   @type master_params: L{objects.MasterNetworkParameters}
371   @param master_params: network parameters of the master
372   @type use_external_mip_script: boolean
373   @param use_external_mip_script: whether to use an external master IP
374     address setup script
375   @raise RPCFail: in case of errors during the IP startup
376
377   """
378   _RunMasterSetupScript(master_params, _MASTER_START,
379                         use_external_mip_script)
380
381
382 def StartMasterDaemons(no_voting):
383   """Activate local node as master node.
384
385   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
386
387   @type no_voting: boolean
388   @param no_voting: whether to start ganeti-masterd without a node vote
389       but still non-interactively
390   @rtype: None
391
392   """
393
394   if no_voting:
395     masterd_args = "--no-voting --yes-do-it"
396   else:
397     masterd_args = ""
398
399   env = {
400     "EXTRA_MASTERD_ARGS": masterd_args,
401     }
402
403   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
404   if result.failed:
405     msg = "Can't start Ganeti master: %s" % result.output
406     logging.error(msg)
407     _Fail(msg)
408
409
410 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
411                _BuildMasterIpEnv)
412 def DeactivateMasterIp(master_params, use_external_mip_script):
413   """Deactivate the master IP on this node.
414
415   @type master_params: L{objects.MasterNetworkParameters}
416   @param master_params: network parameters of the master
417   @type use_external_mip_script: boolean
418   @param use_external_mip_script: whether to use an external master IP
419     address setup script
420   @raise RPCFail: in case of errors during the IP turndown
421
422   """
423   _RunMasterSetupScript(master_params, _MASTER_STOP,
424                         use_external_mip_script)
425
426
427 def StopMasterDaemons():
428   """Stop the master daemons on this node.
429
430   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
431
432   @rtype: None
433
434   """
435   # TODO: log and report back to the caller the error failures; we
436   # need to decide in which case we fail the RPC for this
437
438   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
439   if result.failed:
440     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
441                   " and error %s",
442                   result.cmd, result.exit_code, result.output)
443
444
445 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
446   """Change the netmask of the master IP.
447
448   @param old_netmask: the old value of the netmask
449   @param netmask: the new value of the netmask
450   @param master_ip: the master IP
451   @param master_netdev: the master network device
452
453   """
454   if old_netmask == netmask:
455     return
456
457   if not netutils.IPAddress.Own(master_ip):
458     _Fail("The master IP address is not up, not attempting to change its"
459           " netmask")
460
461   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
462                          "%s/%s" % (master_ip, netmask),
463                          "dev", master_netdev, "label",
464                          "%s:0" % master_netdev])
465   if result.failed:
466     _Fail("Could not set the new netmask on the master IP address")
467
468   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
469                          "%s/%s" % (master_ip, old_netmask),
470                          "dev", master_netdev, "label",
471                          "%s:0" % master_netdev])
472   if result.failed:
473     _Fail("Could not bring down the master IP address with the old netmask")
474
475
476 def EtcHostsModify(mode, host, ip):
477   """Modify a host entry in /etc/hosts.
478
479   @param mode: The mode to operate. Either add or remove entry
480   @param host: The host to operate on
481   @param ip: The ip associated with the entry
482
483   """
484   if mode == constants.ETC_HOSTS_ADD:
485     if not ip:
486       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
487               " present")
488     utils.AddHostToEtcHosts(host, ip)
489   elif mode == constants.ETC_HOSTS_REMOVE:
490     if ip:
491       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
492               " parameter is present")
493     utils.RemoveHostFromEtcHosts(host)
494   else:
495     RPCFail("Mode not supported")
496
497
498 def LeaveCluster(modify_ssh_setup):
499   """Cleans up and remove the current node.
500
501   This function cleans up and prepares the current node to be removed
502   from the cluster.
503
504   If processing is successful, then it raises an
505   L{errors.QuitGanetiException} which is used as a special case to
506   shutdown the node daemon.
507
508   @param modify_ssh_setup: boolean
509
510   """
511   _CleanDirectory(pathutils.DATA_DIR)
512   _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
513   JobQueuePurge()
514
515   if modify_ssh_setup:
516     try:
517       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
518
519       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
520
521       utils.RemoveFile(priv_key)
522       utils.RemoveFile(pub_key)
523     except errors.OpExecError:
524       logging.exception("Error while processing ssh files")
525
526   try:
527     utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
528     utils.RemoveFile(pathutils.RAPI_CERT_FILE)
529     utils.RemoveFile(pathutils.SPICE_CERT_FILE)
530     utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
531     utils.RemoveFile(pathutils.NODED_CERT_FILE)
532   except: # pylint: disable=W0702
533     logging.exception("Error while removing cluster secrets")
534
535   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
536   if result.failed:
537     logging.error("Command %s failed with exitcode %s and error %s",
538                   result.cmd, result.exit_code, result.output)
539
540   # Raise a custom exception (handled in ganeti-noded)
541   raise errors.QuitGanetiException(True, "Shutdown scheduled")
542
543
544 def _GetVgInfo(name, excl_stor):
545   """Retrieves information about a LVM volume group.
546
547   """
548   # TODO: GetVGInfo supports returning information for multiple VGs at once
549   vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
550   if vginfo:
551     vg_free = int(round(vginfo[0][0], 0))
552     vg_size = int(round(vginfo[0][1], 0))
553   else:
554     vg_free = None
555     vg_size = None
556
557   return {
558     "name": name,
559     "vg_free": vg_free,
560     "vg_size": vg_size,
561     }
562
563
564 def _GetHvInfo(name):
565   """Retrieves node information from a hypervisor.
566
567   The information returned depends on the hypervisor. Common items:
568
569     - vg_size is the size of the configured volume group in MiB
570     - vg_free is the free size of the volume group in MiB
571     - memory_dom0 is the memory allocated for domain0 in MiB
572     - memory_free is the currently available (free) ram in MiB
573     - memory_total is the total number of ram in MiB
574     - hv_version: the hypervisor version, if available
575
576   """
577   return hypervisor.GetHypervisor(name).GetNodeInfo()
578
579
580 def _GetNamedNodeInfo(names, fn):
581   """Calls C{fn} for all names in C{names} and returns a dictionary.
582
583   @rtype: None or dict
584
585   """
586   if names is None:
587     return None
588   else:
589     return map(fn, names)
590
591
592 def GetNodeInfo(vg_names, hv_names, excl_stor):
593   """Gives back a hash with different information about the node.
594
595   @type vg_names: list of string
596   @param vg_names: Names of the volume groups to ask for disk space information
597   @type hv_names: list of string
598   @param hv_names: Names of the hypervisors to ask for node information
599   @type excl_stor: boolean
600   @param excl_stor: Whether exclusive_storage is active
601   @rtype: tuple; (string, None/dict, None/dict)
602   @return: Tuple containing boot ID, volume group information and hypervisor
603     information
604
605   """
606   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
607   vg_info = _GetNamedNodeInfo(vg_names, (lambda vg: _GetVgInfo(vg, excl_stor)))
608   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
609
610   return (bootid, vg_info, hv_info)
611
612
613 def _CheckExclusivePvs(pvi_list):
614   """Check that PVs are not shared among LVs
615
616   @type pvi_list: list of L{objects.LvmPvInfo} objects
617   @param pvi_list: information about the PVs
618
619   @rtype: list of tuples (string, list of strings)
620   @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
621
622   """
623   res = []
624   for pvi in pvi_list:
625     if len(pvi.lv_list) > 1:
626       res.append((pvi.name, pvi.lv_list))
627   return res
628
629
630 def VerifyNode(what, cluster_name):
631   """Verify the status of the local node.
632
633   Based on the input L{what} parameter, various checks are done on the
634   local node.
635
636   If the I{filelist} key is present, this list of
637   files is checksummed and the file/checksum pairs are returned.
638
639   If the I{nodelist} key is present, we check that we have
640   connectivity via ssh with the target nodes (and check the hostname
641   report).
642
643   If the I{node-net-test} key is present, we check that we have
644   connectivity to the given nodes via both primary IP and, if
645   applicable, secondary IPs.
646
647   @type what: C{dict}
648   @param what: a dictionary of things to check:
649       - filelist: list of files for which to compute checksums
650       - nodelist: list of nodes we should check ssh communication with
651       - node-net-test: list of nodes we should check node daemon port
652         connectivity with
653       - hypervisor: list with hypervisors to run the verify for
654   @rtype: dict
655   @return: a dictionary with the same keys as the input dict, and
656       values representing the result of the checks
657
658   """
659   result = {}
660   my_name = netutils.Hostname.GetSysName()
661   port = netutils.GetDaemonPort(constants.NODED)
662   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
663
664   if constants.NV_HYPERVISOR in what and vm_capable:
665     result[constants.NV_HYPERVISOR] = tmp = {}
666     for hv_name in what[constants.NV_HYPERVISOR]:
667       try:
668         val = hypervisor.GetHypervisor(hv_name).Verify()
669       except errors.HypervisorError, err:
670         val = "Error while checking hypervisor: %s" % str(err)
671       tmp[hv_name] = val
672
673   if constants.NV_HVPARAMS in what and vm_capable:
674     result[constants.NV_HVPARAMS] = tmp = []
675     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
676       try:
677         logging.info("Validating hv %s, %s", hv_name, hvparms)
678         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
679       except errors.HypervisorError, err:
680         tmp.append((source, hv_name, str(err)))
681
682   if constants.NV_FILELIST in what:
683     fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
684                                               what[constants.NV_FILELIST]))
685     result[constants.NV_FILELIST] = \
686       dict((vcluster.MakeVirtualPath(key), value)
687            for (key, value) in fingerprints.items())
688
689   if constants.NV_NODELIST in what:
690     (nodes, bynode) = what[constants.NV_NODELIST]
691
692     # Add nodes from other groups (different for each node)
693     try:
694       nodes.extend(bynode[my_name])
695     except KeyError:
696       pass
697
698     # Use a random order
699     random.shuffle(nodes)
700
701     # Try to contact all nodes
702     val = {}
703     for node in nodes:
704       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
705       if not success:
706         val[node] = message
707
708     result[constants.NV_NODELIST] = val
709
710   if constants.NV_NODENETTEST in what:
711     result[constants.NV_NODENETTEST] = tmp = {}
712     my_pip = my_sip = None
713     for name, pip, sip in what[constants.NV_NODENETTEST]:
714       if name == my_name:
715         my_pip = pip
716         my_sip = sip
717         break
718     if not my_pip:
719       tmp[my_name] = ("Can't find my own primary/secondary IP"
720                       " in the node list")
721     else:
722       for name, pip, sip in what[constants.NV_NODENETTEST]:
723         fail = []
724         if not netutils.TcpPing(pip, port, source=my_pip):
725           fail.append("primary")
726         if sip != pip:
727           if not netutils.TcpPing(sip, port, source=my_sip):
728             fail.append("secondary")
729         if fail:
730           tmp[name] = ("failure using the %s interface(s)" %
731                        " and ".join(fail))
732
733   if constants.NV_MASTERIP in what:
734     # FIXME: add checks on incoming data structures (here and in the
735     # rest of the function)
736     master_name, master_ip = what[constants.NV_MASTERIP]
737     if master_name == my_name:
738       source = constants.IP4_ADDRESS_LOCALHOST
739     else:
740       source = None
741     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
742                                                      source=source)
743
744   if constants.NV_USERSCRIPTS in what:
745     result[constants.NV_USERSCRIPTS] = \
746       [script for script in what[constants.NV_USERSCRIPTS]
747        if not utils.IsExecutable(script)]
748
749   if constants.NV_OOB_PATHS in what:
750     result[constants.NV_OOB_PATHS] = tmp = []
751     for path in what[constants.NV_OOB_PATHS]:
752       try:
753         st = os.stat(path)
754       except OSError, err:
755         tmp.append("error stating out of band helper: %s" % err)
756       else:
757         if stat.S_ISREG(st.st_mode):
758           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
759             tmp.append(None)
760           else:
761             tmp.append("out of band helper %s is not executable" % path)
762         else:
763           tmp.append("out of band helper %s is not a file" % path)
764
765   if constants.NV_LVLIST in what and vm_capable:
766     try:
767       val = GetVolumeList(utils.ListVolumeGroups().keys())
768     except RPCFail, err:
769       val = str(err)
770     result[constants.NV_LVLIST] = val
771
772   if constants.NV_INSTANCELIST in what and vm_capable:
773     # GetInstanceList can fail
774     try:
775       val = GetInstanceList(what[constants.NV_INSTANCELIST])
776     except RPCFail, err:
777       val = str(err)
778     result[constants.NV_INSTANCELIST] = val
779
780   if constants.NV_VGLIST in what and vm_capable:
781     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
782
783   if constants.NV_PVLIST in what and vm_capable:
784     check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
785     val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
786                                        filter_allocatable=False,
787                                        include_lvs=check_exclusive_pvs)
788     if check_exclusive_pvs:
789       result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
790       for pvi in val:
791         # Avoid sending useless data on the wire
792         pvi.lv_list = []
793     result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
794
795   if constants.NV_VERSION in what:
796     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
797                                     constants.RELEASE_VERSION)
798
799   if constants.NV_HVINFO in what and vm_capable:
800     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
801     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
802
803   if constants.NV_DRBDLIST in what and vm_capable:
804     try:
805       used_minors = bdev.DRBD8.GetUsedDevs().keys()
806     except errors.BlockDeviceError, err:
807       logging.warning("Can't get used minors list", exc_info=True)
808       used_minors = str(err)
809     result[constants.NV_DRBDLIST] = used_minors
810
811   if constants.NV_DRBDHELPER in what and vm_capable:
812     status = True
813     try:
814       payload = bdev.BaseDRBD.GetUsermodeHelper()
815     except errors.BlockDeviceError, err:
816       logging.error("Can't get DRBD usermode helper: %s", str(err))
817       status = False
818       payload = str(err)
819     result[constants.NV_DRBDHELPER] = (status, payload)
820
821   if constants.NV_NODESETUP in what:
822     result[constants.NV_NODESETUP] = tmpr = []
823     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
824       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
825                   " under /sys, missing required directories /sys/block"
826                   " and /sys/class/net")
827     if (not os.path.isdir("/proc/sys") or
828         not os.path.isfile("/proc/sysrq-trigger")):
829       tmpr.append("The procfs filesystem doesn't seem to be mounted"
830                   " under /proc, missing required directory /proc/sys and"
831                   " the file /proc/sysrq-trigger")
832
833   if constants.NV_TIME in what:
834     result[constants.NV_TIME] = utils.SplitTime(time.time())
835
836   if constants.NV_OSLIST in what and vm_capable:
837     result[constants.NV_OSLIST] = DiagnoseOS()
838
839   if constants.NV_BRIDGES in what and vm_capable:
840     result[constants.NV_BRIDGES] = [bridge
841                                     for bridge in what[constants.NV_BRIDGES]
842                                     if not utils.BridgeExists(bridge)]
843
844   if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
845     result[constants.NV_FILE_STORAGE_PATHS] = \
846       bdev.ComputeWrongFileStoragePaths()
847
848   return result
849
850
851 def GetBlockDevSizes(devices):
852   """Return the size of the given block devices
853
854   @type devices: list
855   @param devices: list of block device nodes to query
856   @rtype: dict
857   @return:
858     dictionary of all block devices under /dev (key). The value is their
859     size in MiB.
860
861     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
862
863   """
864   DEV_PREFIX = "/dev/"
865   blockdevs = {}
866
867   for devpath in devices:
868     if not utils.IsBelowDir(DEV_PREFIX, devpath):
869       continue
870
871     try:
872       st = os.stat(devpath)
873     except EnvironmentError, err:
874       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
875       continue
876
877     if stat.S_ISBLK(st.st_mode):
878       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
879       if result.failed:
880         # We don't want to fail, just do not list this device as available
881         logging.warning("Cannot get size for block device %s", devpath)
882         continue
883
884       size = int(result.stdout) / (1024 * 1024)
885       blockdevs[devpath] = size
886   return blockdevs
887
888
889 def GetVolumeList(vg_names):
890   """Compute list of logical volumes and their size.
891
892   @type vg_names: list
893   @param vg_names: the volume groups whose LVs we should list, or
894       empty for all volume groups
895   @rtype: dict
896   @return:
897       dictionary of all partions (key) with value being a tuple of
898       their size (in MiB), inactive and online status::
899
900         {'xenvg/test1': ('20.06', True, True)}
901
902       in case of errors, a string is returned with the error
903       details.
904
905   """
906   lvs = {}
907   sep = "|"
908   if not vg_names:
909     vg_names = []
910   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
911                          "--separator=%s" % sep,
912                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
913   if result.failed:
914     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
915
916   for line in result.stdout.splitlines():
917     line = line.strip()
918     match = _LVSLINE_REGEX.match(line)
919     if not match:
920       logging.error("Invalid line returned from lvs output: '%s'", line)
921       continue
922     vg_name, name, size, attr = match.groups()
923     inactive = attr[4] == "-"
924     online = attr[5] == "o"
925     virtual = attr[0] == "v"
926     if virtual:
927       # we don't want to report such volumes as existing, since they
928       # don't really hold data
929       continue
930     lvs[vg_name + "/" + name] = (size, inactive, online)
931
932   return lvs
933
934
935 def ListVolumeGroups():
936   """List the volume groups and their size.
937
938   @rtype: dict
939   @return: dictionary with keys volume name and values the
940       size of the volume
941
942   """
943   return utils.ListVolumeGroups()
944
945
946 def NodeVolumes():
947   """List all volumes on this node.
948
949   @rtype: list
950   @return:
951     A list of dictionaries, each having four keys:
952       - name: the logical volume name,
953       - size: the size of the logical volume
954       - dev: the physical device on which the LV lives
955       - vg: the volume group to which it belongs
956
957     In case of errors, we return an empty list and log the
958     error.
959
960     Note that since a logical volume can live on multiple physical
961     volumes, the resulting list might include a logical volume
962     multiple times.
963
964   """
965   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
966                          "--separator=|",
967                          "--options=lv_name,lv_size,devices,vg_name"])
968   if result.failed:
969     _Fail("Failed to list logical volumes, lvs output: %s",
970           result.output)
971
972   def parse_dev(dev):
973     return dev.split("(")[0]
974
975   def handle_dev(dev):
976     return [parse_dev(x) for x in dev.split(",")]
977
978   def map_line(line):
979     line = [v.strip() for v in line]
980     return [{"name": line[0], "size": line[1],
981              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
982
983   all_devs = []
984   for line in result.stdout.splitlines():
985     if line.count("|") >= 3:
986       all_devs.extend(map_line(line.split("|")))
987     else:
988       logging.warning("Strange line in the output from lvs: '%s'", line)
989   return all_devs
990
991
992 def BridgesExist(bridges_list):
993   """Check if a list of bridges exist on the current node.
994
995   @rtype: boolean
996   @return: C{True} if all of them exist, C{False} otherwise
997
998   """
999   missing = []
1000   for bridge in bridges_list:
1001     if not utils.BridgeExists(bridge):
1002       missing.append(bridge)
1003
1004   if missing:
1005     _Fail("Missing bridges %s", utils.CommaJoin(missing))
1006
1007
1008 def GetInstanceList(hypervisor_list):
1009   """Provides a list of instances.
1010
1011   @type hypervisor_list: list
1012   @param hypervisor_list: the list of hypervisors to query information
1013
1014   @rtype: list
1015   @return: a list of all running instances on the current node
1016     - instance1.example.com
1017     - instance2.example.com
1018
1019   """
1020   results = []
1021   for hname in hypervisor_list:
1022     try:
1023       names = hypervisor.GetHypervisor(hname).ListInstances()
1024       results.extend(names)
1025     except errors.HypervisorError, err:
1026       _Fail("Error enumerating instances (hypervisor %s): %s",
1027             hname, err, exc=True)
1028
1029   return results
1030
1031
1032 def GetInstanceInfo(instance, hname):
1033   """Gives back the information about an instance as a dictionary.
1034
1035   @type instance: string
1036   @param instance: the instance name
1037   @type hname: string
1038   @param hname: the hypervisor type of the instance
1039
1040   @rtype: dict
1041   @return: dictionary with the following keys:
1042       - memory: memory size of instance (int)
1043       - state: xen state of instance (string)
1044       - time: cpu time of instance (float)
1045       - vcpus: the number of vcpus (int)
1046
1047   """
1048   output = {}
1049
1050   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1051   if iinfo is not None:
1052     output["memory"] = iinfo[2]
1053     output["vcpus"] = iinfo[3]
1054     output["state"] = iinfo[4]
1055     output["time"] = iinfo[5]
1056
1057   return output
1058
1059
1060 def GetInstanceMigratable(instance):
1061   """Gives whether an instance can be migrated.
1062
1063   @type instance: L{objects.Instance}
1064   @param instance: object representing the instance to be checked.
1065
1066   @rtype: tuple
1067   @return: tuple of (result, description) where:
1068       - result: whether the instance can be migrated or not
1069       - description: a description of the issue, if relevant
1070
1071   """
1072   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1073   iname = instance.name
1074   if iname not in hyper.ListInstances():
1075     _Fail("Instance %s is not running", iname)
1076
1077   for idx in range(len(instance.disks)):
1078     link_name = _GetBlockDevSymlinkPath(iname, idx)
1079     if not os.path.islink(link_name):
1080       logging.warning("Instance %s is missing symlink %s for disk %d",
1081                       iname, link_name, idx)
1082
1083
1084 def GetAllInstancesInfo(hypervisor_list):
1085   """Gather data about all instances.
1086
1087   This is the equivalent of L{GetInstanceInfo}, except that it
1088   computes data for all instances at once, thus being faster if one
1089   needs data about more than one instance.
1090
1091   @type hypervisor_list: list
1092   @param hypervisor_list: list of hypervisors to query for instance data
1093
1094   @rtype: dict
1095   @return: dictionary of instance: data, with data having the following keys:
1096       - memory: memory size of instance (int)
1097       - state: xen state of instance (string)
1098       - time: cpu time of instance (float)
1099       - vcpus: the number of vcpus
1100
1101   """
1102   output = {}
1103
1104   for hname in hypervisor_list:
1105     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1106     if iinfo:
1107       for name, _, memory, vcpus, state, times in iinfo:
1108         value = {
1109           "memory": memory,
1110           "vcpus": vcpus,
1111           "state": state,
1112           "time": times,
1113           }
1114         if name in output:
1115           # we only check static parameters, like memory and vcpus,
1116           # and not state and time which can change between the
1117           # invocations of the different hypervisors
1118           for key in "memory", "vcpus":
1119             if value[key] != output[name][key]:
1120               _Fail("Instance %s is running twice"
1121                     " with different parameters", name)
1122         output[name] = value
1123
1124   return output
1125
1126
1127 def _InstanceLogName(kind, os_name, instance, component):
1128   """Compute the OS log filename for a given instance and operation.
1129
1130   The instance name and os name are passed in as strings since not all
1131   operations have these as part of an instance object.
1132
1133   @type kind: string
1134   @param kind: the operation type (e.g. add, import, etc.)
1135   @type os_name: string
1136   @param os_name: the os name
1137   @type instance: string
1138   @param instance: the name of the instance being imported/added/etc.
1139   @type component: string or None
1140   @param component: the name of the component of the instance being
1141       transferred
1142
1143   """
1144   # TODO: Use tempfile.mkstemp to create unique filename
1145   if component:
1146     assert "/" not in component
1147     c_msg = "-%s" % component
1148   else:
1149     c_msg = ""
1150   base = ("%s-%s-%s%s-%s.log" %
1151           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1152   return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1153
1154
1155 def InstanceOsAdd(instance, reinstall, debug):
1156   """Add an OS to an instance.
1157
1158   @type instance: L{objects.Instance}
1159   @param instance: Instance whose OS is to be installed
1160   @type reinstall: boolean
1161   @param reinstall: whether this is an instance reinstall
1162   @type debug: integer
1163   @param debug: debug level, passed to the OS scripts
1164   @rtype: None
1165
1166   """
1167   inst_os = OSFromDisk(instance.os)
1168
1169   create_env = OSEnvironment(instance, inst_os, debug)
1170   if reinstall:
1171     create_env["INSTANCE_REINSTALL"] = "1"
1172
1173   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1174
1175   result = utils.RunCmd([inst_os.create_script], env=create_env,
1176                         cwd=inst_os.path, output=logfile, reset_env=True)
1177   if result.failed:
1178     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1179                   " output: %s", result.cmd, result.fail_reason, logfile,
1180                   result.output)
1181     lines = [utils.SafeEncode(val)
1182              for val in utils.TailFile(logfile, lines=20)]
1183     _Fail("OS create script failed (%s), last lines in the"
1184           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1185
1186
1187 def RunRenameInstance(instance, old_name, debug):
1188   """Run the OS rename script for an instance.
1189
1190   @type instance: L{objects.Instance}
1191   @param instance: Instance whose OS is to be installed
1192   @type old_name: string
1193   @param old_name: previous instance name
1194   @type debug: integer
1195   @param debug: debug level, passed to the OS scripts
1196   @rtype: boolean
1197   @return: the success of the operation
1198
1199   """
1200   inst_os = OSFromDisk(instance.os)
1201
1202   rename_env = OSEnvironment(instance, inst_os, debug)
1203   rename_env["OLD_INSTANCE_NAME"] = old_name
1204
1205   logfile = _InstanceLogName("rename", instance.os,
1206                              "%s-%s" % (old_name, instance.name), None)
1207
1208   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1209                         cwd=inst_os.path, output=logfile, reset_env=True)
1210
1211   if result.failed:
1212     logging.error("os create command '%s' returned error: %s output: %s",
1213                   result.cmd, result.fail_reason, result.output)
1214     lines = [utils.SafeEncode(val)
1215              for val in utils.TailFile(logfile, lines=20)]
1216     _Fail("OS rename script failed (%s), last lines in the"
1217           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1218
1219
1220 def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1221   """Returns symlink path for block device.
1222
1223   """
1224   if _dir is None:
1225     _dir = pathutils.DISK_LINKS_DIR
1226
1227   return utils.PathJoin(_dir,
1228                         ("%s%s%s" %
1229                          (instance_name, constants.DISK_SEPARATOR, idx)))
1230
1231
1232 def _SymlinkBlockDev(instance_name, device_path, idx):
1233   """Set up symlinks to a instance's block device.
1234
1235   This is an auxiliary function run when an instance is start (on the primary
1236   node) or when an instance is migrated (on the target node).
1237
1238
1239   @param instance_name: the name of the target instance
1240   @param device_path: path of the physical block device, on the node
1241   @param idx: the disk index
1242   @return: absolute path to the disk's symlink
1243
1244   """
1245   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1246   try:
1247     os.symlink(device_path, link_name)
1248   except OSError, err:
1249     if err.errno == errno.EEXIST:
1250       if (not os.path.islink(link_name) or
1251           os.readlink(link_name) != device_path):
1252         os.remove(link_name)
1253         os.symlink(device_path, link_name)
1254     else:
1255       raise
1256
1257   return link_name
1258
1259
1260 def _RemoveBlockDevLinks(instance_name, disks):
1261   """Remove the block device symlinks belonging to the given instance.
1262
1263   """
1264   for idx, _ in enumerate(disks):
1265     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1266     if os.path.islink(link_name):
1267       try:
1268         os.remove(link_name)
1269       except OSError:
1270         logging.exception("Can't remove symlink '%s'", link_name)
1271
1272
1273 def _GatherAndLinkBlockDevs(instance):
1274   """Set up an instance's block device(s).
1275
1276   This is run on the primary node at instance startup. The block
1277   devices must be already assembled.
1278
1279   @type instance: L{objects.Instance}
1280   @param instance: the instance whose disks we shoul assemble
1281   @rtype: list
1282   @return: list of (disk_object, device_path)
1283
1284   """
1285   block_devices = []
1286   for idx, disk in enumerate(instance.disks):
1287     device = _RecursiveFindBD(disk)
1288     if device is None:
1289       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1290                                     str(disk))
1291     device.Open()
1292     try:
1293       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1294     except OSError, e:
1295       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1296                                     e.strerror)
1297
1298     block_devices.append((disk, link_name))
1299
1300   return block_devices
1301
1302
1303 def StartInstance(instance, startup_paused):
1304   """Start an instance.
1305
1306   @type instance: L{objects.Instance}
1307   @param instance: the instance object
1308   @type startup_paused: bool
1309   @param instance: pause instance at startup?
1310   @rtype: None
1311
1312   """
1313   running_instances = GetInstanceList([instance.hypervisor])
1314
1315   if instance.name in running_instances:
1316     logging.info("Instance %s already running, not starting", instance.name)
1317     return
1318
1319   try:
1320     block_devices = _GatherAndLinkBlockDevs(instance)
1321     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1322     hyper.StartInstance(instance, block_devices, startup_paused)
1323   except errors.BlockDeviceError, err:
1324     _Fail("Block device error: %s", err, exc=True)
1325   except errors.HypervisorError, err:
1326     _RemoveBlockDevLinks(instance.name, instance.disks)
1327     _Fail("Hypervisor error: %s", err, exc=True)
1328
1329
1330 def InstanceShutdown(instance, timeout):
1331   """Shut an instance down.
1332
1333   @note: this functions uses polling with a hardcoded timeout.
1334
1335   @type instance: L{objects.Instance}
1336   @param instance: the instance object
1337   @type timeout: integer
1338   @param timeout: maximum timeout for soft shutdown
1339   @rtype: None
1340
1341   """
1342   hv_name = instance.hypervisor
1343   hyper = hypervisor.GetHypervisor(hv_name)
1344   iname = instance.name
1345
1346   if instance.name not in hyper.ListInstances():
1347     logging.info("Instance %s not running, doing nothing", iname)
1348     return
1349
1350   class _TryShutdown:
1351     def __init__(self):
1352       self.tried_once = False
1353
1354     def __call__(self):
1355       if iname not in hyper.ListInstances():
1356         return
1357
1358       try:
1359         hyper.StopInstance(instance, retry=self.tried_once)
1360       except errors.HypervisorError, err:
1361         if iname not in hyper.ListInstances():
1362           # if the instance is no longer existing, consider this a
1363           # success and go to cleanup
1364           return
1365
1366         _Fail("Failed to stop instance %s: %s", iname, err)
1367
1368       self.tried_once = True
1369
1370       raise utils.RetryAgain()
1371
1372   try:
1373     utils.Retry(_TryShutdown(), 5, timeout)
1374   except utils.RetryTimeout:
1375     # the shutdown did not succeed
1376     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1377
1378     try:
1379       hyper.StopInstance(instance, force=True)
1380     except errors.HypervisorError, err:
1381       if iname in hyper.ListInstances():
1382         # only raise an error if the instance still exists, otherwise
1383         # the error could simply be "instance ... unknown"!
1384         _Fail("Failed to force stop instance %s: %s", iname, err)
1385
1386     time.sleep(1)
1387
1388     if iname in hyper.ListInstances():
1389       _Fail("Could not shutdown instance %s even by destroy", iname)
1390
1391   try:
1392     hyper.CleanupInstance(instance.name)
1393   except errors.HypervisorError, err:
1394     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1395
1396   _RemoveBlockDevLinks(iname, instance.disks)
1397
1398
1399 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1400   """Reboot an instance.
1401
1402   @type instance: L{objects.Instance}
1403   @param instance: the instance object to reboot
1404   @type reboot_type: str
1405   @param reboot_type: the type of reboot, one the following
1406     constants:
1407       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1408         instance OS, do not recreate the VM
1409       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1410         restart the VM (at the hypervisor level)
1411       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1412         not accepted here, since that mode is handled differently, in
1413         cmdlib, and translates into full stop and start of the
1414         instance (instead of a call_instance_reboot RPC)
1415   @type shutdown_timeout: integer
1416   @param shutdown_timeout: maximum timeout for soft shutdown
1417   @rtype: None
1418
1419   """
1420   running_instances = GetInstanceList([instance.hypervisor])
1421
1422   if instance.name not in running_instances:
1423     _Fail("Cannot reboot instance %s that is not running", instance.name)
1424
1425   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1426   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1427     try:
1428       hyper.RebootInstance(instance)
1429     except errors.HypervisorError, err:
1430       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1431   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1432     try:
1433       InstanceShutdown(instance, shutdown_timeout)
1434       return StartInstance(instance, False)
1435     except errors.HypervisorError, err:
1436       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1437   else:
1438     _Fail("Invalid reboot_type received: %s", reboot_type)
1439
1440
1441 def InstanceBalloonMemory(instance, memory):
1442   """Resize an instance's memory.
1443
1444   @type instance: L{objects.Instance}
1445   @param instance: the instance object
1446   @type memory: int
1447   @param memory: new memory amount in MB
1448   @rtype: None
1449
1450   """
1451   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1452   running = hyper.ListInstances()
1453   if instance.name not in running:
1454     logging.info("Instance %s is not running, cannot balloon", instance.name)
1455     return
1456   try:
1457     hyper.BalloonInstanceMemory(instance, memory)
1458   except errors.HypervisorError, err:
1459     _Fail("Failed to balloon instance memory: %s", err, exc=True)
1460
1461
1462 def MigrationInfo(instance):
1463   """Gather information about an instance to be migrated.
1464
1465   @type instance: L{objects.Instance}
1466   @param instance: the instance definition
1467
1468   """
1469   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1470   try:
1471     info = hyper.MigrationInfo(instance)
1472   except errors.HypervisorError, err:
1473     _Fail("Failed to fetch migration information: %s", err, exc=True)
1474   return info
1475
1476
1477 def AcceptInstance(instance, info, target):
1478   """Prepare the node to accept an instance.
1479
1480   @type instance: L{objects.Instance}
1481   @param instance: the instance definition
1482   @type info: string/data (opaque)
1483   @param info: migration information, from the source node
1484   @type target: string
1485   @param target: target host (usually ip), on this node
1486
1487   """
1488   # TODO: why is this required only for DTS_EXT_MIRROR?
1489   if instance.disk_template in constants.DTS_EXT_MIRROR:
1490     # Create the symlinks, as the disks are not active
1491     # in any way
1492     try:
1493       _GatherAndLinkBlockDevs(instance)
1494     except errors.BlockDeviceError, err:
1495       _Fail("Block device error: %s", err, exc=True)
1496
1497   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1498   try:
1499     hyper.AcceptInstance(instance, info, target)
1500   except errors.HypervisorError, err:
1501     if instance.disk_template in constants.DTS_EXT_MIRROR:
1502       _RemoveBlockDevLinks(instance.name, instance.disks)
1503     _Fail("Failed to accept instance: %s", err, exc=True)
1504
1505
1506 def FinalizeMigrationDst(instance, info, success):
1507   """Finalize any preparation to accept an instance.
1508
1509   @type instance: L{objects.Instance}
1510   @param instance: the instance definition
1511   @type info: string/data (opaque)
1512   @param info: migration information, from the source node
1513   @type success: boolean
1514   @param success: whether the migration was a success or a failure
1515
1516   """
1517   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1518   try:
1519     hyper.FinalizeMigrationDst(instance, info, success)
1520   except errors.HypervisorError, err:
1521     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1522
1523
1524 def MigrateInstance(instance, target, live):
1525   """Migrates an instance to another node.
1526
1527   @type instance: L{objects.Instance}
1528   @param instance: the instance definition
1529   @type target: string
1530   @param target: the target node name
1531   @type live: boolean
1532   @param live: whether the migration should be done live or not (the
1533       interpretation of this parameter is left to the hypervisor)
1534   @raise RPCFail: if migration fails for some reason
1535
1536   """
1537   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1538
1539   try:
1540     hyper.MigrateInstance(instance, target, live)
1541   except errors.HypervisorError, err:
1542     _Fail("Failed to migrate instance: %s", err, exc=True)
1543
1544
1545 def FinalizeMigrationSource(instance, success, live):
1546   """Finalize the instance migration on the source node.
1547
1548   @type instance: L{objects.Instance}
1549   @param instance: the instance definition of the migrated instance
1550   @type success: bool
1551   @param success: whether the migration succeeded or not
1552   @type live: bool
1553   @param live: whether the user requested a live migration or not
1554   @raise RPCFail: If the execution fails for some reason
1555
1556   """
1557   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1558
1559   try:
1560     hyper.FinalizeMigrationSource(instance, success, live)
1561   except Exception, err:  # pylint: disable=W0703
1562     _Fail("Failed to finalize the migration on the source node: %s", err,
1563           exc=True)
1564
1565
1566 def GetMigrationStatus(instance):
1567   """Get the migration status
1568
1569   @type instance: L{objects.Instance}
1570   @param instance: the instance that is being migrated
1571   @rtype: L{objects.MigrationStatus}
1572   @return: the status of the current migration (one of
1573            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1574            progress info that can be retrieved from the hypervisor
1575   @raise RPCFail: If the migration status cannot be retrieved
1576
1577   """
1578   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1579   try:
1580     return hyper.GetMigrationStatus(instance)
1581   except Exception, err:  # pylint: disable=W0703
1582     _Fail("Failed to get migration status: %s", err, exc=True)
1583
1584
1585 def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1586   """Creates a block device for an instance.
1587
1588   @type disk: L{objects.Disk}
1589   @param disk: the object describing the disk we should create
1590   @type size: int
1591   @param size: the size of the physical underlying device, in MiB
1592   @type owner: str
1593   @param owner: the name of the instance for which disk is created,
1594       used for device cache data
1595   @type on_primary: boolean
1596   @param on_primary:  indicates if it is the primary node or not
1597   @type info: string
1598   @param info: string that will be sent to the physical device
1599       creation, used for example to set (LVM) tags on LVs
1600   @type excl_stor: boolean
1601   @param excl_stor: Whether exclusive_storage is active
1602
1603   @return: the new unique_id of the device (this can sometime be
1604       computed only after creation), or None. On secondary nodes,
1605       it's not required to return anything.
1606
1607   """
1608   # TODO: remove the obsolete "size" argument
1609   # pylint: disable=W0613
1610   clist = []
1611   if disk.children:
1612     for child in disk.children:
1613       try:
1614         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1615       except errors.BlockDeviceError, err:
1616         _Fail("Can't assemble device %s: %s", child, err)
1617       if on_primary or disk.AssembleOnSecondary():
1618         # we need the children open in case the device itself has to
1619         # be assembled
1620         try:
1621           # pylint: disable=E1103
1622           crdev.Open()
1623         except errors.BlockDeviceError, err:
1624           _Fail("Can't make child '%s' read-write: %s", child, err)
1625       clist.append(crdev)
1626
1627   try:
1628     device = bdev.Create(disk, clist, excl_stor)
1629   except errors.BlockDeviceError, err:
1630     _Fail("Can't create block device: %s", err)
1631
1632   if on_primary or disk.AssembleOnSecondary():
1633     try:
1634       device.Assemble()
1635     except errors.BlockDeviceError, err:
1636       _Fail("Can't assemble device after creation, unusual event: %s", err)
1637     if on_primary or disk.OpenOnSecondary():
1638       try:
1639         device.Open(force=True)
1640       except errors.BlockDeviceError, err:
1641         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1642     DevCacheManager.UpdateCache(device.dev_path, owner,
1643                                 on_primary, disk.iv_name)
1644
1645   device.SetInfo(info)
1646
1647   return device.unique_id
1648
1649
1650 def _WipeDevice(path, offset, size):
1651   """This function actually wipes the device.
1652
1653   @param path: The path to the device to wipe
1654   @param offset: The offset in MiB in the file
1655   @param size: The size in MiB to write
1656
1657   """
1658   # Internal sizes are always in Mebibytes; if the following "dd" command
1659   # should use a different block size the offset and size given to this
1660   # function must be adjusted accordingly before being passed to "dd".
1661   block_size = 1024 * 1024
1662
1663   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1664          "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1665          "count=%d" % size]
1666   result = utils.RunCmd(cmd)
1667
1668   if result.failed:
1669     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1670           result.fail_reason, result.output)
1671
1672
1673 def BlockdevWipe(disk, offset, size):
1674   """Wipes a block device.
1675
1676   @type disk: L{objects.Disk}
1677   @param disk: the disk object we want to wipe
1678   @type offset: int
1679   @param offset: The offset in MiB in the file
1680   @type size: int
1681   @param size: The size in MiB to write
1682
1683   """
1684   try:
1685     rdev = _RecursiveFindBD(disk)
1686   except errors.BlockDeviceError:
1687     rdev = None
1688
1689   if not rdev:
1690     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1691
1692   # Do cross verify some of the parameters
1693   if offset < 0:
1694     _Fail("Negative offset")
1695   if size < 0:
1696     _Fail("Negative size")
1697   if offset > rdev.size:
1698     _Fail("Offset is bigger than device size")
1699   if (offset + size) > rdev.size:
1700     _Fail("The provided offset and size to wipe is bigger than device size")
1701
1702   _WipeDevice(rdev.dev_path, offset, size)
1703
1704
1705 def BlockdevPauseResumeSync(disks, pause):
1706   """Pause or resume the sync of the block device.
1707
1708   @type disks: list of L{objects.Disk}
1709   @param disks: the disks object we want to pause/resume
1710   @type pause: bool
1711   @param pause: Wheater to pause or resume
1712
1713   """
1714   success = []
1715   for disk in disks:
1716     try:
1717       rdev = _RecursiveFindBD(disk)
1718     except errors.BlockDeviceError:
1719       rdev = None
1720
1721     if not rdev:
1722       success.append((False, ("Cannot change sync for device %s:"
1723                               " device not found" % disk.iv_name)))
1724       continue
1725
1726     result = rdev.PauseResumeSync(pause)
1727
1728     if result:
1729       success.append((result, None))
1730     else:
1731       if pause:
1732         msg = "Pause"
1733       else:
1734         msg = "Resume"
1735       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1736
1737   return success
1738
1739
1740 def BlockdevRemove(disk):
1741   """Remove a block device.
1742
1743   @note: This is intended to be called recursively.
1744
1745   @type disk: L{objects.Disk}
1746   @param disk: the disk object we should remove
1747   @rtype: boolean
1748   @return: the success of the operation
1749
1750   """
1751   msgs = []
1752   try:
1753     rdev = _RecursiveFindBD(disk)
1754   except errors.BlockDeviceError, err:
1755     # probably can't attach
1756     logging.info("Can't attach to device %s in remove", disk)
1757     rdev = None
1758   if rdev is not None:
1759     r_path = rdev.dev_path
1760     try:
1761       rdev.Remove()
1762     except errors.BlockDeviceError, err:
1763       msgs.append(str(err))
1764     if not msgs:
1765       DevCacheManager.RemoveCache(r_path)
1766
1767   if disk.children:
1768     for child in disk.children:
1769       try:
1770         BlockdevRemove(child)
1771       except RPCFail, err:
1772         msgs.append(str(err))
1773
1774   if msgs:
1775     _Fail("; ".join(msgs))
1776
1777
1778 def _RecursiveAssembleBD(disk, owner, as_primary):
1779   """Activate a block device for an instance.
1780
1781   This is run on the primary and secondary nodes for an instance.
1782
1783   @note: this function is called recursively.
1784
1785   @type disk: L{objects.Disk}
1786   @param disk: the disk we try to assemble
1787   @type owner: str
1788   @param owner: the name of the instance which owns the disk
1789   @type as_primary: boolean
1790   @param as_primary: if we should make the block device
1791       read/write
1792
1793   @return: the assembled device or None (in case no device
1794       was assembled)
1795   @raise errors.BlockDeviceError: in case there is an error
1796       during the activation of the children or the device
1797       itself
1798
1799   """
1800   children = []
1801   if disk.children:
1802     mcn = disk.ChildrenNeeded()
1803     if mcn == -1:
1804       mcn = 0 # max number of Nones allowed
1805     else:
1806       mcn = len(disk.children) - mcn # max number of Nones
1807     for chld_disk in disk.children:
1808       try:
1809         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1810       except errors.BlockDeviceError, err:
1811         if children.count(None) >= mcn:
1812           raise
1813         cdev = None
1814         logging.error("Error in child activation (but continuing): %s",
1815                       str(err))
1816       children.append(cdev)
1817
1818   if as_primary or disk.AssembleOnSecondary():
1819     r_dev = bdev.Assemble(disk, children)
1820     result = r_dev
1821     if as_primary or disk.OpenOnSecondary():
1822       r_dev.Open()
1823     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1824                                 as_primary, disk.iv_name)
1825
1826   else:
1827     result = True
1828   return result
1829
1830
1831 def BlockdevAssemble(disk, owner, as_primary, idx):
1832   """Activate a block device for an instance.
1833
1834   This is a wrapper over _RecursiveAssembleBD.
1835
1836   @rtype: str or boolean
1837   @return: a C{/dev/...} path for primary nodes, and
1838       C{True} for secondary nodes
1839
1840   """
1841   try:
1842     result = _RecursiveAssembleBD(disk, owner, as_primary)
1843     if isinstance(result, bdev.BlockDev):
1844       # pylint: disable=E1103
1845       result = result.dev_path
1846       if as_primary:
1847         _SymlinkBlockDev(owner, result, idx)
1848   except errors.BlockDeviceError, err:
1849     _Fail("Error while assembling disk: %s", err, exc=True)
1850   except OSError, err:
1851     _Fail("Error while symlinking disk: %s", err, exc=True)
1852
1853   return result
1854
1855
1856 def BlockdevShutdown(disk):
1857   """Shut down a block device.
1858
1859   First, if the device is assembled (Attach() is successful), then
1860   the device is shutdown. Then the children of the device are
1861   shutdown.
1862
1863   This function is called recursively. Note that we don't cache the
1864   children or such, as oppossed to assemble, shutdown of different
1865   devices doesn't require that the upper device was active.
1866
1867   @type disk: L{objects.Disk}
1868   @param disk: the description of the disk we should
1869       shutdown
1870   @rtype: None
1871
1872   """
1873   msgs = []
1874   r_dev = _RecursiveFindBD(disk)
1875   if r_dev is not None:
1876     r_path = r_dev.dev_path
1877     try:
1878       r_dev.Shutdown()
1879       DevCacheManager.RemoveCache(r_path)
1880     except errors.BlockDeviceError, err:
1881       msgs.append(str(err))
1882
1883   if disk.children:
1884     for child in disk.children:
1885       try:
1886         BlockdevShutdown(child)
1887       except RPCFail, err:
1888         msgs.append(str(err))
1889
1890   if msgs:
1891     _Fail("; ".join(msgs))
1892
1893
1894 def BlockdevAddchildren(parent_cdev, new_cdevs):
1895   """Extend a mirrored block device.
1896
1897   @type parent_cdev: L{objects.Disk}
1898   @param parent_cdev: the disk to which we should add children
1899   @type new_cdevs: list of L{objects.Disk}
1900   @param new_cdevs: the list of children which we should add
1901   @rtype: None
1902
1903   """
1904   parent_bdev = _RecursiveFindBD(parent_cdev)
1905   if parent_bdev is None:
1906     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1907   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1908   if new_bdevs.count(None) > 0:
1909     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1910   parent_bdev.AddChildren(new_bdevs)
1911
1912
1913 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1914   """Shrink a mirrored block device.
1915
1916   @type parent_cdev: L{objects.Disk}
1917   @param parent_cdev: the disk from which we should remove children
1918   @type new_cdevs: list of L{objects.Disk}
1919   @param new_cdevs: the list of children which we should remove
1920   @rtype: None
1921
1922   """
1923   parent_bdev = _RecursiveFindBD(parent_cdev)
1924   if parent_bdev is None:
1925     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1926   devs = []
1927   for disk in new_cdevs:
1928     rpath = disk.StaticDevPath()
1929     if rpath is None:
1930       bd = _RecursiveFindBD(disk)
1931       if bd is None:
1932         _Fail("Can't find device %s while removing children", disk)
1933       else:
1934         devs.append(bd.dev_path)
1935     else:
1936       if not utils.IsNormAbsPath(rpath):
1937         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1938       devs.append(rpath)
1939   parent_bdev.RemoveChildren(devs)
1940
1941
1942 def BlockdevGetmirrorstatus(disks):
1943   """Get the mirroring status of a list of devices.
1944
1945   @type disks: list of L{objects.Disk}
1946   @param disks: the list of disks which we should query
1947   @rtype: disk
1948   @return: List of L{objects.BlockDevStatus}, one for each disk
1949   @raise errors.BlockDeviceError: if any of the disks cannot be
1950       found
1951
1952   """
1953   stats = []
1954   for dsk in disks:
1955     rbd = _RecursiveFindBD(dsk)
1956     if rbd is None:
1957       _Fail("Can't find device %s", dsk)
1958
1959     stats.append(rbd.CombinedSyncStatus())
1960
1961   return stats
1962
1963
1964 def BlockdevGetmirrorstatusMulti(disks):
1965   """Get the mirroring status of a list of devices.
1966
1967   @type disks: list of L{objects.Disk}
1968   @param disks: the list of disks which we should query
1969   @rtype: disk
1970   @return: List of tuples, (bool, status), one for each disk; bool denotes
1971     success/failure, status is L{objects.BlockDevStatus} on success, string
1972     otherwise
1973
1974   """
1975   result = []
1976   for disk in disks:
1977     try:
1978       rbd = _RecursiveFindBD(disk)
1979       if rbd is None:
1980         result.append((False, "Can't find device %s" % disk))
1981         continue
1982
1983       status = rbd.CombinedSyncStatus()
1984     except errors.BlockDeviceError, err:
1985       logging.exception("Error while getting disk status")
1986       result.append((False, str(err)))
1987     else:
1988       result.append((True, status))
1989
1990   assert len(disks) == len(result)
1991
1992   return result
1993
1994
1995 def _RecursiveFindBD(disk):
1996   """Check if a device is activated.
1997
1998   If so, return information about the real device.
1999
2000   @type disk: L{objects.Disk}
2001   @param disk: the disk object we need to find
2002
2003   @return: None if the device can't be found,
2004       otherwise the device instance
2005
2006   """
2007   children = []
2008   if disk.children:
2009     for chdisk in disk.children:
2010       children.append(_RecursiveFindBD(chdisk))
2011
2012   return bdev.FindDevice(disk, children)
2013
2014
2015 def _OpenRealBD(disk):
2016   """Opens the underlying block device of a disk.
2017
2018   @type disk: L{objects.Disk}
2019   @param disk: the disk object we want to open
2020
2021   """
2022   real_disk = _RecursiveFindBD(disk)
2023   if real_disk is None:
2024     _Fail("Block device '%s' is not set up", disk)
2025
2026   real_disk.Open()
2027
2028   return real_disk
2029
2030
2031 def BlockdevFind(disk):
2032   """Check if a device is activated.
2033
2034   If it is, return information about the real device.
2035
2036   @type disk: L{objects.Disk}
2037   @param disk: the disk to find
2038   @rtype: None or objects.BlockDevStatus
2039   @return: None if the disk cannot be found, otherwise a the current
2040            information
2041
2042   """
2043   try:
2044     rbd = _RecursiveFindBD(disk)
2045   except errors.BlockDeviceError, err:
2046     _Fail("Failed to find device: %s", err, exc=True)
2047
2048   if rbd is None:
2049     return None
2050
2051   return rbd.GetSyncStatus()
2052
2053
2054 def BlockdevGetsize(disks):
2055   """Computes the size of the given disks.
2056
2057   If a disk is not found, returns None instead.
2058
2059   @type disks: list of L{objects.Disk}
2060   @param disks: the list of disk to compute the size for
2061   @rtype: list
2062   @return: list with elements None if the disk cannot be found,
2063       otherwise the size
2064
2065   """
2066   result = []
2067   for cf in disks:
2068     try:
2069       rbd = _RecursiveFindBD(cf)
2070     except errors.BlockDeviceError:
2071       result.append(None)
2072       continue
2073     if rbd is None:
2074       result.append(None)
2075     else:
2076       result.append(rbd.GetActualSize())
2077   return result
2078
2079
2080 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2081   """Export a block device to a remote node.
2082
2083   @type disk: L{objects.Disk}
2084   @param disk: the description of the disk to export
2085   @type dest_node: str
2086   @param dest_node: the destination node to export to
2087   @type dest_path: str
2088   @param dest_path: the destination path on the target node
2089   @type cluster_name: str
2090   @param cluster_name: the cluster name, needed for SSH hostalias
2091   @rtype: None
2092
2093   """
2094   real_disk = _OpenRealBD(disk)
2095
2096   # the block size on the read dd is 1MiB to match our units
2097   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2098                                "dd if=%s bs=1048576 count=%s",
2099                                real_disk.dev_path, str(disk.size))
2100
2101   # we set here a smaller block size as, due to ssh buffering, more
2102   # than 64-128k will mostly ignored; we use nocreat to fail if the
2103   # device is not already there or we pass a wrong path; we use
2104   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2105   # to not buffer too much memory; this means that at best, we flush
2106   # every 64k, which will not be very fast
2107   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2108                                 " oflag=dsync", dest_path)
2109
2110   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2111                                                    constants.SSH_LOGIN_USER,
2112                                                    destcmd)
2113
2114   # all commands have been checked, so we're safe to combine them
2115   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2116
2117   result = utils.RunCmd(["bash", "-c", command])
2118
2119   if result.failed:
2120     _Fail("Disk copy command '%s' returned error: %s"
2121           " output: %s", command, result.fail_reason, result.output)
2122
2123
2124 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2125   """Write a file to the filesystem.
2126
2127   This allows the master to overwrite(!) a file. It will only perform
2128   the operation if the file belongs to a list of configuration files.
2129
2130   @type file_name: str
2131   @param file_name: the target file name
2132   @type data: str
2133   @param data: the new contents of the file
2134   @type mode: int
2135   @param mode: the mode to give the file (can be None)
2136   @type uid: string
2137   @param uid: the owner of the file
2138   @type gid: string
2139   @param gid: the group of the file
2140   @type atime: float
2141   @param atime: the atime to set on the file (can be None)
2142   @type mtime: float
2143   @param mtime: the mtime to set on the file (can be None)
2144   @rtype: None
2145
2146   """
2147   file_name = vcluster.LocalizeVirtualPath(file_name)
2148
2149   if not os.path.isabs(file_name):
2150     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2151
2152   if file_name not in _ALLOWED_UPLOAD_FILES:
2153     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2154           file_name)
2155
2156   raw_data = _Decompress(data)
2157
2158   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2159     _Fail("Invalid username/groupname type")
2160
2161   getents = runtime.GetEnts()
2162   uid = getents.LookupUser(uid)
2163   gid = getents.LookupGroup(gid)
2164
2165   utils.SafeWriteFile(file_name, None,
2166                       data=raw_data, mode=mode, uid=uid, gid=gid,
2167                       atime=atime, mtime=mtime)
2168
2169
2170 def RunOob(oob_program, command, node, timeout):
2171   """Executes oob_program with given command on given node.
2172
2173   @param oob_program: The path to the executable oob_program
2174   @param command: The command to invoke on oob_program
2175   @param node: The node given as an argument to the program
2176   @param timeout: Timeout after which we kill the oob program
2177
2178   @return: stdout
2179   @raise RPCFail: If execution fails for some reason
2180
2181   """
2182   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2183
2184   if result.failed:
2185     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2186           result.fail_reason, result.output)
2187
2188   return result.stdout
2189
2190
2191 def _OSOndiskAPIVersion(os_dir):
2192   """Compute and return the API version of a given OS.
2193
2194   This function will try to read the API version of the OS residing in
2195   the 'os_dir' directory.
2196
2197   @type os_dir: str
2198   @param os_dir: the directory in which we should look for the OS
2199   @rtype: tuple
2200   @return: tuple (status, data) with status denoting the validity and
2201       data holding either the vaid versions or an error message
2202
2203   """
2204   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2205
2206   try:
2207     st = os.stat(api_file)
2208   except EnvironmentError, err:
2209     return False, ("Required file '%s' not found under path %s: %s" %
2210                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2211
2212   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2213     return False, ("File '%s' in %s is not a regular file" %
2214                    (constants.OS_API_FILE, os_dir))
2215
2216   try:
2217     api_versions = utils.ReadFile(api_file).splitlines()
2218   except EnvironmentError, err:
2219     return False, ("Error while reading the API version file at %s: %s" %
2220                    (api_file, utils.ErrnoOrStr(err)))
2221
2222   try:
2223     api_versions = [int(version.strip()) for version in api_versions]
2224   except (TypeError, ValueError), err:
2225     return False, ("API version(s) can't be converted to integer: %s" %
2226                    str(err))
2227
2228   return True, api_versions
2229
2230
2231 def DiagnoseOS(top_dirs=None):
2232   """Compute the validity for all OSes.
2233
2234   @type top_dirs: list
2235   @param top_dirs: the list of directories in which to
2236       search (if not given defaults to
2237       L{pathutils.OS_SEARCH_PATH})
2238   @rtype: list of L{objects.OS}
2239   @return: a list of tuples (name, path, status, diagnose, variants,
2240       parameters, api_version) for all (potential) OSes under all
2241       search paths, where:
2242           - name is the (potential) OS name
2243           - path is the full path to the OS
2244           - status True/False is the validity of the OS
2245           - diagnose is the error message for an invalid OS, otherwise empty
2246           - variants is a list of supported OS variants, if any
2247           - parameters is a list of (name, help) parameters, if any
2248           - api_version is a list of support OS API versions
2249
2250   """
2251   if top_dirs is None:
2252     top_dirs = pathutils.OS_SEARCH_PATH
2253
2254   result = []
2255   for dir_name in top_dirs:
2256     if os.path.isdir(dir_name):
2257       try:
2258         f_names = utils.ListVisibleFiles(dir_name)
2259       except EnvironmentError, err:
2260         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2261         break
2262       for name in f_names:
2263         os_path = utils.PathJoin(dir_name, name)
2264         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2265         if status:
2266           diagnose = ""
2267           variants = os_inst.supported_variants
2268           parameters = os_inst.supported_parameters
2269           api_versions = os_inst.api_versions
2270         else:
2271           diagnose = os_inst
2272           variants = parameters = api_versions = []
2273         result.append((name, os_path, status, diagnose, variants,
2274                        parameters, api_versions))
2275
2276   return result
2277
2278
2279 def _TryOSFromDisk(name, base_dir=None):
2280   """Create an OS instance from disk.
2281
2282   This function will return an OS instance if the given name is a
2283   valid OS name.
2284
2285   @type base_dir: string
2286   @keyword base_dir: Base directory containing OS installations.
2287                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2288   @rtype: tuple
2289   @return: success and either the OS instance if we find a valid one,
2290       or error message
2291
2292   """
2293   if base_dir is None:
2294     os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2295   else:
2296     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2297
2298   if os_dir is None:
2299     return False, "Directory for OS %s not found in search path" % name
2300
2301   status, api_versions = _OSOndiskAPIVersion(os_dir)
2302   if not status:
2303     # push the error up
2304     return status, api_versions
2305
2306   if not constants.OS_API_VERSIONS.intersection(api_versions):
2307     return False, ("API version mismatch for path '%s': found %s, want %s." %
2308                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2309
2310   # OS Files dictionary, we will populate it with the absolute path
2311   # names; if the value is True, then it is a required file, otherwise
2312   # an optional one
2313   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2314
2315   if max(api_versions) >= constants.OS_API_V15:
2316     os_files[constants.OS_VARIANTS_FILE] = False
2317
2318   if max(api_versions) >= constants.OS_API_V20:
2319     os_files[constants.OS_PARAMETERS_FILE] = True
2320   else:
2321     del os_files[constants.OS_SCRIPT_VERIFY]
2322
2323   for (filename, required) in os_files.items():
2324     os_files[filename] = utils.PathJoin(os_dir, filename)
2325
2326     try:
2327       st = os.stat(os_files[filename])
2328     except EnvironmentError, err:
2329       if err.errno == errno.ENOENT and not required:
2330         del os_files[filename]
2331         continue
2332       return False, ("File '%s' under path '%s' is missing (%s)" %
2333                      (filename, os_dir, utils.ErrnoOrStr(err)))
2334
2335     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2336       return False, ("File '%s' under path '%s' is not a regular file" %
2337                      (filename, os_dir))
2338
2339     if filename in constants.OS_SCRIPTS:
2340       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2341         return False, ("File '%s' under path '%s' is not executable" %
2342                        (filename, os_dir))
2343
2344   variants = []
2345   if constants.OS_VARIANTS_FILE in os_files:
2346     variants_file = os_files[constants.OS_VARIANTS_FILE]
2347     try:
2348       variants = \
2349         utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2350     except EnvironmentError, err:
2351       # we accept missing files, but not other errors
2352       if err.errno != errno.ENOENT:
2353         return False, ("Error while reading the OS variants file at %s: %s" %
2354                        (variants_file, utils.ErrnoOrStr(err)))
2355
2356   parameters = []
2357   if constants.OS_PARAMETERS_FILE in os_files:
2358     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2359     try:
2360       parameters = utils.ReadFile(parameters_file).splitlines()
2361     except EnvironmentError, err:
2362       return False, ("Error while reading the OS parameters file at %s: %s" %
2363                      (parameters_file, utils.ErrnoOrStr(err)))
2364     parameters = [v.split(None, 1) for v in parameters]
2365
2366   os_obj = objects.OS(name=name, path=os_dir,
2367                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2368                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2369                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2370                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2371                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2372                                                  None),
2373                       supported_variants=variants,
2374                       supported_parameters=parameters,
2375                       api_versions=api_versions)
2376   return True, os_obj
2377
2378
2379 def OSFromDisk(name, base_dir=None):
2380   """Create an OS instance from disk.
2381
2382   This function will return an OS instance if the given name is a
2383   valid OS name. Otherwise, it will raise an appropriate
2384   L{RPCFail} exception, detailing why this is not a valid OS.
2385
2386   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2387   an exception but returns true/false status data.
2388
2389   @type base_dir: string
2390   @keyword base_dir: Base directory containing OS installations.
2391                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2392   @rtype: L{objects.OS}
2393   @return: the OS instance if we find a valid one
2394   @raise RPCFail: if we don't find a valid OS
2395
2396   """
2397   name_only = objects.OS.GetName(name)
2398   status, payload = _TryOSFromDisk(name_only, base_dir)
2399
2400   if not status:
2401     _Fail(payload)
2402
2403   return payload
2404
2405
2406 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2407   """Calculate the basic environment for an os script.
2408
2409   @type os_name: str
2410   @param os_name: full operating system name (including variant)
2411   @type inst_os: L{objects.OS}
2412   @param inst_os: operating system for which the environment is being built
2413   @type os_params: dict
2414   @param os_params: the OS parameters
2415   @type debug: integer
2416   @param debug: debug level (0 or 1, for OS Api 10)
2417   @rtype: dict
2418   @return: dict of environment variables
2419   @raise errors.BlockDeviceError: if the block device
2420       cannot be found
2421
2422   """
2423   result = {}
2424   api_version = \
2425     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2426   result["OS_API_VERSION"] = "%d" % api_version
2427   result["OS_NAME"] = inst_os.name
2428   result["DEBUG_LEVEL"] = "%d" % debug
2429
2430   # OS variants
2431   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2432     variant = objects.OS.GetVariant(os_name)
2433     if not variant:
2434       variant = inst_os.supported_variants[0]
2435   else:
2436     variant = ""
2437   result["OS_VARIANT"] = variant
2438
2439   # OS params
2440   for pname, pvalue in os_params.items():
2441     result["OSP_%s" % pname.upper()] = pvalue
2442
2443   # Set a default path otherwise programs called by OS scripts (or
2444   # even hooks called from OS scripts) might break, and we don't want
2445   # to have each script require setting a PATH variable
2446   result["PATH"] = constants.HOOKS_PATH
2447
2448   return result
2449
2450
2451 def OSEnvironment(instance, inst_os, debug=0):
2452   """Calculate the environment for an os script.
2453
2454   @type instance: L{objects.Instance}
2455   @param instance: target instance for the os script run
2456   @type inst_os: L{objects.OS}
2457   @param inst_os: operating system for which the environment is being built
2458   @type debug: integer
2459   @param debug: debug level (0 or 1, for OS Api 10)
2460   @rtype: dict
2461   @return: dict of environment variables
2462   @raise errors.BlockDeviceError: if the block device
2463       cannot be found
2464
2465   """
2466   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2467
2468   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2469     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2470
2471   result["HYPERVISOR"] = instance.hypervisor
2472   result["DISK_COUNT"] = "%d" % len(instance.disks)
2473   result["NIC_COUNT"] = "%d" % len(instance.nics)
2474   result["INSTANCE_SECONDARY_NODES"] = \
2475       ("%s" % " ".join(instance.secondary_nodes))
2476
2477   # Disks
2478   for idx, disk in enumerate(instance.disks):
2479     real_disk = _OpenRealBD(disk)
2480     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2481     result["DISK_%d_ACCESS" % idx] = disk.mode
2482     if constants.HV_DISK_TYPE in instance.hvparams:
2483       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2484         instance.hvparams[constants.HV_DISK_TYPE]
2485     if disk.dev_type in constants.LDS_BLOCK:
2486       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2487     elif disk.dev_type == constants.LD_FILE:
2488       result["DISK_%d_BACKEND_TYPE" % idx] = \
2489         "file:%s" % disk.physical_id[0]
2490
2491   # NICs
2492   for idx, nic in enumerate(instance.nics):
2493     result["NIC_%d_MAC" % idx] = nic.mac
2494     if nic.ip:
2495       result["NIC_%d_IP" % idx] = nic.ip
2496     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2497     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2498       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2499     if nic.nicparams[constants.NIC_LINK]:
2500       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2501     if nic.network:
2502       result["NIC_%d_NETWORK" % idx] = nic.network
2503     if constants.HV_NIC_TYPE in instance.hvparams:
2504       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2505         instance.hvparams[constants.HV_NIC_TYPE]
2506
2507   # HV/BE params
2508   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2509     for key, value in source.items():
2510       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2511
2512   return result
2513
2514
2515 def DiagnoseExtStorage(top_dirs=None):
2516   """Compute the validity for all ExtStorage Providers.
2517
2518   @type top_dirs: list
2519   @param top_dirs: the list of directories in which to
2520       search (if not given defaults to
2521       L{pathutils.ES_SEARCH_PATH})
2522   @rtype: list of L{objects.ExtStorage}
2523   @return: a list of tuples (name, path, status, diagnose, parameters)
2524       for all (potential) ExtStorage Providers under all
2525       search paths, where:
2526           - name is the (potential) ExtStorage Provider
2527           - path is the full path to the ExtStorage Provider
2528           - status True/False is the validity of the ExtStorage Provider
2529           - diagnose is the error message for an invalid ExtStorage Provider,
2530             otherwise empty
2531           - parameters is a list of (name, help) parameters, if any
2532
2533   """
2534   if top_dirs is None:
2535     top_dirs = pathutils.ES_SEARCH_PATH
2536
2537   result = []
2538   for dir_name in top_dirs:
2539     if os.path.isdir(dir_name):
2540       try:
2541         f_names = utils.ListVisibleFiles(dir_name)
2542       except EnvironmentError, err:
2543         logging.exception("Can't list the ExtStorage directory %s: %s",
2544                           dir_name, err)
2545         break
2546       for name in f_names:
2547         es_path = utils.PathJoin(dir_name, name)
2548         status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2549         if status:
2550           diagnose = ""
2551           parameters = es_inst.supported_parameters
2552         else:
2553           diagnose = es_inst
2554           parameters = []
2555         result.append((name, es_path, status, diagnose, parameters))
2556
2557   return result
2558
2559
2560 def BlockdevGrow(disk, amount, dryrun, backingstore):
2561   """Grow a stack of block devices.
2562
2563   This function is called recursively, with the childrens being the
2564   first ones to resize.
2565
2566   @type disk: L{objects.Disk}
2567   @param disk: the disk to be grown
2568   @type amount: integer
2569   @param amount: the amount (in mebibytes) to grow with
2570   @type dryrun: boolean
2571   @param dryrun: whether to execute the operation in simulation mode
2572       only, without actually increasing the size
2573   @param backingstore: whether to execute the operation on backing storage
2574       only, or on "logical" storage only; e.g. DRBD is logical storage,
2575       whereas LVM, file, RBD are backing storage
2576   @rtype: (status, result)
2577   @return: a tuple with the status of the operation (True/False), and
2578       the errors message if status is False
2579
2580   """
2581   r_dev = _RecursiveFindBD(disk)
2582   if r_dev is None:
2583     _Fail("Cannot find block device %s", disk)
2584
2585   try:
2586     r_dev.Grow(amount, dryrun, backingstore)
2587   except errors.BlockDeviceError, err:
2588     _Fail("Failed to grow block device: %s", err, exc=True)
2589
2590
2591 def BlockdevSnapshot(disk):
2592   """Create a snapshot copy of a block device.
2593
2594   This function is called recursively, and the snapshot is actually created
2595   just for the leaf lvm backend device.
2596
2597   @type disk: L{objects.Disk}
2598   @param disk: the disk to be snapshotted
2599   @rtype: string
2600   @return: snapshot disk ID as (vg, lv)
2601
2602   """
2603   if disk.dev_type == constants.LD_DRBD8:
2604     if not disk.children:
2605       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2606             disk.unique_id)
2607     return BlockdevSnapshot(disk.children[0])
2608   elif disk.dev_type == constants.LD_LV:
2609     r_dev = _RecursiveFindBD(disk)
2610     if r_dev is not None:
2611       # FIXME: choose a saner value for the snapshot size
2612       # let's stay on the safe side and ask for the full size, for now
2613       return r_dev.Snapshot(disk.size)
2614     else:
2615       _Fail("Cannot find block device %s", disk)
2616   else:
2617     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2618           disk.unique_id, disk.dev_type)
2619
2620
2621 def BlockdevSetInfo(disk, info):
2622   """Sets 'metadata' information on block devices.
2623
2624   This function sets 'info' metadata on block devices. Initial
2625   information is set at device creation; this function should be used
2626   for example after renames.
2627
2628   @type disk: L{objects.Disk}
2629   @param disk: the disk to be grown
2630   @type info: string
2631   @param info: new 'info' metadata
2632   @rtype: (status, result)
2633   @return: a tuple with the status of the operation (True/False), and
2634       the errors message if status is False
2635
2636   """
2637   r_dev = _RecursiveFindBD(disk)
2638   if r_dev is None:
2639     _Fail("Cannot find block device %s", disk)
2640
2641   try:
2642     r_dev.SetInfo(info)
2643   except errors.BlockDeviceError, err:
2644     _Fail("Failed to set information on block device: %s", err, exc=True)
2645
2646
2647 def FinalizeExport(instance, snap_disks):
2648   """Write out the export configuration information.
2649
2650   @type instance: L{objects.Instance}
2651   @param instance: the instance which we export, used for
2652       saving configuration
2653   @type snap_disks: list of L{objects.Disk}
2654   @param snap_disks: list of snapshot block devices, which
2655       will be used to get the actual name of the dump file
2656
2657   @rtype: None
2658
2659   """
2660   destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2661   finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2662
2663   config = objects.SerializableConfigParser()
2664
2665   config.add_section(constants.INISECT_EXP)
2666   config.set(constants.INISECT_EXP, "version", "0")
2667   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2668   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2669   config.set(constants.INISECT_EXP, "os", instance.os)
2670   config.set(constants.INISECT_EXP, "compression", "none")
2671
2672   config.add_section(constants.INISECT_INS)
2673   config.set(constants.INISECT_INS, "name", instance.name)
2674   config.set(constants.INISECT_INS, "maxmem", "%d" %
2675              instance.beparams[constants.BE_MAXMEM])
2676   config.set(constants.INISECT_INS, "minmem", "%d" %
2677              instance.beparams[constants.BE_MINMEM])
2678   # "memory" is deprecated, but useful for exporting to old ganeti versions
2679   config.set(constants.INISECT_INS, "memory", "%d" %
2680              instance.beparams[constants.BE_MAXMEM])
2681   config.set(constants.INISECT_INS, "vcpus", "%d" %
2682              instance.beparams[constants.BE_VCPUS])
2683   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2684   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2685   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2686
2687   nic_total = 0
2688   for nic_count, nic in enumerate(instance.nics):
2689     nic_total += 1
2690     config.set(constants.INISECT_INS, "nic%d_mac" %
2691                nic_count, "%s" % nic.mac)
2692     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2693     config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2694                "%s" % nic.network)
2695     for param in constants.NICS_PARAMETER_TYPES:
2696       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2697                  "%s" % nic.nicparams.get(param, None))
2698   # TODO: redundant: on load can read nics until it doesn't exist
2699   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2700
2701   disk_total = 0
2702   for disk_count, disk in enumerate(snap_disks):
2703     if disk:
2704       disk_total += 1
2705       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2706                  ("%s" % disk.iv_name))
2707       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2708                  ("%s" % disk.physical_id[1]))
2709       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2710                  ("%d" % disk.size))
2711
2712   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2713
2714   # New-style hypervisor/backend parameters
2715
2716   config.add_section(constants.INISECT_HYP)
2717   for name, value in instance.hvparams.items():
2718     if name not in constants.HVC_GLOBALS:
2719       config.set(constants.INISECT_HYP, name, str(value))
2720
2721   config.add_section(constants.INISECT_BEP)
2722   for name, value in instance.beparams.items():
2723     config.set(constants.INISECT_BEP, name, str(value))
2724
2725   config.add_section(constants.INISECT_OSP)
2726   for name, value in instance.osparams.items():
2727     config.set(constants.INISECT_OSP, name, str(value))
2728
2729   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2730                   data=config.Dumps())
2731   shutil.rmtree(finaldestdir, ignore_errors=True)
2732   shutil.move(destdir, finaldestdir)
2733
2734
2735 def ExportInfo(dest):
2736   """Get export configuration information.
2737
2738   @type dest: str
2739   @param dest: directory containing the export
2740
2741   @rtype: L{objects.SerializableConfigParser}
2742   @return: a serializable config file containing the
2743       export info
2744
2745   """
2746   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2747
2748   config = objects.SerializableConfigParser()
2749   config.read(cff)
2750
2751   if (not config.has_section(constants.INISECT_EXP) or
2752       not config.has_section(constants.INISECT_INS)):
2753     _Fail("Export info file doesn't have the required fields")
2754
2755   return config.Dumps()
2756
2757
2758 def ListExports():
2759   """Return a list of exports currently available on this machine.
2760
2761   @rtype: list
2762   @return: list of the exports
2763
2764   """
2765   if os.path.isdir(pathutils.EXPORT_DIR):
2766     return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2767   else:
2768     _Fail("No exports directory")
2769
2770
2771 def RemoveExport(export):
2772   """Remove an existing export from the node.
2773
2774   @type export: str
2775   @param export: the name of the export to remove
2776   @rtype: None
2777
2778   """
2779   target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2780
2781   try:
2782     shutil.rmtree(target)
2783   except EnvironmentError, err:
2784     _Fail("Error while removing the export: %s", err, exc=True)
2785
2786
2787 def BlockdevRename(devlist):
2788   """Rename a list of block devices.
2789
2790   @type devlist: list of tuples
2791   @param devlist: list of tuples of the form  (disk,
2792       new_logical_id, new_physical_id); disk is an
2793       L{objects.Disk} object describing the current disk,
2794       and new logical_id/physical_id is the name we
2795       rename it to
2796   @rtype: boolean
2797   @return: True if all renames succeeded, False otherwise
2798
2799   """
2800   msgs = []
2801   result = True
2802   for disk, unique_id in devlist:
2803     dev = _RecursiveFindBD(disk)
2804     if dev is None:
2805       msgs.append("Can't find device %s in rename" % str(disk))
2806       result = False
2807       continue
2808     try:
2809       old_rpath = dev.dev_path
2810       dev.Rename(unique_id)
2811       new_rpath = dev.dev_path
2812       if old_rpath != new_rpath:
2813         DevCacheManager.RemoveCache(old_rpath)
2814         # FIXME: we should add the new cache information here, like:
2815         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2816         # but we don't have the owner here - maybe parse from existing
2817         # cache? for now, we only lose lvm data when we rename, which
2818         # is less critical than DRBD or MD
2819     except errors.BlockDeviceError, err:
2820       msgs.append("Can't rename device '%s' to '%s': %s" %
2821                   (dev, unique_id, err))
2822       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2823       result = False
2824   if not result:
2825     _Fail("; ".join(msgs))
2826
2827
2828 def _TransformFileStorageDir(fs_dir):
2829   """Checks whether given file_storage_dir is valid.
2830
2831   Checks wheter the given fs_dir is within the cluster-wide default
2832   file_storage_dir or the shared_file_storage_dir, which are stored in
2833   SimpleStore. Only paths under those directories are allowed.
2834
2835   @type fs_dir: str
2836   @param fs_dir: the path to check
2837
2838   @return: the normalized path if valid, None otherwise
2839
2840   """
2841   if not (constants.ENABLE_FILE_STORAGE or
2842           constants.ENABLE_SHARED_FILE_STORAGE):
2843     _Fail("File storage disabled at configure time")
2844
2845   bdev.CheckFileStoragePath(fs_dir)
2846
2847   return os.path.normpath(fs_dir)
2848
2849
2850 def CreateFileStorageDir(file_storage_dir):
2851   """Create file storage directory.
2852
2853   @type file_storage_dir: str
2854   @param file_storage_dir: directory to create
2855
2856   @rtype: tuple
2857   @return: tuple with first element a boolean indicating wheter dir
2858       creation was successful or not
2859
2860   """
2861   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2862   if os.path.exists(file_storage_dir):
2863     if not os.path.isdir(file_storage_dir):
2864       _Fail("Specified storage dir '%s' is not a directory",
2865             file_storage_dir)
2866   else:
2867     try:
2868       os.makedirs(file_storage_dir, 0750)
2869     except OSError, err:
2870       _Fail("Cannot create file storage directory '%s': %s",
2871             file_storage_dir, err, exc=True)
2872
2873
2874 def RemoveFileStorageDir(file_storage_dir):
2875   """Remove file storage directory.
2876
2877   Remove it only if it's empty. If not log an error and return.
2878
2879   @type file_storage_dir: str
2880   @param file_storage_dir: the directory we should cleanup
2881   @rtype: tuple (success,)
2882   @return: tuple of one element, C{success}, denoting
2883       whether the operation was successful
2884
2885   """
2886   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2887   if os.path.exists(file_storage_dir):
2888     if not os.path.isdir(file_storage_dir):
2889       _Fail("Specified Storage directory '%s' is not a directory",
2890             file_storage_dir)
2891     # deletes dir only if empty, otherwise we want to fail the rpc call
2892     try:
2893       os.rmdir(file_storage_dir)
2894     except OSError, err:
2895       _Fail("Cannot remove file storage directory '%s': %s",
2896             file_storage_dir, err)
2897
2898
2899 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2900   """Rename the file storage directory.
2901
2902   @type old_file_storage_dir: str
2903   @param old_file_storage_dir: the current path
2904   @type new_file_storage_dir: str
2905   @param new_file_storage_dir: the name we should rename to
2906   @rtype: tuple (success,)
2907   @return: tuple of one element, C{success}, denoting
2908       whether the operation was successful
2909
2910   """
2911   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2912   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2913   if not os.path.exists(new_file_storage_dir):
2914     if os.path.isdir(old_file_storage_dir):
2915       try:
2916         os.rename(old_file_storage_dir, new_file_storage_dir)
2917       except OSError, err:
2918         _Fail("Cannot rename '%s' to '%s': %s",
2919               old_file_storage_dir, new_file_storage_dir, err)
2920     else:
2921       _Fail("Specified storage dir '%s' is not a directory",
2922             old_file_storage_dir)
2923   else:
2924     if os.path.exists(old_file_storage_dir):
2925       _Fail("Cannot rename '%s' to '%s': both locations exist",
2926             old_file_storage_dir, new_file_storage_dir)
2927
2928
2929 def _EnsureJobQueueFile(file_name):
2930   """Checks whether the given filename is in the queue directory.
2931
2932   @type file_name: str
2933   @param file_name: the file name we should check
2934   @rtype: None
2935   @raises RPCFail: if the file is not valid
2936
2937   """
2938   if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2939     _Fail("Passed job queue file '%s' does not belong to"
2940           " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2941
2942
2943 def JobQueueUpdate(file_name, content):
2944   """Updates a file in the queue directory.
2945
2946   This is just a wrapper over L{utils.io.WriteFile}, with proper
2947   checking.
2948
2949   @type file_name: str
2950   @param file_name: the job file name
2951   @type content: str
2952   @param content: the new job contents
2953   @rtype: boolean
2954   @return: the success of the operation
2955
2956   """
2957   file_name = vcluster.LocalizeVirtualPath(file_name)
2958
2959   _EnsureJobQueueFile(file_name)
2960   getents = runtime.GetEnts()
2961
2962   # Write and replace the file atomically
2963   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2964                   gid=getents.masterd_gid)
2965
2966
2967 def JobQueueRename(old, new):
2968   """Renames a job queue file.
2969
2970   This is just a wrapper over os.rename with proper checking.
2971
2972   @type old: str
2973   @param old: the old (actual) file name
2974   @type new: str
2975   @param new: the desired file name
2976   @rtype: tuple
2977   @return: the success of the operation and payload
2978
2979   """
2980   old = vcluster.LocalizeVirtualPath(old)
2981   new = vcluster.LocalizeVirtualPath(new)
2982
2983   _EnsureJobQueueFile(old)
2984   _EnsureJobQueueFile(new)
2985
2986   getents = runtime.GetEnts()
2987
2988   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2989                    dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2990
2991
2992 def BlockdevClose(instance_name, disks):
2993   """Closes the given block devices.
2994
2995   This means they will be switched to secondary mode (in case of
2996   DRBD).
2997
2998   @param instance_name: if the argument is not empty, the symlinks
2999       of this instance will be removed
3000   @type disks: list of L{objects.Disk}
3001   @param disks: the list of disks to be closed
3002   @rtype: tuple (success, message)
3003   @return: a tuple of success and message, where success
3004       indicates the succes of the operation, and message
3005       which will contain the error details in case we
3006       failed
3007
3008   """
3009   bdevs = []
3010   for cf in disks:
3011     rd = _RecursiveFindBD(cf)
3012     if rd is None:
3013       _Fail("Can't find device %s", cf)
3014     bdevs.append(rd)
3015
3016   msg = []
3017   for rd in bdevs:
3018     try:
3019       rd.Close()
3020     except errors.BlockDeviceError, err:
3021       msg.append(str(err))
3022   if msg:
3023     _Fail("Can't make devices secondary: %s", ",".join(msg))
3024   else:
3025     if instance_name:
3026       _RemoveBlockDevLinks(instance_name, disks)
3027
3028
3029 def ValidateHVParams(hvname, hvparams):
3030   """Validates the given hypervisor parameters.
3031
3032   @type hvname: string
3033   @param hvname: the hypervisor name
3034   @type hvparams: dict
3035   @param hvparams: the hypervisor parameters to be validated
3036   @rtype: None
3037
3038   """
3039   try:
3040     hv_type = hypervisor.GetHypervisor(hvname)
3041     hv_type.ValidateParameters(hvparams)
3042   except errors.HypervisorError, err:
3043     _Fail(str(err), log=False)
3044
3045
3046 def _CheckOSPList(os_obj, parameters):
3047   """Check whether a list of parameters is supported by the OS.
3048
3049   @type os_obj: L{objects.OS}
3050   @param os_obj: OS object to check
3051   @type parameters: list
3052   @param parameters: the list of parameters to check
3053
3054   """
3055   supported = [v[0] for v in os_obj.supported_parameters]
3056   delta = frozenset(parameters).difference(supported)
3057   if delta:
3058     _Fail("The following parameters are not supported"
3059           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3060
3061
3062 def ValidateOS(required, osname, checks, osparams):
3063   """Validate the given OS' parameters.
3064
3065   @type required: boolean
3066   @param required: whether absence of the OS should translate into
3067       failure or not
3068   @type osname: string
3069   @param osname: the OS to be validated
3070   @type checks: list
3071   @param checks: list of the checks to run (currently only 'parameters')
3072   @type osparams: dict
3073   @param osparams: dictionary with OS parameters
3074   @rtype: boolean
3075   @return: True if the validation passed, or False if the OS was not
3076       found and L{required} was false
3077
3078   """
3079   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3080     _Fail("Unknown checks required for OS %s: %s", osname,
3081           set(checks).difference(constants.OS_VALIDATE_CALLS))
3082
3083   name_only = objects.OS.GetName(osname)
3084   status, tbv = _TryOSFromDisk(name_only, None)
3085
3086   if not status:
3087     if required:
3088       _Fail(tbv)
3089     else:
3090       return False
3091
3092   if max(tbv.api_versions) < constants.OS_API_V20:
3093     return True
3094
3095   if constants.OS_VALIDATE_PARAMETERS in checks:
3096     _CheckOSPList(tbv, osparams.keys())
3097
3098   validate_env = OSCoreEnv(osname, tbv, osparams)
3099   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3100                         cwd=tbv.path, reset_env=True)
3101   if result.failed:
3102     logging.error("os validate command '%s' returned error: %s output: %s",
3103                   result.cmd, result.fail_reason, result.output)
3104     _Fail("OS validation script failed (%s), output: %s",
3105           result.fail_reason, result.output, log=False)
3106
3107   return True
3108
3109
3110 def DemoteFromMC():
3111   """Demotes the current node from master candidate role.
3112
3113   """
3114   # try to ensure we're not the master by mistake
3115   master, myself = ssconf.GetMasterAndMyself()
3116   if master == myself:
3117     _Fail("ssconf status shows I'm the master node, will not demote")
3118
3119   result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3120   if not result.failed:
3121     _Fail("The master daemon is running, will not demote")
3122
3123   try:
3124     if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3125       utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3126   except EnvironmentError, err:
3127     if err.errno != errno.ENOENT:
3128       _Fail("Error while backing up cluster file: %s", err, exc=True)
3129
3130   utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3131
3132
3133 def _GetX509Filenames(cryptodir, name):
3134   """Returns the full paths for the private key and certificate.
3135
3136   """
3137   return (utils.PathJoin(cryptodir, name),
3138           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3139           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3140
3141
3142 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3143   """Creates a new X509 certificate for SSL/TLS.
3144
3145   @type validity: int
3146   @param validity: Validity in seconds
3147   @rtype: tuple; (string, string)
3148   @return: Certificate name and public part
3149
3150   """
3151   (key_pem, cert_pem) = \
3152     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3153                                      min(validity, _MAX_SSL_CERT_VALIDITY))
3154
3155   cert_dir = tempfile.mkdtemp(dir=cryptodir,
3156                               prefix="x509-%s-" % utils.TimestampForFilename())
3157   try:
3158     name = os.path.basename(cert_dir)
3159     assert len(name) > 5
3160
3161     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3162
3163     utils.WriteFile(key_file, mode=0400, data=key_pem)
3164     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3165
3166     # Never return private key as it shouldn't leave the node
3167     return (name, cert_pem)
3168   except Exception:
3169     shutil.rmtree(cert_dir, ignore_errors=True)
3170     raise
3171
3172
3173 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3174   """Removes a X509 certificate.
3175
3176   @type name: string
3177   @param name: Certificate name
3178
3179   """
3180   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3181
3182   utils.RemoveFile(key_file)
3183   utils.RemoveFile(cert_file)
3184
3185   try:
3186     os.rmdir(cert_dir)
3187   except EnvironmentError, err:
3188     _Fail("Cannot remove certificate directory '%s': %s",
3189           cert_dir, err)
3190
3191
3192 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3193   """Returns the command for the requested input/output.
3194
3195   @type instance: L{objects.Instance}
3196   @param instance: The instance object
3197   @param mode: Import/export mode
3198   @param ieio: Input/output type
3199   @param ieargs: Input/output arguments
3200
3201   """
3202   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3203
3204   env = None
3205   prefix = None
3206   suffix = None
3207   exp_size = None
3208
3209   if ieio == constants.IEIO_FILE:
3210     (filename, ) = ieargs
3211
3212     if not utils.IsNormAbsPath(filename):
3213       _Fail("Path '%s' is not normalized or absolute", filename)
3214
3215     real_filename = os.path.realpath(filename)
3216     directory = os.path.dirname(real_filename)
3217
3218     if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3219       _Fail("File '%s' is not under exports directory '%s': %s",
3220             filename, pathutils.EXPORT_DIR, real_filename)
3221
3222     # Create directory
3223     utils.Makedirs(directory, mode=0750)
3224
3225     quoted_filename = utils.ShellQuote(filename)
3226
3227     if mode == constants.IEM_IMPORT:
3228       suffix = "> %s" % quoted_filename
3229     elif mode == constants.IEM_EXPORT:
3230       suffix = "< %s" % quoted_filename
3231
3232       # Retrieve file size
3233       try:
3234         st = os.stat(filename)
3235       except EnvironmentError, err:
3236         logging.error("Can't stat(2) %s: %s", filename, err)
3237       else:
3238         exp_size = utils.BytesToMebibyte(st.st_size)
3239
3240   elif ieio == constants.IEIO_RAW_DISK:
3241     (disk, ) = ieargs
3242
3243     real_disk = _OpenRealBD(disk)
3244
3245     if mode == constants.IEM_IMPORT:
3246       # we set here a smaller block size as, due to transport buffering, more
3247       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3248       # is not already there or we pass a wrong path; we use notrunc to no
3249       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3250       # much memory; this means that at best, we flush every 64k, which will
3251       # not be very fast
3252       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3253                                     " bs=%s oflag=dsync"),
3254                                     real_disk.dev_path,
3255                                     str(64 * 1024))
3256
3257     elif mode == constants.IEM_EXPORT:
3258       # the block size on the read dd is 1MiB to match our units
3259       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3260                                    real_disk.dev_path,
3261                                    str(1024 * 1024), # 1 MB
3262                                    str(disk.size))
3263       exp_size = disk.size
3264
3265   elif ieio == constants.IEIO_SCRIPT:
3266     (disk, disk_index, ) = ieargs
3267
3268     assert isinstance(disk_index, (int, long))
3269
3270     real_disk = _OpenRealBD(disk)
3271
3272     inst_os = OSFromDisk(instance.os)
3273     env = OSEnvironment(instance, inst_os)
3274
3275     if mode == constants.IEM_IMPORT:
3276       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3277       env["IMPORT_INDEX"] = str(disk_index)
3278       script = inst_os.import_script
3279
3280     elif mode == constants.IEM_EXPORT:
3281       env["EXPORT_DEVICE"] = real_disk.dev_path
3282       env["EXPORT_INDEX"] = str(disk_index)
3283       script = inst_os.export_script
3284
3285     # TODO: Pass special environment only to script
3286     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3287
3288     if mode == constants.IEM_IMPORT:
3289       suffix = "| %s" % script_cmd
3290
3291     elif mode == constants.IEM_EXPORT:
3292       prefix = "%s |" % script_cmd
3293
3294     # Let script predict size
3295     exp_size = constants.IE_CUSTOM_SIZE
3296
3297   else:
3298     _Fail("Invalid %s I/O mode %r", mode, ieio)
3299
3300   return (env, prefix, suffix, exp_size)
3301
3302
3303 def _CreateImportExportStatusDir(prefix):
3304   """Creates status directory for import/export.
3305
3306   """
3307   return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3308                           prefix=("%s-%s-" %
3309                                   (prefix, utils.TimestampForFilename())))
3310
3311
3312 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3313                             ieio, ieioargs):
3314   """Starts an import or export daemon.
3315
3316   @param mode: Import/output mode
3317   @type opts: L{objects.ImportExportOptions}
3318   @param opts: Daemon options
3319   @type host: string
3320   @param host: Remote host for export (None for import)
3321   @type port: int
3322   @param port: Remote port for export (None for import)
3323   @type instance: L{objects.Instance}
3324   @param instance: Instance object
3325   @type component: string
3326   @param component: which part of the instance is transferred now,
3327       e.g. 'disk/0'
3328   @param ieio: Input/output type
3329   @param ieioargs: Input/output arguments
3330
3331   """
3332   if mode == constants.IEM_IMPORT:
3333     prefix = "import"
3334
3335     if not (host is None and port is None):
3336       _Fail("Can not specify host or port on import")
3337
3338   elif mode == constants.IEM_EXPORT:
3339     prefix = "export"
3340
3341     if host is None or port is None:
3342       _Fail("Host and port must be specified for an export")
3343
3344   else:
3345     _Fail("Invalid mode %r", mode)
3346
3347   if (opts.key_name is None) ^ (opts.ca_pem is None):
3348     _Fail("Cluster certificate can only be used for both key and CA")
3349
3350   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3351     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3352
3353   if opts.key_name is None:
3354     # Use server.pem
3355     key_path = pathutils.NODED_CERT_FILE
3356     cert_path = pathutils.NODED_CERT_FILE
3357     assert opts.ca_pem is None
3358   else:
3359     (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3360                                                  opts.key_name)
3361     assert opts.ca_pem is not None
3362
3363   for i in [key_path, cert_path]:
3364     if not os.path.exists(i):
3365       _Fail("File '%s' does not exist" % i)
3366
3367   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3368   try:
3369     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3370     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3371     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3372
3373     if opts.ca_pem is None:
3374       # Use server.pem
3375       ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3376     else:
3377       ca = opts.ca_pem
3378
3379     # Write CA file
3380     utils.WriteFile(ca_file, data=ca, mode=0400)
3381
3382     cmd = [
3383       pathutils.IMPORT_EXPORT_DAEMON,
3384       status_file, mode,
3385       "--key=%s" % key_path,
3386       "--cert=%s" % cert_path,
3387       "--ca=%s" % ca_file,
3388       ]
3389
3390     if host:
3391       cmd.append("--host=%s" % host)
3392
3393     if port:
3394       cmd.append("--port=%s" % port)
3395
3396     if opts.ipv6:
3397       cmd.append("--ipv6")
3398     else:
3399       cmd.append("--ipv4")
3400
3401     if opts.compress:
3402       cmd.append("--compress=%s" % opts.compress)
3403
3404     if opts.magic:
3405       cmd.append("--magic=%s" % opts.magic)
3406
3407     if exp_size is not None:
3408       cmd.append("--expected-size=%s" % exp_size)
3409
3410     if cmd_prefix:
3411       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3412
3413     if cmd_suffix:
3414       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3415
3416     if mode == constants.IEM_EXPORT:
3417       # Retry connection a few times when connecting to remote peer
3418       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3419       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3420     elif opts.connect_timeout is not None:
3421       assert mode == constants.IEM_IMPORT
3422       # Overall timeout for establishing connection while listening
3423       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3424
3425     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3426
3427     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3428     # support for receiving a file descriptor for output
3429     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3430                       output=logfile)
3431
3432     # The import/export name is simply the status directory name
3433     return os.path.basename(status_dir)
3434
3435   except Exception:
3436     shutil.rmtree(status_dir, ignore_errors=True)
3437     raise
3438
3439
3440 def GetImportExportStatus(names):
3441   """Returns import/export daemon status.
3442
3443   @type names: sequence
3444   @param names: List of names
3445   @rtype: List of dicts
3446   @return: Returns a list of the state of each named import/export or None if a
3447            status couldn't be read
3448
3449   """
3450   result = []
3451
3452   for name in names:
3453     status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3454                                  _IES_STATUS_FILE)
3455
3456     try:
3457       data = utils.ReadFile(status_file)
3458     except EnvironmentError, err:
3459       if err.errno != errno.ENOENT:
3460         raise
3461       data = None
3462
3463     if not data:
3464       result.append(None)
3465       continue
3466
3467     result.append(serializer.LoadJson(data))
3468
3469   return result
3470
3471
3472 def AbortImportExport(name):
3473   """Sends SIGTERM to a running import/export daemon.
3474
3475   """
3476   logging.info("Abort import/export %s", name)
3477
3478   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3479   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3480
3481   if pid:
3482     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3483                  name, pid)
3484     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3485
3486
3487 def CleanupImportExport(name):
3488   """Cleanup after an import or export.
3489
3490   If the import/export daemon is still running it's killed. Afterwards the
3491   whole status directory is removed.
3492
3493   """
3494   logging.info("Finalizing import/export %s", name)
3495
3496   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3497
3498   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3499
3500   if pid:
3501     logging.info("Import/export %s is still running with PID %s",
3502                  name, pid)
3503     utils.KillProcess(pid, waitpid=False)
3504
3505   shutil.rmtree(status_dir, ignore_errors=True)
3506
3507
3508 def _FindDisks(nodes_ip, disks):
3509   """Sets the physical ID on disks and returns the block devices.
3510
3511   """
3512   # set the correct physical ID
3513   my_name = netutils.Hostname.GetSysName()
3514   for cf in disks:
3515     cf.SetPhysicalID(my_name, nodes_ip)
3516
3517   bdevs = []
3518
3519   for cf in disks:
3520     rd = _RecursiveFindBD(cf)
3521     if rd is None:
3522       _Fail("Can't find device %s", cf)
3523     bdevs.append(rd)
3524   return bdevs
3525
3526
3527 def DrbdDisconnectNet(nodes_ip, disks):
3528   """Disconnects the network on a list of drbd devices.
3529
3530   """
3531   bdevs = _FindDisks(nodes_ip, disks)
3532
3533   # disconnect disks
3534   for rd in bdevs:
3535     try:
3536       rd.DisconnectNet()
3537     except errors.BlockDeviceError, err:
3538       _Fail("Can't change network configuration to standalone mode: %s",
3539             err, exc=True)
3540
3541
3542 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3543   """Attaches the network on a list of drbd devices.
3544
3545   """
3546   bdevs = _FindDisks(nodes_ip, disks)
3547
3548   if multimaster:
3549     for idx, rd in enumerate(bdevs):
3550       try:
3551         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3552       except EnvironmentError, err:
3553         _Fail("Can't create symlink: %s", err)
3554   # reconnect disks, switch to new master configuration and if
3555   # needed primary mode
3556   for rd in bdevs:
3557     try:
3558       rd.AttachNet(multimaster)
3559     except errors.BlockDeviceError, err:
3560       _Fail("Can't change network configuration: %s", err)
3561
3562   # wait until the disks are connected; we need to retry the re-attach
3563   # if the device becomes standalone, as this might happen if the one
3564   # node disconnects and reconnects in a different mode before the
3565   # other node reconnects; in this case, one or both of the nodes will
3566   # decide it has wrong configuration and switch to standalone
3567
3568   def _Attach():
3569     all_connected = True
3570
3571     for rd in bdevs:
3572       stats = rd.GetProcStatus()
3573
3574       all_connected = (all_connected and
3575                        (stats.is_connected or stats.is_in_resync))
3576
3577       if stats.is_standalone:
3578         # peer had different config info and this node became
3579         # standalone, even though this should not happen with the
3580         # new staged way of changing disk configs
3581         try:
3582           rd.AttachNet(multimaster)
3583         except errors.BlockDeviceError, err:
3584           _Fail("Can't change network configuration: %s", err)
3585
3586     if not all_connected:
3587       raise utils.RetryAgain()
3588
3589   try:
3590     # Start with a delay of 100 miliseconds and go up to 5 seconds
3591     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3592   except utils.RetryTimeout:
3593     _Fail("Timeout in disk reconnecting")
3594
3595   if multimaster:
3596     # change to primary mode
3597     for rd in bdevs:
3598       try:
3599         rd.Open()
3600       except errors.BlockDeviceError, err:
3601         _Fail("Can't change to primary mode: %s", err)
3602
3603
3604 def DrbdWaitSync(nodes_ip, disks):
3605   """Wait until DRBDs have synchronized.
3606
3607   """
3608   def _helper(rd):
3609     stats = rd.GetProcStatus()
3610     if not (stats.is_connected or stats.is_in_resync):
3611       raise utils.RetryAgain()
3612     return stats
3613
3614   bdevs = _FindDisks(nodes_ip, disks)
3615
3616   min_resync = 100
3617   alldone = True
3618   for rd in bdevs:
3619     try:
3620       # poll each second for 15 seconds
3621       stats = utils.Retry(_helper, 1, 15, args=[rd])
3622     except utils.RetryTimeout:
3623       stats = rd.GetProcStatus()
3624       # last check
3625       if not (stats.is_connected or stats.is_in_resync):
3626         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3627     alldone = alldone and (not stats.is_in_resync)
3628     if stats.sync_percent is not None:
3629       min_resync = min(min_resync, stats.sync_percent)
3630
3631   return (alldone, min_resync)
3632
3633
3634 def GetDrbdUsermodeHelper():
3635   """Returns DRBD usermode helper currently configured.
3636
3637   """
3638   try:
3639     return bdev.BaseDRBD.GetUsermodeHelper()
3640   except errors.BlockDeviceError, err:
3641     _Fail(str(err))
3642
3643
3644 def PowercycleNode(hypervisor_type):
3645   """Hard-powercycle the node.
3646
3647   Because we need to return first, and schedule the powercycle in the
3648   background, we won't be able to report failures nicely.
3649
3650   """
3651   hyper = hypervisor.GetHypervisor(hypervisor_type)
3652   try:
3653     pid = os.fork()
3654   except OSError:
3655     # if we can't fork, we'll pretend that we're in the child process
3656     pid = 0
3657   if pid > 0:
3658     return "Reboot scheduled in 5 seconds"
3659   # ensure the child is running on ram
3660   try:
3661     utils.Mlockall()
3662   except Exception: # pylint: disable=W0703
3663     pass
3664   time.sleep(5)
3665   hyper.PowercycleNode()
3666
3667
3668 def _VerifyRestrictedCmdName(cmd):
3669   """Verifies a remote command name.
3670
3671   @type cmd: string
3672   @param cmd: Command name
3673   @rtype: tuple; (boolean, string or None)
3674   @return: The tuple's first element is the status; if C{False}, the second
3675     element is an error message string, otherwise it's C{None}
3676
3677   """
3678   if not cmd.strip():
3679     return (False, "Missing command name")
3680
3681   if os.path.basename(cmd) != cmd:
3682     return (False, "Invalid command name")
3683
3684   if not constants.EXT_PLUGIN_MASK.match(cmd):
3685     return (False, "Command name contains forbidden characters")
3686
3687   return (True, None)
3688
3689
3690 def _CommonRestrictedCmdCheck(path, owner):
3691   """Common checks for remote command file system directories and files.
3692
3693   @type path: string
3694   @param path: Path to check
3695   @param owner: C{None} or tuple containing UID and GID
3696   @rtype: tuple; (boolean, string or C{os.stat} result)
3697   @return: The tuple's first element is the status; if C{False}, the second
3698     element is an error message string, otherwise it's the result of C{os.stat}
3699
3700   """
3701   if owner is None:
3702     # Default to root as owner
3703     owner = (0, 0)
3704
3705   try:
3706     st = os.stat(path)
3707   except EnvironmentError, err:
3708     return (False, "Can't stat(2) '%s': %s" % (path, err))
3709
3710   if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3711     return (False, "Permissions on '%s' are too permissive" % path)
3712
3713   if (st.st_uid, st.st_gid) != owner:
3714     (owner_uid, owner_gid) = owner
3715     return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3716
3717   return (True, st)
3718
3719
3720 def _VerifyRestrictedCmdDirectory(path, _owner=None):
3721   """Verifies remote command directory.
3722
3723   @type path: string
3724   @param path: Path to check
3725   @rtype: tuple; (boolean, string or None)
3726   @return: The tuple's first element is the status; if C{False}, the second
3727     element is an error message string, otherwise it's C{None}
3728
3729   """
3730   (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3731
3732   if not status:
3733     return (False, value)
3734
3735   if not stat.S_ISDIR(value.st_mode):
3736     return (False, "Path '%s' is not a directory" % path)
3737
3738   return (True, None)
3739
3740
3741 def _VerifyRestrictedCmd(path, cmd, _owner=None):
3742   """Verifies a whole remote command and returns its executable filename.
3743
3744   @type path: string
3745   @param path: Directory containing remote commands
3746   @type cmd: string
3747   @param cmd: Command name
3748   @rtype: tuple; (boolean, string)
3749   @return: The tuple's first element is the status; if C{False}, the second
3750     element is an error message string, otherwise the second element is the
3751     absolute path to the executable
3752
3753   """
3754   executable = utils.PathJoin(path, cmd)
3755
3756   (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
3757
3758   if not status:
3759     return (False, msg)
3760
3761   if not utils.IsExecutable(executable):
3762     return (False, "access(2) thinks '%s' can't be executed" % executable)
3763
3764   return (True, executable)
3765
3766
3767 def _PrepareRestrictedCmd(path, cmd,
3768                           _verify_dir=_VerifyRestrictedCmdDirectory,
3769                           _verify_name=_VerifyRestrictedCmdName,
3770                           _verify_cmd=_VerifyRestrictedCmd):
3771   """Performs a number of tests on a remote command.
3772
3773   @type path: string
3774   @param path: Directory containing remote commands
3775   @type cmd: string
3776   @param cmd: Command name
3777   @return: Same as L{_VerifyRestrictedCmd}
3778
3779   """
3780   # Verify the directory first
3781   (status, msg) = _verify_dir(path)
3782   if status:
3783     # Check command if everything was alright
3784     (status, msg) = _verify_name(cmd)
3785
3786   if not status:
3787     return (False, msg)
3788
3789   # Check actual executable
3790   return _verify_cmd(path, cmd)
3791
3792
3793 def RunRestrictedCmd(cmd,
3794                      _lock_timeout=_RCMD_LOCK_TIMEOUT,
3795                      _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
3796                      _path=pathutils.RESTRICTED_COMMANDS_DIR,
3797                      _sleep_fn=time.sleep,
3798                      _prepare_fn=_PrepareRestrictedCmd,
3799                      _runcmd_fn=utils.RunCmd,
3800                      _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
3801   """Executes a remote command after performing strict tests.
3802
3803   @type cmd: string
3804   @param cmd: Command name
3805   @rtype: string
3806   @return: Command output
3807   @raise RPCFail: In case of an error
3808
3809   """
3810   logging.info("Preparing to run remote command '%s'", cmd)
3811
3812   if not _enabled:
3813     _Fail("Remote commands disabled at configure time")
3814
3815   lock = None
3816   try:
3817     cmdresult = None
3818     try:
3819       lock = utils.FileLock.Open(_lock_file)
3820       lock.Exclusive(blocking=True, timeout=_lock_timeout)
3821
3822       (status, value) = _prepare_fn(_path, cmd)
3823
3824       if status:
3825         cmdresult = _runcmd_fn([value], env={}, reset_env=True,
3826                                postfork_fn=lambda _: lock.Unlock())
3827       else:
3828         logging.error(value)
3829     except Exception: # pylint: disable=W0703
3830       # Keep original error in log
3831       logging.exception("Caught exception")
3832
3833     if cmdresult is None:
3834       logging.info("Sleeping for %0.1f seconds before returning",
3835                    _RCMD_INVALID_DELAY)
3836       _sleep_fn(_RCMD_INVALID_DELAY)
3837
3838       # Do not include original error message in returned error
3839       _Fail("Executing command '%s' failed" % cmd)
3840     elif cmdresult.failed or cmdresult.fail_reason:
3841       _Fail("Remote command '%s' failed: %s; output: %s",
3842             cmd, cmdresult.fail_reason, cmdresult.output)
3843     else:
3844       return cmdresult.output
3845   finally:
3846     if lock is not None:
3847       # Release lock at last
3848       lock.Close()
3849       lock = None
3850
3851
3852 def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
3853   """Creates or removes the watcher pause file.
3854
3855   @type until: None or number
3856   @param until: Unix timestamp saying until when the watcher shouldn't run
3857
3858   """
3859   if until is None:
3860     logging.info("Received request to no longer pause watcher")
3861     utils.RemoveFile(_filename)
3862   else:
3863     logging.info("Received request to pause watcher until %s", until)
3864
3865     if not ht.TNumber(until):
3866       _Fail("Duration must be numeric")
3867
3868     utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
3869
3870
3871 class HooksRunner(object):
3872   """Hook runner.
3873
3874   This class is instantiated on the node side (ganeti-noded) and not
3875   on the master side.
3876
3877   """
3878   def __init__(self, hooks_base_dir=None):
3879     """Constructor for hooks runner.
3880
3881     @type hooks_base_dir: str or None
3882     @param hooks_base_dir: if not None, this overrides the
3883         L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3884
3885     """
3886     if hooks_base_dir is None:
3887       hooks_base_dir = pathutils.HOOKS_BASE_DIR
3888     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3889     # constant
3890     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3891
3892   def RunLocalHooks(self, node_list, hpath, phase, env):
3893     """Check that the hooks will be run only locally and then run them.
3894
3895     """
3896     assert len(node_list) == 1
3897     node = node_list[0]
3898     _, myself = ssconf.GetMasterAndMyself()
3899     assert node == myself
3900
3901     results = self.RunHooks(hpath, phase, env)
3902
3903     # Return values in the form expected by HooksMaster
3904     return {node: (None, False, results)}
3905
3906   def RunHooks(self, hpath, phase, env):
3907     """Run the scripts in the hooks directory.
3908
3909     @type hpath: str
3910     @param hpath: the path to the hooks directory which
3911         holds the scripts
3912     @type phase: str
3913     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3914         L{constants.HOOKS_PHASE_POST}
3915     @type env: dict
3916     @param env: dictionary with the environment for the hook
3917     @rtype: list
3918     @return: list of 3-element tuples:
3919       - script path
3920       - script result, either L{constants.HKR_SUCCESS} or
3921         L{constants.HKR_FAIL}
3922       - output of the script
3923
3924     @raise errors.ProgrammerError: for invalid input
3925         parameters
3926
3927     """
3928     if phase == constants.HOOKS_PHASE_PRE:
3929       suffix = "pre"
3930     elif phase == constants.HOOKS_PHASE_POST:
3931       suffix = "post"
3932     else:
3933       _Fail("Unknown hooks phase '%s'", phase)
3934
3935     subdir = "%s-%s.d" % (hpath, suffix)
3936     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3937
3938     results = []
3939
3940     if not os.path.isdir(dir_name):
3941       # for non-existing/non-dirs, we simply exit instead of logging a
3942       # warning at every operation
3943       return results
3944
3945     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3946
3947     for (relname, relstatus, runresult) in runparts_results:
3948       if relstatus == constants.RUNPARTS_SKIP:
3949         rrval = constants.HKR_SKIP
3950         output = ""
3951       elif relstatus == constants.RUNPARTS_ERR:
3952         rrval = constants.HKR_FAIL
3953         output = "Hook script execution error: %s" % runresult
3954       elif relstatus == constants.RUNPARTS_RUN:
3955         if runresult.failed:
3956           rrval = constants.HKR_FAIL
3957         else:
3958           rrval = constants.HKR_SUCCESS
3959         output = utils.SafeEncode(runresult.output.strip())
3960       results.append(("%s/%s" % (subdir, relname), rrval, output))
3961
3962     return results
3963
3964
3965 class IAllocatorRunner(object):
3966   """IAllocator runner.
3967
3968   This class is instantiated on the node side (ganeti-noded) and not on
3969   the master side.
3970
3971   """
3972   @staticmethod
3973   def Run(name, idata):
3974     """Run an iallocator script.
3975
3976     @type name: str
3977     @param name: the iallocator script name
3978     @type idata: str
3979     @param idata: the allocator input data
3980
3981     @rtype: tuple
3982     @return: two element tuple of:
3983        - status
3984        - either error message or stdout of allocator (for success)
3985
3986     """
3987     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3988                                   os.path.isfile)
3989     if alloc_script is None:
3990       _Fail("iallocator module '%s' not found in the search path", name)
3991
3992     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3993     try:
3994       os.write(fd, idata)
3995       os.close(fd)
3996       result = utils.RunCmd([alloc_script, fin_name])
3997       if result.failed:
3998         _Fail("iallocator module '%s' failed: %s, output '%s'",
3999               name, result.fail_reason, result.output)
4000     finally:
4001       os.unlink(fin_name)
4002
4003     return result.stdout
4004
4005
4006 class DevCacheManager(object):
4007   """Simple class for managing a cache of block device information.
4008
4009   """
4010   _DEV_PREFIX = "/dev/"
4011   _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4012
4013   @classmethod
4014   def _ConvertPath(cls, dev_path):
4015     """Converts a /dev/name path to the cache file name.
4016
4017     This replaces slashes with underscores and strips the /dev
4018     prefix. It then returns the full path to the cache file.
4019
4020     @type dev_path: str
4021     @param dev_path: the C{/dev/} path name
4022     @rtype: str
4023     @return: the converted path name
4024
4025     """
4026     if dev_path.startswith(cls._DEV_PREFIX):
4027       dev_path = dev_path[len(cls._DEV_PREFIX):]
4028     dev_path = dev_path.replace("/", "_")
4029     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4030     return fpath
4031
4032   @classmethod
4033   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4034     """Updates the cache information for a given device.
4035
4036     @type dev_path: str
4037     @param dev_path: the pathname of the device
4038     @type owner: str
4039     @param owner: the owner (instance name) of the device
4040     @type on_primary: bool
4041     @param on_primary: whether this is the primary
4042         node nor not
4043     @type iv_name: str
4044     @param iv_name: the instance-visible name of the
4045         device, as in objects.Disk.iv_name
4046
4047     @rtype: None
4048
4049     """
4050     if dev_path is None:
4051       logging.error("DevCacheManager.UpdateCache got a None dev_path")
4052       return
4053     fpath = cls._ConvertPath(dev_path)
4054     if on_primary:
4055       state = "primary"
4056     else:
4057       state = "secondary"
4058     if iv_name is None:
4059       iv_name = "not_visible"
4060     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4061     try:
4062       utils.WriteFile(fpath, data=fdata)
4063     except EnvironmentError, err:
4064       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4065
4066   @classmethod
4067   def RemoveCache(cls, dev_path):
4068     """Remove data for a dev_path.
4069
4070     This is just a wrapper over L{utils.io.RemoveFile} with a converted
4071     path name and logging.
4072
4073     @type dev_path: str
4074     @param dev_path: the pathname of the device
4075
4076     @rtype: None
4077
4078     """
4079     if dev_path is None:
4080       logging.error("DevCacheManager.RemoveCache got a None dev_path")
4081       return
4082     fpath = cls._ConvertPath(dev_path)
4083     try:
4084       utils.RemoveFile(fpath)
4085     except EnvironmentError, err:
4086       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)