Fix network opcode parameters
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
67
68
69 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70 _ALLOWED_CLEAN_DIRS = frozenset([
71   pathutils.DATA_DIR,
72   pathutils.JOB_QUEUE_ARCHIVE_DIR,
73   pathutils.QUEUE_DIR,
74   pathutils.CRYPTO_KEYS_DIR,
75   ])
76 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77 _X509_KEY_FILE = "key"
78 _X509_CERT_FILE = "cert"
79 _IES_STATUS_FILE = "status"
80 _IES_PID_FILE = "pid"
81 _IES_CA_FILE = "ca"
82
83 #: Valid LVS output line regex
84 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
85
86 # Actions for the master setup script
87 _MASTER_START = "start"
88 _MASTER_STOP = "stop"
89
90 #: Maximum file permissions for remote command directory and executables
91 _RCMD_MAX_MODE = (stat.S_IRWXU |
92                   stat.S_IRGRP | stat.S_IXGRP |
93                   stat.S_IROTH | stat.S_IXOTH)
94
95 #: Delay before returning an error for remote commands
96 _RCMD_INVALID_DELAY = 10
97
98 #: How long to wait to acquire lock for remote commands (shorter than
99 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
100 #: command requests arrive
101 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
102
103
104 class RPCFail(Exception):
105   """Class denoting RPC failure.
106
107   Its argument is the error message.
108
109   """
110
111
112 def _Fail(msg, *args, **kwargs):
113   """Log an error and the raise an RPCFail exception.
114
115   This exception is then handled specially in the ganeti daemon and
116   turned into a 'failed' return type. As such, this function is a
117   useful shortcut for logging the error and returning it to the master
118   daemon.
119
120   @type msg: string
121   @param msg: the text of the exception
122   @raise RPCFail
123
124   """
125   if args:
126     msg = msg % args
127   if "log" not in kwargs or kwargs["log"]: # if we should log this error
128     if "exc" in kwargs and kwargs["exc"]:
129       logging.exception(msg)
130     else:
131       logging.error(msg)
132   raise RPCFail(msg)
133
134
135 def _GetConfig():
136   """Simple wrapper to return a SimpleStore.
137
138   @rtype: L{ssconf.SimpleStore}
139   @return: a SimpleStore instance
140
141   """
142   return ssconf.SimpleStore()
143
144
145 def _GetSshRunner(cluster_name):
146   """Simple wrapper to return an SshRunner.
147
148   @type cluster_name: str
149   @param cluster_name: the cluster name, which is needed
150       by the SshRunner constructor
151   @rtype: L{ssh.SshRunner}
152   @return: an SshRunner instance
153
154   """
155   return ssh.SshRunner(cluster_name)
156
157
158 def _Decompress(data):
159   """Unpacks data compressed by the RPC client.
160
161   @type data: list or tuple
162   @param data: Data sent by RPC client
163   @rtype: str
164   @return: Decompressed data
165
166   """
167   assert isinstance(data, (list, tuple))
168   assert len(data) == 2
169   (encoding, content) = data
170   if encoding == constants.RPC_ENCODING_NONE:
171     return content
172   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
173     return zlib.decompress(base64.b64decode(content))
174   else:
175     raise AssertionError("Unknown data encoding")
176
177
178 def _CleanDirectory(path, exclude=None):
179   """Removes all regular files in a directory.
180
181   @type path: str
182   @param path: the directory to clean
183   @type exclude: list
184   @param exclude: list of files to be excluded, defaults
185       to the empty list
186
187   """
188   if path not in _ALLOWED_CLEAN_DIRS:
189     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
190           path)
191
192   if not os.path.isdir(path):
193     return
194   if exclude is None:
195     exclude = []
196   else:
197     # Normalize excluded paths
198     exclude = [os.path.normpath(i) for i in exclude]
199
200   for rel_name in utils.ListVisibleFiles(path):
201     full_name = utils.PathJoin(path, rel_name)
202     if full_name in exclude:
203       continue
204     if os.path.isfile(full_name) and not os.path.islink(full_name):
205       utils.RemoveFile(full_name)
206
207
208 def _BuildUploadFileList():
209   """Build the list of allowed upload files.
210
211   This is abstracted so that it's built only once at module import time.
212
213   """
214   allowed_files = set([
215     pathutils.CLUSTER_CONF_FILE,
216     pathutils.ETC_HOSTS,
217     pathutils.SSH_KNOWN_HOSTS_FILE,
218     pathutils.VNC_PASSWORD_FILE,
219     pathutils.RAPI_CERT_FILE,
220     pathutils.SPICE_CERT_FILE,
221     pathutils.SPICE_CACERT_FILE,
222     pathutils.RAPI_USERS_FILE,
223     pathutils.CONFD_HMAC_KEY,
224     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
225     ])
226
227   for hv_name in constants.HYPER_TYPES:
228     hv_class = hypervisor.GetHypervisorClass(hv_name)
229     allowed_files.update(hv_class.GetAncillaryFiles()[0])
230
231   assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
232     "Allowed file storage paths should never be uploaded via RPC"
233
234   return frozenset(allowed_files)
235
236
237 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
238
239
240 def JobQueuePurge():
241   """Removes job queue files and archived jobs.
242
243   @rtype: tuple
244   @return: True, None
245
246   """
247   _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
248   _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
249
250
251 def GetMasterInfo():
252   """Returns master information.
253
254   This is an utility function to compute master information, either
255   for consumption here or from the node daemon.
256
257   @rtype: tuple
258   @return: master_netdev, master_ip, master_name, primary_ip_family,
259     master_netmask
260   @raise RPCFail: in case of errors
261
262   """
263   try:
264     cfg = _GetConfig()
265     master_netdev = cfg.GetMasterNetdev()
266     master_ip = cfg.GetMasterIP()
267     master_netmask = cfg.GetMasterNetmask()
268     master_node = cfg.GetMasterNode()
269     primary_ip_family = cfg.GetPrimaryIPFamily()
270   except errors.ConfigurationError, err:
271     _Fail("Cluster configuration incomplete: %s", err, exc=True)
272   return (master_netdev, master_ip, master_node, primary_ip_family,
273           master_netmask)
274
275
276 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
277   """Decorator that runs hooks before and after the decorated function.
278
279   @type hook_opcode: string
280   @param hook_opcode: opcode of the hook
281   @type hooks_path: string
282   @param hooks_path: path of the hooks
283   @type env_builder_fn: function
284   @param env_builder_fn: function that returns a dictionary containing the
285     environment variables for the hooks. Will get all the parameters of the
286     decorated function.
287   @raise RPCFail: in case of pre-hook failure
288
289   """
290   def decorator(fn):
291     def wrapper(*args, **kwargs):
292       _, myself = ssconf.GetMasterAndMyself()
293       nodes = ([myself], [myself])  # these hooks run locally
294
295       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
296
297       cfg = _GetConfig()
298       hr = HooksRunner()
299       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
300                             None, env_fn, logging.warning, cfg.GetClusterName(),
301                             cfg.GetMasterNode())
302
303       hm.RunPhase(constants.HOOKS_PHASE_PRE)
304       result = fn(*args, **kwargs)
305       hm.RunPhase(constants.HOOKS_PHASE_POST)
306
307       return result
308     return wrapper
309   return decorator
310
311
312 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
313   """Builds environment variables for master IP hooks.
314
315   @type master_params: L{objects.MasterNetworkParameters}
316   @param master_params: network parameters of the master
317   @type use_external_mip_script: boolean
318   @param use_external_mip_script: whether to use an external master IP
319     address setup script (unused, but necessary per the implementation of the
320     _RunLocalHooks decorator)
321
322   """
323   # pylint: disable=W0613
324   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
325   env = {
326     "MASTER_NETDEV": master_params.netdev,
327     "MASTER_IP": master_params.ip,
328     "MASTER_NETMASK": str(master_params.netmask),
329     "CLUSTER_IP_VERSION": str(ver),
330   }
331
332   return env
333
334
335 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
336   """Execute the master IP address setup script.
337
338   @type master_params: L{objects.MasterNetworkParameters}
339   @param master_params: network parameters of the master
340   @type action: string
341   @param action: action to pass to the script. Must be one of
342     L{backend._MASTER_START} or L{backend._MASTER_STOP}
343   @type use_external_mip_script: boolean
344   @param use_external_mip_script: whether to use an external master IP
345     address setup script
346   @raise backend.RPCFail: if there are errors during the execution of the
347     script
348
349   """
350   env = _BuildMasterIpEnv(master_params)
351
352   if use_external_mip_script:
353     setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
354   else:
355     setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
356
357   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
358
359   if result.failed:
360     _Fail("Failed to %s the master IP. Script return value: %s" %
361           (action, result.exit_code), log=True)
362
363
364 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
365                _BuildMasterIpEnv)
366 def ActivateMasterIp(master_params, use_external_mip_script):
367   """Activate the IP address of the master daemon.
368
369   @type master_params: L{objects.MasterNetworkParameters}
370   @param master_params: network parameters of the master
371   @type use_external_mip_script: boolean
372   @param use_external_mip_script: whether to use an external master IP
373     address setup script
374   @raise RPCFail: in case of errors during the IP startup
375
376   """
377   _RunMasterSetupScript(master_params, _MASTER_START,
378                         use_external_mip_script)
379
380
381 def StartMasterDaemons(no_voting):
382   """Activate local node as master node.
383
384   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
385
386   @type no_voting: boolean
387   @param no_voting: whether to start ganeti-masterd without a node vote
388       but still non-interactively
389   @rtype: None
390
391   """
392
393   if no_voting:
394     masterd_args = "--no-voting --yes-do-it"
395   else:
396     masterd_args = ""
397
398   env = {
399     "EXTRA_MASTERD_ARGS": masterd_args,
400     }
401
402   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
403   if result.failed:
404     msg = "Can't start Ganeti master: %s" % result.output
405     logging.error(msg)
406     _Fail(msg)
407
408
409 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
410                _BuildMasterIpEnv)
411 def DeactivateMasterIp(master_params, use_external_mip_script):
412   """Deactivate the master IP on this node.
413
414   @type master_params: L{objects.MasterNetworkParameters}
415   @param master_params: network parameters of the master
416   @type use_external_mip_script: boolean
417   @param use_external_mip_script: whether to use an external master IP
418     address setup script
419   @raise RPCFail: in case of errors during the IP turndown
420
421   """
422   _RunMasterSetupScript(master_params, _MASTER_STOP,
423                         use_external_mip_script)
424
425
426 def StopMasterDaemons():
427   """Stop the master daemons on this node.
428
429   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
430
431   @rtype: None
432
433   """
434   # TODO: log and report back to the caller the error failures; we
435   # need to decide in which case we fail the RPC for this
436
437   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
438   if result.failed:
439     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
440                   " and error %s",
441                   result.cmd, result.exit_code, result.output)
442
443
444 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
445   """Change the netmask of the master IP.
446
447   @param old_netmask: the old value of the netmask
448   @param netmask: the new value of the netmask
449   @param master_ip: the master IP
450   @param master_netdev: the master network device
451
452   """
453   if old_netmask == netmask:
454     return
455
456   if not netutils.IPAddress.Own(master_ip):
457     _Fail("The master IP address is not up, not attempting to change its"
458           " netmask")
459
460   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
461                          "%s/%s" % (master_ip, netmask),
462                          "dev", master_netdev, "label",
463                          "%s:0" % master_netdev])
464   if result.failed:
465     _Fail("Could not set the new netmask on the master IP address")
466
467   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
468                          "%s/%s" % (master_ip, old_netmask),
469                          "dev", master_netdev, "label",
470                          "%s:0" % master_netdev])
471   if result.failed:
472     _Fail("Could not bring down the master IP address with the old netmask")
473
474
475 def EtcHostsModify(mode, host, ip):
476   """Modify a host entry in /etc/hosts.
477
478   @param mode: The mode to operate. Either add or remove entry
479   @param host: The host to operate on
480   @param ip: The ip associated with the entry
481
482   """
483   if mode == constants.ETC_HOSTS_ADD:
484     if not ip:
485       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
486               " present")
487     utils.AddHostToEtcHosts(host, ip)
488   elif mode == constants.ETC_HOSTS_REMOVE:
489     if ip:
490       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
491               " parameter is present")
492     utils.RemoveHostFromEtcHosts(host)
493   else:
494     RPCFail("Mode not supported")
495
496
497 def LeaveCluster(modify_ssh_setup):
498   """Cleans up and remove the current node.
499
500   This function cleans up and prepares the current node to be removed
501   from the cluster.
502
503   If processing is successful, then it raises an
504   L{errors.QuitGanetiException} which is used as a special case to
505   shutdown the node daemon.
506
507   @param modify_ssh_setup: boolean
508
509   """
510   _CleanDirectory(pathutils.DATA_DIR)
511   _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
512   JobQueuePurge()
513
514   if modify_ssh_setup:
515     try:
516       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
517
518       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
519
520       utils.RemoveFile(priv_key)
521       utils.RemoveFile(pub_key)
522     except errors.OpExecError:
523       logging.exception("Error while processing ssh files")
524
525   try:
526     utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
527     utils.RemoveFile(pathutils.RAPI_CERT_FILE)
528     utils.RemoveFile(pathutils.SPICE_CERT_FILE)
529     utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
530     utils.RemoveFile(pathutils.NODED_CERT_FILE)
531   except: # pylint: disable=W0702
532     logging.exception("Error while removing cluster secrets")
533
534   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
535   if result.failed:
536     logging.error("Command %s failed with exitcode %s and error %s",
537                   result.cmd, result.exit_code, result.output)
538
539   # Raise a custom exception (handled in ganeti-noded)
540   raise errors.QuitGanetiException(True, "Shutdown scheduled")
541
542
543 def _GetVgInfo(name):
544   """Retrieves information about a LVM volume group.
545
546   """
547   # TODO: GetVGInfo supports returning information for multiple VGs at once
548   vginfo = bdev.LogicalVolume.GetVGInfo([name])
549   if vginfo:
550     vg_free = int(round(vginfo[0][0], 0))
551     vg_size = int(round(vginfo[0][1], 0))
552   else:
553     vg_free = None
554     vg_size = None
555
556   return {
557     "name": name,
558     "vg_free": vg_free,
559     "vg_size": vg_size,
560     }
561
562
563 def _GetHvInfo(name):
564   """Retrieves node information from a hypervisor.
565
566   The information returned depends on the hypervisor. Common items:
567
568     - vg_size is the size of the configured volume group in MiB
569     - vg_free is the free size of the volume group in MiB
570     - memory_dom0 is the memory allocated for domain0 in MiB
571     - memory_free is the currently available (free) ram in MiB
572     - memory_total is the total number of ram in MiB
573     - hv_version: the hypervisor version, if available
574
575   """
576   return hypervisor.GetHypervisor(name).GetNodeInfo()
577
578
579 def _GetNamedNodeInfo(names, fn):
580   """Calls C{fn} for all names in C{names} and returns a dictionary.
581
582   @rtype: None or dict
583
584   """
585   if names is None:
586     return None
587   else:
588     return map(fn, names)
589
590
591 def GetNodeInfo(vg_names, hv_names):
592   """Gives back a hash with different information about the node.
593
594   @type vg_names: list of string
595   @param vg_names: Names of the volume groups to ask for disk space information
596   @type hv_names: list of string
597   @param hv_names: Names of the hypervisors to ask for node information
598   @rtype: tuple; (string, None/dict, None/dict)
599   @return: Tuple containing boot ID, volume group information and hypervisor
600     information
601
602   """
603   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
604   vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
605   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
606
607   return (bootid, vg_info, hv_info)
608
609
610 def VerifyNode(what, cluster_name):
611   """Verify the status of the local node.
612
613   Based on the input L{what} parameter, various checks are done on the
614   local node.
615
616   If the I{filelist} key is present, this list of
617   files is checksummed and the file/checksum pairs are returned.
618
619   If the I{nodelist} key is present, we check that we have
620   connectivity via ssh with the target nodes (and check the hostname
621   report).
622
623   If the I{node-net-test} key is present, we check that we have
624   connectivity to the given nodes via both primary IP and, if
625   applicable, secondary IPs.
626
627   @type what: C{dict}
628   @param what: a dictionary of things to check:
629       - filelist: list of files for which to compute checksums
630       - nodelist: list of nodes we should check ssh communication with
631       - node-net-test: list of nodes we should check node daemon port
632         connectivity with
633       - hypervisor: list with hypervisors to run the verify for
634   @rtype: dict
635   @return: a dictionary with the same keys as the input dict, and
636       values representing the result of the checks
637
638   """
639   result = {}
640   my_name = netutils.Hostname.GetSysName()
641   port = netutils.GetDaemonPort(constants.NODED)
642   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
643
644   if constants.NV_HYPERVISOR in what and vm_capable:
645     result[constants.NV_HYPERVISOR] = tmp = {}
646     for hv_name in what[constants.NV_HYPERVISOR]:
647       try:
648         val = hypervisor.GetHypervisor(hv_name).Verify()
649       except errors.HypervisorError, err:
650         val = "Error while checking hypervisor: %s" % str(err)
651       tmp[hv_name] = val
652
653   if constants.NV_HVPARAMS in what and vm_capable:
654     result[constants.NV_HVPARAMS] = tmp = []
655     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
656       try:
657         logging.info("Validating hv %s, %s", hv_name, hvparms)
658         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
659       except errors.HypervisorError, err:
660         tmp.append((source, hv_name, str(err)))
661
662   if constants.NV_FILELIST in what:
663     fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
664                                               what[constants.NV_FILELIST]))
665     result[constants.NV_FILELIST] = \
666       dict((vcluster.MakeVirtualPath(key), value)
667            for (key, value) in fingerprints.items())
668
669   if constants.NV_NODELIST in what:
670     (nodes, bynode) = what[constants.NV_NODELIST]
671
672     # Add nodes from other groups (different for each node)
673     try:
674       nodes.extend(bynode[my_name])
675     except KeyError:
676       pass
677
678     # Use a random order
679     random.shuffle(nodes)
680
681     # Try to contact all nodes
682     val = {}
683     for node in nodes:
684       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
685       if not success:
686         val[node] = message
687
688     result[constants.NV_NODELIST] = val
689
690   if constants.NV_NODENETTEST in what:
691     result[constants.NV_NODENETTEST] = tmp = {}
692     my_pip = my_sip = None
693     for name, pip, sip in what[constants.NV_NODENETTEST]:
694       if name == my_name:
695         my_pip = pip
696         my_sip = sip
697         break
698     if not my_pip:
699       tmp[my_name] = ("Can't find my own primary/secondary IP"
700                       " in the node list")
701     else:
702       for name, pip, sip in what[constants.NV_NODENETTEST]:
703         fail = []
704         if not netutils.TcpPing(pip, port, source=my_pip):
705           fail.append("primary")
706         if sip != pip:
707           if not netutils.TcpPing(sip, port, source=my_sip):
708             fail.append("secondary")
709         if fail:
710           tmp[name] = ("failure using the %s interface(s)" %
711                        " and ".join(fail))
712
713   if constants.NV_MASTERIP in what:
714     # FIXME: add checks on incoming data structures (here and in the
715     # rest of the function)
716     master_name, master_ip = what[constants.NV_MASTERIP]
717     if master_name == my_name:
718       source = constants.IP4_ADDRESS_LOCALHOST
719     else:
720       source = None
721     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
722                                                      source=source)
723
724   if constants.NV_USERSCRIPTS in what:
725     result[constants.NV_USERSCRIPTS] = \
726       [script for script in what[constants.NV_USERSCRIPTS]
727        if not utils.IsExecutable(script)]
728
729   if constants.NV_OOB_PATHS in what:
730     result[constants.NV_OOB_PATHS] = tmp = []
731     for path in what[constants.NV_OOB_PATHS]:
732       try:
733         st = os.stat(path)
734       except OSError, err:
735         tmp.append("error stating out of band helper: %s" % err)
736       else:
737         if stat.S_ISREG(st.st_mode):
738           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
739             tmp.append(None)
740           else:
741             tmp.append("out of band helper %s is not executable" % path)
742         else:
743           tmp.append("out of band helper %s is not a file" % path)
744
745   if constants.NV_LVLIST in what and vm_capable:
746     try:
747       val = GetVolumeList(utils.ListVolumeGroups().keys())
748     except RPCFail, err:
749       val = str(err)
750     result[constants.NV_LVLIST] = val
751
752   if constants.NV_INSTANCELIST in what and vm_capable:
753     # GetInstanceList can fail
754     try:
755       val = GetInstanceList(what[constants.NV_INSTANCELIST])
756     except RPCFail, err:
757       val = str(err)
758     result[constants.NV_INSTANCELIST] = val
759
760   if constants.NV_VGLIST in what and vm_capable:
761     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
762
763   if constants.NV_PVLIST in what and vm_capable:
764     result[constants.NV_PVLIST] = \
765       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
766                                    filter_allocatable=False)
767
768   if constants.NV_VERSION in what:
769     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
770                                     constants.RELEASE_VERSION)
771
772   if constants.NV_HVINFO in what and vm_capable:
773     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
774     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
775
776   if constants.NV_DRBDLIST in what and vm_capable:
777     try:
778       used_minors = bdev.DRBD8.GetUsedDevs().keys()
779     except errors.BlockDeviceError, err:
780       logging.warning("Can't get used minors list", exc_info=True)
781       used_minors = str(err)
782     result[constants.NV_DRBDLIST] = used_minors
783
784   if constants.NV_DRBDHELPER in what and vm_capable:
785     status = True
786     try:
787       payload = bdev.BaseDRBD.GetUsermodeHelper()
788     except errors.BlockDeviceError, err:
789       logging.error("Can't get DRBD usermode helper: %s", str(err))
790       status = False
791       payload = str(err)
792     result[constants.NV_DRBDHELPER] = (status, payload)
793
794   if constants.NV_NODESETUP in what:
795     result[constants.NV_NODESETUP] = tmpr = []
796     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
797       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
798                   " under /sys, missing required directories /sys/block"
799                   " and /sys/class/net")
800     if (not os.path.isdir("/proc/sys") or
801         not os.path.isfile("/proc/sysrq-trigger")):
802       tmpr.append("The procfs filesystem doesn't seem to be mounted"
803                   " under /proc, missing required directory /proc/sys and"
804                   " the file /proc/sysrq-trigger")
805
806   if constants.NV_TIME in what:
807     result[constants.NV_TIME] = utils.SplitTime(time.time())
808
809   if constants.NV_OSLIST in what and vm_capable:
810     result[constants.NV_OSLIST] = DiagnoseOS()
811
812   if constants.NV_BRIDGES in what and vm_capable:
813     result[constants.NV_BRIDGES] = [bridge
814                                     for bridge in what[constants.NV_BRIDGES]
815                                     if not utils.BridgeExists(bridge)]
816
817   if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
818     result[constants.NV_FILE_STORAGE_PATHS] = \
819       bdev.ComputeWrongFileStoragePaths()
820
821   return result
822
823
824 def GetBlockDevSizes(devices):
825   """Return the size of the given block devices
826
827   @type devices: list
828   @param devices: list of block device nodes to query
829   @rtype: dict
830   @return:
831     dictionary of all block devices under /dev (key). The value is their
832     size in MiB.
833
834     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
835
836   """
837   DEV_PREFIX = "/dev/"
838   blockdevs = {}
839
840   for devpath in devices:
841     if not utils.IsBelowDir(DEV_PREFIX, devpath):
842       continue
843
844     try:
845       st = os.stat(devpath)
846     except EnvironmentError, err:
847       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
848       continue
849
850     if stat.S_ISBLK(st.st_mode):
851       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
852       if result.failed:
853         # We don't want to fail, just do not list this device as available
854         logging.warning("Cannot get size for block device %s", devpath)
855         continue
856
857       size = int(result.stdout) / (1024 * 1024)
858       blockdevs[devpath] = size
859   return blockdevs
860
861
862 def GetVolumeList(vg_names):
863   """Compute list of logical volumes and their size.
864
865   @type vg_names: list
866   @param vg_names: the volume groups whose LVs we should list, or
867       empty for all volume groups
868   @rtype: dict
869   @return:
870       dictionary of all partions (key) with value being a tuple of
871       their size (in MiB), inactive and online status::
872
873         {'xenvg/test1': ('20.06', True, True)}
874
875       in case of errors, a string is returned with the error
876       details.
877
878   """
879   lvs = {}
880   sep = "|"
881   if not vg_names:
882     vg_names = []
883   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
884                          "--separator=%s" % sep,
885                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
886   if result.failed:
887     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
888
889   for line in result.stdout.splitlines():
890     line = line.strip()
891     match = _LVSLINE_REGEX.match(line)
892     if not match:
893       logging.error("Invalid line returned from lvs output: '%s'", line)
894       continue
895     vg_name, name, size, attr = match.groups()
896     inactive = attr[4] == "-"
897     online = attr[5] == "o"
898     virtual = attr[0] == "v"
899     if virtual:
900       # we don't want to report such volumes as existing, since they
901       # don't really hold data
902       continue
903     lvs[vg_name + "/" + name] = (size, inactive, online)
904
905   return lvs
906
907
908 def ListVolumeGroups():
909   """List the volume groups and their size.
910
911   @rtype: dict
912   @return: dictionary with keys volume name and values the
913       size of the volume
914
915   """
916   return utils.ListVolumeGroups()
917
918
919 def NodeVolumes():
920   """List all volumes on this node.
921
922   @rtype: list
923   @return:
924     A list of dictionaries, each having four keys:
925       - name: the logical volume name,
926       - size: the size of the logical volume
927       - dev: the physical device on which the LV lives
928       - vg: the volume group to which it belongs
929
930     In case of errors, we return an empty list and log the
931     error.
932
933     Note that since a logical volume can live on multiple physical
934     volumes, the resulting list might include a logical volume
935     multiple times.
936
937   """
938   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
939                          "--separator=|",
940                          "--options=lv_name,lv_size,devices,vg_name"])
941   if result.failed:
942     _Fail("Failed to list logical volumes, lvs output: %s",
943           result.output)
944
945   def parse_dev(dev):
946     return dev.split("(")[0]
947
948   def handle_dev(dev):
949     return [parse_dev(x) for x in dev.split(",")]
950
951   def map_line(line):
952     line = [v.strip() for v in line]
953     return [{"name": line[0], "size": line[1],
954              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
955
956   all_devs = []
957   for line in result.stdout.splitlines():
958     if line.count("|") >= 3:
959       all_devs.extend(map_line(line.split("|")))
960     else:
961       logging.warning("Strange line in the output from lvs: '%s'", line)
962   return all_devs
963
964
965 def BridgesExist(bridges_list):
966   """Check if a list of bridges exist on the current node.
967
968   @rtype: boolean
969   @return: C{True} if all of them exist, C{False} otherwise
970
971   """
972   missing = []
973   for bridge in bridges_list:
974     if not utils.BridgeExists(bridge):
975       missing.append(bridge)
976
977   if missing:
978     _Fail("Missing bridges %s", utils.CommaJoin(missing))
979
980
981 def GetInstanceList(hypervisor_list):
982   """Provides a list of instances.
983
984   @type hypervisor_list: list
985   @param hypervisor_list: the list of hypervisors to query information
986
987   @rtype: list
988   @return: a list of all running instances on the current node
989     - instance1.example.com
990     - instance2.example.com
991
992   """
993   results = []
994   for hname in hypervisor_list:
995     try:
996       names = hypervisor.GetHypervisor(hname).ListInstances()
997       results.extend(names)
998     except errors.HypervisorError, err:
999       _Fail("Error enumerating instances (hypervisor %s): %s",
1000             hname, err, exc=True)
1001
1002   return results
1003
1004
1005 def GetInstanceInfo(instance, hname):
1006   """Gives back the information about an instance as a dictionary.
1007
1008   @type instance: string
1009   @param instance: the instance name
1010   @type hname: string
1011   @param hname: the hypervisor type of the instance
1012
1013   @rtype: dict
1014   @return: dictionary with the following keys:
1015       - memory: memory size of instance (int)
1016       - state: xen state of instance (string)
1017       - time: cpu time of instance (float)
1018       - vcpus: the number of vcpus (int)
1019
1020   """
1021   output = {}
1022
1023   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1024   if iinfo is not None:
1025     output["memory"] = iinfo[2]
1026     output["vcpus"] = iinfo[3]
1027     output["state"] = iinfo[4]
1028     output["time"] = iinfo[5]
1029
1030   return output
1031
1032
1033 def GetInstanceMigratable(instance):
1034   """Gives whether an instance can be migrated.
1035
1036   @type instance: L{objects.Instance}
1037   @param instance: object representing the instance to be checked.
1038
1039   @rtype: tuple
1040   @return: tuple of (result, description) where:
1041       - result: whether the instance can be migrated or not
1042       - description: a description of the issue, if relevant
1043
1044   """
1045   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1046   iname = instance.name
1047   if iname not in hyper.ListInstances():
1048     _Fail("Instance %s is not running", iname)
1049
1050   for idx in range(len(instance.disks)):
1051     link_name = _GetBlockDevSymlinkPath(iname, idx)
1052     if not os.path.islink(link_name):
1053       logging.warning("Instance %s is missing symlink %s for disk %d",
1054                       iname, link_name, idx)
1055
1056
1057 def GetAllInstancesInfo(hypervisor_list):
1058   """Gather data about all instances.
1059
1060   This is the equivalent of L{GetInstanceInfo}, except that it
1061   computes data for all instances at once, thus being faster if one
1062   needs data about more than one instance.
1063
1064   @type hypervisor_list: list
1065   @param hypervisor_list: list of hypervisors to query for instance data
1066
1067   @rtype: dict
1068   @return: dictionary of instance: data, with data having the following keys:
1069       - memory: memory size of instance (int)
1070       - state: xen state of instance (string)
1071       - time: cpu time of instance (float)
1072       - vcpus: the number of vcpus
1073
1074   """
1075   output = {}
1076
1077   for hname in hypervisor_list:
1078     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1079     if iinfo:
1080       for name, _, memory, vcpus, state, times in iinfo:
1081         value = {
1082           "memory": memory,
1083           "vcpus": vcpus,
1084           "state": state,
1085           "time": times,
1086           }
1087         if name in output:
1088           # we only check static parameters, like memory and vcpus,
1089           # and not state and time which can change between the
1090           # invocations of the different hypervisors
1091           for key in "memory", "vcpus":
1092             if value[key] != output[name][key]:
1093               _Fail("Instance %s is running twice"
1094                     " with different parameters", name)
1095         output[name] = value
1096
1097   return output
1098
1099
1100 def _InstanceLogName(kind, os_name, instance, component):
1101   """Compute the OS log filename for a given instance and operation.
1102
1103   The instance name and os name are passed in as strings since not all
1104   operations have these as part of an instance object.
1105
1106   @type kind: string
1107   @param kind: the operation type (e.g. add, import, etc.)
1108   @type os_name: string
1109   @param os_name: the os name
1110   @type instance: string
1111   @param instance: the name of the instance being imported/added/etc.
1112   @type component: string or None
1113   @param component: the name of the component of the instance being
1114       transferred
1115
1116   """
1117   # TODO: Use tempfile.mkstemp to create unique filename
1118   if component:
1119     assert "/" not in component
1120     c_msg = "-%s" % component
1121   else:
1122     c_msg = ""
1123   base = ("%s-%s-%s%s-%s.log" %
1124           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1125   return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1126
1127
1128 def InstanceOsAdd(instance, reinstall, debug):
1129   """Add an OS to an instance.
1130
1131   @type instance: L{objects.Instance}
1132   @param instance: Instance whose OS is to be installed
1133   @type reinstall: boolean
1134   @param reinstall: whether this is an instance reinstall
1135   @type debug: integer
1136   @param debug: debug level, passed to the OS scripts
1137   @rtype: None
1138
1139   """
1140   inst_os = OSFromDisk(instance.os)
1141
1142   create_env = OSEnvironment(instance, inst_os, debug)
1143   if reinstall:
1144     create_env["INSTANCE_REINSTALL"] = "1"
1145
1146   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1147
1148   result = utils.RunCmd([inst_os.create_script], env=create_env,
1149                         cwd=inst_os.path, output=logfile, reset_env=True)
1150   if result.failed:
1151     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1152                   " output: %s", result.cmd, result.fail_reason, logfile,
1153                   result.output)
1154     lines = [utils.SafeEncode(val)
1155              for val in utils.TailFile(logfile, lines=20)]
1156     _Fail("OS create script failed (%s), last lines in the"
1157           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1158
1159
1160 def RunRenameInstance(instance, old_name, debug):
1161   """Run the OS rename script for an instance.
1162
1163   @type instance: L{objects.Instance}
1164   @param instance: Instance whose OS is to be installed
1165   @type old_name: string
1166   @param old_name: previous instance name
1167   @type debug: integer
1168   @param debug: debug level, passed to the OS scripts
1169   @rtype: boolean
1170   @return: the success of the operation
1171
1172   """
1173   inst_os = OSFromDisk(instance.os)
1174
1175   rename_env = OSEnvironment(instance, inst_os, debug)
1176   rename_env["OLD_INSTANCE_NAME"] = old_name
1177
1178   logfile = _InstanceLogName("rename", instance.os,
1179                              "%s-%s" % (old_name, instance.name), None)
1180
1181   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1182                         cwd=inst_os.path, output=logfile, reset_env=True)
1183
1184   if result.failed:
1185     logging.error("os create command '%s' returned error: %s output: %s",
1186                   result.cmd, result.fail_reason, result.output)
1187     lines = [utils.SafeEncode(val)
1188              for val in utils.TailFile(logfile, lines=20)]
1189     _Fail("OS rename script failed (%s), last lines in the"
1190           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1191
1192
1193 def _GetBlockDevSymlinkPath(instance_name, idx):
1194   return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1195                         (instance_name, constants.DISK_SEPARATOR, idx))
1196
1197
1198 def _SymlinkBlockDev(instance_name, device_path, idx):
1199   """Set up symlinks to a instance's block device.
1200
1201   This is an auxiliary function run when an instance is start (on the primary
1202   node) or when an instance is migrated (on the target node).
1203
1204
1205   @param instance_name: the name of the target instance
1206   @param device_path: path of the physical block device, on the node
1207   @param idx: the disk index
1208   @return: absolute path to the disk's symlink
1209
1210   """
1211   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1212   try:
1213     os.symlink(device_path, link_name)
1214   except OSError, err:
1215     if err.errno == errno.EEXIST:
1216       if (not os.path.islink(link_name) or
1217           os.readlink(link_name) != device_path):
1218         os.remove(link_name)
1219         os.symlink(device_path, link_name)
1220     else:
1221       raise
1222
1223   return link_name
1224
1225
1226 def _RemoveBlockDevLinks(instance_name, disks):
1227   """Remove the block device symlinks belonging to the given instance.
1228
1229   """
1230   for idx, _ in enumerate(disks):
1231     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1232     if os.path.islink(link_name):
1233       try:
1234         os.remove(link_name)
1235       except OSError:
1236         logging.exception("Can't remove symlink '%s'", link_name)
1237
1238
1239 def _GatherAndLinkBlockDevs(instance):
1240   """Set up an instance's block device(s).
1241
1242   This is run on the primary node at instance startup. The block
1243   devices must be already assembled.
1244
1245   @type instance: L{objects.Instance}
1246   @param instance: the instance whose disks we shoul assemble
1247   @rtype: list
1248   @return: list of (disk_object, device_path)
1249
1250   """
1251   block_devices = []
1252   for idx, disk in enumerate(instance.disks):
1253     device = _RecursiveFindBD(disk)
1254     if device is None:
1255       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1256                                     str(disk))
1257     device.Open()
1258     try:
1259       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1260     except OSError, e:
1261       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1262                                     e.strerror)
1263
1264     block_devices.append((disk, link_name))
1265
1266   return block_devices
1267
1268
1269 def StartInstance(instance, startup_paused):
1270   """Start an instance.
1271
1272   @type instance: L{objects.Instance}
1273   @param instance: the instance object
1274   @type startup_paused: bool
1275   @param instance: pause instance at startup?
1276   @rtype: None
1277
1278   """
1279   running_instances = GetInstanceList([instance.hypervisor])
1280
1281   if instance.name in running_instances:
1282     logging.info("Instance %s already running, not starting", instance.name)
1283     return
1284
1285   try:
1286     block_devices = _GatherAndLinkBlockDevs(instance)
1287     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1288     hyper.StartInstance(instance, block_devices, startup_paused)
1289   except errors.BlockDeviceError, err:
1290     _Fail("Block device error: %s", err, exc=True)
1291   except errors.HypervisorError, err:
1292     _RemoveBlockDevLinks(instance.name, instance.disks)
1293     _Fail("Hypervisor error: %s", err, exc=True)
1294
1295
1296 def InstanceShutdown(instance, timeout):
1297   """Shut an instance down.
1298
1299   @note: this functions uses polling with a hardcoded timeout.
1300
1301   @type instance: L{objects.Instance}
1302   @param instance: the instance object
1303   @type timeout: integer
1304   @param timeout: maximum timeout for soft shutdown
1305   @rtype: None
1306
1307   """
1308   hv_name = instance.hypervisor
1309   hyper = hypervisor.GetHypervisor(hv_name)
1310   iname = instance.name
1311
1312   if instance.name not in hyper.ListInstances():
1313     logging.info("Instance %s not running, doing nothing", iname)
1314     return
1315
1316   class _TryShutdown:
1317     def __init__(self):
1318       self.tried_once = False
1319
1320     def __call__(self):
1321       if iname not in hyper.ListInstances():
1322         return
1323
1324       try:
1325         hyper.StopInstance(instance, retry=self.tried_once)
1326       except errors.HypervisorError, err:
1327         if iname not in hyper.ListInstances():
1328           # if the instance is no longer existing, consider this a
1329           # success and go to cleanup
1330           return
1331
1332         _Fail("Failed to stop instance %s: %s", iname, err)
1333
1334       self.tried_once = True
1335
1336       raise utils.RetryAgain()
1337
1338   try:
1339     utils.Retry(_TryShutdown(), 5, timeout)
1340   except utils.RetryTimeout:
1341     # the shutdown did not succeed
1342     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1343
1344     try:
1345       hyper.StopInstance(instance, force=True)
1346     except errors.HypervisorError, err:
1347       if iname in hyper.ListInstances():
1348         # only raise an error if the instance still exists, otherwise
1349         # the error could simply be "instance ... unknown"!
1350         _Fail("Failed to force stop instance %s: %s", iname, err)
1351
1352     time.sleep(1)
1353
1354     if iname in hyper.ListInstances():
1355       _Fail("Could not shutdown instance %s even by destroy", iname)
1356
1357   try:
1358     hyper.CleanupInstance(instance.name)
1359   except errors.HypervisorError, err:
1360     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1361
1362   _RemoveBlockDevLinks(iname, instance.disks)
1363
1364
1365 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1366   """Reboot an instance.
1367
1368   @type instance: L{objects.Instance}
1369   @param instance: the instance object to reboot
1370   @type reboot_type: str
1371   @param reboot_type: the type of reboot, one the following
1372     constants:
1373       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1374         instance OS, do not recreate the VM
1375       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1376         restart the VM (at the hypervisor level)
1377       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1378         not accepted here, since that mode is handled differently, in
1379         cmdlib, and translates into full stop and start of the
1380         instance (instead of a call_instance_reboot RPC)
1381   @type shutdown_timeout: integer
1382   @param shutdown_timeout: maximum timeout for soft shutdown
1383   @rtype: None
1384
1385   """
1386   running_instances = GetInstanceList([instance.hypervisor])
1387
1388   if instance.name not in running_instances:
1389     _Fail("Cannot reboot instance %s that is not running", instance.name)
1390
1391   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1392   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1393     try:
1394       hyper.RebootInstance(instance)
1395     except errors.HypervisorError, err:
1396       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1397   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1398     try:
1399       InstanceShutdown(instance, shutdown_timeout)
1400       return StartInstance(instance, False)
1401     except errors.HypervisorError, err:
1402       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1403   else:
1404     _Fail("Invalid reboot_type received: %s", reboot_type)
1405
1406
1407 def InstanceBalloonMemory(instance, memory):
1408   """Resize an instance's memory.
1409
1410   @type instance: L{objects.Instance}
1411   @param instance: the instance object
1412   @type memory: int
1413   @param memory: new memory amount in MB
1414   @rtype: None
1415
1416   """
1417   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1418   running = hyper.ListInstances()
1419   if instance.name not in running:
1420     logging.info("Instance %s is not running, cannot balloon", instance.name)
1421     return
1422   try:
1423     hyper.BalloonInstanceMemory(instance, memory)
1424   except errors.HypervisorError, err:
1425     _Fail("Failed to balloon instance memory: %s", err, exc=True)
1426
1427
1428 def MigrationInfo(instance):
1429   """Gather information about an instance to be migrated.
1430
1431   @type instance: L{objects.Instance}
1432   @param instance: the instance definition
1433
1434   """
1435   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1436   try:
1437     info = hyper.MigrationInfo(instance)
1438   except errors.HypervisorError, err:
1439     _Fail("Failed to fetch migration information: %s", err, exc=True)
1440   return info
1441
1442
1443 def AcceptInstance(instance, info, target):
1444   """Prepare the node to accept an instance.
1445
1446   @type instance: L{objects.Instance}
1447   @param instance: the instance definition
1448   @type info: string/data (opaque)
1449   @param info: migration information, from the source node
1450   @type target: string
1451   @param target: target host (usually ip), on this node
1452
1453   """
1454   # TODO: why is this required only for DTS_EXT_MIRROR?
1455   if instance.disk_template in constants.DTS_EXT_MIRROR:
1456     # Create the symlinks, as the disks are not active
1457     # in any way
1458     try:
1459       _GatherAndLinkBlockDevs(instance)
1460     except errors.BlockDeviceError, err:
1461       _Fail("Block device error: %s", err, exc=True)
1462
1463   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1464   try:
1465     hyper.AcceptInstance(instance, info, target)
1466   except errors.HypervisorError, err:
1467     if instance.disk_template in constants.DTS_EXT_MIRROR:
1468       _RemoveBlockDevLinks(instance.name, instance.disks)
1469     _Fail("Failed to accept instance: %s", err, exc=True)
1470
1471
1472 def FinalizeMigrationDst(instance, info, success):
1473   """Finalize any preparation to accept an instance.
1474
1475   @type instance: L{objects.Instance}
1476   @param instance: the instance definition
1477   @type info: string/data (opaque)
1478   @param info: migration information, from the source node
1479   @type success: boolean
1480   @param success: whether the migration was a success or a failure
1481
1482   """
1483   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1484   try:
1485     hyper.FinalizeMigrationDst(instance, info, success)
1486   except errors.HypervisorError, err:
1487     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1488
1489
1490 def MigrateInstance(instance, target, live):
1491   """Migrates an instance to another node.
1492
1493   @type instance: L{objects.Instance}
1494   @param instance: the instance definition
1495   @type target: string
1496   @param target: the target node name
1497   @type live: boolean
1498   @param live: whether the migration should be done live or not (the
1499       interpretation of this parameter is left to the hypervisor)
1500   @raise RPCFail: if migration fails for some reason
1501
1502   """
1503   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1504
1505   try:
1506     hyper.MigrateInstance(instance, target, live)
1507   except errors.HypervisorError, err:
1508     _Fail("Failed to migrate instance: %s", err, exc=True)
1509
1510
1511 def FinalizeMigrationSource(instance, success, live):
1512   """Finalize the instance migration on the source node.
1513
1514   @type instance: L{objects.Instance}
1515   @param instance: the instance definition of the migrated instance
1516   @type success: bool
1517   @param success: whether the migration succeeded or not
1518   @type live: bool
1519   @param live: whether the user requested a live migration or not
1520   @raise RPCFail: If the execution fails for some reason
1521
1522   """
1523   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1524
1525   try:
1526     hyper.FinalizeMigrationSource(instance, success, live)
1527   except Exception, err:  # pylint: disable=W0703
1528     _Fail("Failed to finalize the migration on the source node: %s", err,
1529           exc=True)
1530
1531
1532 def GetMigrationStatus(instance):
1533   """Get the migration status
1534
1535   @type instance: L{objects.Instance}
1536   @param instance: the instance that is being migrated
1537   @rtype: L{objects.MigrationStatus}
1538   @return: the status of the current migration (one of
1539            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1540            progress info that can be retrieved from the hypervisor
1541   @raise RPCFail: If the migration status cannot be retrieved
1542
1543   """
1544   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1545   try:
1546     return hyper.GetMigrationStatus(instance)
1547   except Exception, err:  # pylint: disable=W0703
1548     _Fail("Failed to get migration status: %s", err, exc=True)
1549
1550
1551 def BlockdevCreate(disk, size, owner, on_primary, info):
1552   """Creates a block device for an instance.
1553
1554   @type disk: L{objects.Disk}
1555   @param disk: the object describing the disk we should create
1556   @type size: int
1557   @param size: the size of the physical underlying device, in MiB
1558   @type owner: str
1559   @param owner: the name of the instance for which disk is created,
1560       used for device cache data
1561   @type on_primary: boolean
1562   @param on_primary:  indicates if it is the primary node or not
1563   @type info: string
1564   @param info: string that will be sent to the physical device
1565       creation, used for example to set (LVM) tags on LVs
1566
1567   @return: the new unique_id of the device (this can sometime be
1568       computed only after creation), or None. On secondary nodes,
1569       it's not required to return anything.
1570
1571   """
1572   # TODO: remove the obsolete "size" argument
1573   # pylint: disable=W0613
1574   clist = []
1575   if disk.children:
1576     for child in disk.children:
1577       try:
1578         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1579       except errors.BlockDeviceError, err:
1580         _Fail("Can't assemble device %s: %s", child, err)
1581       if on_primary or disk.AssembleOnSecondary():
1582         # we need the children open in case the device itself has to
1583         # be assembled
1584         try:
1585           # pylint: disable=E1103
1586           crdev.Open()
1587         except errors.BlockDeviceError, err:
1588           _Fail("Can't make child '%s' read-write: %s", child, err)
1589       clist.append(crdev)
1590
1591   try:
1592     device = bdev.Create(disk, clist)
1593   except errors.BlockDeviceError, err:
1594     _Fail("Can't create block device: %s", err)
1595
1596   if on_primary or disk.AssembleOnSecondary():
1597     try:
1598       device.Assemble()
1599     except errors.BlockDeviceError, err:
1600       _Fail("Can't assemble device after creation, unusual event: %s", err)
1601     if on_primary or disk.OpenOnSecondary():
1602       try:
1603         device.Open(force=True)
1604       except errors.BlockDeviceError, err:
1605         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1606     DevCacheManager.UpdateCache(device.dev_path, owner,
1607                                 on_primary, disk.iv_name)
1608
1609   device.SetInfo(info)
1610
1611   return device.unique_id
1612
1613
1614 def _WipeDevice(path, offset, size):
1615   """This function actually wipes the device.
1616
1617   @param path: The path to the device to wipe
1618   @param offset: The offset in MiB in the file
1619   @param size: The size in MiB to write
1620
1621   """
1622   # Internal sizes are always in Mebibytes; if the following "dd" command
1623   # should use a different block size the offset and size given to this
1624   # function must be adjusted accordingly before being passed to "dd".
1625   block_size = 1024 * 1024
1626
1627   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1628          "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1629          "count=%d" % size]
1630   result = utils.RunCmd(cmd)
1631
1632   if result.failed:
1633     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1634           result.fail_reason, result.output)
1635
1636
1637 def BlockdevWipe(disk, offset, size):
1638   """Wipes a block device.
1639
1640   @type disk: L{objects.Disk}
1641   @param disk: the disk object we want to wipe
1642   @type offset: int
1643   @param offset: The offset in MiB in the file
1644   @type size: int
1645   @param size: The size in MiB to write
1646
1647   """
1648   try:
1649     rdev = _RecursiveFindBD(disk)
1650   except errors.BlockDeviceError:
1651     rdev = None
1652
1653   if not rdev:
1654     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1655
1656   # Do cross verify some of the parameters
1657   if offset < 0:
1658     _Fail("Negative offset")
1659   if size < 0:
1660     _Fail("Negative size")
1661   if offset > rdev.size:
1662     _Fail("Offset is bigger than device size")
1663   if (offset + size) > rdev.size:
1664     _Fail("The provided offset and size to wipe is bigger than device size")
1665
1666   _WipeDevice(rdev.dev_path, offset, size)
1667
1668
1669 def BlockdevPauseResumeSync(disks, pause):
1670   """Pause or resume the sync of the block device.
1671
1672   @type disks: list of L{objects.Disk}
1673   @param disks: the disks object we want to pause/resume
1674   @type pause: bool
1675   @param pause: Wheater to pause or resume
1676
1677   """
1678   success = []
1679   for disk in disks:
1680     try:
1681       rdev = _RecursiveFindBD(disk)
1682     except errors.BlockDeviceError:
1683       rdev = None
1684
1685     if not rdev:
1686       success.append((False, ("Cannot change sync for device %s:"
1687                               " device not found" % disk.iv_name)))
1688       continue
1689
1690     result = rdev.PauseResumeSync(pause)
1691
1692     if result:
1693       success.append((result, None))
1694     else:
1695       if pause:
1696         msg = "Pause"
1697       else:
1698         msg = "Resume"
1699       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1700
1701   return success
1702
1703
1704 def BlockdevRemove(disk):
1705   """Remove a block device.
1706
1707   @note: This is intended to be called recursively.
1708
1709   @type disk: L{objects.Disk}
1710   @param disk: the disk object we should remove
1711   @rtype: boolean
1712   @return: the success of the operation
1713
1714   """
1715   msgs = []
1716   try:
1717     rdev = _RecursiveFindBD(disk)
1718   except errors.BlockDeviceError, err:
1719     # probably can't attach
1720     logging.info("Can't attach to device %s in remove", disk)
1721     rdev = None
1722   if rdev is not None:
1723     r_path = rdev.dev_path
1724     try:
1725       rdev.Remove()
1726     except errors.BlockDeviceError, err:
1727       msgs.append(str(err))
1728     if not msgs:
1729       DevCacheManager.RemoveCache(r_path)
1730
1731   if disk.children:
1732     for child in disk.children:
1733       try:
1734         BlockdevRemove(child)
1735       except RPCFail, err:
1736         msgs.append(str(err))
1737
1738   if msgs:
1739     _Fail("; ".join(msgs))
1740
1741
1742 def _RecursiveAssembleBD(disk, owner, as_primary):
1743   """Activate a block device for an instance.
1744
1745   This is run on the primary and secondary nodes for an instance.
1746
1747   @note: this function is called recursively.
1748
1749   @type disk: L{objects.Disk}
1750   @param disk: the disk we try to assemble
1751   @type owner: str
1752   @param owner: the name of the instance which owns the disk
1753   @type as_primary: boolean
1754   @param as_primary: if we should make the block device
1755       read/write
1756
1757   @return: the assembled device or None (in case no device
1758       was assembled)
1759   @raise errors.BlockDeviceError: in case there is an error
1760       during the activation of the children or the device
1761       itself
1762
1763   """
1764   children = []
1765   if disk.children:
1766     mcn = disk.ChildrenNeeded()
1767     if mcn == -1:
1768       mcn = 0 # max number of Nones allowed
1769     else:
1770       mcn = len(disk.children) - mcn # max number of Nones
1771     for chld_disk in disk.children:
1772       try:
1773         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1774       except errors.BlockDeviceError, err:
1775         if children.count(None) >= mcn:
1776           raise
1777         cdev = None
1778         logging.error("Error in child activation (but continuing): %s",
1779                       str(err))
1780       children.append(cdev)
1781
1782   if as_primary or disk.AssembleOnSecondary():
1783     r_dev = bdev.Assemble(disk, children)
1784     result = r_dev
1785     if as_primary or disk.OpenOnSecondary():
1786       r_dev.Open()
1787     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1788                                 as_primary, disk.iv_name)
1789
1790   else:
1791     result = True
1792   return result
1793
1794
1795 def BlockdevAssemble(disk, owner, as_primary, idx):
1796   """Activate a block device for an instance.
1797
1798   This is a wrapper over _RecursiveAssembleBD.
1799
1800   @rtype: str or boolean
1801   @return: a C{/dev/...} path for primary nodes, and
1802       C{True} for secondary nodes
1803
1804   """
1805   try:
1806     result = _RecursiveAssembleBD(disk, owner, as_primary)
1807     if isinstance(result, bdev.BlockDev):
1808       # pylint: disable=E1103
1809       result = result.dev_path
1810       if as_primary:
1811         _SymlinkBlockDev(owner, result, idx)
1812   except errors.BlockDeviceError, err:
1813     _Fail("Error while assembling disk: %s", err, exc=True)
1814   except OSError, err:
1815     _Fail("Error while symlinking disk: %s", err, exc=True)
1816
1817   return result
1818
1819
1820 def BlockdevShutdown(disk):
1821   """Shut down a block device.
1822
1823   First, if the device is assembled (Attach() is successful), then
1824   the device is shutdown. Then the children of the device are
1825   shutdown.
1826
1827   This function is called recursively. Note that we don't cache the
1828   children or such, as oppossed to assemble, shutdown of different
1829   devices doesn't require that the upper device was active.
1830
1831   @type disk: L{objects.Disk}
1832   @param disk: the description of the disk we should
1833       shutdown
1834   @rtype: None
1835
1836   """
1837   msgs = []
1838   r_dev = _RecursiveFindBD(disk)
1839   if r_dev is not None:
1840     r_path = r_dev.dev_path
1841     try:
1842       r_dev.Shutdown()
1843       DevCacheManager.RemoveCache(r_path)
1844     except errors.BlockDeviceError, err:
1845       msgs.append(str(err))
1846
1847   if disk.children:
1848     for child in disk.children:
1849       try:
1850         BlockdevShutdown(child)
1851       except RPCFail, err:
1852         msgs.append(str(err))
1853
1854   if msgs:
1855     _Fail("; ".join(msgs))
1856
1857
1858 def BlockdevAddchildren(parent_cdev, new_cdevs):
1859   """Extend a mirrored block device.
1860
1861   @type parent_cdev: L{objects.Disk}
1862   @param parent_cdev: the disk to which we should add children
1863   @type new_cdevs: list of L{objects.Disk}
1864   @param new_cdevs: the list of children which we should add
1865   @rtype: None
1866
1867   """
1868   parent_bdev = _RecursiveFindBD(parent_cdev)
1869   if parent_bdev is None:
1870     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1871   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1872   if new_bdevs.count(None) > 0:
1873     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1874   parent_bdev.AddChildren(new_bdevs)
1875
1876
1877 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1878   """Shrink a mirrored block device.
1879
1880   @type parent_cdev: L{objects.Disk}
1881   @param parent_cdev: the disk from which we should remove children
1882   @type new_cdevs: list of L{objects.Disk}
1883   @param new_cdevs: the list of children which we should remove
1884   @rtype: None
1885
1886   """
1887   parent_bdev = _RecursiveFindBD(parent_cdev)
1888   if parent_bdev is None:
1889     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1890   devs = []
1891   for disk in new_cdevs:
1892     rpath = disk.StaticDevPath()
1893     if rpath is None:
1894       bd = _RecursiveFindBD(disk)
1895       if bd is None:
1896         _Fail("Can't find device %s while removing children", disk)
1897       else:
1898         devs.append(bd.dev_path)
1899     else:
1900       if not utils.IsNormAbsPath(rpath):
1901         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1902       devs.append(rpath)
1903   parent_bdev.RemoveChildren(devs)
1904
1905
1906 def BlockdevGetmirrorstatus(disks):
1907   """Get the mirroring status of a list of devices.
1908
1909   @type disks: list of L{objects.Disk}
1910   @param disks: the list of disks which we should query
1911   @rtype: disk
1912   @return: List of L{objects.BlockDevStatus}, one for each disk
1913   @raise errors.BlockDeviceError: if any of the disks cannot be
1914       found
1915
1916   """
1917   stats = []
1918   for dsk in disks:
1919     rbd = _RecursiveFindBD(dsk)
1920     if rbd is None:
1921       _Fail("Can't find device %s", dsk)
1922
1923     stats.append(rbd.CombinedSyncStatus())
1924
1925   return stats
1926
1927
1928 def BlockdevGetmirrorstatusMulti(disks):
1929   """Get the mirroring status of a list of devices.
1930
1931   @type disks: list of L{objects.Disk}
1932   @param disks: the list of disks which we should query
1933   @rtype: disk
1934   @return: List of tuples, (bool, status), one for each disk; bool denotes
1935     success/failure, status is L{objects.BlockDevStatus} on success, string
1936     otherwise
1937
1938   """
1939   result = []
1940   for disk in disks:
1941     try:
1942       rbd = _RecursiveFindBD(disk)
1943       if rbd is None:
1944         result.append((False, "Can't find device %s" % disk))
1945         continue
1946
1947       status = rbd.CombinedSyncStatus()
1948     except errors.BlockDeviceError, err:
1949       logging.exception("Error while getting disk status")
1950       result.append((False, str(err)))
1951     else:
1952       result.append((True, status))
1953
1954   assert len(disks) == len(result)
1955
1956   return result
1957
1958
1959 def _RecursiveFindBD(disk):
1960   """Check if a device is activated.
1961
1962   If so, return information about the real device.
1963
1964   @type disk: L{objects.Disk}
1965   @param disk: the disk object we need to find
1966
1967   @return: None if the device can't be found,
1968       otherwise the device instance
1969
1970   """
1971   children = []
1972   if disk.children:
1973     for chdisk in disk.children:
1974       children.append(_RecursiveFindBD(chdisk))
1975
1976   return bdev.FindDevice(disk, children)
1977
1978
1979 def _OpenRealBD(disk):
1980   """Opens the underlying block device of a disk.
1981
1982   @type disk: L{objects.Disk}
1983   @param disk: the disk object we want to open
1984
1985   """
1986   real_disk = _RecursiveFindBD(disk)
1987   if real_disk is None:
1988     _Fail("Block device '%s' is not set up", disk)
1989
1990   real_disk.Open()
1991
1992   return real_disk
1993
1994
1995 def BlockdevFind(disk):
1996   """Check if a device is activated.
1997
1998   If it is, return information about the real device.
1999
2000   @type disk: L{objects.Disk}
2001   @param disk: the disk to find
2002   @rtype: None or objects.BlockDevStatus
2003   @return: None if the disk cannot be found, otherwise a the current
2004            information
2005
2006   """
2007   try:
2008     rbd = _RecursiveFindBD(disk)
2009   except errors.BlockDeviceError, err:
2010     _Fail("Failed to find device: %s", err, exc=True)
2011
2012   if rbd is None:
2013     return None
2014
2015   return rbd.GetSyncStatus()
2016
2017
2018 def BlockdevGetsize(disks):
2019   """Computes the size of the given disks.
2020
2021   If a disk is not found, returns None instead.
2022
2023   @type disks: list of L{objects.Disk}
2024   @param disks: the list of disk to compute the size for
2025   @rtype: list
2026   @return: list with elements None if the disk cannot be found,
2027       otherwise the size
2028
2029   """
2030   result = []
2031   for cf in disks:
2032     try:
2033       rbd = _RecursiveFindBD(cf)
2034     except errors.BlockDeviceError:
2035       result.append(None)
2036       continue
2037     if rbd is None:
2038       result.append(None)
2039     else:
2040       result.append(rbd.GetActualSize())
2041   return result
2042
2043
2044 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2045   """Export a block device to a remote node.
2046
2047   @type disk: L{objects.Disk}
2048   @param disk: the description of the disk to export
2049   @type dest_node: str
2050   @param dest_node: the destination node to export to
2051   @type dest_path: str
2052   @param dest_path: the destination path on the target node
2053   @type cluster_name: str
2054   @param cluster_name: the cluster name, needed for SSH hostalias
2055   @rtype: None
2056
2057   """
2058   real_disk = _OpenRealBD(disk)
2059
2060   # the block size on the read dd is 1MiB to match our units
2061   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2062                                "dd if=%s bs=1048576 count=%s",
2063                                real_disk.dev_path, str(disk.size))
2064
2065   # we set here a smaller block size as, due to ssh buffering, more
2066   # than 64-128k will mostly ignored; we use nocreat to fail if the
2067   # device is not already there or we pass a wrong path; we use
2068   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2069   # to not buffer too much memory; this means that at best, we flush
2070   # every 64k, which will not be very fast
2071   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2072                                 " oflag=dsync", dest_path)
2073
2074   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2075                                                    constants.SSH_LOGIN_USER,
2076                                                    destcmd)
2077
2078   # all commands have been checked, so we're safe to combine them
2079   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2080
2081   result = utils.RunCmd(["bash", "-c", command])
2082
2083   if result.failed:
2084     _Fail("Disk copy command '%s' returned error: %s"
2085           " output: %s", command, result.fail_reason, result.output)
2086
2087
2088 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2089   """Write a file to the filesystem.
2090
2091   This allows the master to overwrite(!) a file. It will only perform
2092   the operation if the file belongs to a list of configuration files.
2093
2094   @type file_name: str
2095   @param file_name: the target file name
2096   @type data: str
2097   @param data: the new contents of the file
2098   @type mode: int
2099   @param mode: the mode to give the file (can be None)
2100   @type uid: string
2101   @param uid: the owner of the file
2102   @type gid: string
2103   @param gid: the group of the file
2104   @type atime: float
2105   @param atime: the atime to set on the file (can be None)
2106   @type mtime: float
2107   @param mtime: the mtime to set on the file (can be None)
2108   @rtype: None
2109
2110   """
2111   file_name = vcluster.LocalizeVirtualPath(file_name)
2112
2113   if not os.path.isabs(file_name):
2114     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2115
2116   if file_name not in _ALLOWED_UPLOAD_FILES:
2117     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2118           file_name)
2119
2120   raw_data = _Decompress(data)
2121
2122   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2123     _Fail("Invalid username/groupname type")
2124
2125   getents = runtime.GetEnts()
2126   uid = getents.LookupUser(uid)
2127   gid = getents.LookupGroup(gid)
2128
2129   utils.SafeWriteFile(file_name, None,
2130                       data=raw_data, mode=mode, uid=uid, gid=gid,
2131                       atime=atime, mtime=mtime)
2132
2133
2134 def RunOob(oob_program, command, node, timeout):
2135   """Executes oob_program with given command on given node.
2136
2137   @param oob_program: The path to the executable oob_program
2138   @param command: The command to invoke on oob_program
2139   @param node: The node given as an argument to the program
2140   @param timeout: Timeout after which we kill the oob program
2141
2142   @return: stdout
2143   @raise RPCFail: If execution fails for some reason
2144
2145   """
2146   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2147
2148   if result.failed:
2149     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2150           result.fail_reason, result.output)
2151
2152   return result.stdout
2153
2154
2155 def _OSOndiskAPIVersion(os_dir):
2156   """Compute and return the API version of a given OS.
2157
2158   This function will try to read the API version of the OS residing in
2159   the 'os_dir' directory.
2160
2161   @type os_dir: str
2162   @param os_dir: the directory in which we should look for the OS
2163   @rtype: tuple
2164   @return: tuple (status, data) with status denoting the validity and
2165       data holding either the vaid versions or an error message
2166
2167   """
2168   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2169
2170   try:
2171     st = os.stat(api_file)
2172   except EnvironmentError, err:
2173     return False, ("Required file '%s' not found under path %s: %s" %
2174                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2175
2176   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2177     return False, ("File '%s' in %s is not a regular file" %
2178                    (constants.OS_API_FILE, os_dir))
2179
2180   try:
2181     api_versions = utils.ReadFile(api_file).splitlines()
2182   except EnvironmentError, err:
2183     return False, ("Error while reading the API version file at %s: %s" %
2184                    (api_file, utils.ErrnoOrStr(err)))
2185
2186   try:
2187     api_versions = [int(version.strip()) for version in api_versions]
2188   except (TypeError, ValueError), err:
2189     return False, ("API version(s) can't be converted to integer: %s" %
2190                    str(err))
2191
2192   return True, api_versions
2193
2194
2195 def DiagnoseOS(top_dirs=None):
2196   """Compute the validity for all OSes.
2197
2198   @type top_dirs: list
2199   @param top_dirs: the list of directories in which to
2200       search (if not given defaults to
2201       L{pathutils.OS_SEARCH_PATH})
2202   @rtype: list of L{objects.OS}
2203   @return: a list of tuples (name, path, status, diagnose, variants,
2204       parameters, api_version) for all (potential) OSes under all
2205       search paths, where:
2206           - name is the (potential) OS name
2207           - path is the full path to the OS
2208           - status True/False is the validity of the OS
2209           - diagnose is the error message for an invalid OS, otherwise empty
2210           - variants is a list of supported OS variants, if any
2211           - parameters is a list of (name, help) parameters, if any
2212           - api_version is a list of support OS API versions
2213
2214   """
2215   if top_dirs is None:
2216     top_dirs = pathutils.OS_SEARCH_PATH
2217
2218   result = []
2219   for dir_name in top_dirs:
2220     if os.path.isdir(dir_name):
2221       try:
2222         f_names = utils.ListVisibleFiles(dir_name)
2223       except EnvironmentError, err:
2224         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2225         break
2226       for name in f_names:
2227         os_path = utils.PathJoin(dir_name, name)
2228         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2229         if status:
2230           diagnose = ""
2231           variants = os_inst.supported_variants
2232           parameters = os_inst.supported_parameters
2233           api_versions = os_inst.api_versions
2234         else:
2235           diagnose = os_inst
2236           variants = parameters = api_versions = []
2237         result.append((name, os_path, status, diagnose, variants,
2238                        parameters, api_versions))
2239
2240   return result
2241
2242
2243 def _TryOSFromDisk(name, base_dir=None):
2244   """Create an OS instance from disk.
2245
2246   This function will return an OS instance if the given name is a
2247   valid OS name.
2248
2249   @type base_dir: string
2250   @keyword base_dir: Base directory containing OS installations.
2251                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2252   @rtype: tuple
2253   @return: success and either the OS instance if we find a valid one,
2254       or error message
2255
2256   """
2257   if base_dir is None:
2258     os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2259   else:
2260     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2261
2262   if os_dir is None:
2263     return False, "Directory for OS %s not found in search path" % name
2264
2265   status, api_versions = _OSOndiskAPIVersion(os_dir)
2266   if not status:
2267     # push the error up
2268     return status, api_versions
2269
2270   if not constants.OS_API_VERSIONS.intersection(api_versions):
2271     return False, ("API version mismatch for path '%s': found %s, want %s." %
2272                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2273
2274   # OS Files dictionary, we will populate it with the absolute path
2275   # names; if the value is True, then it is a required file, otherwise
2276   # an optional one
2277   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2278
2279   if max(api_versions) >= constants.OS_API_V15:
2280     os_files[constants.OS_VARIANTS_FILE] = False
2281
2282   if max(api_versions) >= constants.OS_API_V20:
2283     os_files[constants.OS_PARAMETERS_FILE] = True
2284   else:
2285     del os_files[constants.OS_SCRIPT_VERIFY]
2286
2287   for (filename, required) in os_files.items():
2288     os_files[filename] = utils.PathJoin(os_dir, filename)
2289
2290     try:
2291       st = os.stat(os_files[filename])
2292     except EnvironmentError, err:
2293       if err.errno == errno.ENOENT and not required:
2294         del os_files[filename]
2295         continue
2296       return False, ("File '%s' under path '%s' is missing (%s)" %
2297                      (filename, os_dir, utils.ErrnoOrStr(err)))
2298
2299     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2300       return False, ("File '%s' under path '%s' is not a regular file" %
2301                      (filename, os_dir))
2302
2303     if filename in constants.OS_SCRIPTS:
2304       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2305         return False, ("File '%s' under path '%s' is not executable" %
2306                        (filename, os_dir))
2307
2308   variants = []
2309   if constants.OS_VARIANTS_FILE in os_files:
2310     variants_file = os_files[constants.OS_VARIANTS_FILE]
2311     try:
2312       variants = \
2313         utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2314     except EnvironmentError, err:
2315       # we accept missing files, but not other errors
2316       if err.errno != errno.ENOENT:
2317         return False, ("Error while reading the OS variants file at %s: %s" %
2318                        (variants_file, utils.ErrnoOrStr(err)))
2319
2320   parameters = []
2321   if constants.OS_PARAMETERS_FILE in os_files:
2322     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2323     try:
2324       parameters = utils.ReadFile(parameters_file).splitlines()
2325     except EnvironmentError, err:
2326       return False, ("Error while reading the OS parameters file at %s: %s" %
2327                      (parameters_file, utils.ErrnoOrStr(err)))
2328     parameters = [v.split(None, 1) for v in parameters]
2329
2330   os_obj = objects.OS(name=name, path=os_dir,
2331                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2332                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2333                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2334                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2335                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2336                                                  None),
2337                       supported_variants=variants,
2338                       supported_parameters=parameters,
2339                       api_versions=api_versions)
2340   return True, os_obj
2341
2342
2343 def OSFromDisk(name, base_dir=None):
2344   """Create an OS instance from disk.
2345
2346   This function will return an OS instance if the given name is a
2347   valid OS name. Otherwise, it will raise an appropriate
2348   L{RPCFail} exception, detailing why this is not a valid OS.
2349
2350   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2351   an exception but returns true/false status data.
2352
2353   @type base_dir: string
2354   @keyword base_dir: Base directory containing OS installations.
2355                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2356   @rtype: L{objects.OS}
2357   @return: the OS instance if we find a valid one
2358   @raise RPCFail: if we don't find a valid OS
2359
2360   """
2361   name_only = objects.OS.GetName(name)
2362   status, payload = _TryOSFromDisk(name_only, base_dir)
2363
2364   if not status:
2365     _Fail(payload)
2366
2367   return payload
2368
2369
2370 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2371   """Calculate the basic environment for an os script.
2372
2373   @type os_name: str
2374   @param os_name: full operating system name (including variant)
2375   @type inst_os: L{objects.OS}
2376   @param inst_os: operating system for which the environment is being built
2377   @type os_params: dict
2378   @param os_params: the OS parameters
2379   @type debug: integer
2380   @param debug: debug level (0 or 1, for OS Api 10)
2381   @rtype: dict
2382   @return: dict of environment variables
2383   @raise errors.BlockDeviceError: if the block device
2384       cannot be found
2385
2386   """
2387   result = {}
2388   api_version = \
2389     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2390   result["OS_API_VERSION"] = "%d" % api_version
2391   result["OS_NAME"] = inst_os.name
2392   result["DEBUG_LEVEL"] = "%d" % debug
2393
2394   # OS variants
2395   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2396     variant = objects.OS.GetVariant(os_name)
2397     if not variant:
2398       variant = inst_os.supported_variants[0]
2399   else:
2400     variant = ""
2401   result["OS_VARIANT"] = variant
2402
2403   # OS params
2404   for pname, pvalue in os_params.items():
2405     result["OSP_%s" % pname.upper()] = pvalue
2406
2407   # Set a default path otherwise programs called by OS scripts (or
2408   # even hooks called from OS scripts) might break, and we don't want
2409   # to have each script require setting a PATH variable
2410   result["PATH"] = constants.HOOKS_PATH
2411
2412   return result
2413
2414
2415 def OSEnvironment(instance, inst_os, debug=0):
2416   """Calculate the environment for an os script.
2417
2418   @type instance: L{objects.Instance}
2419   @param instance: target instance for the os script run
2420   @type inst_os: L{objects.OS}
2421   @param inst_os: operating system for which the environment is being built
2422   @type debug: integer
2423   @param debug: debug level (0 or 1, for OS Api 10)
2424   @rtype: dict
2425   @return: dict of environment variables
2426   @raise errors.BlockDeviceError: if the block device
2427       cannot be found
2428
2429   """
2430   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2431
2432   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2433     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2434
2435   result["HYPERVISOR"] = instance.hypervisor
2436   result["DISK_COUNT"] = "%d" % len(instance.disks)
2437   result["NIC_COUNT"] = "%d" % len(instance.nics)
2438   result["INSTANCE_SECONDARY_NODES"] = \
2439       ("%s" % " ".join(instance.secondary_nodes))
2440
2441   # Disks
2442   for idx, disk in enumerate(instance.disks):
2443     real_disk = _OpenRealBD(disk)
2444     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2445     result["DISK_%d_ACCESS" % idx] = disk.mode
2446     if constants.HV_DISK_TYPE in instance.hvparams:
2447       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2448         instance.hvparams[constants.HV_DISK_TYPE]
2449     if disk.dev_type in constants.LDS_BLOCK:
2450       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2451     elif disk.dev_type == constants.LD_FILE:
2452       result["DISK_%d_BACKEND_TYPE" % idx] = \
2453         "file:%s" % disk.physical_id[0]
2454
2455   # NICs
2456   for idx, nic in enumerate(instance.nics):
2457     result["NIC_%d_MAC" % idx] = nic.mac
2458     if nic.ip:
2459       result["NIC_%d_IP" % idx] = nic.ip
2460     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2461     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2462       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2463     if nic.nicparams[constants.NIC_LINK]:
2464       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2465     if nic.network:
2466       result["NIC_%d_NETWORK" % idx] = nic.network
2467     if constants.HV_NIC_TYPE in instance.hvparams:
2468       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2469         instance.hvparams[constants.HV_NIC_TYPE]
2470
2471   # HV/BE params
2472   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2473     for key, value in source.items():
2474       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2475
2476   return result
2477
2478
2479 def BlockdevGrow(disk, amount, dryrun, backingstore):
2480   """Grow a stack of block devices.
2481
2482   This function is called recursively, with the childrens being the
2483   first ones to resize.
2484
2485   @type disk: L{objects.Disk}
2486   @param disk: the disk to be grown
2487   @type amount: integer
2488   @param amount: the amount (in mebibytes) to grow with
2489   @type dryrun: boolean
2490   @param dryrun: whether to execute the operation in simulation mode
2491       only, without actually increasing the size
2492   @param backingstore: whether to execute the operation on backing storage
2493       only, or on "logical" storage only; e.g. DRBD is logical storage,
2494       whereas LVM, file, RBD are backing storage
2495   @rtype: (status, result)
2496   @return: a tuple with the status of the operation (True/False), and
2497       the errors message if status is False
2498
2499   """
2500   r_dev = _RecursiveFindBD(disk)
2501   if r_dev is None:
2502     _Fail("Cannot find block device %s", disk)
2503
2504   try:
2505     r_dev.Grow(amount, dryrun, backingstore)
2506   except errors.BlockDeviceError, err:
2507     _Fail("Failed to grow block device: %s", err, exc=True)
2508
2509
2510 def BlockdevSnapshot(disk):
2511   """Create a snapshot copy of a block device.
2512
2513   This function is called recursively, and the snapshot is actually created
2514   just for the leaf lvm backend device.
2515
2516   @type disk: L{objects.Disk}
2517   @param disk: the disk to be snapshotted
2518   @rtype: string
2519   @return: snapshot disk ID as (vg, lv)
2520
2521   """
2522   if disk.dev_type == constants.LD_DRBD8:
2523     if not disk.children:
2524       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2525             disk.unique_id)
2526     return BlockdevSnapshot(disk.children[0])
2527   elif disk.dev_type == constants.LD_LV:
2528     r_dev = _RecursiveFindBD(disk)
2529     if r_dev is not None:
2530       # FIXME: choose a saner value for the snapshot size
2531       # let's stay on the safe side and ask for the full size, for now
2532       return r_dev.Snapshot(disk.size)
2533     else:
2534       _Fail("Cannot find block device %s", disk)
2535   else:
2536     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2537           disk.unique_id, disk.dev_type)
2538
2539
2540 def BlockdevSetInfo(disk, info):
2541   """Sets 'metadata' information on block devices.
2542
2543   This function sets 'info' metadata on block devices. Initial
2544   information is set at device creation; this function should be used
2545   for example after renames.
2546
2547   @type disk: L{objects.Disk}
2548   @param disk: the disk to be grown
2549   @type info: string
2550   @param info: new 'info' metadata
2551   @rtype: (status, result)
2552   @return: a tuple with the status of the operation (True/False), and
2553       the errors message if status is False
2554
2555   """
2556   r_dev = _RecursiveFindBD(disk)
2557   if r_dev is None:
2558     _Fail("Cannot find block device %s", disk)
2559
2560   try:
2561     r_dev.SetInfo(info)
2562   except errors.BlockDeviceError, err:
2563     _Fail("Failed to set information on block device: %s", err, exc=True)
2564
2565
2566 def FinalizeExport(instance, snap_disks):
2567   """Write out the export configuration information.
2568
2569   @type instance: L{objects.Instance}
2570   @param instance: the instance which we export, used for
2571       saving configuration
2572   @type snap_disks: list of L{objects.Disk}
2573   @param snap_disks: list of snapshot block devices, which
2574       will be used to get the actual name of the dump file
2575
2576   @rtype: None
2577
2578   """
2579   destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2580   finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2581
2582   config = objects.SerializableConfigParser()
2583
2584   config.add_section(constants.INISECT_EXP)
2585   config.set(constants.INISECT_EXP, "version", "0")
2586   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2587   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2588   config.set(constants.INISECT_EXP, "os", instance.os)
2589   config.set(constants.INISECT_EXP, "compression", "none")
2590
2591   config.add_section(constants.INISECT_INS)
2592   config.set(constants.INISECT_INS, "name", instance.name)
2593   config.set(constants.INISECT_INS, "maxmem", "%d" %
2594              instance.beparams[constants.BE_MAXMEM])
2595   config.set(constants.INISECT_INS, "minmem", "%d" %
2596              instance.beparams[constants.BE_MINMEM])
2597   # "memory" is deprecated, but useful for exporting to old ganeti versions
2598   config.set(constants.INISECT_INS, "memory", "%d" %
2599              instance.beparams[constants.BE_MAXMEM])
2600   config.set(constants.INISECT_INS, "vcpus", "%d" %
2601              instance.beparams[constants.BE_VCPUS])
2602   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2603   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2604   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2605
2606   nic_total = 0
2607   for nic_count, nic in enumerate(instance.nics):
2608     nic_total += 1
2609     config.set(constants.INISECT_INS, "nic%d_mac" %
2610                nic_count, "%s" % nic.mac)
2611     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2612     config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2613                "%s" % nic.network)
2614     for param in constants.NICS_PARAMETER_TYPES:
2615       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2616                  "%s" % nic.nicparams.get(param, None))
2617   # TODO: redundant: on load can read nics until it doesn't exist
2618   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2619
2620   disk_total = 0
2621   for disk_count, disk in enumerate(snap_disks):
2622     if disk:
2623       disk_total += 1
2624       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2625                  ("%s" % disk.iv_name))
2626       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2627                  ("%s" % disk.physical_id[1]))
2628       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2629                  ("%d" % disk.size))
2630
2631   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2632
2633   # New-style hypervisor/backend parameters
2634
2635   config.add_section(constants.INISECT_HYP)
2636   for name, value in instance.hvparams.items():
2637     if name not in constants.HVC_GLOBALS:
2638       config.set(constants.INISECT_HYP, name, str(value))
2639
2640   config.add_section(constants.INISECT_BEP)
2641   for name, value in instance.beparams.items():
2642     config.set(constants.INISECT_BEP, name, str(value))
2643
2644   config.add_section(constants.INISECT_OSP)
2645   for name, value in instance.osparams.items():
2646     config.set(constants.INISECT_OSP, name, str(value))
2647
2648   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2649                   data=config.Dumps())
2650   shutil.rmtree(finaldestdir, ignore_errors=True)
2651   shutil.move(destdir, finaldestdir)
2652
2653
2654 def ExportInfo(dest):
2655   """Get export configuration information.
2656
2657   @type dest: str
2658   @param dest: directory containing the export
2659
2660   @rtype: L{objects.SerializableConfigParser}
2661   @return: a serializable config file containing the
2662       export info
2663
2664   """
2665   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2666
2667   config = objects.SerializableConfigParser()
2668   config.read(cff)
2669
2670   if (not config.has_section(constants.INISECT_EXP) or
2671       not config.has_section(constants.INISECT_INS)):
2672     _Fail("Export info file doesn't have the required fields")
2673
2674   return config.Dumps()
2675
2676
2677 def ListExports():
2678   """Return a list of exports currently available on this machine.
2679
2680   @rtype: list
2681   @return: list of the exports
2682
2683   """
2684   if os.path.isdir(pathutils.EXPORT_DIR):
2685     return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2686   else:
2687     _Fail("No exports directory")
2688
2689
2690 def RemoveExport(export):
2691   """Remove an existing export from the node.
2692
2693   @type export: str
2694   @param export: the name of the export to remove
2695   @rtype: None
2696
2697   """
2698   target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2699
2700   try:
2701     shutil.rmtree(target)
2702   except EnvironmentError, err:
2703     _Fail("Error while removing the export: %s", err, exc=True)
2704
2705
2706 def BlockdevRename(devlist):
2707   """Rename a list of block devices.
2708
2709   @type devlist: list of tuples
2710   @param devlist: list of tuples of the form  (disk,
2711       new_logical_id, new_physical_id); disk is an
2712       L{objects.Disk} object describing the current disk,
2713       and new logical_id/physical_id is the name we
2714       rename it to
2715   @rtype: boolean
2716   @return: True if all renames succeeded, False otherwise
2717
2718   """
2719   msgs = []
2720   result = True
2721   for disk, unique_id in devlist:
2722     dev = _RecursiveFindBD(disk)
2723     if dev is None:
2724       msgs.append("Can't find device %s in rename" % str(disk))
2725       result = False
2726       continue
2727     try:
2728       old_rpath = dev.dev_path
2729       dev.Rename(unique_id)
2730       new_rpath = dev.dev_path
2731       if old_rpath != new_rpath:
2732         DevCacheManager.RemoveCache(old_rpath)
2733         # FIXME: we should add the new cache information here, like:
2734         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2735         # but we don't have the owner here - maybe parse from existing
2736         # cache? for now, we only lose lvm data when we rename, which
2737         # is less critical than DRBD or MD
2738     except errors.BlockDeviceError, err:
2739       msgs.append("Can't rename device '%s' to '%s': %s" %
2740                   (dev, unique_id, err))
2741       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2742       result = False
2743   if not result:
2744     _Fail("; ".join(msgs))
2745
2746
2747 def _TransformFileStorageDir(fs_dir):
2748   """Checks whether given file_storage_dir is valid.
2749
2750   Checks wheter the given fs_dir is within the cluster-wide default
2751   file_storage_dir or the shared_file_storage_dir, which are stored in
2752   SimpleStore. Only paths under those directories are allowed.
2753
2754   @type fs_dir: str
2755   @param fs_dir: the path to check
2756
2757   @return: the normalized path if valid, None otherwise
2758
2759   """
2760   if not (constants.ENABLE_FILE_STORAGE or
2761           constants.ENABLE_SHARED_FILE_STORAGE):
2762     _Fail("File storage disabled at configure time")
2763
2764   bdev.CheckFileStoragePath(fs_dir)
2765
2766   return os.path.normpath(fs_dir)
2767
2768
2769 def CreateFileStorageDir(file_storage_dir):
2770   """Create file storage directory.
2771
2772   @type file_storage_dir: str
2773   @param file_storage_dir: directory to create
2774
2775   @rtype: tuple
2776   @return: tuple with first element a boolean indicating wheter dir
2777       creation was successful or not
2778
2779   """
2780   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2781   if os.path.exists(file_storage_dir):
2782     if not os.path.isdir(file_storage_dir):
2783       _Fail("Specified storage dir '%s' is not a directory",
2784             file_storage_dir)
2785   else:
2786     try:
2787       os.makedirs(file_storage_dir, 0750)
2788     except OSError, err:
2789       _Fail("Cannot create file storage directory '%s': %s",
2790             file_storage_dir, err, exc=True)
2791
2792
2793 def RemoveFileStorageDir(file_storage_dir):
2794   """Remove file storage directory.
2795
2796   Remove it only if it's empty. If not log an error and return.
2797
2798   @type file_storage_dir: str
2799   @param file_storage_dir: the directory we should cleanup
2800   @rtype: tuple (success,)
2801   @return: tuple of one element, C{success}, denoting
2802       whether the operation was successful
2803
2804   """
2805   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2806   if os.path.exists(file_storage_dir):
2807     if not os.path.isdir(file_storage_dir):
2808       _Fail("Specified Storage directory '%s' is not a directory",
2809             file_storage_dir)
2810     # deletes dir only if empty, otherwise we want to fail the rpc call
2811     try:
2812       os.rmdir(file_storage_dir)
2813     except OSError, err:
2814       _Fail("Cannot remove file storage directory '%s': %s",
2815             file_storage_dir, err)
2816
2817
2818 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2819   """Rename the file storage directory.
2820
2821   @type old_file_storage_dir: str
2822   @param old_file_storage_dir: the current path
2823   @type new_file_storage_dir: str
2824   @param new_file_storage_dir: the name we should rename to
2825   @rtype: tuple (success,)
2826   @return: tuple of one element, C{success}, denoting
2827       whether the operation was successful
2828
2829   """
2830   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2831   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2832   if not os.path.exists(new_file_storage_dir):
2833     if os.path.isdir(old_file_storage_dir):
2834       try:
2835         os.rename(old_file_storage_dir, new_file_storage_dir)
2836       except OSError, err:
2837         _Fail("Cannot rename '%s' to '%s': %s",
2838               old_file_storage_dir, new_file_storage_dir, err)
2839     else:
2840       _Fail("Specified storage dir '%s' is not a directory",
2841             old_file_storage_dir)
2842   else:
2843     if os.path.exists(old_file_storage_dir):
2844       _Fail("Cannot rename '%s' to '%s': both locations exist",
2845             old_file_storage_dir, new_file_storage_dir)
2846
2847
2848 def _EnsureJobQueueFile(file_name):
2849   """Checks whether the given filename is in the queue directory.
2850
2851   @type file_name: str
2852   @param file_name: the file name we should check
2853   @rtype: None
2854   @raises RPCFail: if the file is not valid
2855
2856   """
2857   if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2858     _Fail("Passed job queue file '%s' does not belong to"
2859           " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2860
2861
2862 def JobQueueUpdate(file_name, content):
2863   """Updates a file in the queue directory.
2864
2865   This is just a wrapper over L{utils.io.WriteFile}, with proper
2866   checking.
2867
2868   @type file_name: str
2869   @param file_name: the job file name
2870   @type content: str
2871   @param content: the new job contents
2872   @rtype: boolean
2873   @return: the success of the operation
2874
2875   """
2876   file_name = vcluster.LocalizeVirtualPath(file_name)
2877
2878   _EnsureJobQueueFile(file_name)
2879   getents = runtime.GetEnts()
2880
2881   # Write and replace the file atomically
2882   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2883                   gid=getents.masterd_gid)
2884
2885
2886 def JobQueueRename(old, new):
2887   """Renames a job queue file.
2888
2889   This is just a wrapper over os.rename with proper checking.
2890
2891   @type old: str
2892   @param old: the old (actual) file name
2893   @type new: str
2894   @param new: the desired file name
2895   @rtype: tuple
2896   @return: the success of the operation and payload
2897
2898   """
2899   old = vcluster.LocalizeVirtualPath(old)
2900   new = vcluster.LocalizeVirtualPath(new)
2901
2902   _EnsureJobQueueFile(old)
2903   _EnsureJobQueueFile(new)
2904
2905   getents = runtime.GetEnts()
2906
2907   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2908                    dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2909
2910
2911 def BlockdevClose(instance_name, disks):
2912   """Closes the given block devices.
2913
2914   This means they will be switched to secondary mode (in case of
2915   DRBD).
2916
2917   @param instance_name: if the argument is not empty, the symlinks
2918       of this instance will be removed
2919   @type disks: list of L{objects.Disk}
2920   @param disks: the list of disks to be closed
2921   @rtype: tuple (success, message)
2922   @return: a tuple of success and message, where success
2923       indicates the succes of the operation, and message
2924       which will contain the error details in case we
2925       failed
2926
2927   """
2928   bdevs = []
2929   for cf in disks:
2930     rd = _RecursiveFindBD(cf)
2931     if rd is None:
2932       _Fail("Can't find device %s", cf)
2933     bdevs.append(rd)
2934
2935   msg = []
2936   for rd in bdevs:
2937     try:
2938       rd.Close()
2939     except errors.BlockDeviceError, err:
2940       msg.append(str(err))
2941   if msg:
2942     _Fail("Can't make devices secondary: %s", ",".join(msg))
2943   else:
2944     if instance_name:
2945       _RemoveBlockDevLinks(instance_name, disks)
2946
2947
2948 def ValidateHVParams(hvname, hvparams):
2949   """Validates the given hypervisor parameters.
2950
2951   @type hvname: string
2952   @param hvname: the hypervisor name
2953   @type hvparams: dict
2954   @param hvparams: the hypervisor parameters to be validated
2955   @rtype: None
2956
2957   """
2958   try:
2959     hv_type = hypervisor.GetHypervisor(hvname)
2960     hv_type.ValidateParameters(hvparams)
2961   except errors.HypervisorError, err:
2962     _Fail(str(err), log=False)
2963
2964
2965 def _CheckOSPList(os_obj, parameters):
2966   """Check whether a list of parameters is supported by the OS.
2967
2968   @type os_obj: L{objects.OS}
2969   @param os_obj: OS object to check
2970   @type parameters: list
2971   @param parameters: the list of parameters to check
2972
2973   """
2974   supported = [v[0] for v in os_obj.supported_parameters]
2975   delta = frozenset(parameters).difference(supported)
2976   if delta:
2977     _Fail("The following parameters are not supported"
2978           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2979
2980
2981 def ValidateOS(required, osname, checks, osparams):
2982   """Validate the given OS' parameters.
2983
2984   @type required: boolean
2985   @param required: whether absence of the OS should translate into
2986       failure or not
2987   @type osname: string
2988   @param osname: the OS to be validated
2989   @type checks: list
2990   @param checks: list of the checks to run (currently only 'parameters')
2991   @type osparams: dict
2992   @param osparams: dictionary with OS parameters
2993   @rtype: boolean
2994   @return: True if the validation passed, or False if the OS was not
2995       found and L{required} was false
2996
2997   """
2998   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2999     _Fail("Unknown checks required for OS %s: %s", osname,
3000           set(checks).difference(constants.OS_VALIDATE_CALLS))
3001
3002   name_only = objects.OS.GetName(osname)
3003   status, tbv = _TryOSFromDisk(name_only, None)
3004
3005   if not status:
3006     if required:
3007       _Fail(tbv)
3008     else:
3009       return False
3010
3011   if max(tbv.api_versions) < constants.OS_API_V20:
3012     return True
3013
3014   if constants.OS_VALIDATE_PARAMETERS in checks:
3015     _CheckOSPList(tbv, osparams.keys())
3016
3017   validate_env = OSCoreEnv(osname, tbv, osparams)
3018   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3019                         cwd=tbv.path, reset_env=True)
3020   if result.failed:
3021     logging.error("os validate command '%s' returned error: %s output: %s",
3022                   result.cmd, result.fail_reason, result.output)
3023     _Fail("OS validation script failed (%s), output: %s",
3024           result.fail_reason, result.output, log=False)
3025
3026   return True
3027
3028
3029 def DemoteFromMC():
3030   """Demotes the current node from master candidate role.
3031
3032   """
3033   # try to ensure we're not the master by mistake
3034   master, myself = ssconf.GetMasterAndMyself()
3035   if master == myself:
3036     _Fail("ssconf status shows I'm the master node, will not demote")
3037
3038   result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3039   if not result.failed:
3040     _Fail("The master daemon is running, will not demote")
3041
3042   try:
3043     if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3044       utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3045   except EnvironmentError, err:
3046     if err.errno != errno.ENOENT:
3047       _Fail("Error while backing up cluster file: %s", err, exc=True)
3048
3049   utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3050
3051
3052 def _GetX509Filenames(cryptodir, name):
3053   """Returns the full paths for the private key and certificate.
3054
3055   """
3056   return (utils.PathJoin(cryptodir, name),
3057           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3058           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3059
3060
3061 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3062   """Creates a new X509 certificate for SSL/TLS.
3063
3064   @type validity: int
3065   @param validity: Validity in seconds
3066   @rtype: tuple; (string, string)
3067   @return: Certificate name and public part
3068
3069   """
3070   (key_pem, cert_pem) = \
3071     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3072                                      min(validity, _MAX_SSL_CERT_VALIDITY))
3073
3074   cert_dir = tempfile.mkdtemp(dir=cryptodir,
3075                               prefix="x509-%s-" % utils.TimestampForFilename())
3076   try:
3077     name = os.path.basename(cert_dir)
3078     assert len(name) > 5
3079
3080     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3081
3082     utils.WriteFile(key_file, mode=0400, data=key_pem)
3083     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3084
3085     # Never return private key as it shouldn't leave the node
3086     return (name, cert_pem)
3087   except Exception:
3088     shutil.rmtree(cert_dir, ignore_errors=True)
3089     raise
3090
3091
3092 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3093   """Removes a X509 certificate.
3094
3095   @type name: string
3096   @param name: Certificate name
3097
3098   """
3099   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3100
3101   utils.RemoveFile(key_file)
3102   utils.RemoveFile(cert_file)
3103
3104   try:
3105     os.rmdir(cert_dir)
3106   except EnvironmentError, err:
3107     _Fail("Cannot remove certificate directory '%s': %s",
3108           cert_dir, err)
3109
3110
3111 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3112   """Returns the command for the requested input/output.
3113
3114   @type instance: L{objects.Instance}
3115   @param instance: The instance object
3116   @param mode: Import/export mode
3117   @param ieio: Input/output type
3118   @param ieargs: Input/output arguments
3119
3120   """
3121   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3122
3123   env = None
3124   prefix = None
3125   suffix = None
3126   exp_size = None
3127
3128   if ieio == constants.IEIO_FILE:
3129     (filename, ) = ieargs
3130
3131     if not utils.IsNormAbsPath(filename):
3132       _Fail("Path '%s' is not normalized or absolute", filename)
3133
3134     real_filename = os.path.realpath(filename)
3135     directory = os.path.dirname(real_filename)
3136
3137     if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3138       _Fail("File '%s' is not under exports directory '%s': %s",
3139             filename, pathutils.EXPORT_DIR, real_filename)
3140
3141     # Create directory
3142     utils.Makedirs(directory, mode=0750)
3143
3144     quoted_filename = utils.ShellQuote(filename)
3145
3146     if mode == constants.IEM_IMPORT:
3147       suffix = "> %s" % quoted_filename
3148     elif mode == constants.IEM_EXPORT:
3149       suffix = "< %s" % quoted_filename
3150
3151       # Retrieve file size
3152       try:
3153         st = os.stat(filename)
3154       except EnvironmentError, err:
3155         logging.error("Can't stat(2) %s: %s", filename, err)
3156       else:
3157         exp_size = utils.BytesToMebibyte(st.st_size)
3158
3159   elif ieio == constants.IEIO_RAW_DISK:
3160     (disk, ) = ieargs
3161
3162     real_disk = _OpenRealBD(disk)
3163
3164     if mode == constants.IEM_IMPORT:
3165       # we set here a smaller block size as, due to transport buffering, more
3166       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3167       # is not already there or we pass a wrong path; we use notrunc to no
3168       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3169       # much memory; this means that at best, we flush every 64k, which will
3170       # not be very fast
3171       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3172                                     " bs=%s oflag=dsync"),
3173                                     real_disk.dev_path,
3174                                     str(64 * 1024))
3175
3176     elif mode == constants.IEM_EXPORT:
3177       # the block size on the read dd is 1MiB to match our units
3178       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3179                                    real_disk.dev_path,
3180                                    str(1024 * 1024), # 1 MB
3181                                    str(disk.size))
3182       exp_size = disk.size
3183
3184   elif ieio == constants.IEIO_SCRIPT:
3185     (disk, disk_index, ) = ieargs
3186
3187     assert isinstance(disk_index, (int, long))
3188
3189     real_disk = _OpenRealBD(disk)
3190
3191     inst_os = OSFromDisk(instance.os)
3192     env = OSEnvironment(instance, inst_os)
3193
3194     if mode == constants.IEM_IMPORT:
3195       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3196       env["IMPORT_INDEX"] = str(disk_index)
3197       script = inst_os.import_script
3198
3199     elif mode == constants.IEM_EXPORT:
3200       env["EXPORT_DEVICE"] = real_disk.dev_path
3201       env["EXPORT_INDEX"] = str(disk_index)
3202       script = inst_os.export_script
3203
3204     # TODO: Pass special environment only to script
3205     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3206
3207     if mode == constants.IEM_IMPORT:
3208       suffix = "| %s" % script_cmd
3209
3210     elif mode == constants.IEM_EXPORT:
3211       prefix = "%s |" % script_cmd
3212
3213     # Let script predict size
3214     exp_size = constants.IE_CUSTOM_SIZE
3215
3216   else:
3217     _Fail("Invalid %s I/O mode %r", mode, ieio)
3218
3219   return (env, prefix, suffix, exp_size)
3220
3221
3222 def _CreateImportExportStatusDir(prefix):
3223   """Creates status directory for import/export.
3224
3225   """
3226   return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3227                           prefix=("%s-%s-" %
3228                                   (prefix, utils.TimestampForFilename())))
3229
3230
3231 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3232                             ieio, ieioargs):
3233   """Starts an import or export daemon.
3234
3235   @param mode: Import/output mode
3236   @type opts: L{objects.ImportExportOptions}
3237   @param opts: Daemon options
3238   @type host: string
3239   @param host: Remote host for export (None for import)
3240   @type port: int
3241   @param port: Remote port for export (None for import)
3242   @type instance: L{objects.Instance}
3243   @param instance: Instance object
3244   @type component: string
3245   @param component: which part of the instance is transferred now,
3246       e.g. 'disk/0'
3247   @param ieio: Input/output type
3248   @param ieioargs: Input/output arguments
3249
3250   """
3251   if mode == constants.IEM_IMPORT:
3252     prefix = "import"
3253
3254     if not (host is None and port is None):
3255       _Fail("Can not specify host or port on import")
3256
3257   elif mode == constants.IEM_EXPORT:
3258     prefix = "export"
3259
3260     if host is None or port is None:
3261       _Fail("Host and port must be specified for an export")
3262
3263   else:
3264     _Fail("Invalid mode %r", mode)
3265
3266   if (opts.key_name is None) ^ (opts.ca_pem is None):
3267     _Fail("Cluster certificate can only be used for both key and CA")
3268
3269   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3270     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3271
3272   if opts.key_name is None:
3273     # Use server.pem
3274     key_path = pathutils.NODED_CERT_FILE
3275     cert_path = pathutils.NODED_CERT_FILE
3276     assert opts.ca_pem is None
3277   else:
3278     (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3279                                                  opts.key_name)
3280     assert opts.ca_pem is not None
3281
3282   for i in [key_path, cert_path]:
3283     if not os.path.exists(i):
3284       _Fail("File '%s' does not exist" % i)
3285
3286   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3287   try:
3288     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3289     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3290     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3291
3292     if opts.ca_pem is None:
3293       # Use server.pem
3294       ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3295     else:
3296       ca = opts.ca_pem
3297
3298     # Write CA file
3299     utils.WriteFile(ca_file, data=ca, mode=0400)
3300
3301     cmd = [
3302       pathutils.IMPORT_EXPORT_DAEMON,
3303       status_file, mode,
3304       "--key=%s" % key_path,
3305       "--cert=%s" % cert_path,
3306       "--ca=%s" % ca_file,
3307       ]
3308
3309     if host:
3310       cmd.append("--host=%s" % host)
3311
3312     if port:
3313       cmd.append("--port=%s" % port)
3314
3315     if opts.ipv6:
3316       cmd.append("--ipv6")
3317     else:
3318       cmd.append("--ipv4")
3319
3320     if opts.compress:
3321       cmd.append("--compress=%s" % opts.compress)
3322
3323     if opts.magic:
3324       cmd.append("--magic=%s" % opts.magic)
3325
3326     if exp_size is not None:
3327       cmd.append("--expected-size=%s" % exp_size)
3328
3329     if cmd_prefix:
3330       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3331
3332     if cmd_suffix:
3333       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3334
3335     if mode == constants.IEM_EXPORT:
3336       # Retry connection a few times when connecting to remote peer
3337       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3338       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3339     elif opts.connect_timeout is not None:
3340       assert mode == constants.IEM_IMPORT
3341       # Overall timeout for establishing connection while listening
3342       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3343
3344     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3345
3346     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3347     # support for receiving a file descriptor for output
3348     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3349                       output=logfile)
3350
3351     # The import/export name is simply the status directory name
3352     return os.path.basename(status_dir)
3353
3354   except Exception:
3355     shutil.rmtree(status_dir, ignore_errors=True)
3356     raise
3357
3358
3359 def GetImportExportStatus(names):
3360   """Returns import/export daemon status.
3361
3362   @type names: sequence
3363   @param names: List of names
3364   @rtype: List of dicts
3365   @return: Returns a list of the state of each named import/export or None if a
3366            status couldn't be read
3367
3368   """
3369   result = []
3370
3371   for name in names:
3372     status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3373                                  _IES_STATUS_FILE)
3374
3375     try:
3376       data = utils.ReadFile(status_file)
3377     except EnvironmentError, err:
3378       if err.errno != errno.ENOENT:
3379         raise
3380       data = None
3381
3382     if not data:
3383       result.append(None)
3384       continue
3385
3386     result.append(serializer.LoadJson(data))
3387
3388   return result
3389
3390
3391 def AbortImportExport(name):
3392   """Sends SIGTERM to a running import/export daemon.
3393
3394   """
3395   logging.info("Abort import/export %s", name)
3396
3397   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3398   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3399
3400   if pid:
3401     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3402                  name, pid)
3403     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3404
3405
3406 def CleanupImportExport(name):
3407   """Cleanup after an import or export.
3408
3409   If the import/export daemon is still running it's killed. Afterwards the
3410   whole status directory is removed.
3411
3412   """
3413   logging.info("Finalizing import/export %s", name)
3414
3415   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3416
3417   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3418
3419   if pid:
3420     logging.info("Import/export %s is still running with PID %s",
3421                  name, pid)
3422     utils.KillProcess(pid, waitpid=False)
3423
3424   shutil.rmtree(status_dir, ignore_errors=True)
3425
3426
3427 def _FindDisks(nodes_ip, disks):
3428   """Sets the physical ID on disks and returns the block devices.
3429
3430   """
3431   # set the correct physical ID
3432   my_name = netutils.Hostname.GetSysName()
3433   for cf in disks:
3434     cf.SetPhysicalID(my_name, nodes_ip)
3435
3436   bdevs = []
3437
3438   for cf in disks:
3439     rd = _RecursiveFindBD(cf)
3440     if rd is None:
3441       _Fail("Can't find device %s", cf)
3442     bdevs.append(rd)
3443   return bdevs
3444
3445
3446 def DrbdDisconnectNet(nodes_ip, disks):
3447   """Disconnects the network on a list of drbd devices.
3448
3449   """
3450   bdevs = _FindDisks(nodes_ip, disks)
3451
3452   # disconnect disks
3453   for rd in bdevs:
3454     try:
3455       rd.DisconnectNet()
3456     except errors.BlockDeviceError, err:
3457       _Fail("Can't change network configuration to standalone mode: %s",
3458             err, exc=True)
3459
3460
3461 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3462   """Attaches the network on a list of drbd devices.
3463
3464   """
3465   bdevs = _FindDisks(nodes_ip, disks)
3466
3467   if multimaster:
3468     for idx, rd in enumerate(bdevs):
3469       try:
3470         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3471       except EnvironmentError, err:
3472         _Fail("Can't create symlink: %s", err)
3473   # reconnect disks, switch to new master configuration and if
3474   # needed primary mode
3475   for rd in bdevs:
3476     try:
3477       rd.AttachNet(multimaster)
3478     except errors.BlockDeviceError, err:
3479       _Fail("Can't change network configuration: %s", err)
3480
3481   # wait until the disks are connected; we need to retry the re-attach
3482   # if the device becomes standalone, as this might happen if the one
3483   # node disconnects and reconnects in a different mode before the
3484   # other node reconnects; in this case, one or both of the nodes will
3485   # decide it has wrong configuration and switch to standalone
3486
3487   def _Attach():
3488     all_connected = True
3489
3490     for rd in bdevs:
3491       stats = rd.GetProcStatus()
3492
3493       all_connected = (all_connected and
3494                        (stats.is_connected or stats.is_in_resync))
3495
3496       if stats.is_standalone:
3497         # peer had different config info and this node became
3498         # standalone, even though this should not happen with the
3499         # new staged way of changing disk configs
3500         try:
3501           rd.AttachNet(multimaster)
3502         except errors.BlockDeviceError, err:
3503           _Fail("Can't change network configuration: %s", err)
3504
3505     if not all_connected:
3506       raise utils.RetryAgain()
3507
3508   try:
3509     # Start with a delay of 100 miliseconds and go up to 5 seconds
3510     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3511   except utils.RetryTimeout:
3512     _Fail("Timeout in disk reconnecting")
3513
3514   if multimaster:
3515     # change to primary mode
3516     for rd in bdevs:
3517       try:
3518         rd.Open()
3519       except errors.BlockDeviceError, err:
3520         _Fail("Can't change to primary mode: %s", err)
3521
3522
3523 def DrbdWaitSync(nodes_ip, disks):
3524   """Wait until DRBDs have synchronized.
3525
3526   """
3527   def _helper(rd):
3528     stats = rd.GetProcStatus()
3529     if not (stats.is_connected or stats.is_in_resync):
3530       raise utils.RetryAgain()
3531     return stats
3532
3533   bdevs = _FindDisks(nodes_ip, disks)
3534
3535   min_resync = 100
3536   alldone = True
3537   for rd in bdevs:
3538     try:
3539       # poll each second for 15 seconds
3540       stats = utils.Retry(_helper, 1, 15, args=[rd])
3541     except utils.RetryTimeout:
3542       stats = rd.GetProcStatus()
3543       # last check
3544       if not (stats.is_connected or stats.is_in_resync):
3545         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3546     alldone = alldone and (not stats.is_in_resync)
3547     if stats.sync_percent is not None:
3548       min_resync = min(min_resync, stats.sync_percent)
3549
3550   return (alldone, min_resync)
3551
3552
3553 def GetDrbdUsermodeHelper():
3554   """Returns DRBD usermode helper currently configured.
3555
3556   """
3557   try:
3558     return bdev.BaseDRBD.GetUsermodeHelper()
3559   except errors.BlockDeviceError, err:
3560     _Fail(str(err))
3561
3562
3563 def PowercycleNode(hypervisor_type):
3564   """Hard-powercycle the node.
3565
3566   Because we need to return first, and schedule the powercycle in the
3567   background, we won't be able to report failures nicely.
3568
3569   """
3570   hyper = hypervisor.GetHypervisor(hypervisor_type)
3571   try:
3572     pid = os.fork()
3573   except OSError:
3574     # if we can't fork, we'll pretend that we're in the child process
3575     pid = 0
3576   if pid > 0:
3577     return "Reboot scheduled in 5 seconds"
3578   # ensure the child is running on ram
3579   try:
3580     utils.Mlockall()
3581   except Exception: # pylint: disable=W0703
3582     pass
3583   time.sleep(5)
3584   hyper.PowercycleNode()
3585
3586
3587 def _VerifyRestrictedCmdName(cmd):
3588   """Verifies a remote command name.
3589
3590   @type cmd: string
3591   @param cmd: Command name
3592   @rtype: tuple; (boolean, string or None)
3593   @return: The tuple's first element is the status; if C{False}, the second
3594     element is an error message string, otherwise it's C{None}
3595
3596   """
3597   if not cmd.strip():
3598     return (False, "Missing command name")
3599
3600   if os.path.basename(cmd) != cmd:
3601     return (False, "Invalid command name")
3602
3603   if not constants.EXT_PLUGIN_MASK.match(cmd):
3604     return (False, "Command name contains forbidden characters")
3605
3606   return (True, None)
3607
3608
3609 def _CommonRestrictedCmdCheck(path, owner):
3610   """Common checks for remote command file system directories and files.
3611
3612   @type path: string
3613   @param path: Path to check
3614   @param owner: C{None} or tuple containing UID and GID
3615   @rtype: tuple; (boolean, string or C{os.stat} result)
3616   @return: The tuple's first element is the status; if C{False}, the second
3617     element is an error message string, otherwise it's the result of C{os.stat}
3618
3619   """
3620   if owner is None:
3621     # Default to root as owner
3622     owner = (0, 0)
3623
3624   try:
3625     st = os.stat(path)
3626   except EnvironmentError, err:
3627     return (False, "Can't stat(2) '%s': %s" % (path, err))
3628
3629   if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3630     return (False, "Permissions on '%s' are too permissive" % path)
3631
3632   if (st.st_uid, st.st_gid) != owner:
3633     (owner_uid, owner_gid) = owner
3634     return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3635
3636   return (True, st)
3637
3638
3639 def _VerifyRestrictedCmdDirectory(path, _owner=None):
3640   """Verifies remote command directory.
3641
3642   @type path: string
3643   @param path: Path to check
3644   @rtype: tuple; (boolean, string or None)
3645   @return: The tuple's first element is the status; if C{False}, the second
3646     element is an error message string, otherwise it's C{None}
3647
3648   """
3649   (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3650
3651   if not status:
3652     return (False, value)
3653
3654   if not stat.S_ISDIR(value.st_mode):
3655     return (False, "Path '%s' is not a directory" % path)
3656
3657   return (True, None)
3658
3659
3660 def _VerifyRestrictedCmd(path, cmd, _owner=None):
3661   """Verifies a whole remote command and returns its executable filename.
3662
3663   @type path: string
3664   @param path: Directory containing remote commands
3665   @type cmd: string
3666   @param cmd: Command name
3667   @rtype: tuple; (boolean, string)
3668   @return: The tuple's first element is the status; if C{False}, the second
3669     element is an error message string, otherwise the second element is the
3670     absolute path to the executable
3671
3672   """
3673   executable = utils.PathJoin(path, cmd)
3674
3675   (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
3676
3677   if not status:
3678     return (False, msg)
3679
3680   if not utils.IsExecutable(executable):
3681     return (False, "access(2) thinks '%s' can't be executed" % executable)
3682
3683   return (True, executable)
3684
3685
3686 def _PrepareRestrictedCmd(path, cmd,
3687                           _verify_dir=_VerifyRestrictedCmdDirectory,
3688                           _verify_name=_VerifyRestrictedCmdName,
3689                           _verify_cmd=_VerifyRestrictedCmd):
3690   """Performs a number of tests on a remote command.
3691
3692   @type path: string
3693   @param path: Directory containing remote commands
3694   @type cmd: string
3695   @param cmd: Command name
3696   @return: Same as L{_VerifyRestrictedCmd}
3697
3698   """
3699   # Verify the directory first
3700   (status, msg) = _verify_dir(path)
3701   if status:
3702     # Check command if everything was alright
3703     (status, msg) = _verify_name(cmd)
3704
3705   if not status:
3706     return (False, msg)
3707
3708   # Check actual executable
3709   return _verify_cmd(path, cmd)
3710
3711
3712 def RunRestrictedCmd(cmd,
3713                      _lock_timeout=_RCMD_LOCK_TIMEOUT,
3714                      _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
3715                      _path=pathutils.RESTRICTED_COMMANDS_DIR,
3716                      _sleep_fn=time.sleep,
3717                      _prepare_fn=_PrepareRestrictedCmd,
3718                      _runcmd_fn=utils.RunCmd,
3719                      _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
3720   """Executes a remote command after performing strict tests.
3721
3722   @type cmd: string
3723   @param cmd: Command name
3724   @rtype: string
3725   @return: Command output
3726   @raise RPCFail: In case of an error
3727
3728   """
3729   logging.info("Preparing to run remote command '%s'", cmd)
3730
3731   if not _enabled:
3732     _Fail("Remote commands disabled at configure time")
3733
3734   lock = None
3735   try:
3736     cmdresult = None
3737     try:
3738       lock = utils.FileLock.Open(_lock_file)
3739       lock.Exclusive(blocking=True, timeout=_lock_timeout)
3740
3741       (status, value) = _prepare_fn(_path, cmd)
3742
3743       if status:
3744         cmdresult = _runcmd_fn([value], env={}, reset_env=True,
3745                                postfork_fn=lambda _: lock.Unlock())
3746       else:
3747         logging.error(value)
3748     except Exception: # pylint: disable=W0703
3749       # Keep original error in log
3750       logging.exception("Caught exception")
3751
3752     if cmdresult is None:
3753       logging.info("Sleeping for %0.1f seconds before returning",
3754                    _RCMD_INVALID_DELAY)
3755       _sleep_fn(_RCMD_INVALID_DELAY)
3756
3757       # Do not include original error message in returned error
3758       _Fail("Executing command '%s' failed" % cmd)
3759     elif cmdresult.failed or cmdresult.fail_reason:
3760       _Fail("Remote command '%s' failed: %s; output: %s",
3761             cmd, cmdresult.fail_reason, cmdresult.output)
3762     else:
3763       return cmdresult.output
3764   finally:
3765     if lock is not None:
3766       # Release lock at last
3767       lock.Close()
3768       lock = None
3769
3770
3771 class HooksRunner(object):
3772   """Hook runner.
3773
3774   This class is instantiated on the node side (ganeti-noded) and not
3775   on the master side.
3776
3777   """
3778   def __init__(self, hooks_base_dir=None):
3779     """Constructor for hooks runner.
3780
3781     @type hooks_base_dir: str or None
3782     @param hooks_base_dir: if not None, this overrides the
3783         L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3784
3785     """
3786     if hooks_base_dir is None:
3787       hooks_base_dir = pathutils.HOOKS_BASE_DIR
3788     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3789     # constant
3790     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3791
3792   def RunLocalHooks(self, node_list, hpath, phase, env):
3793     """Check that the hooks will be run only locally and then run them.
3794
3795     """
3796     assert len(node_list) == 1
3797     node = node_list[0]
3798     _, myself = ssconf.GetMasterAndMyself()
3799     assert node == myself
3800
3801     results = self.RunHooks(hpath, phase, env)
3802
3803     # Return values in the form expected by HooksMaster
3804     return {node: (None, False, results)}
3805
3806   def RunHooks(self, hpath, phase, env):
3807     """Run the scripts in the hooks directory.
3808
3809     @type hpath: str
3810     @param hpath: the path to the hooks directory which
3811         holds the scripts
3812     @type phase: str
3813     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3814         L{constants.HOOKS_PHASE_POST}
3815     @type env: dict
3816     @param env: dictionary with the environment for the hook
3817     @rtype: list
3818     @return: list of 3-element tuples:
3819       - script path
3820       - script result, either L{constants.HKR_SUCCESS} or
3821         L{constants.HKR_FAIL}
3822       - output of the script
3823
3824     @raise errors.ProgrammerError: for invalid input
3825         parameters
3826
3827     """
3828     if phase == constants.HOOKS_PHASE_PRE:
3829       suffix = "pre"
3830     elif phase == constants.HOOKS_PHASE_POST:
3831       suffix = "post"
3832     else:
3833       _Fail("Unknown hooks phase '%s'", phase)
3834
3835     subdir = "%s-%s.d" % (hpath, suffix)
3836     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3837
3838     results = []
3839
3840     if not os.path.isdir(dir_name):
3841       # for non-existing/non-dirs, we simply exit instead of logging a
3842       # warning at every operation
3843       return results
3844
3845     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3846
3847     for (relname, relstatus, runresult) in runparts_results:
3848       if relstatus == constants.RUNPARTS_SKIP:
3849         rrval = constants.HKR_SKIP
3850         output = ""
3851       elif relstatus == constants.RUNPARTS_ERR:
3852         rrval = constants.HKR_FAIL
3853         output = "Hook script execution error: %s" % runresult
3854       elif relstatus == constants.RUNPARTS_RUN:
3855         if runresult.failed:
3856           rrval = constants.HKR_FAIL
3857         else:
3858           rrval = constants.HKR_SUCCESS
3859         output = utils.SafeEncode(runresult.output.strip())
3860       results.append(("%s/%s" % (subdir, relname), rrval, output))
3861
3862     return results
3863
3864
3865 class IAllocatorRunner(object):
3866   """IAllocator runner.
3867
3868   This class is instantiated on the node side (ganeti-noded) and not on
3869   the master side.
3870
3871   """
3872   @staticmethod
3873   def Run(name, idata):
3874     """Run an iallocator script.
3875
3876     @type name: str
3877     @param name: the iallocator script name
3878     @type idata: str
3879     @param idata: the allocator input data
3880
3881     @rtype: tuple
3882     @return: two element tuple of:
3883        - status
3884        - either error message or stdout of allocator (for success)
3885
3886     """
3887     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3888                                   os.path.isfile)
3889     if alloc_script is None:
3890       _Fail("iallocator module '%s' not found in the search path", name)
3891
3892     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3893     try:
3894       os.write(fd, idata)
3895       os.close(fd)
3896       result = utils.RunCmd([alloc_script, fin_name])
3897       if result.failed:
3898         _Fail("iallocator module '%s' failed: %s, output '%s'",
3899               name, result.fail_reason, result.output)
3900     finally:
3901       os.unlink(fin_name)
3902
3903     return result.stdout
3904
3905
3906 class DevCacheManager(object):
3907   """Simple class for managing a cache of block device information.
3908
3909   """
3910   _DEV_PREFIX = "/dev/"
3911   _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3912
3913   @classmethod
3914   def _ConvertPath(cls, dev_path):
3915     """Converts a /dev/name path to the cache file name.
3916
3917     This replaces slashes with underscores and strips the /dev
3918     prefix. It then returns the full path to the cache file.
3919
3920     @type dev_path: str
3921     @param dev_path: the C{/dev/} path name
3922     @rtype: str
3923     @return: the converted path name
3924
3925     """
3926     if dev_path.startswith(cls._DEV_PREFIX):
3927       dev_path = dev_path[len(cls._DEV_PREFIX):]
3928     dev_path = dev_path.replace("/", "_")
3929     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3930     return fpath
3931
3932   @classmethod
3933   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3934     """Updates the cache information for a given device.
3935
3936     @type dev_path: str
3937     @param dev_path: the pathname of the device
3938     @type owner: str
3939     @param owner: the owner (instance name) of the device
3940     @type on_primary: bool
3941     @param on_primary: whether this is the primary
3942         node nor not
3943     @type iv_name: str
3944     @param iv_name: the instance-visible name of the
3945         device, as in objects.Disk.iv_name
3946
3947     @rtype: None
3948
3949     """
3950     if dev_path is None:
3951       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3952       return
3953     fpath = cls._ConvertPath(dev_path)
3954     if on_primary:
3955       state = "primary"
3956     else:
3957       state = "secondary"
3958     if iv_name is None:
3959       iv_name = "not_visible"
3960     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3961     try:
3962       utils.WriteFile(fpath, data=fdata)
3963     except EnvironmentError, err:
3964       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3965
3966   @classmethod
3967   def RemoveCache(cls, dev_path):
3968     """Remove data for a dev_path.
3969
3970     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3971     path name and logging.
3972
3973     @type dev_path: str
3974     @param dev_path: the pathname of the device
3975
3976     @rtype: None
3977
3978     """
3979     if dev_path is None:
3980       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3981       return
3982     fpath = cls._ConvertPath(dev_path)
3983     try:
3984       utils.RemoveFile(fpath)
3985     except EnvironmentError, err:
3986       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)