d5c0e928fa0474cdc08d52898ffc34f5f0d4a3e9
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
67
68
69 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70 _ALLOWED_CLEAN_DIRS = frozenset([
71   pathutils.DATA_DIR,
72   pathutils.JOB_QUEUE_ARCHIVE_DIR,
73   pathutils.QUEUE_DIR,
74   pathutils.CRYPTO_KEYS_DIR,
75   ])
76 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77 _X509_KEY_FILE = "key"
78 _X509_CERT_FILE = "cert"
79 _IES_STATUS_FILE = "status"
80 _IES_PID_FILE = "pid"
81 _IES_CA_FILE = "ca"
82
83 #: Valid LVS output line regex
84 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
85
86 # Actions for the master setup script
87 _MASTER_START = "start"
88 _MASTER_STOP = "stop"
89
90 #: Maximum file permissions for remote command directory and executables
91 _RCMD_MAX_MODE = (stat.S_IRWXU |
92                   stat.S_IRGRP | stat.S_IXGRP |
93                   stat.S_IROTH | stat.S_IXOTH)
94
95 #: Delay before returning an error for remote commands
96 _RCMD_INVALID_DELAY = 10
97
98 #: How long to wait to acquire lock for remote commands (shorter than
99 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
100 #: command requests arrive
101 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
102
103
104 class RPCFail(Exception):
105   """Class denoting RPC failure.
106
107   Its argument is the error message.
108
109   """
110
111
112 def _Fail(msg, *args, **kwargs):
113   """Log an error and the raise an RPCFail exception.
114
115   This exception is then handled specially in the ganeti daemon and
116   turned into a 'failed' return type. As such, this function is a
117   useful shortcut for logging the error and returning it to the master
118   daemon.
119
120   @type msg: string
121   @param msg: the text of the exception
122   @raise RPCFail
123
124   """
125   if args:
126     msg = msg % args
127   if "log" not in kwargs or kwargs["log"]: # if we should log this error
128     if "exc" in kwargs and kwargs["exc"]:
129       logging.exception(msg)
130     else:
131       logging.error(msg)
132   raise RPCFail(msg)
133
134
135 def _GetConfig():
136   """Simple wrapper to return a SimpleStore.
137
138   @rtype: L{ssconf.SimpleStore}
139   @return: a SimpleStore instance
140
141   """
142   return ssconf.SimpleStore()
143
144
145 def _GetSshRunner(cluster_name):
146   """Simple wrapper to return an SshRunner.
147
148   @type cluster_name: str
149   @param cluster_name: the cluster name, which is needed
150       by the SshRunner constructor
151   @rtype: L{ssh.SshRunner}
152   @return: an SshRunner instance
153
154   """
155   return ssh.SshRunner(cluster_name)
156
157
158 def _Decompress(data):
159   """Unpacks data compressed by the RPC client.
160
161   @type data: list or tuple
162   @param data: Data sent by RPC client
163   @rtype: str
164   @return: Decompressed data
165
166   """
167   assert isinstance(data, (list, tuple))
168   assert len(data) == 2
169   (encoding, content) = data
170   if encoding == constants.RPC_ENCODING_NONE:
171     return content
172   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
173     return zlib.decompress(base64.b64decode(content))
174   else:
175     raise AssertionError("Unknown data encoding")
176
177
178 def _CleanDirectory(path, exclude=None):
179   """Removes all regular files in a directory.
180
181   @type path: str
182   @param path: the directory to clean
183   @type exclude: list
184   @param exclude: list of files to be excluded, defaults
185       to the empty list
186
187   """
188   if path not in _ALLOWED_CLEAN_DIRS:
189     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
190           path)
191
192   if not os.path.isdir(path):
193     return
194   if exclude is None:
195     exclude = []
196   else:
197     # Normalize excluded paths
198     exclude = [os.path.normpath(i) for i in exclude]
199
200   for rel_name in utils.ListVisibleFiles(path):
201     full_name = utils.PathJoin(path, rel_name)
202     if full_name in exclude:
203       continue
204     if os.path.isfile(full_name) and not os.path.islink(full_name):
205       utils.RemoveFile(full_name)
206
207
208 def _BuildUploadFileList():
209   """Build the list of allowed upload files.
210
211   This is abstracted so that it's built only once at module import time.
212
213   """
214   allowed_files = set([
215     pathutils.CLUSTER_CONF_FILE,
216     pathutils.ETC_HOSTS,
217     pathutils.SSH_KNOWN_HOSTS_FILE,
218     pathutils.VNC_PASSWORD_FILE,
219     pathutils.RAPI_CERT_FILE,
220     pathutils.SPICE_CERT_FILE,
221     pathutils.SPICE_CACERT_FILE,
222     pathutils.RAPI_USERS_FILE,
223     pathutils.CONFD_HMAC_KEY,
224     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
225     ])
226
227   for hv_name in constants.HYPER_TYPES:
228     hv_class = hypervisor.GetHypervisorClass(hv_name)
229     allowed_files.update(hv_class.GetAncillaryFiles()[0])
230
231   assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
232     "Allowed file storage paths should never be uploaded via RPC"
233
234   return frozenset(allowed_files)
235
236
237 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
238
239
240 def JobQueuePurge():
241   """Removes job queue files and archived jobs.
242
243   @rtype: tuple
244   @return: True, None
245
246   """
247   _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
248   _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
249
250
251 def GetMasterInfo():
252   """Returns master information.
253
254   This is an utility function to compute master information, either
255   for consumption here or from the node daemon.
256
257   @rtype: tuple
258   @return: master_netdev, master_ip, master_name, primary_ip_family,
259     master_netmask
260   @raise RPCFail: in case of errors
261
262   """
263   try:
264     cfg = _GetConfig()
265     master_netdev = cfg.GetMasterNetdev()
266     master_ip = cfg.GetMasterIP()
267     master_netmask = cfg.GetMasterNetmask()
268     master_node = cfg.GetMasterNode()
269     primary_ip_family = cfg.GetPrimaryIPFamily()
270   except errors.ConfigurationError, err:
271     _Fail("Cluster configuration incomplete: %s", err, exc=True)
272   return (master_netdev, master_ip, master_node, primary_ip_family,
273           master_netmask)
274
275
276 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
277   """Decorator that runs hooks before and after the decorated function.
278
279   @type hook_opcode: string
280   @param hook_opcode: opcode of the hook
281   @type hooks_path: string
282   @param hooks_path: path of the hooks
283   @type env_builder_fn: function
284   @param env_builder_fn: function that returns a dictionary containing the
285     environment variables for the hooks. Will get all the parameters of the
286     decorated function.
287   @raise RPCFail: in case of pre-hook failure
288
289   """
290   def decorator(fn):
291     def wrapper(*args, **kwargs):
292       _, myself = ssconf.GetMasterAndMyself()
293       nodes = ([myself], [myself])  # these hooks run locally
294
295       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
296
297       cfg = _GetConfig()
298       hr = HooksRunner()
299       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
300                             None, env_fn, logging.warning, cfg.GetClusterName(),
301                             cfg.GetMasterNode())
302
303       hm.RunPhase(constants.HOOKS_PHASE_PRE)
304       result = fn(*args, **kwargs)
305       hm.RunPhase(constants.HOOKS_PHASE_POST)
306
307       return result
308     return wrapper
309   return decorator
310
311
312 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
313   """Builds environment variables for master IP hooks.
314
315   @type master_params: L{objects.MasterNetworkParameters}
316   @param master_params: network parameters of the master
317   @type use_external_mip_script: boolean
318   @param use_external_mip_script: whether to use an external master IP
319     address setup script (unused, but necessary per the implementation of the
320     _RunLocalHooks decorator)
321
322   """
323   # pylint: disable=W0613
324   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
325   env = {
326     "MASTER_NETDEV": master_params.netdev,
327     "MASTER_IP": master_params.ip,
328     "MASTER_NETMASK": str(master_params.netmask),
329     "CLUSTER_IP_VERSION": str(ver),
330   }
331
332   return env
333
334
335 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
336   """Execute the master IP address setup script.
337
338   @type master_params: L{objects.MasterNetworkParameters}
339   @param master_params: network parameters of the master
340   @type action: string
341   @param action: action to pass to the script. Must be one of
342     L{backend._MASTER_START} or L{backend._MASTER_STOP}
343   @type use_external_mip_script: boolean
344   @param use_external_mip_script: whether to use an external master IP
345     address setup script
346   @raise backend.RPCFail: if there are errors during the execution of the
347     script
348
349   """
350   env = _BuildMasterIpEnv(master_params)
351
352   if use_external_mip_script:
353     setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
354   else:
355     setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
356
357   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
358
359   if result.failed:
360     _Fail("Failed to %s the master IP. Script return value: %s" %
361           (action, result.exit_code), log=True)
362
363
364 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
365                _BuildMasterIpEnv)
366 def ActivateMasterIp(master_params, use_external_mip_script):
367   """Activate the IP address of the master daemon.
368
369   @type master_params: L{objects.MasterNetworkParameters}
370   @param master_params: network parameters of the master
371   @type use_external_mip_script: boolean
372   @param use_external_mip_script: whether to use an external master IP
373     address setup script
374   @raise RPCFail: in case of errors during the IP startup
375
376   """
377   _RunMasterSetupScript(master_params, _MASTER_START,
378                         use_external_mip_script)
379
380
381 def StartMasterDaemons(no_voting):
382   """Activate local node as master node.
383
384   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
385
386   @type no_voting: boolean
387   @param no_voting: whether to start ganeti-masterd without a node vote
388       but still non-interactively
389   @rtype: None
390
391   """
392
393   if no_voting:
394     masterd_args = "--no-voting --yes-do-it"
395   else:
396     masterd_args = ""
397
398   env = {
399     "EXTRA_MASTERD_ARGS": masterd_args,
400     }
401
402   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
403   if result.failed:
404     msg = "Can't start Ganeti master: %s" % result.output
405     logging.error(msg)
406     _Fail(msg)
407
408
409 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
410                _BuildMasterIpEnv)
411 def DeactivateMasterIp(master_params, use_external_mip_script):
412   """Deactivate the master IP on this node.
413
414   @type master_params: L{objects.MasterNetworkParameters}
415   @param master_params: network parameters of the master
416   @type use_external_mip_script: boolean
417   @param use_external_mip_script: whether to use an external master IP
418     address setup script
419   @raise RPCFail: in case of errors during the IP turndown
420
421   """
422   _RunMasterSetupScript(master_params, _MASTER_STOP,
423                         use_external_mip_script)
424
425
426 def StopMasterDaemons():
427   """Stop the master daemons on this node.
428
429   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
430
431   @rtype: None
432
433   """
434   # TODO: log and report back to the caller the error failures; we
435   # need to decide in which case we fail the RPC for this
436
437   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
438   if result.failed:
439     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
440                   " and error %s",
441                   result.cmd, result.exit_code, result.output)
442
443
444 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
445   """Change the netmask of the master IP.
446
447   @param old_netmask: the old value of the netmask
448   @param netmask: the new value of the netmask
449   @param master_ip: the master IP
450   @param master_netdev: the master network device
451
452   """
453   if old_netmask == netmask:
454     return
455
456   if not netutils.IPAddress.Own(master_ip):
457     _Fail("The master IP address is not up, not attempting to change its"
458           " netmask")
459
460   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
461                          "%s/%s" % (master_ip, netmask),
462                          "dev", master_netdev, "label",
463                          "%s:0" % master_netdev])
464   if result.failed:
465     _Fail("Could not set the new netmask on the master IP address")
466
467   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
468                          "%s/%s" % (master_ip, old_netmask),
469                          "dev", master_netdev, "label",
470                          "%s:0" % master_netdev])
471   if result.failed:
472     _Fail("Could not bring down the master IP address with the old netmask")
473
474
475 def EtcHostsModify(mode, host, ip):
476   """Modify a host entry in /etc/hosts.
477
478   @param mode: The mode to operate. Either add or remove entry
479   @param host: The host to operate on
480   @param ip: The ip associated with the entry
481
482   """
483   if mode == constants.ETC_HOSTS_ADD:
484     if not ip:
485       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
486               " present")
487     utils.AddHostToEtcHosts(host, ip)
488   elif mode == constants.ETC_HOSTS_REMOVE:
489     if ip:
490       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
491               " parameter is present")
492     utils.RemoveHostFromEtcHosts(host)
493   else:
494     RPCFail("Mode not supported")
495
496
497 def LeaveCluster(modify_ssh_setup):
498   """Cleans up and remove the current node.
499
500   This function cleans up and prepares the current node to be removed
501   from the cluster.
502
503   If processing is successful, then it raises an
504   L{errors.QuitGanetiException} which is used as a special case to
505   shutdown the node daemon.
506
507   @param modify_ssh_setup: boolean
508
509   """
510   _CleanDirectory(pathutils.DATA_DIR)
511   _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
512   JobQueuePurge()
513
514   if modify_ssh_setup:
515     try:
516       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
517
518       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
519
520       utils.RemoveFile(priv_key)
521       utils.RemoveFile(pub_key)
522     except errors.OpExecError:
523       logging.exception("Error while processing ssh files")
524
525   try:
526     utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
527     utils.RemoveFile(pathutils.RAPI_CERT_FILE)
528     utils.RemoveFile(pathutils.SPICE_CERT_FILE)
529     utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
530     utils.RemoveFile(pathutils.NODED_CERT_FILE)
531   except: # pylint: disable=W0702
532     logging.exception("Error while removing cluster secrets")
533
534   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
535   if result.failed:
536     logging.error("Command %s failed with exitcode %s and error %s",
537                   result.cmd, result.exit_code, result.output)
538
539   # Raise a custom exception (handled in ganeti-noded)
540   raise errors.QuitGanetiException(True, "Shutdown scheduled")
541
542
543 def _GetVgInfo(name):
544   """Retrieves information about a LVM volume group.
545
546   """
547   # TODO: GetVGInfo supports returning information for multiple VGs at once
548   vginfo = bdev.LogicalVolume.GetVGInfo([name])
549   if vginfo:
550     vg_free = int(round(vginfo[0][0], 0))
551     vg_size = int(round(vginfo[0][1], 0))
552   else:
553     vg_free = None
554     vg_size = None
555
556   return {
557     "name": name,
558     "vg_free": vg_free,
559     "vg_size": vg_size,
560     }
561
562
563 def _GetHvInfo(name):
564   """Retrieves node information from a hypervisor.
565
566   The information returned depends on the hypervisor. Common items:
567
568     - vg_size is the size of the configured volume group in MiB
569     - vg_free is the free size of the volume group in MiB
570     - memory_dom0 is the memory allocated for domain0 in MiB
571     - memory_free is the currently available (free) ram in MiB
572     - memory_total is the total number of ram in MiB
573     - hv_version: the hypervisor version, if available
574
575   """
576   return hypervisor.GetHypervisor(name).GetNodeInfo()
577
578
579 def _GetNamedNodeInfo(names, fn):
580   """Calls C{fn} for all names in C{names} and returns a dictionary.
581
582   @rtype: None or dict
583
584   """
585   if names is None:
586     return None
587   else:
588     return map(fn, names)
589
590
591 def GetNodeInfo(vg_names, hv_names):
592   """Gives back a hash with different information about the node.
593
594   @type vg_names: list of string
595   @param vg_names: Names of the volume groups to ask for disk space information
596   @type hv_names: list of string
597   @param hv_names: Names of the hypervisors to ask for node information
598   @rtype: tuple; (string, None/dict, None/dict)
599   @return: Tuple containing boot ID, volume group information and hypervisor
600     information
601
602   """
603   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
604   vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
605   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
606
607   return (bootid, vg_info, hv_info)
608
609
610 def VerifyNode(what, cluster_name):
611   """Verify the status of the local node.
612
613   Based on the input L{what} parameter, various checks are done on the
614   local node.
615
616   If the I{filelist} key is present, this list of
617   files is checksummed and the file/checksum pairs are returned.
618
619   If the I{nodelist} key is present, we check that we have
620   connectivity via ssh with the target nodes (and check the hostname
621   report).
622
623   If the I{node-net-test} key is present, we check that we have
624   connectivity to the given nodes via both primary IP and, if
625   applicable, secondary IPs.
626
627   @type what: C{dict}
628   @param what: a dictionary of things to check:
629       - filelist: list of files for which to compute checksums
630       - nodelist: list of nodes we should check ssh communication with
631       - node-net-test: list of nodes we should check node daemon port
632         connectivity with
633       - hypervisor: list with hypervisors to run the verify for
634   @rtype: dict
635   @return: a dictionary with the same keys as the input dict, and
636       values representing the result of the checks
637
638   """
639   result = {}
640   my_name = netutils.Hostname.GetSysName()
641   port = netutils.GetDaemonPort(constants.NODED)
642   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
643
644   if constants.NV_HYPERVISOR in what and vm_capable:
645     result[constants.NV_HYPERVISOR] = tmp = {}
646     for hv_name in what[constants.NV_HYPERVISOR]:
647       try:
648         val = hypervisor.GetHypervisor(hv_name).Verify()
649       except errors.HypervisorError, err:
650         val = "Error while checking hypervisor: %s" % str(err)
651       tmp[hv_name] = val
652
653   if constants.NV_HVPARAMS in what and vm_capable:
654     result[constants.NV_HVPARAMS] = tmp = []
655     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
656       try:
657         logging.info("Validating hv %s, %s", hv_name, hvparms)
658         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
659       except errors.HypervisorError, err:
660         tmp.append((source, hv_name, str(err)))
661
662   if constants.NV_FILELIST in what:
663     fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
664                                               what[constants.NV_FILELIST]))
665     result[constants.NV_FILELIST] = \
666       dict((vcluster.MakeVirtualPath(key), value)
667            for (key, value) in fingerprints.items())
668
669   if constants.NV_NODELIST in what:
670     (nodes, bynode) = what[constants.NV_NODELIST]
671
672     # Add nodes from other groups (different for each node)
673     try:
674       nodes.extend(bynode[my_name])
675     except KeyError:
676       pass
677
678     # Use a random order
679     random.shuffle(nodes)
680
681     # Try to contact all nodes
682     val = {}
683     for node in nodes:
684       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
685       if not success:
686         val[node] = message
687
688     result[constants.NV_NODELIST] = val
689
690   if constants.NV_NODENETTEST in what:
691     result[constants.NV_NODENETTEST] = tmp = {}
692     my_pip = my_sip = None
693     for name, pip, sip in what[constants.NV_NODENETTEST]:
694       if name == my_name:
695         my_pip = pip
696         my_sip = sip
697         break
698     if not my_pip:
699       tmp[my_name] = ("Can't find my own primary/secondary IP"
700                       " in the node list")
701     else:
702       for name, pip, sip in what[constants.NV_NODENETTEST]:
703         fail = []
704         if not netutils.TcpPing(pip, port, source=my_pip):
705           fail.append("primary")
706         if sip != pip:
707           if not netutils.TcpPing(sip, port, source=my_sip):
708             fail.append("secondary")
709         if fail:
710           tmp[name] = ("failure using the %s interface(s)" %
711                        " and ".join(fail))
712
713   if constants.NV_MASTERIP in what:
714     # FIXME: add checks on incoming data structures (here and in the
715     # rest of the function)
716     master_name, master_ip = what[constants.NV_MASTERIP]
717     if master_name == my_name:
718       source = constants.IP4_ADDRESS_LOCALHOST
719     else:
720       source = None
721     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
722                                                      source=source)
723
724   if constants.NV_USERSCRIPTS in what:
725     result[constants.NV_USERSCRIPTS] = \
726       [script for script in what[constants.NV_USERSCRIPTS]
727        if not utils.IsExecutable(script)]
728
729   if constants.NV_OOB_PATHS in what:
730     result[constants.NV_OOB_PATHS] = tmp = []
731     for path in what[constants.NV_OOB_PATHS]:
732       try:
733         st = os.stat(path)
734       except OSError, err:
735         tmp.append("error stating out of band helper: %s" % err)
736       else:
737         if stat.S_ISREG(st.st_mode):
738           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
739             tmp.append(None)
740           else:
741             tmp.append("out of band helper %s is not executable" % path)
742         else:
743           tmp.append("out of band helper %s is not a file" % path)
744
745   if constants.NV_LVLIST in what and vm_capable:
746     try:
747       val = GetVolumeList(utils.ListVolumeGroups().keys())
748     except RPCFail, err:
749       val = str(err)
750     result[constants.NV_LVLIST] = val
751
752   if constants.NV_INSTANCELIST in what and vm_capable:
753     # GetInstanceList can fail
754     try:
755       val = GetInstanceList(what[constants.NV_INSTANCELIST])
756     except RPCFail, err:
757       val = str(err)
758     result[constants.NV_INSTANCELIST] = val
759
760   if constants.NV_VGLIST in what and vm_capable:
761     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
762
763   if constants.NV_PVLIST in what and vm_capable:
764     result[constants.NV_PVLIST] = \
765       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
766                                    filter_allocatable=False)
767
768   if constants.NV_VERSION in what:
769     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
770                                     constants.RELEASE_VERSION)
771
772   if constants.NV_HVINFO in what and vm_capable:
773     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
774     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
775
776   if constants.NV_DRBDLIST in what and vm_capable:
777     try:
778       used_minors = bdev.DRBD8.GetUsedDevs().keys()
779     except errors.BlockDeviceError, err:
780       logging.warning("Can't get used minors list", exc_info=True)
781       used_minors = str(err)
782     result[constants.NV_DRBDLIST] = used_minors
783
784   if constants.NV_DRBDHELPER in what and vm_capable:
785     status = True
786     try:
787       payload = bdev.BaseDRBD.GetUsermodeHelper()
788     except errors.BlockDeviceError, err:
789       logging.error("Can't get DRBD usermode helper: %s", str(err))
790       status = False
791       payload = str(err)
792     result[constants.NV_DRBDHELPER] = (status, payload)
793
794   if constants.NV_NODESETUP in what:
795     result[constants.NV_NODESETUP] = tmpr = []
796     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
797       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
798                   " under /sys, missing required directories /sys/block"
799                   " and /sys/class/net")
800     if (not os.path.isdir("/proc/sys") or
801         not os.path.isfile("/proc/sysrq-trigger")):
802       tmpr.append("The procfs filesystem doesn't seem to be mounted"
803                   " under /proc, missing required directory /proc/sys and"
804                   " the file /proc/sysrq-trigger")
805
806   if constants.NV_TIME in what:
807     result[constants.NV_TIME] = utils.SplitTime(time.time())
808
809   if constants.NV_OSLIST in what and vm_capable:
810     result[constants.NV_OSLIST] = DiagnoseOS()
811
812   if constants.NV_BRIDGES in what and vm_capable:
813     result[constants.NV_BRIDGES] = [bridge
814                                     for bridge in what[constants.NV_BRIDGES]
815                                     if not utils.BridgeExists(bridge)]
816
817   if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
818     result[constants.NV_FILE_STORAGE_PATHS] = \
819       bdev.ComputeWrongFileStoragePaths()
820
821   return result
822
823
824 def GetBlockDevSizes(devices):
825   """Return the size of the given block devices
826
827   @type devices: list
828   @param devices: list of block device nodes to query
829   @rtype: dict
830   @return:
831     dictionary of all block devices under /dev (key). The value is their
832     size in MiB.
833
834     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
835
836   """
837   DEV_PREFIX = "/dev/"
838   blockdevs = {}
839
840   for devpath in devices:
841     if not utils.IsBelowDir(DEV_PREFIX, devpath):
842       continue
843
844     try:
845       st = os.stat(devpath)
846     except EnvironmentError, err:
847       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
848       continue
849
850     if stat.S_ISBLK(st.st_mode):
851       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
852       if result.failed:
853         # We don't want to fail, just do not list this device as available
854         logging.warning("Cannot get size for block device %s", devpath)
855         continue
856
857       size = int(result.stdout) / (1024 * 1024)
858       blockdevs[devpath] = size
859   return blockdevs
860
861
862 def GetVolumeList(vg_names):
863   """Compute list of logical volumes and their size.
864
865   @type vg_names: list
866   @param vg_names: the volume groups whose LVs we should list, or
867       empty for all volume groups
868   @rtype: dict
869   @return:
870       dictionary of all partions (key) with value being a tuple of
871       their size (in MiB), inactive and online status::
872
873         {'xenvg/test1': ('20.06', True, True)}
874
875       in case of errors, a string is returned with the error
876       details.
877
878   """
879   lvs = {}
880   sep = "|"
881   if not vg_names:
882     vg_names = []
883   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
884                          "--separator=%s" % sep,
885                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
886   if result.failed:
887     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
888
889   for line in result.stdout.splitlines():
890     line = line.strip()
891     match = _LVSLINE_REGEX.match(line)
892     if not match:
893       logging.error("Invalid line returned from lvs output: '%s'", line)
894       continue
895     vg_name, name, size, attr = match.groups()
896     inactive = attr[4] == "-"
897     online = attr[5] == "o"
898     virtual = attr[0] == "v"
899     if virtual:
900       # we don't want to report such volumes as existing, since they
901       # don't really hold data
902       continue
903     lvs[vg_name + "/" + name] = (size, inactive, online)
904
905   return lvs
906
907
908 def ListVolumeGroups():
909   """List the volume groups and their size.
910
911   @rtype: dict
912   @return: dictionary with keys volume name and values the
913       size of the volume
914
915   """
916   return utils.ListVolumeGroups()
917
918
919 def NodeVolumes():
920   """List all volumes on this node.
921
922   @rtype: list
923   @return:
924     A list of dictionaries, each having four keys:
925       - name: the logical volume name,
926       - size: the size of the logical volume
927       - dev: the physical device on which the LV lives
928       - vg: the volume group to which it belongs
929
930     In case of errors, we return an empty list and log the
931     error.
932
933     Note that since a logical volume can live on multiple physical
934     volumes, the resulting list might include a logical volume
935     multiple times.
936
937   """
938   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
939                          "--separator=|",
940                          "--options=lv_name,lv_size,devices,vg_name"])
941   if result.failed:
942     _Fail("Failed to list logical volumes, lvs output: %s",
943           result.output)
944
945   def parse_dev(dev):
946     return dev.split("(")[0]
947
948   def handle_dev(dev):
949     return [parse_dev(x) for x in dev.split(",")]
950
951   def map_line(line):
952     line = [v.strip() for v in line]
953     return [{"name": line[0], "size": line[1],
954              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
955
956   all_devs = []
957   for line in result.stdout.splitlines():
958     if line.count("|") >= 3:
959       all_devs.extend(map_line(line.split("|")))
960     else:
961       logging.warning("Strange line in the output from lvs: '%s'", line)
962   return all_devs
963
964
965 def BridgesExist(bridges_list):
966   """Check if a list of bridges exist on the current node.
967
968   @rtype: boolean
969   @return: C{True} if all of them exist, C{False} otherwise
970
971   """
972   missing = []
973   for bridge in bridges_list:
974     if not utils.BridgeExists(bridge):
975       missing.append(bridge)
976
977   if missing:
978     _Fail("Missing bridges %s", utils.CommaJoin(missing))
979
980
981 def GetInstanceList(hypervisor_list):
982   """Provides a list of instances.
983
984   @type hypervisor_list: list
985   @param hypervisor_list: the list of hypervisors to query information
986
987   @rtype: list
988   @return: a list of all running instances on the current node
989     - instance1.example.com
990     - instance2.example.com
991
992   """
993   results = []
994   for hname in hypervisor_list:
995     try:
996       names = hypervisor.GetHypervisor(hname).ListInstances()
997       results.extend(names)
998     except errors.HypervisorError, err:
999       _Fail("Error enumerating instances (hypervisor %s): %s",
1000             hname, err, exc=True)
1001
1002   return results
1003
1004
1005 def GetInstanceInfo(instance, hname):
1006   """Gives back the information about an instance as a dictionary.
1007
1008   @type instance: string
1009   @param instance: the instance name
1010   @type hname: string
1011   @param hname: the hypervisor type of the instance
1012
1013   @rtype: dict
1014   @return: dictionary with the following keys:
1015       - memory: memory size of instance (int)
1016       - state: xen state of instance (string)
1017       - time: cpu time of instance (float)
1018       - vcpus: the number of vcpus (int)
1019
1020   """
1021   output = {}
1022
1023   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1024   if iinfo is not None:
1025     output["memory"] = iinfo[2]
1026     output["vcpus"] = iinfo[3]
1027     output["state"] = iinfo[4]
1028     output["time"] = iinfo[5]
1029
1030   return output
1031
1032
1033 def GetInstanceMigratable(instance):
1034   """Gives whether an instance can be migrated.
1035
1036   @type instance: L{objects.Instance}
1037   @param instance: object representing the instance to be checked.
1038
1039   @rtype: tuple
1040   @return: tuple of (result, description) where:
1041       - result: whether the instance can be migrated or not
1042       - description: a description of the issue, if relevant
1043
1044   """
1045   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1046   iname = instance.name
1047   if iname not in hyper.ListInstances():
1048     _Fail("Instance %s is not running", iname)
1049
1050   for idx in range(len(instance.disks)):
1051     link_name = _GetBlockDevSymlinkPath(iname, idx)
1052     if not os.path.islink(link_name):
1053       logging.warning("Instance %s is missing symlink %s for disk %d",
1054                       iname, link_name, idx)
1055
1056
1057 def GetAllInstancesInfo(hypervisor_list):
1058   """Gather data about all instances.
1059
1060   This is the equivalent of L{GetInstanceInfo}, except that it
1061   computes data for all instances at once, thus being faster if one
1062   needs data about more than one instance.
1063
1064   @type hypervisor_list: list
1065   @param hypervisor_list: list of hypervisors to query for instance data
1066
1067   @rtype: dict
1068   @return: dictionary of instance: data, with data having the following keys:
1069       - memory: memory size of instance (int)
1070       - state: xen state of instance (string)
1071       - time: cpu time of instance (float)
1072       - vcpus: the number of vcpus
1073
1074   """
1075   output = {}
1076
1077   for hname in hypervisor_list:
1078     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1079     if iinfo:
1080       for name, _, memory, vcpus, state, times in iinfo:
1081         value = {
1082           "memory": memory,
1083           "vcpus": vcpus,
1084           "state": state,
1085           "time": times,
1086           }
1087         if name in output:
1088           # we only check static parameters, like memory and vcpus,
1089           # and not state and time which can change between the
1090           # invocations of the different hypervisors
1091           for key in "memory", "vcpus":
1092             if value[key] != output[name][key]:
1093               _Fail("Instance %s is running twice"
1094                     " with different parameters", name)
1095         output[name] = value
1096
1097   return output
1098
1099
1100 def _InstanceLogName(kind, os_name, instance, component):
1101   """Compute the OS log filename for a given instance and operation.
1102
1103   The instance name and os name are passed in as strings since not all
1104   operations have these as part of an instance object.
1105
1106   @type kind: string
1107   @param kind: the operation type (e.g. add, import, etc.)
1108   @type os_name: string
1109   @param os_name: the os name
1110   @type instance: string
1111   @param instance: the name of the instance being imported/added/etc.
1112   @type component: string or None
1113   @param component: the name of the component of the instance being
1114       transferred
1115
1116   """
1117   # TODO: Use tempfile.mkstemp to create unique filename
1118   if component:
1119     assert "/" not in component
1120     c_msg = "-%s" % component
1121   else:
1122     c_msg = ""
1123   base = ("%s-%s-%s%s-%s.log" %
1124           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1125   return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1126
1127
1128 def InstanceOsAdd(instance, reinstall, debug):
1129   """Add an OS to an instance.
1130
1131   @type instance: L{objects.Instance}
1132   @param instance: Instance whose OS is to be installed
1133   @type reinstall: boolean
1134   @param reinstall: whether this is an instance reinstall
1135   @type debug: integer
1136   @param debug: debug level, passed to the OS scripts
1137   @rtype: None
1138
1139   """
1140   inst_os = OSFromDisk(instance.os)
1141
1142   create_env = OSEnvironment(instance, inst_os, debug)
1143   if reinstall:
1144     create_env["INSTANCE_REINSTALL"] = "1"
1145
1146   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1147
1148   result = utils.RunCmd([inst_os.create_script], env=create_env,
1149                         cwd=inst_os.path, output=logfile, reset_env=True)
1150   if result.failed:
1151     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1152                   " output: %s", result.cmd, result.fail_reason, logfile,
1153                   result.output)
1154     lines = [utils.SafeEncode(val)
1155              for val in utils.TailFile(logfile, lines=20)]
1156     _Fail("OS create script failed (%s), last lines in the"
1157           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1158
1159
1160 def RunRenameInstance(instance, old_name, debug):
1161   """Run the OS rename script for an instance.
1162
1163   @type instance: L{objects.Instance}
1164   @param instance: Instance whose OS is to be installed
1165   @type old_name: string
1166   @param old_name: previous instance name
1167   @type debug: integer
1168   @param debug: debug level, passed to the OS scripts
1169   @rtype: boolean
1170   @return: the success of the operation
1171
1172   """
1173   inst_os = OSFromDisk(instance.os)
1174
1175   rename_env = OSEnvironment(instance, inst_os, debug)
1176   rename_env["OLD_INSTANCE_NAME"] = old_name
1177
1178   logfile = _InstanceLogName("rename", instance.os,
1179                              "%s-%s" % (old_name, instance.name), None)
1180
1181   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1182                         cwd=inst_os.path, output=logfile, reset_env=True)
1183
1184   if result.failed:
1185     logging.error("os create command '%s' returned error: %s output: %s",
1186                   result.cmd, result.fail_reason, result.output)
1187     lines = [utils.SafeEncode(val)
1188              for val in utils.TailFile(logfile, lines=20)]
1189     _Fail("OS rename script failed (%s), last lines in the"
1190           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1191
1192
1193 def _GetBlockDevSymlinkPath(instance_name, idx):
1194   return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1195                         (instance_name, constants.DISK_SEPARATOR, idx))
1196
1197
1198 def _SymlinkBlockDev(instance_name, device_path, idx):
1199   """Set up symlinks to a instance's block device.
1200
1201   This is an auxiliary function run when an instance is start (on the primary
1202   node) or when an instance is migrated (on the target node).
1203
1204
1205   @param instance_name: the name of the target instance
1206   @param device_path: path of the physical block device, on the node
1207   @param idx: the disk index
1208   @return: absolute path to the disk's symlink
1209
1210   """
1211   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1212   try:
1213     os.symlink(device_path, link_name)
1214   except OSError, err:
1215     if err.errno == errno.EEXIST:
1216       if (not os.path.islink(link_name) or
1217           os.readlink(link_name) != device_path):
1218         os.remove(link_name)
1219         os.symlink(device_path, link_name)
1220     else:
1221       raise
1222
1223   return link_name
1224
1225
1226 def _RemoveBlockDevLinks(instance_name, disks):
1227   """Remove the block device symlinks belonging to the given instance.
1228
1229   """
1230   for idx, _ in enumerate(disks):
1231     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1232     if os.path.islink(link_name):
1233       try:
1234         os.remove(link_name)
1235       except OSError:
1236         logging.exception("Can't remove symlink '%s'", link_name)
1237
1238
1239 def _GatherAndLinkBlockDevs(instance):
1240   """Set up an instance's block device(s).
1241
1242   This is run on the primary node at instance startup. The block
1243   devices must be already assembled.
1244
1245   @type instance: L{objects.Instance}
1246   @param instance: the instance whose disks we shoul assemble
1247   @rtype: list
1248   @return: list of (disk_object, device_path)
1249
1250   """
1251   block_devices = []
1252   for idx, disk in enumerate(instance.disks):
1253     device = _RecursiveFindBD(disk)
1254     if device is None:
1255       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1256                                     str(disk))
1257     device.Open()
1258     try:
1259       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1260     except OSError, e:
1261       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1262                                     e.strerror)
1263
1264     block_devices.append((disk, link_name))
1265
1266   return block_devices
1267
1268
1269 def StartInstance(instance, startup_paused):
1270   """Start an instance.
1271
1272   @type instance: L{objects.Instance}
1273   @param instance: the instance object
1274   @type startup_paused: bool
1275   @param instance: pause instance at startup?
1276   @rtype: None
1277
1278   """
1279   running_instances = GetInstanceList([instance.hypervisor])
1280
1281   if instance.name in running_instances:
1282     logging.info("Instance %s already running, not starting", instance.name)
1283     return
1284
1285   try:
1286     block_devices = _GatherAndLinkBlockDevs(instance)
1287     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1288     hyper.StartInstance(instance, block_devices, startup_paused)
1289   except errors.BlockDeviceError, err:
1290     _Fail("Block device error: %s", err, exc=True)
1291   except errors.HypervisorError, err:
1292     _RemoveBlockDevLinks(instance.name, instance.disks)
1293     _Fail("Hypervisor error: %s", err, exc=True)
1294
1295
1296 def InstanceShutdown(instance, timeout):
1297   """Shut an instance down.
1298
1299   @note: this functions uses polling with a hardcoded timeout.
1300
1301   @type instance: L{objects.Instance}
1302   @param instance: the instance object
1303   @type timeout: integer
1304   @param timeout: maximum timeout for soft shutdown
1305   @rtype: None
1306
1307   """
1308   hv_name = instance.hypervisor
1309   hyper = hypervisor.GetHypervisor(hv_name)
1310   iname = instance.name
1311
1312   if instance.name not in hyper.ListInstances():
1313     logging.info("Instance %s not running, doing nothing", iname)
1314     return
1315
1316   class _TryShutdown:
1317     def __init__(self):
1318       self.tried_once = False
1319
1320     def __call__(self):
1321       if iname not in hyper.ListInstances():
1322         return
1323
1324       try:
1325         hyper.StopInstance(instance, retry=self.tried_once)
1326       except errors.HypervisorError, err:
1327         if iname not in hyper.ListInstances():
1328           # if the instance is no longer existing, consider this a
1329           # success and go to cleanup
1330           return
1331
1332         _Fail("Failed to stop instance %s: %s", iname, err)
1333
1334       self.tried_once = True
1335
1336       raise utils.RetryAgain()
1337
1338   try:
1339     utils.Retry(_TryShutdown(), 5, timeout)
1340   except utils.RetryTimeout:
1341     # the shutdown did not succeed
1342     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1343
1344     try:
1345       hyper.StopInstance(instance, force=True)
1346     except errors.HypervisorError, err:
1347       if iname in hyper.ListInstances():
1348         # only raise an error if the instance still exists, otherwise
1349         # the error could simply be "instance ... unknown"!
1350         _Fail("Failed to force stop instance %s: %s", iname, err)
1351
1352     time.sleep(1)
1353
1354     if iname in hyper.ListInstances():
1355       _Fail("Could not shutdown instance %s even by destroy", iname)
1356
1357   try:
1358     hyper.CleanupInstance(instance.name)
1359   except errors.HypervisorError, err:
1360     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1361
1362   _RemoveBlockDevLinks(iname, instance.disks)
1363
1364
1365 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1366   """Reboot an instance.
1367
1368   @type instance: L{objects.Instance}
1369   @param instance: the instance object to reboot
1370   @type reboot_type: str
1371   @param reboot_type: the type of reboot, one the following
1372     constants:
1373       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1374         instance OS, do not recreate the VM
1375       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1376         restart the VM (at the hypervisor level)
1377       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1378         not accepted here, since that mode is handled differently, in
1379         cmdlib, and translates into full stop and start of the
1380         instance (instead of a call_instance_reboot RPC)
1381   @type shutdown_timeout: integer
1382   @param shutdown_timeout: maximum timeout for soft shutdown
1383   @rtype: None
1384
1385   """
1386   running_instances = GetInstanceList([instance.hypervisor])
1387
1388   if instance.name not in running_instances:
1389     _Fail("Cannot reboot instance %s that is not running", instance.name)
1390
1391   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1392   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1393     try:
1394       hyper.RebootInstance(instance)
1395     except errors.HypervisorError, err:
1396       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1397   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1398     try:
1399       InstanceShutdown(instance, shutdown_timeout)
1400       return StartInstance(instance, False)
1401     except errors.HypervisorError, err:
1402       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1403   else:
1404     _Fail("Invalid reboot_type received: %s", reboot_type)
1405
1406
1407 def InstanceBalloonMemory(instance, memory):
1408   """Resize an instance's memory.
1409
1410   @type instance: L{objects.Instance}
1411   @param instance: the instance object
1412   @type memory: int
1413   @param memory: new memory amount in MB
1414   @rtype: None
1415
1416   """
1417   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1418   running = hyper.ListInstances()
1419   if instance.name not in running:
1420     logging.info("Instance %s is not running, cannot balloon", instance.name)
1421     return
1422   try:
1423     hyper.BalloonInstanceMemory(instance, memory)
1424   except errors.HypervisorError, err:
1425     _Fail("Failed to balloon instance memory: %s", err, exc=True)
1426
1427
1428 def MigrationInfo(instance):
1429   """Gather information about an instance to be migrated.
1430
1431   @type instance: L{objects.Instance}
1432   @param instance: the instance definition
1433
1434   """
1435   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1436   try:
1437     info = hyper.MigrationInfo(instance)
1438   except errors.HypervisorError, err:
1439     _Fail("Failed to fetch migration information: %s", err, exc=True)
1440   return info
1441
1442
1443 def AcceptInstance(instance, info, target):
1444   """Prepare the node to accept an instance.
1445
1446   @type instance: L{objects.Instance}
1447   @param instance: the instance definition
1448   @type info: string/data (opaque)
1449   @param info: migration information, from the source node
1450   @type target: string
1451   @param target: target host (usually ip), on this node
1452
1453   """
1454   # TODO: why is this required only for DTS_EXT_MIRROR?
1455   if instance.disk_template in constants.DTS_EXT_MIRROR:
1456     # Create the symlinks, as the disks are not active
1457     # in any way
1458     try:
1459       _GatherAndLinkBlockDevs(instance)
1460     except errors.BlockDeviceError, err:
1461       _Fail("Block device error: %s", err, exc=True)
1462
1463   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1464   try:
1465     hyper.AcceptInstance(instance, info, target)
1466   except errors.HypervisorError, err:
1467     if instance.disk_template in constants.DTS_EXT_MIRROR:
1468       _RemoveBlockDevLinks(instance.name, instance.disks)
1469     _Fail("Failed to accept instance: %s", err, exc=True)
1470
1471
1472 def FinalizeMigrationDst(instance, info, success):
1473   """Finalize any preparation to accept an instance.
1474
1475   @type instance: L{objects.Instance}
1476   @param instance: the instance definition
1477   @type info: string/data (opaque)
1478   @param info: migration information, from the source node
1479   @type success: boolean
1480   @param success: whether the migration was a success or a failure
1481
1482   """
1483   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1484   try:
1485     hyper.FinalizeMigrationDst(instance, info, success)
1486   except errors.HypervisorError, err:
1487     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1488
1489
1490 def MigrateInstance(instance, target, live):
1491   """Migrates an instance to another node.
1492
1493   @type instance: L{objects.Instance}
1494   @param instance: the instance definition
1495   @type target: string
1496   @param target: the target node name
1497   @type live: boolean
1498   @param live: whether the migration should be done live or not (the
1499       interpretation of this parameter is left to the hypervisor)
1500   @raise RPCFail: if migration fails for some reason
1501
1502   """
1503   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1504
1505   try:
1506     hyper.MigrateInstance(instance, target, live)
1507   except errors.HypervisorError, err:
1508     _Fail("Failed to migrate instance: %s", err, exc=True)
1509
1510
1511 def FinalizeMigrationSource(instance, success, live):
1512   """Finalize the instance migration on the source node.
1513
1514   @type instance: L{objects.Instance}
1515   @param instance: the instance definition of the migrated instance
1516   @type success: bool
1517   @param success: whether the migration succeeded or not
1518   @type live: bool
1519   @param live: whether the user requested a live migration or not
1520   @raise RPCFail: If the execution fails for some reason
1521
1522   """
1523   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1524
1525   try:
1526     hyper.FinalizeMigrationSource(instance, success, live)
1527   except Exception, err:  # pylint: disable=W0703
1528     _Fail("Failed to finalize the migration on the source node: %s", err,
1529           exc=True)
1530
1531
1532 def GetMigrationStatus(instance):
1533   """Get the migration status
1534
1535   @type instance: L{objects.Instance}
1536   @param instance: the instance that is being migrated
1537   @rtype: L{objects.MigrationStatus}
1538   @return: the status of the current migration (one of
1539            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1540            progress info that can be retrieved from the hypervisor
1541   @raise RPCFail: If the migration status cannot be retrieved
1542
1543   """
1544   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1545   try:
1546     return hyper.GetMigrationStatus(instance)
1547   except Exception, err:  # pylint: disable=W0703
1548     _Fail("Failed to get migration status: %s", err, exc=True)
1549
1550
1551 def BlockdevCreate(disk, size, owner, on_primary, info):
1552   """Creates a block device for an instance.
1553
1554   @type disk: L{objects.Disk}
1555   @param disk: the object describing the disk we should create
1556   @type size: int
1557   @param size: the size of the physical underlying device, in MiB
1558   @type owner: str
1559   @param owner: the name of the instance for which disk is created,
1560       used for device cache data
1561   @type on_primary: boolean
1562   @param on_primary:  indicates if it is the primary node or not
1563   @type info: string
1564   @param info: string that will be sent to the physical device
1565       creation, used for example to set (LVM) tags on LVs
1566
1567   @return: the new unique_id of the device (this can sometime be
1568       computed only after creation), or None. On secondary nodes,
1569       it's not required to return anything.
1570
1571   """
1572   # TODO: remove the obsolete "size" argument
1573   # pylint: disable=W0613
1574   clist = []
1575   if disk.children:
1576     for child in disk.children:
1577       try:
1578         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1579       except errors.BlockDeviceError, err:
1580         _Fail("Can't assemble device %s: %s", child, err)
1581       if on_primary or disk.AssembleOnSecondary():
1582         # we need the children open in case the device itself has to
1583         # be assembled
1584         try:
1585           # pylint: disable=E1103
1586           crdev.Open()
1587         except errors.BlockDeviceError, err:
1588           _Fail("Can't make child '%s' read-write: %s", child, err)
1589       clist.append(crdev)
1590
1591   try:
1592     device = bdev.Create(disk, clist)
1593   except errors.BlockDeviceError, err:
1594     _Fail("Can't create block device: %s", err)
1595
1596   if on_primary or disk.AssembleOnSecondary():
1597     try:
1598       device.Assemble()
1599     except errors.BlockDeviceError, err:
1600       _Fail("Can't assemble device after creation, unusual event: %s", err)
1601     if on_primary or disk.OpenOnSecondary():
1602       try:
1603         device.Open(force=True)
1604       except errors.BlockDeviceError, err:
1605         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1606     DevCacheManager.UpdateCache(device.dev_path, owner,
1607                                 on_primary, disk.iv_name)
1608
1609   device.SetInfo(info)
1610
1611   return device.unique_id
1612
1613
1614 def _WipeDevice(path, offset, size):
1615   """This function actually wipes the device.
1616
1617   @param path: The path to the device to wipe
1618   @param offset: The offset in MiB in the file
1619   @param size: The size in MiB to write
1620
1621   """
1622   # Internal sizes are always in Mebibytes; if the following "dd" command
1623   # should use a different block size the offset and size given to this
1624   # function must be adjusted accordingly before being passed to "dd".
1625   block_size = 1024 * 1024
1626
1627   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1628          "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1629          "count=%d" % size]
1630   result = utils.RunCmd(cmd)
1631
1632   if result.failed:
1633     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1634           result.fail_reason, result.output)
1635
1636
1637 def BlockdevWipe(disk, offset, size):
1638   """Wipes a block device.
1639
1640   @type disk: L{objects.Disk}
1641   @param disk: the disk object we want to wipe
1642   @type offset: int
1643   @param offset: The offset in MiB in the file
1644   @type size: int
1645   @param size: The size in MiB to write
1646
1647   """
1648   try:
1649     rdev = _RecursiveFindBD(disk)
1650   except errors.BlockDeviceError:
1651     rdev = None
1652
1653   if not rdev:
1654     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1655
1656   # Do cross verify some of the parameters
1657   if offset < 0:
1658     _Fail("Negative offset")
1659   if size < 0:
1660     _Fail("Negative size")
1661   if offset > rdev.size:
1662     _Fail("Offset is bigger than device size")
1663   if (offset + size) > rdev.size:
1664     _Fail("The provided offset and size to wipe is bigger than device size")
1665
1666   _WipeDevice(rdev.dev_path, offset, size)
1667
1668
1669 def BlockdevPauseResumeSync(disks, pause):
1670   """Pause or resume the sync of the block device.
1671
1672   @type disks: list of L{objects.Disk}
1673   @param disks: the disks object we want to pause/resume
1674   @type pause: bool
1675   @param pause: Wheater to pause or resume
1676
1677   """
1678   success = []
1679   for disk in disks:
1680     try:
1681       rdev = _RecursiveFindBD(disk)
1682     except errors.BlockDeviceError:
1683       rdev = None
1684
1685     if not rdev:
1686       success.append((False, ("Cannot change sync for device %s:"
1687                               " device not found" % disk.iv_name)))
1688       continue
1689
1690     result = rdev.PauseResumeSync(pause)
1691
1692     if result:
1693       success.append((result, None))
1694     else:
1695       if pause:
1696         msg = "Pause"
1697       else:
1698         msg = "Resume"
1699       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1700
1701   return success
1702
1703
1704 def BlockdevRemove(disk):
1705   """Remove a block device.
1706
1707   @note: This is intended to be called recursively.
1708
1709   @type disk: L{objects.Disk}
1710   @param disk: the disk object we should remove
1711   @rtype: boolean
1712   @return: the success of the operation
1713
1714   """
1715   msgs = []
1716   try:
1717     rdev = _RecursiveFindBD(disk)
1718   except errors.BlockDeviceError, err:
1719     # probably can't attach
1720     logging.info("Can't attach to device %s in remove", disk)
1721     rdev = None
1722   if rdev is not None:
1723     r_path = rdev.dev_path
1724     try:
1725       rdev.Remove()
1726     except errors.BlockDeviceError, err:
1727       msgs.append(str(err))
1728     if not msgs:
1729       DevCacheManager.RemoveCache(r_path)
1730
1731   if disk.children:
1732     for child in disk.children:
1733       try:
1734         BlockdevRemove(child)
1735       except RPCFail, err:
1736         msgs.append(str(err))
1737
1738   if msgs:
1739     _Fail("; ".join(msgs))
1740
1741
1742 def _RecursiveAssembleBD(disk, owner, as_primary):
1743   """Activate a block device for an instance.
1744
1745   This is run on the primary and secondary nodes for an instance.
1746
1747   @note: this function is called recursively.
1748
1749   @type disk: L{objects.Disk}
1750   @param disk: the disk we try to assemble
1751   @type owner: str
1752   @param owner: the name of the instance which owns the disk
1753   @type as_primary: boolean
1754   @param as_primary: if we should make the block device
1755       read/write
1756
1757   @return: the assembled device or None (in case no device
1758       was assembled)
1759   @raise errors.BlockDeviceError: in case there is an error
1760       during the activation of the children or the device
1761       itself
1762
1763   """
1764   children = []
1765   if disk.children:
1766     mcn = disk.ChildrenNeeded()
1767     if mcn == -1:
1768       mcn = 0 # max number of Nones allowed
1769     else:
1770       mcn = len(disk.children) - mcn # max number of Nones
1771     for chld_disk in disk.children:
1772       try:
1773         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1774       except errors.BlockDeviceError, err:
1775         if children.count(None) >= mcn:
1776           raise
1777         cdev = None
1778         logging.error("Error in child activation (but continuing): %s",
1779                       str(err))
1780       children.append(cdev)
1781
1782   if as_primary or disk.AssembleOnSecondary():
1783     r_dev = bdev.Assemble(disk, children)
1784     result = r_dev
1785     if as_primary or disk.OpenOnSecondary():
1786       r_dev.Open()
1787     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1788                                 as_primary, disk.iv_name)
1789
1790   else:
1791     result = True
1792   return result
1793
1794
1795 def BlockdevAssemble(disk, owner, as_primary, idx):
1796   """Activate a block device for an instance.
1797
1798   This is a wrapper over _RecursiveAssembleBD.
1799
1800   @rtype: str or boolean
1801   @return: a C{/dev/...} path for primary nodes, and
1802       C{True} for secondary nodes
1803
1804   """
1805   try:
1806     result = _RecursiveAssembleBD(disk, owner, as_primary)
1807     if isinstance(result, bdev.BlockDev):
1808       # pylint: disable=E1103
1809       result = result.dev_path
1810       if as_primary:
1811         _SymlinkBlockDev(owner, result, idx)
1812   except errors.BlockDeviceError, err:
1813     _Fail("Error while assembling disk: %s", err, exc=True)
1814   except OSError, err:
1815     _Fail("Error while symlinking disk: %s", err, exc=True)
1816
1817   return result
1818
1819
1820 def BlockdevShutdown(disk):
1821   """Shut down a block device.
1822
1823   First, if the device is assembled (Attach() is successful), then
1824   the device is shutdown. Then the children of the device are
1825   shutdown.
1826
1827   This function is called recursively. Note that we don't cache the
1828   children or such, as oppossed to assemble, shutdown of different
1829   devices doesn't require that the upper device was active.
1830
1831   @type disk: L{objects.Disk}
1832   @param disk: the description of the disk we should
1833       shutdown
1834   @rtype: None
1835
1836   """
1837   msgs = []
1838   r_dev = _RecursiveFindBD(disk)
1839   if r_dev is not None:
1840     r_path = r_dev.dev_path
1841     try:
1842       r_dev.Shutdown()
1843       DevCacheManager.RemoveCache(r_path)
1844     except errors.BlockDeviceError, err:
1845       msgs.append(str(err))
1846
1847   if disk.children:
1848     for child in disk.children:
1849       try:
1850         BlockdevShutdown(child)
1851       except RPCFail, err:
1852         msgs.append(str(err))
1853
1854   if msgs:
1855     _Fail("; ".join(msgs))
1856
1857
1858 def BlockdevAddchildren(parent_cdev, new_cdevs):
1859   """Extend a mirrored block device.
1860
1861   @type parent_cdev: L{objects.Disk}
1862   @param parent_cdev: the disk to which we should add children
1863   @type new_cdevs: list of L{objects.Disk}
1864   @param new_cdevs: the list of children which we should add
1865   @rtype: None
1866
1867   """
1868   parent_bdev = _RecursiveFindBD(parent_cdev)
1869   if parent_bdev is None:
1870     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1871   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1872   if new_bdevs.count(None) > 0:
1873     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1874   parent_bdev.AddChildren(new_bdevs)
1875
1876
1877 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1878   """Shrink a mirrored block device.
1879
1880   @type parent_cdev: L{objects.Disk}
1881   @param parent_cdev: the disk from which we should remove children
1882   @type new_cdevs: list of L{objects.Disk}
1883   @param new_cdevs: the list of children which we should remove
1884   @rtype: None
1885
1886   """
1887   parent_bdev = _RecursiveFindBD(parent_cdev)
1888   if parent_bdev is None:
1889     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1890   devs = []
1891   for disk in new_cdevs:
1892     rpath = disk.StaticDevPath()
1893     if rpath is None:
1894       bd = _RecursiveFindBD(disk)
1895       if bd is None:
1896         _Fail("Can't find device %s while removing children", disk)
1897       else:
1898         devs.append(bd.dev_path)
1899     else:
1900       if not utils.IsNormAbsPath(rpath):
1901         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1902       devs.append(rpath)
1903   parent_bdev.RemoveChildren(devs)
1904
1905
1906 def BlockdevGetmirrorstatus(disks):
1907   """Get the mirroring status of a list of devices.
1908
1909   @type disks: list of L{objects.Disk}
1910   @param disks: the list of disks which we should query
1911   @rtype: disk
1912   @return: List of L{objects.BlockDevStatus}, one for each disk
1913   @raise errors.BlockDeviceError: if any of the disks cannot be
1914       found
1915
1916   """
1917   stats = []
1918   for dsk in disks:
1919     rbd = _RecursiveFindBD(dsk)
1920     if rbd is None:
1921       _Fail("Can't find device %s", dsk)
1922
1923     stats.append(rbd.CombinedSyncStatus())
1924
1925   return stats
1926
1927
1928 def BlockdevGetmirrorstatusMulti(disks):
1929   """Get the mirroring status of a list of devices.
1930
1931   @type disks: list of L{objects.Disk}
1932   @param disks: the list of disks which we should query
1933   @rtype: disk
1934   @return: List of tuples, (bool, status), one for each disk; bool denotes
1935     success/failure, status is L{objects.BlockDevStatus} on success, string
1936     otherwise
1937
1938   """
1939   result = []
1940   for disk in disks:
1941     try:
1942       rbd = _RecursiveFindBD(disk)
1943       if rbd is None:
1944         result.append((False, "Can't find device %s" % disk))
1945         continue
1946
1947       status = rbd.CombinedSyncStatus()
1948     except errors.BlockDeviceError, err:
1949       logging.exception("Error while getting disk status")
1950       result.append((False, str(err)))
1951     else:
1952       result.append((True, status))
1953
1954   assert len(disks) == len(result)
1955
1956   return result
1957
1958
1959 def _RecursiveFindBD(disk):
1960   """Check if a device is activated.
1961
1962   If so, return information about the real device.
1963
1964   @type disk: L{objects.Disk}
1965   @param disk: the disk object we need to find
1966
1967   @return: None if the device can't be found,
1968       otherwise the device instance
1969
1970   """
1971   children = []
1972   if disk.children:
1973     for chdisk in disk.children:
1974       children.append(_RecursiveFindBD(chdisk))
1975
1976   return bdev.FindDevice(disk, children)
1977
1978
1979 def _OpenRealBD(disk):
1980   """Opens the underlying block device of a disk.
1981
1982   @type disk: L{objects.Disk}
1983   @param disk: the disk object we want to open
1984
1985   """
1986   real_disk = _RecursiveFindBD(disk)
1987   if real_disk is None:
1988     _Fail("Block device '%s' is not set up", disk)
1989
1990   real_disk.Open()
1991
1992   return real_disk
1993
1994
1995 def BlockdevFind(disk):
1996   """Check if a device is activated.
1997
1998   If it is, return information about the real device.
1999
2000   @type disk: L{objects.Disk}
2001   @param disk: the disk to find
2002   @rtype: None or objects.BlockDevStatus
2003   @return: None if the disk cannot be found, otherwise a the current
2004            information
2005
2006   """
2007   try:
2008     rbd = _RecursiveFindBD(disk)
2009   except errors.BlockDeviceError, err:
2010     _Fail("Failed to find device: %s", err, exc=True)
2011
2012   if rbd is None:
2013     return None
2014
2015   return rbd.GetSyncStatus()
2016
2017
2018 def BlockdevGetsize(disks):
2019   """Computes the size of the given disks.
2020
2021   If a disk is not found, returns None instead.
2022
2023   @type disks: list of L{objects.Disk}
2024   @param disks: the list of disk to compute the size for
2025   @rtype: list
2026   @return: list with elements None if the disk cannot be found,
2027       otherwise the size
2028
2029   """
2030   result = []
2031   for cf in disks:
2032     try:
2033       rbd = _RecursiveFindBD(cf)
2034     except errors.BlockDeviceError:
2035       result.append(None)
2036       continue
2037     if rbd is None:
2038       result.append(None)
2039     else:
2040       result.append(rbd.GetActualSize())
2041   return result
2042
2043
2044 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2045   """Export a block device to a remote node.
2046
2047   @type disk: L{objects.Disk}
2048   @param disk: the description of the disk to export
2049   @type dest_node: str
2050   @param dest_node: the destination node to export to
2051   @type dest_path: str
2052   @param dest_path: the destination path on the target node
2053   @type cluster_name: str
2054   @param cluster_name: the cluster name, needed for SSH hostalias
2055   @rtype: None
2056
2057   """
2058   real_disk = _OpenRealBD(disk)
2059
2060   # the block size on the read dd is 1MiB to match our units
2061   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2062                                "dd if=%s bs=1048576 count=%s",
2063                                real_disk.dev_path, str(disk.size))
2064
2065   # we set here a smaller block size as, due to ssh buffering, more
2066   # than 64-128k will mostly ignored; we use nocreat to fail if the
2067   # device is not already there or we pass a wrong path; we use
2068   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2069   # to not buffer too much memory; this means that at best, we flush
2070   # every 64k, which will not be very fast
2071   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2072                                 " oflag=dsync", dest_path)
2073
2074   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2075                                                    constants.SSH_LOGIN_USER,
2076                                                    destcmd)
2077
2078   # all commands have been checked, so we're safe to combine them
2079   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2080
2081   result = utils.RunCmd(["bash", "-c", command])
2082
2083   if result.failed:
2084     _Fail("Disk copy command '%s' returned error: %s"
2085           " output: %s", command, result.fail_reason, result.output)
2086
2087
2088 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2089   """Write a file to the filesystem.
2090
2091   This allows the master to overwrite(!) a file. It will only perform
2092   the operation if the file belongs to a list of configuration files.
2093
2094   @type file_name: str
2095   @param file_name: the target file name
2096   @type data: str
2097   @param data: the new contents of the file
2098   @type mode: int
2099   @param mode: the mode to give the file (can be None)
2100   @type uid: string
2101   @param uid: the owner of the file
2102   @type gid: string
2103   @param gid: the group of the file
2104   @type atime: float
2105   @param atime: the atime to set on the file (can be None)
2106   @type mtime: float
2107   @param mtime: the mtime to set on the file (can be None)
2108   @rtype: None
2109
2110   """
2111   file_name = vcluster.LocalizeVirtualPath(file_name)
2112
2113   if not os.path.isabs(file_name):
2114     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2115
2116   if file_name not in _ALLOWED_UPLOAD_FILES:
2117     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2118           file_name)
2119
2120   raw_data = _Decompress(data)
2121
2122   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2123     _Fail("Invalid username/groupname type")
2124
2125   getents = runtime.GetEnts()
2126   uid = getents.LookupUser(uid)
2127   gid = getents.LookupGroup(gid)
2128
2129   utils.SafeWriteFile(file_name, None,
2130                       data=raw_data, mode=mode, uid=uid, gid=gid,
2131                       atime=atime, mtime=mtime)
2132
2133
2134 def RunOob(oob_program, command, node, timeout):
2135   """Executes oob_program with given command on given node.
2136
2137   @param oob_program: The path to the executable oob_program
2138   @param command: The command to invoke on oob_program
2139   @param node: The node given as an argument to the program
2140   @param timeout: Timeout after which we kill the oob program
2141
2142   @return: stdout
2143   @raise RPCFail: If execution fails for some reason
2144
2145   """
2146   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2147
2148   if result.failed:
2149     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2150           result.fail_reason, result.output)
2151
2152   return result.stdout
2153
2154
2155 def _OSOndiskAPIVersion(os_dir):
2156   """Compute and return the API version of a given OS.
2157
2158   This function will try to read the API version of the OS residing in
2159   the 'os_dir' directory.
2160
2161   @type os_dir: str
2162   @param os_dir: the directory in which we should look for the OS
2163   @rtype: tuple
2164   @return: tuple (status, data) with status denoting the validity and
2165       data holding either the vaid versions or an error message
2166
2167   """
2168   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2169
2170   try:
2171     st = os.stat(api_file)
2172   except EnvironmentError, err:
2173     return False, ("Required file '%s' not found under path %s: %s" %
2174                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2175
2176   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2177     return False, ("File '%s' in %s is not a regular file" %
2178                    (constants.OS_API_FILE, os_dir))
2179
2180   try:
2181     api_versions = utils.ReadFile(api_file).splitlines()
2182   except EnvironmentError, err:
2183     return False, ("Error while reading the API version file at %s: %s" %
2184                    (api_file, utils.ErrnoOrStr(err)))
2185
2186   try:
2187     api_versions = [int(version.strip()) for version in api_versions]
2188   except (TypeError, ValueError), err:
2189     return False, ("API version(s) can't be converted to integer: %s" %
2190                    str(err))
2191
2192   return True, api_versions
2193
2194
2195 def DiagnoseOS(top_dirs=None):
2196   """Compute the validity for all OSes.
2197
2198   @type top_dirs: list
2199   @param top_dirs: the list of directories in which to
2200       search (if not given defaults to
2201       L{pathutils.OS_SEARCH_PATH})
2202   @rtype: list of L{objects.OS}
2203   @return: a list of tuples (name, path, status, diagnose, variants,
2204       parameters, api_version) for all (potential) OSes under all
2205       search paths, where:
2206           - name is the (potential) OS name
2207           - path is the full path to the OS
2208           - status True/False is the validity of the OS
2209           - diagnose is the error message for an invalid OS, otherwise empty
2210           - variants is a list of supported OS variants, if any
2211           - parameters is a list of (name, help) parameters, if any
2212           - api_version is a list of support OS API versions
2213
2214   """
2215   if top_dirs is None:
2216     top_dirs = pathutils.OS_SEARCH_PATH
2217
2218   result = []
2219   for dir_name in top_dirs:
2220     if os.path.isdir(dir_name):
2221       try:
2222         f_names = utils.ListVisibleFiles(dir_name)
2223       except EnvironmentError, err:
2224         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2225         break
2226       for name in f_names:
2227         os_path = utils.PathJoin(dir_name, name)
2228         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2229         if status:
2230           diagnose = ""
2231           variants = os_inst.supported_variants
2232           parameters = os_inst.supported_parameters
2233           api_versions = os_inst.api_versions
2234         else:
2235           diagnose = os_inst
2236           variants = parameters = api_versions = []
2237         result.append((name, os_path, status, diagnose, variants,
2238                        parameters, api_versions))
2239
2240   return result
2241
2242
2243 def _TryOSFromDisk(name, base_dir=None):
2244   """Create an OS instance from disk.
2245
2246   This function will return an OS instance if the given name is a
2247   valid OS name.
2248
2249   @type base_dir: string
2250   @keyword base_dir: Base directory containing OS installations.
2251                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2252   @rtype: tuple
2253   @return: success and either the OS instance if we find a valid one,
2254       or error message
2255
2256   """
2257   if base_dir is None:
2258     os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2259   else:
2260     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2261
2262   if os_dir is None:
2263     return False, "Directory for OS %s not found in search path" % name
2264
2265   status, api_versions = _OSOndiskAPIVersion(os_dir)
2266   if not status:
2267     # push the error up
2268     return status, api_versions
2269
2270   if not constants.OS_API_VERSIONS.intersection(api_versions):
2271     return False, ("API version mismatch for path '%s': found %s, want %s." %
2272                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2273
2274   # OS Files dictionary, we will populate it with the absolute path
2275   # names; if the value is True, then it is a required file, otherwise
2276   # an optional one
2277   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2278
2279   if max(api_versions) >= constants.OS_API_V15:
2280     os_files[constants.OS_VARIANTS_FILE] = False
2281
2282   if max(api_versions) >= constants.OS_API_V20:
2283     os_files[constants.OS_PARAMETERS_FILE] = True
2284   else:
2285     del os_files[constants.OS_SCRIPT_VERIFY]
2286
2287   for (filename, required) in os_files.items():
2288     os_files[filename] = utils.PathJoin(os_dir, filename)
2289
2290     try:
2291       st = os.stat(os_files[filename])
2292     except EnvironmentError, err:
2293       if err.errno == errno.ENOENT and not required:
2294         del os_files[filename]
2295         continue
2296       return False, ("File '%s' under path '%s' is missing (%s)" %
2297                      (filename, os_dir, utils.ErrnoOrStr(err)))
2298
2299     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2300       return False, ("File '%s' under path '%s' is not a regular file" %
2301                      (filename, os_dir))
2302
2303     if filename in constants.OS_SCRIPTS:
2304       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2305         return False, ("File '%s' under path '%s' is not executable" %
2306                        (filename, os_dir))
2307
2308   variants = []
2309   if constants.OS_VARIANTS_FILE in os_files:
2310     variants_file = os_files[constants.OS_VARIANTS_FILE]
2311     try:
2312       variants = \
2313         utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2314     except EnvironmentError, err:
2315       # we accept missing files, but not other errors
2316       if err.errno != errno.ENOENT:
2317         return False, ("Error while reading the OS variants file at %s: %s" %
2318                        (variants_file, utils.ErrnoOrStr(err)))
2319
2320   parameters = []
2321   if constants.OS_PARAMETERS_FILE in os_files:
2322     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2323     try:
2324       parameters = utils.ReadFile(parameters_file).splitlines()
2325     except EnvironmentError, err:
2326       return False, ("Error while reading the OS parameters file at %s: %s" %
2327                      (parameters_file, utils.ErrnoOrStr(err)))
2328     parameters = [v.split(None, 1) for v in parameters]
2329
2330   os_obj = objects.OS(name=name, path=os_dir,
2331                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2332                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2333                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2334                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2335                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2336                                                  None),
2337                       supported_variants=variants,
2338                       supported_parameters=parameters,
2339                       api_versions=api_versions)
2340   return True, os_obj
2341
2342
2343 def OSFromDisk(name, base_dir=None):
2344   """Create an OS instance from disk.
2345
2346   This function will return an OS instance if the given name is a
2347   valid OS name. Otherwise, it will raise an appropriate
2348   L{RPCFail} exception, detailing why this is not a valid OS.
2349
2350   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2351   an exception but returns true/false status data.
2352
2353   @type base_dir: string
2354   @keyword base_dir: Base directory containing OS installations.
2355                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2356   @rtype: L{objects.OS}
2357   @return: the OS instance if we find a valid one
2358   @raise RPCFail: if we don't find a valid OS
2359
2360   """
2361   name_only = objects.OS.GetName(name)
2362   status, payload = _TryOSFromDisk(name_only, base_dir)
2363
2364   if not status:
2365     _Fail(payload)
2366
2367   return payload
2368
2369
2370 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2371   """Calculate the basic environment for an os script.
2372
2373   @type os_name: str
2374   @param os_name: full operating system name (including variant)
2375   @type inst_os: L{objects.OS}
2376   @param inst_os: operating system for which the environment is being built
2377   @type os_params: dict
2378   @param os_params: the OS parameters
2379   @type debug: integer
2380   @param debug: debug level (0 or 1, for OS Api 10)
2381   @rtype: dict
2382   @return: dict of environment variables
2383   @raise errors.BlockDeviceError: if the block device
2384       cannot be found
2385
2386   """
2387   result = {}
2388   api_version = \
2389     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2390   result["OS_API_VERSION"] = "%d" % api_version
2391   result["OS_NAME"] = inst_os.name
2392   result["DEBUG_LEVEL"] = "%d" % debug
2393
2394   # OS variants
2395   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2396     variant = objects.OS.GetVariant(os_name)
2397     if not variant:
2398       variant = inst_os.supported_variants[0]
2399   else:
2400     variant = ""
2401   result["OS_VARIANT"] = variant
2402
2403   # OS params
2404   for pname, pvalue in os_params.items():
2405     result["OSP_%s" % pname.upper()] = pvalue
2406
2407   # Set a default path otherwise programs called by OS scripts (or
2408   # even hooks called from OS scripts) might break, and we don't want
2409   # to have each script require setting a PATH variable
2410   result["PATH"] = constants.HOOKS_PATH
2411
2412   return result
2413
2414
2415 def OSEnvironment(instance, inst_os, debug=0):
2416   """Calculate the environment for an os script.
2417
2418   @type instance: L{objects.Instance}
2419   @param instance: target instance for the os script run
2420   @type inst_os: L{objects.OS}
2421   @param inst_os: operating system for which the environment is being built
2422   @type debug: integer
2423   @param debug: debug level (0 or 1, for OS Api 10)
2424   @rtype: dict
2425   @return: dict of environment variables
2426   @raise errors.BlockDeviceError: if the block device
2427       cannot be found
2428
2429   """
2430   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2431
2432   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2433     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2434
2435   result["HYPERVISOR"] = instance.hypervisor
2436   result["DISK_COUNT"] = "%d" % len(instance.disks)
2437   result["NIC_COUNT"] = "%d" % len(instance.nics)
2438   result["INSTANCE_SECONDARY_NODES"] = \
2439       ("%s" % " ".join(instance.secondary_nodes))
2440
2441   # Disks
2442   for idx, disk in enumerate(instance.disks):
2443     real_disk = _OpenRealBD(disk)
2444     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2445     result["DISK_%d_ACCESS" % idx] = disk.mode
2446     if constants.HV_DISK_TYPE in instance.hvparams:
2447       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2448         instance.hvparams[constants.HV_DISK_TYPE]
2449     if disk.dev_type in constants.LDS_BLOCK:
2450       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2451     elif disk.dev_type == constants.LD_FILE:
2452       result["DISK_%d_BACKEND_TYPE" % idx] = \
2453         "file:%s" % disk.physical_id[0]
2454
2455   # NICs
2456   for idx, nic in enumerate(instance.nics):
2457     result["NIC_%d_MAC" % idx] = nic.mac
2458     if nic.ip:
2459       result["NIC_%d_IP" % idx] = nic.ip
2460     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2461     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2462       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2463     if nic.nicparams[constants.NIC_LINK]:
2464       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2465     if nic.network:
2466       result["NIC_%d_NETWORK" % idx] = nic.network
2467     if constants.HV_NIC_TYPE in instance.hvparams:
2468       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2469         instance.hvparams[constants.HV_NIC_TYPE]
2470
2471   # HV/BE params
2472   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2473     for key, value in source.items():
2474       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2475
2476   return result
2477
2478
2479 def BlockdevGrow(disk, amount, dryrun, backingstore):
2480   """Grow a stack of block devices.
2481
2482   This function is called recursively, with the childrens being the
2483   first ones to resize.
2484
2485   @type disk: L{objects.Disk}
2486   @param disk: the disk to be grown
2487   @type amount: integer
2488   @param amount: the amount (in mebibytes) to grow with
2489   @type dryrun: boolean
2490   @param dryrun: whether to execute the operation in simulation mode
2491       only, without actually increasing the size
2492   @param backingstore: whether to execute the operation on backing storage
2493       only, or on "logical" storage only; e.g. DRBD is logical storage,
2494       whereas LVM, file, RBD are backing storage
2495   @rtype: (status, result)
2496   @return: a tuple with the status of the operation (True/False), and
2497       the errors message if status is False
2498
2499   """
2500   r_dev = _RecursiveFindBD(disk)
2501   if r_dev is None:
2502     _Fail("Cannot find block device %s", disk)
2503
2504   try:
2505     r_dev.Grow(amount, dryrun, backingstore)
2506   except errors.BlockDeviceError, err:
2507     _Fail("Failed to grow block device: %s", err, exc=True)
2508
2509
2510 def BlockdevSnapshot(disk):
2511   """Create a snapshot copy of a block device.
2512
2513   This function is called recursively, and the snapshot is actually created
2514   just for the leaf lvm backend device.
2515
2516   @type disk: L{objects.Disk}
2517   @param disk: the disk to be snapshotted
2518   @rtype: string
2519   @return: snapshot disk ID as (vg, lv)
2520
2521   """
2522   if disk.dev_type == constants.LD_DRBD8:
2523     if not disk.children:
2524       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2525             disk.unique_id)
2526     return BlockdevSnapshot(disk.children[0])
2527   elif disk.dev_type == constants.LD_LV:
2528     r_dev = _RecursiveFindBD(disk)
2529     if r_dev is not None:
2530       # FIXME: choose a saner value for the snapshot size
2531       # let's stay on the safe side and ask for the full size, for now
2532       return r_dev.Snapshot(disk.size)
2533     else:
2534       _Fail("Cannot find block device %s", disk)
2535   else:
2536     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2537           disk.unique_id, disk.dev_type)
2538
2539
2540 def BlockdevSetInfo(disk, info):
2541   """Sets 'metadata' information on block devices.
2542
2543   This function sets 'info' metadata on block devices. Initial
2544   information is set at device creation; this function should be used
2545   for example after renames.
2546
2547   @type disk: L{objects.Disk}
2548   @param disk: the disk to be grown
2549   @type info: string
2550   @param info: new 'info' metadata
2551   @rtype: (status, result)
2552   @return: a tuple with the status of the operation (True/False), and
2553       the errors message if status is False
2554
2555   """
2556   r_dev = _RecursiveFindBD(disk)
2557   if r_dev is None:
2558     _Fail("Cannot find block device %s", disk)
2559
2560   try:
2561     r_dev.SetInfo(info)
2562   except errors.BlockDeviceError, err:
2563     _Fail("Failed to set information on block device: %s", err, exc=True)
2564
2565
2566 def FinalizeExport(instance, snap_disks):
2567   """Write out the export configuration information.
2568
2569   @type instance: L{objects.Instance}
2570   @param instance: the instance which we export, used for
2571       saving configuration
2572   @type snap_disks: list of L{objects.Disk}
2573   @param snap_disks: list of snapshot block devices, which
2574       will be used to get the actual name of the dump file
2575
2576   @rtype: None
2577
2578   """
2579   destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2580   finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2581
2582   config = objects.SerializableConfigParser()
2583
2584   config.add_section(constants.INISECT_EXP)
2585   config.set(constants.INISECT_EXP, "version", "0")
2586   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2587   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2588   config.set(constants.INISECT_EXP, "os", instance.os)
2589   config.set(constants.INISECT_EXP, "compression", "none")
2590
2591   config.add_section(constants.INISECT_INS)
2592   config.set(constants.INISECT_INS, "name", instance.name)
2593   config.set(constants.INISECT_INS, "maxmem", "%d" %
2594              instance.beparams[constants.BE_MAXMEM])
2595   config.set(constants.INISECT_INS, "minmem", "%d" %
2596              instance.beparams[constants.BE_MINMEM])
2597   # "memory" is deprecated, but useful for exporting to old ganeti versions
2598   config.set(constants.INISECT_INS, "memory", "%d" %
2599              instance.beparams[constants.BE_MAXMEM])
2600   config.set(constants.INISECT_INS, "vcpus", "%d" %
2601              instance.beparams[constants.BE_VCPUS])
2602   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2603   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2604   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2605
2606   nic_total = 0
2607   for nic_count, nic in enumerate(instance.nics):
2608     nic_total += 1
2609     config.set(constants.INISECT_INS, "nic%d_mac" %
2610                nic_count, "%s" % nic.mac)
2611     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2612     for param in constants.NICS_PARAMETER_TYPES:
2613       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2614                  "%s" % nic.nicparams.get(param, None))
2615   # TODO: redundant: on load can read nics until it doesn't exist
2616   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2617
2618   disk_total = 0
2619   for disk_count, disk in enumerate(snap_disks):
2620     if disk:
2621       disk_total += 1
2622       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2623                  ("%s" % disk.iv_name))
2624       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2625                  ("%s" % disk.physical_id[1]))
2626       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2627                  ("%d" % disk.size))
2628
2629   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2630
2631   # New-style hypervisor/backend parameters
2632
2633   config.add_section(constants.INISECT_HYP)
2634   for name, value in instance.hvparams.items():
2635     if name not in constants.HVC_GLOBALS:
2636       config.set(constants.INISECT_HYP, name, str(value))
2637
2638   config.add_section(constants.INISECT_BEP)
2639   for name, value in instance.beparams.items():
2640     config.set(constants.INISECT_BEP, name, str(value))
2641
2642   config.add_section(constants.INISECT_OSP)
2643   for name, value in instance.osparams.items():
2644     config.set(constants.INISECT_OSP, name, str(value))
2645
2646   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2647                   data=config.Dumps())
2648   shutil.rmtree(finaldestdir, ignore_errors=True)
2649   shutil.move(destdir, finaldestdir)
2650
2651
2652 def ExportInfo(dest):
2653   """Get export configuration information.
2654
2655   @type dest: str
2656   @param dest: directory containing the export
2657
2658   @rtype: L{objects.SerializableConfigParser}
2659   @return: a serializable config file containing the
2660       export info
2661
2662   """
2663   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2664
2665   config = objects.SerializableConfigParser()
2666   config.read(cff)
2667
2668   if (not config.has_section(constants.INISECT_EXP) or
2669       not config.has_section(constants.INISECT_INS)):
2670     _Fail("Export info file doesn't have the required fields")
2671
2672   return config.Dumps()
2673
2674
2675 def ListExports():
2676   """Return a list of exports currently available on this machine.
2677
2678   @rtype: list
2679   @return: list of the exports
2680
2681   """
2682   if os.path.isdir(pathutils.EXPORT_DIR):
2683     return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2684   else:
2685     _Fail("No exports directory")
2686
2687
2688 def RemoveExport(export):
2689   """Remove an existing export from the node.
2690
2691   @type export: str
2692   @param export: the name of the export to remove
2693   @rtype: None
2694
2695   """
2696   target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2697
2698   try:
2699     shutil.rmtree(target)
2700   except EnvironmentError, err:
2701     _Fail("Error while removing the export: %s", err, exc=True)
2702
2703
2704 def BlockdevRename(devlist):
2705   """Rename a list of block devices.
2706
2707   @type devlist: list of tuples
2708   @param devlist: list of tuples of the form  (disk,
2709       new_logical_id, new_physical_id); disk is an
2710       L{objects.Disk} object describing the current disk,
2711       and new logical_id/physical_id is the name we
2712       rename it to
2713   @rtype: boolean
2714   @return: True if all renames succeeded, False otherwise
2715
2716   """
2717   msgs = []
2718   result = True
2719   for disk, unique_id in devlist:
2720     dev = _RecursiveFindBD(disk)
2721     if dev is None:
2722       msgs.append("Can't find device %s in rename" % str(disk))
2723       result = False
2724       continue
2725     try:
2726       old_rpath = dev.dev_path
2727       dev.Rename(unique_id)
2728       new_rpath = dev.dev_path
2729       if old_rpath != new_rpath:
2730         DevCacheManager.RemoveCache(old_rpath)
2731         # FIXME: we should add the new cache information here, like:
2732         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2733         # but we don't have the owner here - maybe parse from existing
2734         # cache? for now, we only lose lvm data when we rename, which
2735         # is less critical than DRBD or MD
2736     except errors.BlockDeviceError, err:
2737       msgs.append("Can't rename device '%s' to '%s': %s" %
2738                   (dev, unique_id, err))
2739       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2740       result = False
2741   if not result:
2742     _Fail("; ".join(msgs))
2743
2744
2745 def _TransformFileStorageDir(fs_dir):
2746   """Checks whether given file_storage_dir is valid.
2747
2748   Checks wheter the given fs_dir is within the cluster-wide default
2749   file_storage_dir or the shared_file_storage_dir, which are stored in
2750   SimpleStore. Only paths under those directories are allowed.
2751
2752   @type fs_dir: str
2753   @param fs_dir: the path to check
2754
2755   @return: the normalized path if valid, None otherwise
2756
2757   """
2758   if not (constants.ENABLE_FILE_STORAGE or
2759           constants.ENABLE_SHARED_FILE_STORAGE):
2760     _Fail("File storage disabled at configure time")
2761
2762   bdev.CheckFileStoragePath(fs_dir)
2763
2764   return os.path.normpath(fs_dir)
2765
2766
2767 def CreateFileStorageDir(file_storage_dir):
2768   """Create file storage directory.
2769
2770   @type file_storage_dir: str
2771   @param file_storage_dir: directory to create
2772
2773   @rtype: tuple
2774   @return: tuple with first element a boolean indicating wheter dir
2775       creation was successful or not
2776
2777   """
2778   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2779   if os.path.exists(file_storage_dir):
2780     if not os.path.isdir(file_storage_dir):
2781       _Fail("Specified storage dir '%s' is not a directory",
2782             file_storage_dir)
2783   else:
2784     try:
2785       os.makedirs(file_storage_dir, 0750)
2786     except OSError, err:
2787       _Fail("Cannot create file storage directory '%s': %s",
2788             file_storage_dir, err, exc=True)
2789
2790
2791 def RemoveFileStorageDir(file_storage_dir):
2792   """Remove file storage directory.
2793
2794   Remove it only if it's empty. If not log an error and return.
2795
2796   @type file_storage_dir: str
2797   @param file_storage_dir: the directory we should cleanup
2798   @rtype: tuple (success,)
2799   @return: tuple of one element, C{success}, denoting
2800       whether the operation was successful
2801
2802   """
2803   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2804   if os.path.exists(file_storage_dir):
2805     if not os.path.isdir(file_storage_dir):
2806       _Fail("Specified Storage directory '%s' is not a directory",
2807             file_storage_dir)
2808     # deletes dir only if empty, otherwise we want to fail the rpc call
2809     try:
2810       os.rmdir(file_storage_dir)
2811     except OSError, err:
2812       _Fail("Cannot remove file storage directory '%s': %s",
2813             file_storage_dir, err)
2814
2815
2816 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2817   """Rename the file storage directory.
2818
2819   @type old_file_storage_dir: str
2820   @param old_file_storage_dir: the current path
2821   @type new_file_storage_dir: str
2822   @param new_file_storage_dir: the name we should rename to
2823   @rtype: tuple (success,)
2824   @return: tuple of one element, C{success}, denoting
2825       whether the operation was successful
2826
2827   """
2828   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2829   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2830   if not os.path.exists(new_file_storage_dir):
2831     if os.path.isdir(old_file_storage_dir):
2832       try:
2833         os.rename(old_file_storage_dir, new_file_storage_dir)
2834       except OSError, err:
2835         _Fail("Cannot rename '%s' to '%s': %s",
2836               old_file_storage_dir, new_file_storage_dir, err)
2837     else:
2838       _Fail("Specified storage dir '%s' is not a directory",
2839             old_file_storage_dir)
2840   else:
2841     if os.path.exists(old_file_storage_dir):
2842       _Fail("Cannot rename '%s' to '%s': both locations exist",
2843             old_file_storage_dir, new_file_storage_dir)
2844
2845
2846 def _EnsureJobQueueFile(file_name):
2847   """Checks whether the given filename is in the queue directory.
2848
2849   @type file_name: str
2850   @param file_name: the file name we should check
2851   @rtype: None
2852   @raises RPCFail: if the file is not valid
2853
2854   """
2855   if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2856     _Fail("Passed job queue file '%s' does not belong to"
2857           " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2858
2859
2860 def JobQueueUpdate(file_name, content):
2861   """Updates a file in the queue directory.
2862
2863   This is just a wrapper over L{utils.io.WriteFile}, with proper
2864   checking.
2865
2866   @type file_name: str
2867   @param file_name: the job file name
2868   @type content: str
2869   @param content: the new job contents
2870   @rtype: boolean
2871   @return: the success of the operation
2872
2873   """
2874   file_name = vcluster.LocalizeVirtualPath(file_name)
2875
2876   _EnsureJobQueueFile(file_name)
2877   getents = runtime.GetEnts()
2878
2879   # Write and replace the file atomically
2880   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2881                   gid=getents.masterd_gid)
2882
2883
2884 def JobQueueRename(old, new):
2885   """Renames a job queue file.
2886
2887   This is just a wrapper over os.rename with proper checking.
2888
2889   @type old: str
2890   @param old: the old (actual) file name
2891   @type new: str
2892   @param new: the desired file name
2893   @rtype: tuple
2894   @return: the success of the operation and payload
2895
2896   """
2897   old = vcluster.LocalizeVirtualPath(old)
2898   new = vcluster.LocalizeVirtualPath(new)
2899
2900   _EnsureJobQueueFile(old)
2901   _EnsureJobQueueFile(new)
2902
2903   getents = runtime.GetEnts()
2904
2905   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2906                    dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2907
2908
2909 def BlockdevClose(instance_name, disks):
2910   """Closes the given block devices.
2911
2912   This means they will be switched to secondary mode (in case of
2913   DRBD).
2914
2915   @param instance_name: if the argument is not empty, the symlinks
2916       of this instance will be removed
2917   @type disks: list of L{objects.Disk}
2918   @param disks: the list of disks to be closed
2919   @rtype: tuple (success, message)
2920   @return: a tuple of success and message, where success
2921       indicates the succes of the operation, and message
2922       which will contain the error details in case we
2923       failed
2924
2925   """
2926   bdevs = []
2927   for cf in disks:
2928     rd = _RecursiveFindBD(cf)
2929     if rd is None:
2930       _Fail("Can't find device %s", cf)
2931     bdevs.append(rd)
2932
2933   msg = []
2934   for rd in bdevs:
2935     try:
2936       rd.Close()
2937     except errors.BlockDeviceError, err:
2938       msg.append(str(err))
2939   if msg:
2940     _Fail("Can't make devices secondary: %s", ",".join(msg))
2941   else:
2942     if instance_name:
2943       _RemoveBlockDevLinks(instance_name, disks)
2944
2945
2946 def ValidateHVParams(hvname, hvparams):
2947   """Validates the given hypervisor parameters.
2948
2949   @type hvname: string
2950   @param hvname: the hypervisor name
2951   @type hvparams: dict
2952   @param hvparams: the hypervisor parameters to be validated
2953   @rtype: None
2954
2955   """
2956   try:
2957     hv_type = hypervisor.GetHypervisor(hvname)
2958     hv_type.ValidateParameters(hvparams)
2959   except errors.HypervisorError, err:
2960     _Fail(str(err), log=False)
2961
2962
2963 def _CheckOSPList(os_obj, parameters):
2964   """Check whether a list of parameters is supported by the OS.
2965
2966   @type os_obj: L{objects.OS}
2967   @param os_obj: OS object to check
2968   @type parameters: list
2969   @param parameters: the list of parameters to check
2970
2971   """
2972   supported = [v[0] for v in os_obj.supported_parameters]
2973   delta = frozenset(parameters).difference(supported)
2974   if delta:
2975     _Fail("The following parameters are not supported"
2976           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2977
2978
2979 def ValidateOS(required, osname, checks, osparams):
2980   """Validate the given OS' parameters.
2981
2982   @type required: boolean
2983   @param required: whether absence of the OS should translate into
2984       failure or not
2985   @type osname: string
2986   @param osname: the OS to be validated
2987   @type checks: list
2988   @param checks: list of the checks to run (currently only 'parameters')
2989   @type osparams: dict
2990   @param osparams: dictionary with OS parameters
2991   @rtype: boolean
2992   @return: True if the validation passed, or False if the OS was not
2993       found and L{required} was false
2994
2995   """
2996   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2997     _Fail("Unknown checks required for OS %s: %s", osname,
2998           set(checks).difference(constants.OS_VALIDATE_CALLS))
2999
3000   name_only = objects.OS.GetName(osname)
3001   status, tbv = _TryOSFromDisk(name_only, None)
3002
3003   if not status:
3004     if required:
3005       _Fail(tbv)
3006     else:
3007       return False
3008
3009   if max(tbv.api_versions) < constants.OS_API_V20:
3010     return True
3011
3012   if constants.OS_VALIDATE_PARAMETERS in checks:
3013     _CheckOSPList(tbv, osparams.keys())
3014
3015   validate_env = OSCoreEnv(osname, tbv, osparams)
3016   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3017                         cwd=tbv.path, reset_env=True)
3018   if result.failed:
3019     logging.error("os validate command '%s' returned error: %s output: %s",
3020                   result.cmd, result.fail_reason, result.output)
3021     _Fail("OS validation script failed (%s), output: %s",
3022           result.fail_reason, result.output, log=False)
3023
3024   return True
3025
3026
3027 def DemoteFromMC():
3028   """Demotes the current node from master candidate role.
3029
3030   """
3031   # try to ensure we're not the master by mistake
3032   master, myself = ssconf.GetMasterAndMyself()
3033   if master == myself:
3034     _Fail("ssconf status shows I'm the master node, will not demote")
3035
3036   result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3037   if not result.failed:
3038     _Fail("The master daemon is running, will not demote")
3039
3040   try:
3041     if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3042       utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3043   except EnvironmentError, err:
3044     if err.errno != errno.ENOENT:
3045       _Fail("Error while backing up cluster file: %s", err, exc=True)
3046
3047   utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3048
3049
3050 def _GetX509Filenames(cryptodir, name):
3051   """Returns the full paths for the private key and certificate.
3052
3053   """
3054   return (utils.PathJoin(cryptodir, name),
3055           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3056           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3057
3058
3059 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3060   """Creates a new X509 certificate for SSL/TLS.
3061
3062   @type validity: int
3063   @param validity: Validity in seconds
3064   @rtype: tuple; (string, string)
3065   @return: Certificate name and public part
3066
3067   """
3068   (key_pem, cert_pem) = \
3069     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3070                                      min(validity, _MAX_SSL_CERT_VALIDITY))
3071
3072   cert_dir = tempfile.mkdtemp(dir=cryptodir,
3073                               prefix="x509-%s-" % utils.TimestampForFilename())
3074   try:
3075     name = os.path.basename(cert_dir)
3076     assert len(name) > 5
3077
3078     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3079
3080     utils.WriteFile(key_file, mode=0400, data=key_pem)
3081     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3082
3083     # Never return private key as it shouldn't leave the node
3084     return (name, cert_pem)
3085   except Exception:
3086     shutil.rmtree(cert_dir, ignore_errors=True)
3087     raise
3088
3089
3090 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3091   """Removes a X509 certificate.
3092
3093   @type name: string
3094   @param name: Certificate name
3095
3096   """
3097   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3098
3099   utils.RemoveFile(key_file)
3100   utils.RemoveFile(cert_file)
3101
3102   try:
3103     os.rmdir(cert_dir)
3104   except EnvironmentError, err:
3105     _Fail("Cannot remove certificate directory '%s': %s",
3106           cert_dir, err)
3107
3108
3109 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3110   """Returns the command for the requested input/output.
3111
3112   @type instance: L{objects.Instance}
3113   @param instance: The instance object
3114   @param mode: Import/export mode
3115   @param ieio: Input/output type
3116   @param ieargs: Input/output arguments
3117
3118   """
3119   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3120
3121   env = None
3122   prefix = None
3123   suffix = None
3124   exp_size = None
3125
3126   if ieio == constants.IEIO_FILE:
3127     (filename, ) = ieargs
3128
3129     if not utils.IsNormAbsPath(filename):
3130       _Fail("Path '%s' is not normalized or absolute", filename)
3131
3132     real_filename = os.path.realpath(filename)
3133     directory = os.path.dirname(real_filename)
3134
3135     if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3136       _Fail("File '%s' is not under exports directory '%s': %s",
3137             filename, pathutils.EXPORT_DIR, real_filename)
3138
3139     # Create directory
3140     utils.Makedirs(directory, mode=0750)
3141
3142     quoted_filename = utils.ShellQuote(filename)
3143
3144     if mode == constants.IEM_IMPORT:
3145       suffix = "> %s" % quoted_filename
3146     elif mode == constants.IEM_EXPORT:
3147       suffix = "< %s" % quoted_filename
3148
3149       # Retrieve file size
3150       try:
3151         st = os.stat(filename)
3152       except EnvironmentError, err:
3153         logging.error("Can't stat(2) %s: %s", filename, err)
3154       else:
3155         exp_size = utils.BytesToMebibyte(st.st_size)
3156
3157   elif ieio == constants.IEIO_RAW_DISK:
3158     (disk, ) = ieargs
3159
3160     real_disk = _OpenRealBD(disk)
3161
3162     if mode == constants.IEM_IMPORT:
3163       # we set here a smaller block size as, due to transport buffering, more
3164       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3165       # is not already there or we pass a wrong path; we use notrunc to no
3166       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3167       # much memory; this means that at best, we flush every 64k, which will
3168       # not be very fast
3169       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3170                                     " bs=%s oflag=dsync"),
3171                                     real_disk.dev_path,
3172                                     str(64 * 1024))
3173
3174     elif mode == constants.IEM_EXPORT:
3175       # the block size on the read dd is 1MiB to match our units
3176       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3177                                    real_disk.dev_path,
3178                                    str(1024 * 1024), # 1 MB
3179                                    str(disk.size))
3180       exp_size = disk.size
3181
3182   elif ieio == constants.IEIO_SCRIPT:
3183     (disk, disk_index, ) = ieargs
3184
3185     assert isinstance(disk_index, (int, long))
3186
3187     real_disk = _OpenRealBD(disk)
3188
3189     inst_os = OSFromDisk(instance.os)
3190     env = OSEnvironment(instance, inst_os)
3191
3192     if mode == constants.IEM_IMPORT:
3193       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3194       env["IMPORT_INDEX"] = str(disk_index)
3195       script = inst_os.import_script
3196
3197     elif mode == constants.IEM_EXPORT:
3198       env["EXPORT_DEVICE"] = real_disk.dev_path
3199       env["EXPORT_INDEX"] = str(disk_index)
3200       script = inst_os.export_script
3201
3202     # TODO: Pass special environment only to script
3203     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3204
3205     if mode == constants.IEM_IMPORT:
3206       suffix = "| %s" % script_cmd
3207
3208     elif mode == constants.IEM_EXPORT:
3209       prefix = "%s |" % script_cmd
3210
3211     # Let script predict size
3212     exp_size = constants.IE_CUSTOM_SIZE
3213
3214   else:
3215     _Fail("Invalid %s I/O mode %r", mode, ieio)
3216
3217   return (env, prefix, suffix, exp_size)
3218
3219
3220 def _CreateImportExportStatusDir(prefix):
3221   """Creates status directory for import/export.
3222
3223   """
3224   return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3225                           prefix=("%s-%s-" %
3226                                   (prefix, utils.TimestampForFilename())))
3227
3228
3229 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3230                             ieio, ieioargs):
3231   """Starts an import or export daemon.
3232
3233   @param mode: Import/output mode
3234   @type opts: L{objects.ImportExportOptions}
3235   @param opts: Daemon options
3236   @type host: string
3237   @param host: Remote host for export (None for import)
3238   @type port: int
3239   @param port: Remote port for export (None for import)
3240   @type instance: L{objects.Instance}
3241   @param instance: Instance object
3242   @type component: string
3243   @param component: which part of the instance is transferred now,
3244       e.g. 'disk/0'
3245   @param ieio: Input/output type
3246   @param ieioargs: Input/output arguments
3247
3248   """
3249   if mode == constants.IEM_IMPORT:
3250     prefix = "import"
3251
3252     if not (host is None and port is None):
3253       _Fail("Can not specify host or port on import")
3254
3255   elif mode == constants.IEM_EXPORT:
3256     prefix = "export"
3257
3258     if host is None or port is None:
3259       _Fail("Host and port must be specified for an export")
3260
3261   else:
3262     _Fail("Invalid mode %r", mode)
3263
3264   if (opts.key_name is None) ^ (opts.ca_pem is None):
3265     _Fail("Cluster certificate can only be used for both key and CA")
3266
3267   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3268     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3269
3270   if opts.key_name is None:
3271     # Use server.pem
3272     key_path = pathutils.NODED_CERT_FILE
3273     cert_path = pathutils.NODED_CERT_FILE
3274     assert opts.ca_pem is None
3275   else:
3276     (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3277                                                  opts.key_name)
3278     assert opts.ca_pem is not None
3279
3280   for i in [key_path, cert_path]:
3281     if not os.path.exists(i):
3282       _Fail("File '%s' does not exist" % i)
3283
3284   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3285   try:
3286     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3287     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3288     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3289
3290     if opts.ca_pem is None:
3291       # Use server.pem
3292       ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3293     else:
3294       ca = opts.ca_pem
3295
3296     # Write CA file
3297     utils.WriteFile(ca_file, data=ca, mode=0400)
3298
3299     cmd = [
3300       pathutils.IMPORT_EXPORT_DAEMON,
3301       status_file, mode,
3302       "--key=%s" % key_path,
3303       "--cert=%s" % cert_path,
3304       "--ca=%s" % ca_file,
3305       ]
3306
3307     if host:
3308       cmd.append("--host=%s" % host)
3309
3310     if port:
3311       cmd.append("--port=%s" % port)
3312
3313     if opts.ipv6:
3314       cmd.append("--ipv6")
3315     else:
3316       cmd.append("--ipv4")
3317
3318     if opts.compress:
3319       cmd.append("--compress=%s" % opts.compress)
3320
3321     if opts.magic:
3322       cmd.append("--magic=%s" % opts.magic)
3323
3324     if exp_size is not None:
3325       cmd.append("--expected-size=%s" % exp_size)
3326
3327     if cmd_prefix:
3328       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3329
3330     if cmd_suffix:
3331       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3332
3333     if mode == constants.IEM_EXPORT:
3334       # Retry connection a few times when connecting to remote peer
3335       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3336       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3337     elif opts.connect_timeout is not None:
3338       assert mode == constants.IEM_IMPORT
3339       # Overall timeout for establishing connection while listening
3340       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3341
3342     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3343
3344     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3345     # support for receiving a file descriptor for output
3346     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3347                       output=logfile)
3348
3349     # The import/export name is simply the status directory name
3350     return os.path.basename(status_dir)
3351
3352   except Exception:
3353     shutil.rmtree(status_dir, ignore_errors=True)
3354     raise
3355
3356
3357 def GetImportExportStatus(names):
3358   """Returns import/export daemon status.
3359
3360   @type names: sequence
3361   @param names: List of names
3362   @rtype: List of dicts
3363   @return: Returns a list of the state of each named import/export or None if a
3364            status couldn't be read
3365
3366   """
3367   result = []
3368
3369   for name in names:
3370     status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3371                                  _IES_STATUS_FILE)
3372
3373     try:
3374       data = utils.ReadFile(status_file)
3375     except EnvironmentError, err:
3376       if err.errno != errno.ENOENT:
3377         raise
3378       data = None
3379
3380     if not data:
3381       result.append(None)
3382       continue
3383
3384     result.append(serializer.LoadJson(data))
3385
3386   return result
3387
3388
3389 def AbortImportExport(name):
3390   """Sends SIGTERM to a running import/export daemon.
3391
3392   """
3393   logging.info("Abort import/export %s", name)
3394
3395   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3396   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3397
3398   if pid:
3399     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3400                  name, pid)
3401     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3402
3403
3404 def CleanupImportExport(name):
3405   """Cleanup after an import or export.
3406
3407   If the import/export daemon is still running it's killed. Afterwards the
3408   whole status directory is removed.
3409
3410   """
3411   logging.info("Finalizing import/export %s", name)
3412
3413   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3414
3415   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3416
3417   if pid:
3418     logging.info("Import/export %s is still running with PID %s",
3419                  name, pid)
3420     utils.KillProcess(pid, waitpid=False)
3421
3422   shutil.rmtree(status_dir, ignore_errors=True)
3423
3424
3425 def _FindDisks(nodes_ip, disks):
3426   """Sets the physical ID on disks and returns the block devices.
3427
3428   """
3429   # set the correct physical ID
3430   my_name = netutils.Hostname.GetSysName()
3431   for cf in disks:
3432     cf.SetPhysicalID(my_name, nodes_ip)
3433
3434   bdevs = []
3435
3436   for cf in disks:
3437     rd = _RecursiveFindBD(cf)
3438     if rd is None:
3439       _Fail("Can't find device %s", cf)
3440     bdevs.append(rd)
3441   return bdevs
3442
3443
3444 def DrbdDisconnectNet(nodes_ip, disks):
3445   """Disconnects the network on a list of drbd devices.
3446
3447   """
3448   bdevs = _FindDisks(nodes_ip, disks)
3449
3450   # disconnect disks
3451   for rd in bdevs:
3452     try:
3453       rd.DisconnectNet()
3454     except errors.BlockDeviceError, err:
3455       _Fail("Can't change network configuration to standalone mode: %s",
3456             err, exc=True)
3457
3458
3459 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3460   """Attaches the network on a list of drbd devices.
3461
3462   """
3463   bdevs = _FindDisks(nodes_ip, disks)
3464
3465   if multimaster:
3466     for idx, rd in enumerate(bdevs):
3467       try:
3468         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3469       except EnvironmentError, err:
3470         _Fail("Can't create symlink: %s", err)
3471   # reconnect disks, switch to new master configuration and if
3472   # needed primary mode
3473   for rd in bdevs:
3474     try:
3475       rd.AttachNet(multimaster)
3476     except errors.BlockDeviceError, err:
3477       _Fail("Can't change network configuration: %s", err)
3478
3479   # wait until the disks are connected; we need to retry the re-attach
3480   # if the device becomes standalone, as this might happen if the one
3481   # node disconnects and reconnects in a different mode before the
3482   # other node reconnects; in this case, one or both of the nodes will
3483   # decide it has wrong configuration and switch to standalone
3484
3485   def _Attach():
3486     all_connected = True
3487
3488     for rd in bdevs:
3489       stats = rd.GetProcStatus()
3490
3491       all_connected = (all_connected and
3492                        (stats.is_connected or stats.is_in_resync))
3493
3494       if stats.is_standalone:
3495         # peer had different config info and this node became
3496         # standalone, even though this should not happen with the
3497         # new staged way of changing disk configs
3498         try:
3499           rd.AttachNet(multimaster)
3500         except errors.BlockDeviceError, err:
3501           _Fail("Can't change network configuration: %s", err)
3502
3503     if not all_connected:
3504       raise utils.RetryAgain()
3505
3506   try:
3507     # Start with a delay of 100 miliseconds and go up to 5 seconds
3508     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3509   except utils.RetryTimeout:
3510     _Fail("Timeout in disk reconnecting")
3511
3512   if multimaster:
3513     # change to primary mode
3514     for rd in bdevs:
3515       try:
3516         rd.Open()
3517       except errors.BlockDeviceError, err:
3518         _Fail("Can't change to primary mode: %s", err)
3519
3520
3521 def DrbdWaitSync(nodes_ip, disks):
3522   """Wait until DRBDs have synchronized.
3523
3524   """
3525   def _helper(rd):
3526     stats = rd.GetProcStatus()
3527     if not (stats.is_connected or stats.is_in_resync):
3528       raise utils.RetryAgain()
3529     return stats
3530
3531   bdevs = _FindDisks(nodes_ip, disks)
3532
3533   min_resync = 100
3534   alldone = True
3535   for rd in bdevs:
3536     try:
3537       # poll each second for 15 seconds
3538       stats = utils.Retry(_helper, 1, 15, args=[rd])
3539     except utils.RetryTimeout:
3540       stats = rd.GetProcStatus()
3541       # last check
3542       if not (stats.is_connected or stats.is_in_resync):
3543         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3544     alldone = alldone and (not stats.is_in_resync)
3545     if stats.sync_percent is not None:
3546       min_resync = min(min_resync, stats.sync_percent)
3547
3548   return (alldone, min_resync)
3549
3550
3551 def GetDrbdUsermodeHelper():
3552   """Returns DRBD usermode helper currently configured.
3553
3554   """
3555   try:
3556     return bdev.BaseDRBD.GetUsermodeHelper()
3557   except errors.BlockDeviceError, err:
3558     _Fail(str(err))
3559
3560
3561 def PowercycleNode(hypervisor_type):
3562   """Hard-powercycle the node.
3563
3564   Because we need to return first, and schedule the powercycle in the
3565   background, we won't be able to report failures nicely.
3566
3567   """
3568   hyper = hypervisor.GetHypervisor(hypervisor_type)
3569   try:
3570     pid = os.fork()
3571   except OSError:
3572     # if we can't fork, we'll pretend that we're in the child process
3573     pid = 0
3574   if pid > 0:
3575     return "Reboot scheduled in 5 seconds"
3576   # ensure the child is running on ram
3577   try:
3578     utils.Mlockall()
3579   except Exception: # pylint: disable=W0703
3580     pass
3581   time.sleep(5)
3582   hyper.PowercycleNode()
3583
3584
3585 def _VerifyRemoteCommandName(cmd):
3586   """Verifies a remote command name.
3587
3588   @type cmd: string
3589   @param cmd: Command name
3590   @rtype: tuple; (boolean, string or None)
3591   @return: The tuple's first element is the status; if C{False}, the second
3592     element is an error message string, otherwise it's C{None}
3593
3594   """
3595   if not cmd.strip():
3596     return (False, "Missing command name")
3597
3598   if os.path.basename(cmd) != cmd:
3599     return (False, "Invalid command name")
3600
3601   if not constants.EXT_PLUGIN_MASK.match(cmd):
3602     return (False, "Command name contains forbidden characters")
3603
3604   return (True, None)
3605
3606
3607 def _CommonRemoteCommandCheck(path, owner):
3608   """Common checks for remote command file system directories and files.
3609
3610   @type path: string
3611   @param path: Path to check
3612   @param owner: C{None} or tuple containing UID and GID
3613   @rtype: tuple; (boolean, string or C{os.stat} result)
3614   @return: The tuple's first element is the status; if C{False}, the second
3615     element is an error message string, otherwise it's the result of C{os.stat}
3616
3617   """
3618   if owner is None:
3619     # Default to root as owner
3620     owner = (0, 0)
3621
3622   try:
3623     st = os.stat(path)
3624   except EnvironmentError, err:
3625     return (False, "Can't stat(2) '%s': %s" % (path, err))
3626
3627   if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3628     return (False, "Permissions on '%s' are too permissive" % path)
3629
3630   if (st.st_uid, st.st_gid) != owner:
3631     (owner_uid, owner_gid) = owner
3632     return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3633
3634   return (True, st)
3635
3636
3637 def _VerifyRemoteCommandDirectory(path, _owner=None):
3638   """Verifies remote command directory.
3639
3640   @type path: string
3641   @param path: Path to check
3642   @rtype: tuple; (boolean, string or None)
3643   @return: The tuple's first element is the status; if C{False}, the second
3644     element is an error message string, otherwise it's C{None}
3645
3646   """
3647   (status, value) = _CommonRemoteCommandCheck(path, _owner)
3648
3649   if not status:
3650     return (False, value)
3651
3652   if not stat.S_ISDIR(value.st_mode):
3653     return (False, "Path '%s' is not a directory" % path)
3654
3655   return (True, None)
3656
3657
3658 def _VerifyRemoteCommand(path, cmd, _owner=None):
3659   """Verifies a whole remote command and returns its executable filename.
3660
3661   @type path: string
3662   @param path: Directory containing remote commands
3663   @type cmd: string
3664   @param cmd: Command name
3665   @rtype: tuple; (boolean, string)
3666   @return: The tuple's first element is the status; if C{False}, the second
3667     element is an error message string, otherwise the second element is the
3668     absolute path to the executable
3669
3670   """
3671   executable = utils.PathJoin(path, cmd)
3672
3673   (status, msg) = _CommonRemoteCommandCheck(executable, _owner)
3674
3675   if not status:
3676     return (False, msg)
3677
3678   if not utils.IsExecutable(executable):
3679     return (False, "access(2) thinks '%s' can't be executed" % executable)
3680
3681   return (True, executable)
3682
3683
3684 def _PrepareRemoteCommand(path, cmd,
3685                           _verify_dir=_VerifyRemoteCommandDirectory,
3686                           _verify_name=_VerifyRemoteCommandName,
3687                           _verify_cmd=_VerifyRemoteCommand):
3688   """Performs a number of tests on a remote command.
3689
3690   @type path: string
3691   @param path: Directory containing remote commands
3692   @type cmd: string
3693   @param cmd: Command name
3694   @return: Same as L{_VerifyRemoteCommand}
3695
3696   """
3697   # Verify the directory first
3698   (status, msg) = _verify_dir(path)
3699   if status:
3700     # Check command if everything was alright
3701     (status, msg) = _verify_name(cmd)
3702
3703   if not status:
3704     return (False, msg)
3705
3706   # Check actual executable
3707   return _verify_cmd(path, cmd)
3708
3709
3710 def RunRemoteCommand(cmd,
3711                      _lock_timeout=_RCMD_LOCK_TIMEOUT,
3712                      _lock_file=pathutils.REMOTE_COMMANDS_LOCK_FILE,
3713                      _path=pathutils.REMOTE_COMMANDS_DIR,
3714                      _sleep_fn=time.sleep,
3715                      _prepare_fn=_PrepareRemoteCommand,
3716                      _runcmd_fn=utils.RunCmd,
3717                      _enabled=constants.ENABLE_REMOTE_COMMANDS):
3718   """Executes a remote command after performing strict tests.
3719
3720   @type cmd: string
3721   @param cmd: Command name
3722   @rtype: string
3723   @return: Command output
3724   @raise RPCFail: In case of an error
3725
3726   """
3727   logging.info("Preparing to run remote command '%s'", cmd)
3728
3729   if not _enabled:
3730     _Fail("Remote commands disabled at configure time")
3731
3732   lock = None
3733   try:
3734     cmdresult = None
3735     try:
3736       lock = utils.FileLock.Open(_lock_file)
3737       lock.Exclusive(blocking=True, timeout=_lock_timeout)
3738
3739       (status, value) = _prepare_fn(_path, cmd)
3740
3741       if status:
3742         cmdresult = _runcmd_fn([value], env={}, reset_env=True,
3743                                postfork_fn=lambda _: lock.Unlock())
3744       else:
3745         logging.error(value)
3746     except Exception: # pylint: disable=W0703
3747       # Keep original error in log
3748       logging.exception("Caught exception")
3749
3750     if cmdresult is None:
3751       logging.info("Sleeping for %0.1f seconds before returning",
3752                    _RCMD_INVALID_DELAY)
3753       _sleep_fn(_RCMD_INVALID_DELAY)
3754
3755       # Do not include original error message in returned error
3756       _Fail("Executing command '%s' failed" % cmd)
3757     elif cmdresult.failed or cmdresult.fail_reason:
3758       _Fail("Remote command '%s' failed: %s; output: %s",
3759             cmd, cmdresult.fail_reason, cmdresult.output)
3760     else:
3761       return cmdresult.output
3762   finally:
3763     if lock is not None:
3764       # Release lock at last
3765       lock.Close()
3766       lock = None
3767
3768
3769 class HooksRunner(object):
3770   """Hook runner.
3771
3772   This class is instantiated on the node side (ganeti-noded) and not
3773   on the master side.
3774
3775   """
3776   def __init__(self, hooks_base_dir=None):
3777     """Constructor for hooks runner.
3778
3779     @type hooks_base_dir: str or None
3780     @param hooks_base_dir: if not None, this overrides the
3781         L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3782
3783     """
3784     if hooks_base_dir is None:
3785       hooks_base_dir = pathutils.HOOKS_BASE_DIR
3786     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3787     # constant
3788     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3789
3790   def RunLocalHooks(self, node_list, hpath, phase, env):
3791     """Check that the hooks will be run only locally and then run them.
3792
3793     """
3794     assert len(node_list) == 1
3795     node = node_list[0]
3796     _, myself = ssconf.GetMasterAndMyself()
3797     assert node == myself
3798
3799     results = self.RunHooks(hpath, phase, env)
3800
3801     # Return values in the form expected by HooksMaster
3802     return {node: (None, False, results)}
3803
3804   def RunHooks(self, hpath, phase, env):
3805     """Run the scripts in the hooks directory.
3806
3807     @type hpath: str
3808     @param hpath: the path to the hooks directory which
3809         holds the scripts
3810     @type phase: str
3811     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3812         L{constants.HOOKS_PHASE_POST}
3813     @type env: dict
3814     @param env: dictionary with the environment for the hook
3815     @rtype: list
3816     @return: list of 3-element tuples:
3817       - script path
3818       - script result, either L{constants.HKR_SUCCESS} or
3819         L{constants.HKR_FAIL}
3820       - output of the script
3821
3822     @raise errors.ProgrammerError: for invalid input
3823         parameters
3824
3825     """
3826     if phase == constants.HOOKS_PHASE_PRE:
3827       suffix = "pre"
3828     elif phase == constants.HOOKS_PHASE_POST:
3829       suffix = "post"
3830     else:
3831       _Fail("Unknown hooks phase '%s'", phase)
3832
3833     subdir = "%s-%s.d" % (hpath, suffix)
3834     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3835
3836     results = []
3837
3838     if not os.path.isdir(dir_name):
3839       # for non-existing/non-dirs, we simply exit instead of logging a
3840       # warning at every operation
3841       return results
3842
3843     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3844
3845     for (relname, relstatus, runresult) in runparts_results:
3846       if relstatus == constants.RUNPARTS_SKIP:
3847         rrval = constants.HKR_SKIP
3848         output = ""
3849       elif relstatus == constants.RUNPARTS_ERR:
3850         rrval = constants.HKR_FAIL
3851         output = "Hook script execution error: %s" % runresult
3852       elif relstatus == constants.RUNPARTS_RUN:
3853         if runresult.failed:
3854           rrval = constants.HKR_FAIL
3855         else:
3856           rrval = constants.HKR_SUCCESS
3857         output = utils.SafeEncode(runresult.output.strip())
3858       results.append(("%s/%s" % (subdir, relname), rrval, output))
3859
3860     return results
3861
3862
3863 class IAllocatorRunner(object):
3864   """IAllocator runner.
3865
3866   This class is instantiated on the node side (ganeti-noded) and not on
3867   the master side.
3868
3869   """
3870   @staticmethod
3871   def Run(name, idata):
3872     """Run an iallocator script.
3873
3874     @type name: str
3875     @param name: the iallocator script name
3876     @type idata: str
3877     @param idata: the allocator input data
3878
3879     @rtype: tuple
3880     @return: two element tuple of:
3881        - status
3882        - either error message or stdout of allocator (for success)
3883
3884     """
3885     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3886                                   os.path.isfile)
3887     if alloc_script is None:
3888       _Fail("iallocator module '%s' not found in the search path", name)
3889
3890     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3891     try:
3892       os.write(fd, idata)
3893       os.close(fd)
3894       result = utils.RunCmd([alloc_script, fin_name])
3895       if result.failed:
3896         _Fail("iallocator module '%s' failed: %s, output '%s'",
3897               name, result.fail_reason, result.output)
3898     finally:
3899       os.unlink(fin_name)
3900
3901     return result.stdout
3902
3903
3904 class DevCacheManager(object):
3905   """Simple class for managing a cache of block device information.
3906
3907   """
3908   _DEV_PREFIX = "/dev/"
3909   _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3910
3911   @classmethod
3912   def _ConvertPath(cls, dev_path):
3913     """Converts a /dev/name path to the cache file name.
3914
3915     This replaces slashes with underscores and strips the /dev
3916     prefix. It then returns the full path to the cache file.
3917
3918     @type dev_path: str
3919     @param dev_path: the C{/dev/} path name
3920     @rtype: str
3921     @return: the converted path name
3922
3923     """
3924     if dev_path.startswith(cls._DEV_PREFIX):
3925       dev_path = dev_path[len(cls._DEV_PREFIX):]
3926     dev_path = dev_path.replace("/", "_")
3927     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3928     return fpath
3929
3930   @classmethod
3931   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3932     """Updates the cache information for a given device.
3933
3934     @type dev_path: str
3935     @param dev_path: the pathname of the device
3936     @type owner: str
3937     @param owner: the owner (instance name) of the device
3938     @type on_primary: bool
3939     @param on_primary: whether this is the primary
3940         node nor not
3941     @type iv_name: str
3942     @param iv_name: the instance-visible name of the
3943         device, as in objects.Disk.iv_name
3944
3945     @rtype: None
3946
3947     """
3948     if dev_path is None:
3949       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3950       return
3951     fpath = cls._ConvertPath(dev_path)
3952     if on_primary:
3953       state = "primary"
3954     else:
3955       state = "secondary"
3956     if iv_name is None:
3957       iv_name = "not_visible"
3958     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3959     try:
3960       utils.WriteFile(fpath, data=fdata)
3961     except EnvironmentError, err:
3962       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3963
3964   @classmethod
3965   def RemoveCache(cls, dev_path):
3966     """Remove data for a dev_path.
3967
3968     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3969     path name and logging.
3970
3971     @type dev_path: str
3972     @param dev_path: the pathname of the device
3973
3974     @rtype: None
3975
3976     """
3977     if dev_path is None:
3978       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3979       return
3980     fpath = cls._ConvertPath(dev_path)
3981     try:
3982       utils.RemoveFile(fpath)
3983     except EnvironmentError, err:
3984       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)