Change master IP address RPCs for external script
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65
66
67 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
68 _ALLOWED_CLEAN_DIRS = frozenset([
69   constants.DATA_DIR,
70   constants.JOB_QUEUE_ARCHIVE_DIR,
71   constants.QUEUE_DIR,
72   constants.CRYPTO_KEYS_DIR,
73   ])
74 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
75 _X509_KEY_FILE = "key"
76 _X509_CERT_FILE = "cert"
77 _IES_STATUS_FILE = "status"
78 _IES_PID_FILE = "pid"
79 _IES_CA_FILE = "ca"
80
81 #: Valid LVS output line regex
82 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
83
84
85 class RPCFail(Exception):
86   """Class denoting RPC failure.
87
88   Its argument is the error message.
89
90   """
91
92
93 def _Fail(msg, *args, **kwargs):
94   """Log an error and the raise an RPCFail exception.
95
96   This exception is then handled specially in the ganeti daemon and
97   turned into a 'failed' return type. As such, this function is a
98   useful shortcut for logging the error and returning it to the master
99   daemon.
100
101   @type msg: string
102   @param msg: the text of the exception
103   @raise RPCFail
104
105   """
106   if args:
107     msg = msg % args
108   if "log" not in kwargs or kwargs["log"]: # if we should log this error
109     if "exc" in kwargs and kwargs["exc"]:
110       logging.exception(msg)
111     else:
112       logging.error(msg)
113   raise RPCFail(msg)
114
115
116 def _GetConfig():
117   """Simple wrapper to return a SimpleStore.
118
119   @rtype: L{ssconf.SimpleStore}
120   @return: a SimpleStore instance
121
122   """
123   return ssconf.SimpleStore()
124
125
126 def _GetSshRunner(cluster_name):
127   """Simple wrapper to return an SshRunner.
128
129   @type cluster_name: str
130   @param cluster_name: the cluster name, which is needed
131       by the SshRunner constructor
132   @rtype: L{ssh.SshRunner}
133   @return: an SshRunner instance
134
135   """
136   return ssh.SshRunner(cluster_name)
137
138
139 def _Decompress(data):
140   """Unpacks data compressed by the RPC client.
141
142   @type data: list or tuple
143   @param data: Data sent by RPC client
144   @rtype: str
145   @return: Decompressed data
146
147   """
148   assert isinstance(data, (list, tuple))
149   assert len(data) == 2
150   (encoding, content) = data
151   if encoding == constants.RPC_ENCODING_NONE:
152     return content
153   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
154     return zlib.decompress(base64.b64decode(content))
155   else:
156     raise AssertionError("Unknown data encoding")
157
158
159 def _CleanDirectory(path, exclude=None):
160   """Removes all regular files in a directory.
161
162   @type path: str
163   @param path: the directory to clean
164   @type exclude: list
165   @param exclude: list of files to be excluded, defaults
166       to the empty list
167
168   """
169   if path not in _ALLOWED_CLEAN_DIRS:
170     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
171           path)
172
173   if not os.path.isdir(path):
174     return
175   if exclude is None:
176     exclude = []
177   else:
178     # Normalize excluded paths
179     exclude = [os.path.normpath(i) for i in exclude]
180
181   for rel_name in utils.ListVisibleFiles(path):
182     full_name = utils.PathJoin(path, rel_name)
183     if full_name in exclude:
184       continue
185     if os.path.isfile(full_name) and not os.path.islink(full_name):
186       utils.RemoveFile(full_name)
187
188
189 def _BuildUploadFileList():
190   """Build the list of allowed upload files.
191
192   This is abstracted so that it's built only once at module import time.
193
194   """
195   allowed_files = set([
196     constants.CLUSTER_CONF_FILE,
197     constants.ETC_HOSTS,
198     constants.SSH_KNOWN_HOSTS_FILE,
199     constants.VNC_PASSWORD_FILE,
200     constants.RAPI_CERT_FILE,
201     constants.SPICE_CERT_FILE,
202     constants.SPICE_CACERT_FILE,
203     constants.RAPI_USERS_FILE,
204     constants.CONFD_HMAC_KEY,
205     constants.CLUSTER_DOMAIN_SECRET_FILE,
206     ])
207
208   for hv_name in constants.HYPER_TYPES:
209     hv_class = hypervisor.GetHypervisorClass(hv_name)
210     allowed_files.update(hv_class.GetAncillaryFiles()[0])
211
212   return frozenset(allowed_files)
213
214
215 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
216
217
218 def JobQueuePurge():
219   """Removes job queue files and archived jobs.
220
221   @rtype: tuple
222   @return: True, None
223
224   """
225   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
226   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
227
228
229 def GetMasterInfo():
230   """Returns master information.
231
232   This is an utility function to compute master information, either
233   for consumption here or from the node daemon.
234
235   @rtype: tuple
236   @return: master_netdev, master_ip, master_name, primary_ip_family,
237     master_netmask
238   @raise RPCFail: in case of errors
239
240   """
241   try:
242     cfg = _GetConfig()
243     master_netdev = cfg.GetMasterNetdev()
244     master_ip = cfg.GetMasterIP()
245     master_netmask = cfg.GetMasterNetmask()
246     master_node = cfg.GetMasterNode()
247     primary_ip_family = cfg.GetPrimaryIPFamily()
248   except errors.ConfigurationError, err:
249     _Fail("Cluster configuration incomplete: %s", err, exc=True)
250   return (master_netdev, master_ip, master_node, primary_ip_family,
251       master_netmask)
252
253
254 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
255   """Decorator that runs hooks before and after the decorated function.
256
257   @type hook_opcode: string
258   @param hook_opcode: opcode of the hook
259   @type hooks_path: string
260   @param hooks_path: path of the hooks
261   @type env_builder_fn: function
262   @param env_builder_fn: function that returns a dictionary containing the
263     environment variables for the hooks. Will get all the parameters of the
264     decorated function.
265   @raise RPCFail: in case of pre-hook failure
266
267   """
268   def decorator(fn):
269     def wrapper(*args, **kwargs):
270       _, myself = ssconf.GetMasterAndMyself()
271       nodes = ([myself], [myself])  # these hooks run locally
272
273       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
274
275       cfg = _GetConfig()
276       hr = HooksRunner()
277       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
278                             None, env_fn, logging.warning, cfg.GetClusterName(),
279                             cfg.GetMasterNode())
280
281       hm.RunPhase(constants.HOOKS_PHASE_PRE)
282       result = fn(*args, **kwargs)
283       hm.RunPhase(constants.HOOKS_PHASE_POST)
284
285       return result
286     return wrapper
287   return decorator
288
289
290 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
291   """Builds environment variables for master IP hooks.
292
293   @type master_params: L{objects.MasterNetworkParameters}
294   @param master_params: network parameters of the master
295   @type use_external_mip_script: boolean
296   @param use_external_mip_script: whether to use an external master IP
297     address setup script (unused, but necessary per the implementation of the
298     _RunLocalHooks decorator)
299
300   """
301   # pylint: disable=W0613
302   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
303   env = {
304     "MASTER_NETDEV": master_params.netdev,
305     "MASTER_IP": master_params.ip,
306     "MASTER_NETMASK": master_params.netmask,
307     "CLUSTER_IP_VERSION": str(ver),
308   }
309
310   return env
311
312
313 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
314                _BuildMasterIpEnv)
315 def ActivateMasterIp(master_params, use_external_mip_script):
316   """Activate the IP address of the master daemon.
317
318   @type master_params: L{objects.MasterNetworkParameters}
319   @param master_params: network parameters of the master
320   @type use_external_mip_script: boolean
321   @param use_external_mip_script: whether to use an external master IP
322     address setup script
323
324   """
325   # pylint: disable=W0613
326   err_msg = None
327   if netutils.TcpPing(master_params.ip, constants.DEFAULT_NODED_PORT):
328     if netutils.IPAddress.Own(master_params.ip):
329       # we already have the ip:
330       logging.debug("Master IP already configured, doing nothing")
331     else:
332       err_msg = "Someone else has the master ip, not activating"
333       logging.error(err_msg)
334   else:
335     ipcls = netutils.IPAddress.GetClassFromIpFamily(master_params.ip_family)
336
337     result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
338                            "%s/%s" % (master_params.ip, master_params.netmask),
339                            "dev", master_params.netdev, "label",
340                            "%s:0" % master_params.netdev])
341     if result.failed:
342       err_msg = "Can't activate master IP: %s" % result.output
343       logging.error(err_msg)
344
345     else:
346       # we ignore the exit code of the following cmds
347       if ipcls == netutils.IP4Address:
348         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_params.netdev,
349                       "-s", master_params.ip, master_params.ip])
350       elif ipcls == netutils.IP6Address:
351         try:
352           utils.RunCmd(["ndisc6", "-q", "-r 3", master_params.ip,
353                         master_params.netdev])
354         except errors.OpExecError:
355           # TODO: Better error reporting
356           logging.warning("Can't execute ndisc6, please install if missing")
357
358   if err_msg:
359     _Fail(err_msg)
360
361
362 def StartMasterDaemons(no_voting):
363   """Activate local node as master node.
364
365   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
366
367   @type no_voting: boolean
368   @param no_voting: whether to start ganeti-masterd without a node vote
369       but still non-interactively
370   @rtype: None
371
372   """
373
374   if no_voting:
375     masterd_args = "--no-voting --yes-do-it"
376   else:
377     masterd_args = ""
378
379   env = {
380     "EXTRA_MASTERD_ARGS": masterd_args,
381     }
382
383   result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
384   if result.failed:
385     msg = "Can't start Ganeti master: %s" % result.output
386     logging.error(msg)
387     _Fail(msg)
388
389
390 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
391                _BuildMasterIpEnv)
392 def DeactivateMasterIp(master_params, use_external_mip_script):
393   """Deactivate the master IP on this node.
394
395   @type master_params: L{objects.MasterNetworkParameters}
396   @param master_params: network parameters of the master
397   @type use_external_mip_script: boolean
398   @param use_external_mip_script: whether to use an external master IP
399     address setup script
400
401   """
402   # pylint: disable=W0613
403   # TODO: log and report back to the caller the error failures; we
404   # need to decide in which case we fail the RPC for this
405
406   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
407                          "%s/%s" % (master_params.ip, master_params.netmask),
408                          "dev", master_params.netdev])
409   if result.failed:
410     logging.error("Can't remove the master IP, error: %s", result.output)
411     # but otherwise ignore the failure
412
413
414 def StopMasterDaemons():
415   """Stop the master daemons on this node.
416
417   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
418
419   @rtype: None
420
421   """
422   # TODO: log and report back to the caller the error failures; we
423   # need to decide in which case we fail the RPC for this
424
425   result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
426   if result.failed:
427     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
428                   " and error %s",
429                   result.cmd, result.exit_code, result.output)
430
431
432 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
433   """Change the netmask of the master IP.
434
435   @param old_netmask: the old value of the netmask
436   @param netmask: the new value of the netmask
437   @param master_ip: the master IP
438   @param master_netdev: the master network device
439
440   """
441   if old_netmask == netmask:
442     return
443
444   if not netutils.IPAddress.Own(master_ip):
445     _Fail("The master IP address is not up, not attempting to change its"
446           " netmask")
447
448   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
449                          "%s/%s" % (master_ip, netmask),
450                          "dev", master_netdev, "label",
451                          "%s:0" % master_netdev])
452   if result.failed:
453     _Fail("Could not set the new netmask on the master IP address")
454
455   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
456                          "%s/%s" % (master_ip, old_netmask),
457                          "dev", master_netdev, "label",
458                          "%s:0" % master_netdev])
459   if result.failed:
460     _Fail("Could not bring down the master IP address with the old netmask")
461
462
463 def EtcHostsModify(mode, host, ip):
464   """Modify a host entry in /etc/hosts.
465
466   @param mode: The mode to operate. Either add or remove entry
467   @param host: The host to operate on
468   @param ip: The ip associated with the entry
469
470   """
471   if mode == constants.ETC_HOSTS_ADD:
472     if not ip:
473       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
474               " present")
475     utils.AddHostToEtcHosts(host, ip)
476   elif mode == constants.ETC_HOSTS_REMOVE:
477     if ip:
478       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
479               " parameter is present")
480     utils.RemoveHostFromEtcHosts(host)
481   else:
482     RPCFail("Mode not supported")
483
484
485 def LeaveCluster(modify_ssh_setup):
486   """Cleans up and remove the current node.
487
488   This function cleans up and prepares the current node to be removed
489   from the cluster.
490
491   If processing is successful, then it raises an
492   L{errors.QuitGanetiException} which is used as a special case to
493   shutdown the node daemon.
494
495   @param modify_ssh_setup: boolean
496
497   """
498   _CleanDirectory(constants.DATA_DIR)
499   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
500   JobQueuePurge()
501
502   if modify_ssh_setup:
503     try:
504       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
505
506       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
507
508       utils.RemoveFile(priv_key)
509       utils.RemoveFile(pub_key)
510     except errors.OpExecError:
511       logging.exception("Error while processing ssh files")
512
513   try:
514     utils.RemoveFile(constants.CONFD_HMAC_KEY)
515     utils.RemoveFile(constants.RAPI_CERT_FILE)
516     utils.RemoveFile(constants.SPICE_CERT_FILE)
517     utils.RemoveFile(constants.SPICE_CACERT_FILE)
518     utils.RemoveFile(constants.NODED_CERT_FILE)
519   except: # pylint: disable=W0702
520     logging.exception("Error while removing cluster secrets")
521
522   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
523   if result.failed:
524     logging.error("Command %s failed with exitcode %s and error %s",
525                   result.cmd, result.exit_code, result.output)
526
527   # Raise a custom exception (handled in ganeti-noded)
528   raise errors.QuitGanetiException(True, "Shutdown scheduled")
529
530
531 def GetNodeInfo(vgname, hypervisor_type):
532   """Gives back a hash with different information about the node.
533
534   @type vgname: C{string}
535   @param vgname: the name of the volume group to ask for disk space information
536   @type hypervisor_type: C{str}
537   @param hypervisor_type: the name of the hypervisor to ask for
538       memory information
539   @rtype: C{dict}
540   @return: dictionary with the following keys:
541       - vg_size is the size of the configured volume group in MiB
542       - vg_free is the free size of the volume group in MiB
543       - memory_dom0 is the memory allocated for domain0 in MiB
544       - memory_free is the currently available (free) ram in MiB
545       - memory_total is the total number of ram in MiB
546       - hv_version: the hypervisor version, if available
547
548   """
549   outputarray = {}
550
551   if vgname is not None:
552     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
553     vg_free = vg_size = None
554     if vginfo:
555       vg_free = int(round(vginfo[0][0], 0))
556       vg_size = int(round(vginfo[0][1], 0))
557     outputarray["vg_size"] = vg_size
558     outputarray["vg_free"] = vg_free
559
560   if hypervisor_type is not None:
561     hyper = hypervisor.GetHypervisor(hypervisor_type)
562     hyp_info = hyper.GetNodeInfo()
563     if hyp_info is not None:
564       outputarray.update(hyp_info)
565
566   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
567
568   return outputarray
569
570
571 def VerifyNode(what, cluster_name):
572   """Verify the status of the local node.
573
574   Based on the input L{what} parameter, various checks are done on the
575   local node.
576
577   If the I{filelist} key is present, this list of
578   files is checksummed and the file/checksum pairs are returned.
579
580   If the I{nodelist} key is present, we check that we have
581   connectivity via ssh with the target nodes (and check the hostname
582   report).
583
584   If the I{node-net-test} key is present, we check that we have
585   connectivity to the given nodes via both primary IP and, if
586   applicable, secondary IPs.
587
588   @type what: C{dict}
589   @param what: a dictionary of things to check:
590       - filelist: list of files for which to compute checksums
591       - nodelist: list of nodes we should check ssh communication with
592       - node-net-test: list of nodes we should check node daemon port
593         connectivity with
594       - hypervisor: list with hypervisors to run the verify for
595   @rtype: dict
596   @return: a dictionary with the same keys as the input dict, and
597       values representing the result of the checks
598
599   """
600   result = {}
601   my_name = netutils.Hostname.GetSysName()
602   port = netutils.GetDaemonPort(constants.NODED)
603   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
604
605   if constants.NV_HYPERVISOR in what and vm_capable:
606     result[constants.NV_HYPERVISOR] = tmp = {}
607     for hv_name in what[constants.NV_HYPERVISOR]:
608       try:
609         val = hypervisor.GetHypervisor(hv_name).Verify()
610       except errors.HypervisorError, err:
611         val = "Error while checking hypervisor: %s" % str(err)
612       tmp[hv_name] = val
613
614   if constants.NV_HVPARAMS in what and vm_capable:
615     result[constants.NV_HVPARAMS] = tmp = []
616     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
617       try:
618         logging.info("Validating hv %s, %s", hv_name, hvparms)
619         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
620       except errors.HypervisorError, err:
621         tmp.append((source, hv_name, str(err)))
622
623   if constants.NV_FILELIST in what:
624     result[constants.NV_FILELIST] = utils.FingerprintFiles(
625       what[constants.NV_FILELIST])
626
627   if constants.NV_NODELIST in what:
628     (nodes, bynode) = what[constants.NV_NODELIST]
629
630     # Add nodes from other groups (different for each node)
631     try:
632       nodes.extend(bynode[my_name])
633     except KeyError:
634       pass
635
636     # Use a random order
637     random.shuffle(nodes)
638
639     # Try to contact all nodes
640     val = {}
641     for node in nodes:
642       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
643       if not success:
644         val[node] = message
645
646     result[constants.NV_NODELIST] = val
647
648   if constants.NV_NODENETTEST in what:
649     result[constants.NV_NODENETTEST] = tmp = {}
650     my_pip = my_sip = None
651     for name, pip, sip in what[constants.NV_NODENETTEST]:
652       if name == my_name:
653         my_pip = pip
654         my_sip = sip
655         break
656     if not my_pip:
657       tmp[my_name] = ("Can't find my own primary/secondary IP"
658                       " in the node list")
659     else:
660       for name, pip, sip in what[constants.NV_NODENETTEST]:
661         fail = []
662         if not netutils.TcpPing(pip, port, source=my_pip):
663           fail.append("primary")
664         if sip != pip:
665           if not netutils.TcpPing(sip, port, source=my_sip):
666             fail.append("secondary")
667         if fail:
668           tmp[name] = ("failure using the %s interface(s)" %
669                        " and ".join(fail))
670
671   if constants.NV_MASTERIP in what:
672     # FIXME: add checks on incoming data structures (here and in the
673     # rest of the function)
674     master_name, master_ip = what[constants.NV_MASTERIP]
675     if master_name == my_name:
676       source = constants.IP4_ADDRESS_LOCALHOST
677     else:
678       source = None
679     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
680                                                   source=source)
681
682   if constants.NV_USERSCRIPTS in what:
683     result[constants.NV_USERSCRIPTS] = \
684       [script for script in what[constants.NV_USERSCRIPTS]
685        if not (os.path.exists(script) and os.access(script, os.X_OK))]
686
687   if constants.NV_OOB_PATHS in what:
688     result[constants.NV_OOB_PATHS] = tmp = []
689     for path in what[constants.NV_OOB_PATHS]:
690       try:
691         st = os.stat(path)
692       except OSError, err:
693         tmp.append("error stating out of band helper: %s" % err)
694       else:
695         if stat.S_ISREG(st.st_mode):
696           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
697             tmp.append(None)
698           else:
699             tmp.append("out of band helper %s is not executable" % path)
700         else:
701           tmp.append("out of band helper %s is not a file" % path)
702
703   if constants.NV_LVLIST in what and vm_capable:
704     try:
705       val = GetVolumeList(utils.ListVolumeGroups().keys())
706     except RPCFail, err:
707       val = str(err)
708     result[constants.NV_LVLIST] = val
709
710   if constants.NV_INSTANCELIST in what and vm_capable:
711     # GetInstanceList can fail
712     try:
713       val = GetInstanceList(what[constants.NV_INSTANCELIST])
714     except RPCFail, err:
715       val = str(err)
716     result[constants.NV_INSTANCELIST] = val
717
718   if constants.NV_VGLIST in what and vm_capable:
719     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
720
721   if constants.NV_PVLIST in what and vm_capable:
722     result[constants.NV_PVLIST] = \
723       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
724                                    filter_allocatable=False)
725
726   if constants.NV_VERSION in what:
727     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
728                                     constants.RELEASE_VERSION)
729
730   if constants.NV_HVINFO in what and vm_capable:
731     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
732     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
733
734   if constants.NV_DRBDLIST in what and vm_capable:
735     try:
736       used_minors = bdev.DRBD8.GetUsedDevs().keys()
737     except errors.BlockDeviceError, err:
738       logging.warning("Can't get used minors list", exc_info=True)
739       used_minors = str(err)
740     result[constants.NV_DRBDLIST] = used_minors
741
742   if constants.NV_DRBDHELPER in what and vm_capable:
743     status = True
744     try:
745       payload = bdev.BaseDRBD.GetUsermodeHelper()
746     except errors.BlockDeviceError, err:
747       logging.error("Can't get DRBD usermode helper: %s", str(err))
748       status = False
749       payload = str(err)
750     result[constants.NV_DRBDHELPER] = (status, payload)
751
752   if constants.NV_NODESETUP in what:
753     result[constants.NV_NODESETUP] = tmpr = []
754     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
755       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
756                   " under /sys, missing required directories /sys/block"
757                   " and /sys/class/net")
758     if (not os.path.isdir("/proc/sys") or
759         not os.path.isfile("/proc/sysrq-trigger")):
760       tmpr.append("The procfs filesystem doesn't seem to be mounted"
761                   " under /proc, missing required directory /proc/sys and"
762                   " the file /proc/sysrq-trigger")
763
764   if constants.NV_TIME in what:
765     result[constants.NV_TIME] = utils.SplitTime(time.time())
766
767   if constants.NV_OSLIST in what and vm_capable:
768     result[constants.NV_OSLIST] = DiagnoseOS()
769
770   if constants.NV_BRIDGES in what and vm_capable:
771     result[constants.NV_BRIDGES] = [bridge
772                                     for bridge in what[constants.NV_BRIDGES]
773                                     if not utils.BridgeExists(bridge)]
774   return result
775
776
777 def GetBlockDevSizes(devices):
778   """Return the size of the given block devices
779
780   @type devices: list
781   @param devices: list of block device nodes to query
782   @rtype: dict
783   @return:
784     dictionary of all block devices under /dev (key). The value is their
785     size in MiB.
786
787     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
788
789   """
790   DEV_PREFIX = "/dev/"
791   blockdevs = {}
792
793   for devpath in devices:
794     if not utils.IsBelowDir(DEV_PREFIX, devpath):
795       continue
796
797     try:
798       st = os.stat(devpath)
799     except EnvironmentError, err:
800       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
801       continue
802
803     if stat.S_ISBLK(st.st_mode):
804       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
805       if result.failed:
806         # We don't want to fail, just do not list this device as available
807         logging.warning("Cannot get size for block device %s", devpath)
808         continue
809
810       size = int(result.stdout) / (1024 * 1024)
811       blockdevs[devpath] = size
812   return blockdevs
813
814
815 def GetVolumeList(vg_names):
816   """Compute list of logical volumes and their size.
817
818   @type vg_names: list
819   @param vg_names: the volume groups whose LVs we should list, or
820       empty for all volume groups
821   @rtype: dict
822   @return:
823       dictionary of all partions (key) with value being a tuple of
824       their size (in MiB), inactive and online status::
825
826         {'xenvg/test1': ('20.06', True, True)}
827
828       in case of errors, a string is returned with the error
829       details.
830
831   """
832   lvs = {}
833   sep = "|"
834   if not vg_names:
835     vg_names = []
836   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
837                          "--separator=%s" % sep,
838                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
839   if result.failed:
840     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
841
842   for line in result.stdout.splitlines():
843     line = line.strip()
844     match = _LVSLINE_REGEX.match(line)
845     if not match:
846       logging.error("Invalid line returned from lvs output: '%s'", line)
847       continue
848     vg_name, name, size, attr = match.groups()
849     inactive = attr[4] == "-"
850     online = attr[5] == "o"
851     virtual = attr[0] == "v"
852     if virtual:
853       # we don't want to report such volumes as existing, since they
854       # don't really hold data
855       continue
856     lvs[vg_name + "/" + name] = (size, inactive, online)
857
858   return lvs
859
860
861 def ListVolumeGroups():
862   """List the volume groups and their size.
863
864   @rtype: dict
865   @return: dictionary with keys volume name and values the
866       size of the volume
867
868   """
869   return utils.ListVolumeGroups()
870
871
872 def NodeVolumes():
873   """List all volumes on this node.
874
875   @rtype: list
876   @return:
877     A list of dictionaries, each having four keys:
878       - name: the logical volume name,
879       - size: the size of the logical volume
880       - dev: the physical device on which the LV lives
881       - vg: the volume group to which it belongs
882
883     In case of errors, we return an empty list and log the
884     error.
885
886     Note that since a logical volume can live on multiple physical
887     volumes, the resulting list might include a logical volume
888     multiple times.
889
890   """
891   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
892                          "--separator=|",
893                          "--options=lv_name,lv_size,devices,vg_name"])
894   if result.failed:
895     _Fail("Failed to list logical volumes, lvs output: %s",
896           result.output)
897
898   def parse_dev(dev):
899     return dev.split("(")[0]
900
901   def handle_dev(dev):
902     return [parse_dev(x) for x in dev.split(",")]
903
904   def map_line(line):
905     line = [v.strip() for v in line]
906     return [{"name": line[0], "size": line[1],
907              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
908
909   all_devs = []
910   for line in result.stdout.splitlines():
911     if line.count("|") >= 3:
912       all_devs.extend(map_line(line.split("|")))
913     else:
914       logging.warning("Strange line in the output from lvs: '%s'", line)
915   return all_devs
916
917
918 def BridgesExist(bridges_list):
919   """Check if a list of bridges exist on the current node.
920
921   @rtype: boolean
922   @return: C{True} if all of them exist, C{False} otherwise
923
924   """
925   missing = []
926   for bridge in bridges_list:
927     if not utils.BridgeExists(bridge):
928       missing.append(bridge)
929
930   if missing:
931     _Fail("Missing bridges %s", utils.CommaJoin(missing))
932
933
934 def GetInstanceList(hypervisor_list):
935   """Provides a list of instances.
936
937   @type hypervisor_list: list
938   @param hypervisor_list: the list of hypervisors to query information
939
940   @rtype: list
941   @return: a list of all running instances on the current node
942     - instance1.example.com
943     - instance2.example.com
944
945   """
946   results = []
947   for hname in hypervisor_list:
948     try:
949       names = hypervisor.GetHypervisor(hname).ListInstances()
950       results.extend(names)
951     except errors.HypervisorError, err:
952       _Fail("Error enumerating instances (hypervisor %s): %s",
953             hname, err, exc=True)
954
955   return results
956
957
958 def GetInstanceInfo(instance, hname):
959   """Gives back the information about an instance as a dictionary.
960
961   @type instance: string
962   @param instance: the instance name
963   @type hname: string
964   @param hname: the hypervisor type of the instance
965
966   @rtype: dict
967   @return: dictionary with the following keys:
968       - memory: memory size of instance (int)
969       - state: xen state of instance (string)
970       - time: cpu time of instance (float)
971
972   """
973   output = {}
974
975   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
976   if iinfo is not None:
977     output["memory"] = iinfo[2]
978     output["state"] = iinfo[4]
979     output["time"] = iinfo[5]
980
981   return output
982
983
984 def GetInstanceMigratable(instance):
985   """Gives whether an instance can be migrated.
986
987   @type instance: L{objects.Instance}
988   @param instance: object representing the instance to be checked.
989
990   @rtype: tuple
991   @return: tuple of (result, description) where:
992       - result: whether the instance can be migrated or not
993       - description: a description of the issue, if relevant
994
995   """
996   hyper = hypervisor.GetHypervisor(instance.hypervisor)
997   iname = instance.name
998   if iname not in hyper.ListInstances():
999     _Fail("Instance %s is not running", iname)
1000
1001   for idx in range(len(instance.disks)):
1002     link_name = _GetBlockDevSymlinkPath(iname, idx)
1003     if not os.path.islink(link_name):
1004       logging.warning("Instance %s is missing symlink %s for disk %d",
1005                       iname, link_name, idx)
1006
1007
1008 def GetAllInstancesInfo(hypervisor_list):
1009   """Gather data about all instances.
1010
1011   This is the equivalent of L{GetInstanceInfo}, except that it
1012   computes data for all instances at once, thus being faster if one
1013   needs data about more than one instance.
1014
1015   @type hypervisor_list: list
1016   @param hypervisor_list: list of hypervisors to query for instance data
1017
1018   @rtype: dict
1019   @return: dictionary of instance: data, with data having the following keys:
1020       - memory: memory size of instance (int)
1021       - state: xen state of instance (string)
1022       - time: cpu time of instance (float)
1023       - vcpus: the number of vcpus
1024
1025   """
1026   output = {}
1027
1028   for hname in hypervisor_list:
1029     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1030     if iinfo:
1031       for name, _, memory, vcpus, state, times in iinfo:
1032         value = {
1033           "memory": memory,
1034           "vcpus": vcpus,
1035           "state": state,
1036           "time": times,
1037           }
1038         if name in output:
1039           # we only check static parameters, like memory and vcpus,
1040           # and not state and time which can change between the
1041           # invocations of the different hypervisors
1042           for key in "memory", "vcpus":
1043             if value[key] != output[name][key]:
1044               _Fail("Instance %s is running twice"
1045                     " with different parameters", name)
1046         output[name] = value
1047
1048   return output
1049
1050
1051 def _InstanceLogName(kind, os_name, instance, component):
1052   """Compute the OS log filename for a given instance and operation.
1053
1054   The instance name and os name are passed in as strings since not all
1055   operations have these as part of an instance object.
1056
1057   @type kind: string
1058   @param kind: the operation type (e.g. add, import, etc.)
1059   @type os_name: string
1060   @param os_name: the os name
1061   @type instance: string
1062   @param instance: the name of the instance being imported/added/etc.
1063   @type component: string or None
1064   @param component: the name of the component of the instance being
1065       transferred
1066
1067   """
1068   # TODO: Use tempfile.mkstemp to create unique filename
1069   if component:
1070     assert "/" not in component
1071     c_msg = "-%s" % component
1072   else:
1073     c_msg = ""
1074   base = ("%s-%s-%s%s-%s.log" %
1075           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1076   return utils.PathJoin(constants.LOG_OS_DIR, base)
1077
1078
1079 def InstanceOsAdd(instance, reinstall, debug):
1080   """Add an OS to an instance.
1081
1082   @type instance: L{objects.Instance}
1083   @param instance: Instance whose OS is to be installed
1084   @type reinstall: boolean
1085   @param reinstall: whether this is an instance reinstall
1086   @type debug: integer
1087   @param debug: debug level, passed to the OS scripts
1088   @rtype: None
1089
1090   """
1091   inst_os = OSFromDisk(instance.os)
1092
1093   create_env = OSEnvironment(instance, inst_os, debug)
1094   if reinstall:
1095     create_env["INSTANCE_REINSTALL"] = "1"
1096
1097   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1098
1099   result = utils.RunCmd([inst_os.create_script], env=create_env,
1100                         cwd=inst_os.path, output=logfile, reset_env=True)
1101   if result.failed:
1102     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1103                   " output: %s", result.cmd, result.fail_reason, logfile,
1104                   result.output)
1105     lines = [utils.SafeEncode(val)
1106              for val in utils.TailFile(logfile, lines=20)]
1107     _Fail("OS create script failed (%s), last lines in the"
1108           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1109
1110
1111 def RunRenameInstance(instance, old_name, debug):
1112   """Run the OS rename script for an instance.
1113
1114   @type instance: L{objects.Instance}
1115   @param instance: Instance whose OS is to be installed
1116   @type old_name: string
1117   @param old_name: previous instance name
1118   @type debug: integer
1119   @param debug: debug level, passed to the OS scripts
1120   @rtype: boolean
1121   @return: the success of the operation
1122
1123   """
1124   inst_os = OSFromDisk(instance.os)
1125
1126   rename_env = OSEnvironment(instance, inst_os, debug)
1127   rename_env["OLD_INSTANCE_NAME"] = old_name
1128
1129   logfile = _InstanceLogName("rename", instance.os,
1130                              "%s-%s" % (old_name, instance.name), None)
1131
1132   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1133                         cwd=inst_os.path, output=logfile, reset_env=True)
1134
1135   if result.failed:
1136     logging.error("os create command '%s' returned error: %s output: %s",
1137                   result.cmd, result.fail_reason, result.output)
1138     lines = [utils.SafeEncode(val)
1139              for val in utils.TailFile(logfile, lines=20)]
1140     _Fail("OS rename script failed (%s), last lines in the"
1141           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1142
1143
1144 def _GetBlockDevSymlinkPath(instance_name, idx):
1145   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1146                         (instance_name, constants.DISK_SEPARATOR, idx))
1147
1148
1149 def _SymlinkBlockDev(instance_name, device_path, idx):
1150   """Set up symlinks to a instance's block device.
1151
1152   This is an auxiliary function run when an instance is start (on the primary
1153   node) or when an instance is migrated (on the target node).
1154
1155
1156   @param instance_name: the name of the target instance
1157   @param device_path: path of the physical block device, on the node
1158   @param idx: the disk index
1159   @return: absolute path to the disk's symlink
1160
1161   """
1162   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1163   try:
1164     os.symlink(device_path, link_name)
1165   except OSError, err:
1166     if err.errno == errno.EEXIST:
1167       if (not os.path.islink(link_name) or
1168           os.readlink(link_name) != device_path):
1169         os.remove(link_name)
1170         os.symlink(device_path, link_name)
1171     else:
1172       raise
1173
1174   return link_name
1175
1176
1177 def _RemoveBlockDevLinks(instance_name, disks):
1178   """Remove the block device symlinks belonging to the given instance.
1179
1180   """
1181   for idx, _ in enumerate(disks):
1182     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1183     if os.path.islink(link_name):
1184       try:
1185         os.remove(link_name)
1186       except OSError:
1187         logging.exception("Can't remove symlink '%s'", link_name)
1188
1189
1190 def _GatherAndLinkBlockDevs(instance):
1191   """Set up an instance's block device(s).
1192
1193   This is run on the primary node at instance startup. The block
1194   devices must be already assembled.
1195
1196   @type instance: L{objects.Instance}
1197   @param instance: the instance whose disks we shoul assemble
1198   @rtype: list
1199   @return: list of (disk_object, device_path)
1200
1201   """
1202   block_devices = []
1203   for idx, disk in enumerate(instance.disks):
1204     device = _RecursiveFindBD(disk)
1205     if device is None:
1206       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1207                                     str(disk))
1208     device.Open()
1209     try:
1210       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1211     except OSError, e:
1212       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1213                                     e.strerror)
1214
1215     block_devices.append((disk, link_name))
1216
1217   return block_devices
1218
1219
1220 def StartInstance(instance, startup_paused):
1221   """Start an instance.
1222
1223   @type instance: L{objects.Instance}
1224   @param instance: the instance object
1225   @type startup_paused: bool
1226   @param instance: pause instance at startup?
1227   @rtype: None
1228
1229   """
1230   running_instances = GetInstanceList([instance.hypervisor])
1231
1232   if instance.name in running_instances:
1233     logging.info("Instance %s already running, not starting", instance.name)
1234     return
1235
1236   try:
1237     block_devices = _GatherAndLinkBlockDevs(instance)
1238     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1239     hyper.StartInstance(instance, block_devices, startup_paused)
1240   except errors.BlockDeviceError, err:
1241     _Fail("Block device error: %s", err, exc=True)
1242   except errors.HypervisorError, err:
1243     _RemoveBlockDevLinks(instance.name, instance.disks)
1244     _Fail("Hypervisor error: %s", err, exc=True)
1245
1246
1247 def InstanceShutdown(instance, timeout):
1248   """Shut an instance down.
1249
1250   @note: this functions uses polling with a hardcoded timeout.
1251
1252   @type instance: L{objects.Instance}
1253   @param instance: the instance object
1254   @type timeout: integer
1255   @param timeout: maximum timeout for soft shutdown
1256   @rtype: None
1257
1258   """
1259   hv_name = instance.hypervisor
1260   hyper = hypervisor.GetHypervisor(hv_name)
1261   iname = instance.name
1262
1263   if instance.name not in hyper.ListInstances():
1264     logging.info("Instance %s not running, doing nothing", iname)
1265     return
1266
1267   class _TryShutdown:
1268     def __init__(self):
1269       self.tried_once = False
1270
1271     def __call__(self):
1272       if iname not in hyper.ListInstances():
1273         return
1274
1275       try:
1276         hyper.StopInstance(instance, retry=self.tried_once)
1277       except errors.HypervisorError, err:
1278         if iname not in hyper.ListInstances():
1279           # if the instance is no longer existing, consider this a
1280           # success and go to cleanup
1281           return
1282
1283         _Fail("Failed to stop instance %s: %s", iname, err)
1284
1285       self.tried_once = True
1286
1287       raise utils.RetryAgain()
1288
1289   try:
1290     utils.Retry(_TryShutdown(), 5, timeout)
1291   except utils.RetryTimeout:
1292     # the shutdown did not succeed
1293     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1294
1295     try:
1296       hyper.StopInstance(instance, force=True)
1297     except errors.HypervisorError, err:
1298       if iname in hyper.ListInstances():
1299         # only raise an error if the instance still exists, otherwise
1300         # the error could simply be "instance ... unknown"!
1301         _Fail("Failed to force stop instance %s: %s", iname, err)
1302
1303     time.sleep(1)
1304
1305     if iname in hyper.ListInstances():
1306       _Fail("Could not shutdown instance %s even by destroy", iname)
1307
1308   try:
1309     hyper.CleanupInstance(instance.name)
1310   except errors.HypervisorError, err:
1311     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1312
1313   _RemoveBlockDevLinks(iname, instance.disks)
1314
1315
1316 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1317   """Reboot an instance.
1318
1319   @type instance: L{objects.Instance}
1320   @param instance: the instance object to reboot
1321   @type reboot_type: str
1322   @param reboot_type: the type of reboot, one the following
1323     constants:
1324       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1325         instance OS, do not recreate the VM
1326       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1327         restart the VM (at the hypervisor level)
1328       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1329         not accepted here, since that mode is handled differently, in
1330         cmdlib, and translates into full stop and start of the
1331         instance (instead of a call_instance_reboot RPC)
1332   @type shutdown_timeout: integer
1333   @param shutdown_timeout: maximum timeout for soft shutdown
1334   @rtype: None
1335
1336   """
1337   running_instances = GetInstanceList([instance.hypervisor])
1338
1339   if instance.name not in running_instances:
1340     _Fail("Cannot reboot instance %s that is not running", instance.name)
1341
1342   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1343   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1344     try:
1345       hyper.RebootInstance(instance)
1346     except errors.HypervisorError, err:
1347       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1348   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1349     try:
1350       InstanceShutdown(instance, shutdown_timeout)
1351       return StartInstance(instance, False)
1352     except errors.HypervisorError, err:
1353       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1354   else:
1355     _Fail("Invalid reboot_type received: %s", reboot_type)
1356
1357
1358 def MigrationInfo(instance):
1359   """Gather information about an instance to be migrated.
1360
1361   @type instance: L{objects.Instance}
1362   @param instance: the instance definition
1363
1364   """
1365   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1366   try:
1367     info = hyper.MigrationInfo(instance)
1368   except errors.HypervisorError, err:
1369     _Fail("Failed to fetch migration information: %s", err, exc=True)
1370   return info
1371
1372
1373 def AcceptInstance(instance, info, target):
1374   """Prepare the node to accept an instance.
1375
1376   @type instance: L{objects.Instance}
1377   @param instance: the instance definition
1378   @type info: string/data (opaque)
1379   @param info: migration information, from the source node
1380   @type target: string
1381   @param target: target host (usually ip), on this node
1382
1383   """
1384   # TODO: why is this required only for DTS_EXT_MIRROR?
1385   if instance.disk_template in constants.DTS_EXT_MIRROR:
1386     # Create the symlinks, as the disks are not active
1387     # in any way
1388     try:
1389       _GatherAndLinkBlockDevs(instance)
1390     except errors.BlockDeviceError, err:
1391       _Fail("Block device error: %s", err, exc=True)
1392
1393   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1394   try:
1395     hyper.AcceptInstance(instance, info, target)
1396   except errors.HypervisorError, err:
1397     if instance.disk_template in constants.DTS_EXT_MIRROR:
1398       _RemoveBlockDevLinks(instance.name, instance.disks)
1399     _Fail("Failed to accept instance: %s", err, exc=True)
1400
1401
1402 def FinalizeMigrationDst(instance, info, success):
1403   """Finalize any preparation to accept an instance.
1404
1405   @type instance: L{objects.Instance}
1406   @param instance: the instance definition
1407   @type info: string/data (opaque)
1408   @param info: migration information, from the source node
1409   @type success: boolean
1410   @param success: whether the migration was a success or a failure
1411
1412   """
1413   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1414   try:
1415     hyper.FinalizeMigrationDst(instance, info, success)
1416   except errors.HypervisorError, err:
1417     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1418
1419
1420 def MigrateInstance(instance, target, live):
1421   """Migrates an instance to another node.
1422
1423   @type instance: L{objects.Instance}
1424   @param instance: the instance definition
1425   @type target: string
1426   @param target: the target node name
1427   @type live: boolean
1428   @param live: whether the migration should be done live or not (the
1429       interpretation of this parameter is left to the hypervisor)
1430   @raise RPCFail: if migration fails for some reason
1431
1432   """
1433   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1434
1435   try:
1436     hyper.MigrateInstance(instance, target, live)
1437   except errors.HypervisorError, err:
1438     _Fail("Failed to migrate instance: %s", err, exc=True)
1439
1440
1441 def FinalizeMigrationSource(instance, success, live):
1442   """Finalize the instance migration on the source node.
1443
1444   @type instance: L{objects.Instance}
1445   @param instance: the instance definition of the migrated instance
1446   @type success: bool
1447   @param success: whether the migration succeeded or not
1448   @type live: bool
1449   @param live: whether the user requested a live migration or not
1450   @raise RPCFail: If the execution fails for some reason
1451
1452   """
1453   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1454
1455   try:
1456     hyper.FinalizeMigrationSource(instance, success, live)
1457   except Exception, err:  # pylint: disable=W0703
1458     _Fail("Failed to finalize the migration on the source node: %s", err,
1459           exc=True)
1460
1461
1462 def GetMigrationStatus(instance):
1463   """Get the migration status
1464
1465   @type instance: L{objects.Instance}
1466   @param instance: the instance that is being migrated
1467   @rtype: L{objects.MigrationStatus}
1468   @return: the status of the current migration (one of
1469            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1470            progress info that can be retrieved from the hypervisor
1471   @raise RPCFail: If the migration status cannot be retrieved
1472
1473   """
1474   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1475   try:
1476     return hyper.GetMigrationStatus(instance)
1477   except Exception, err:  # pylint: disable=W0703
1478     _Fail("Failed to get migration status: %s", err, exc=True)
1479
1480
1481 def BlockdevCreate(disk, size, owner, on_primary, info):
1482   """Creates a block device for an instance.
1483
1484   @type disk: L{objects.Disk}
1485   @param disk: the object describing the disk we should create
1486   @type size: int
1487   @param size: the size of the physical underlying device, in MiB
1488   @type owner: str
1489   @param owner: the name of the instance for which disk is created,
1490       used for device cache data
1491   @type on_primary: boolean
1492   @param on_primary:  indicates if it is the primary node or not
1493   @type info: string
1494   @param info: string that will be sent to the physical device
1495       creation, used for example to set (LVM) tags on LVs
1496
1497   @return: the new unique_id of the device (this can sometime be
1498       computed only after creation), or None. On secondary nodes,
1499       it's not required to return anything.
1500
1501   """
1502   # TODO: remove the obsolete "size" argument
1503   # pylint: disable=W0613
1504   clist = []
1505   if disk.children:
1506     for child in disk.children:
1507       try:
1508         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1509       except errors.BlockDeviceError, err:
1510         _Fail("Can't assemble device %s: %s", child, err)
1511       if on_primary or disk.AssembleOnSecondary():
1512         # we need the children open in case the device itself has to
1513         # be assembled
1514         try:
1515           # pylint: disable=E1103
1516           crdev.Open()
1517         except errors.BlockDeviceError, err:
1518           _Fail("Can't make child '%s' read-write: %s", child, err)
1519       clist.append(crdev)
1520
1521   try:
1522     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1523   except errors.BlockDeviceError, err:
1524     _Fail("Can't create block device: %s", err)
1525
1526   if on_primary or disk.AssembleOnSecondary():
1527     try:
1528       device.Assemble()
1529     except errors.BlockDeviceError, err:
1530       _Fail("Can't assemble device after creation, unusual event: %s", err)
1531     device.SetSyncSpeed(constants.SYNC_SPEED)
1532     if on_primary or disk.OpenOnSecondary():
1533       try:
1534         device.Open(force=True)
1535       except errors.BlockDeviceError, err:
1536         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1537     DevCacheManager.UpdateCache(device.dev_path, owner,
1538                                 on_primary, disk.iv_name)
1539
1540   device.SetInfo(info)
1541
1542   return device.unique_id
1543
1544
1545 def _WipeDevice(path, offset, size):
1546   """This function actually wipes the device.
1547
1548   @param path: The path to the device to wipe
1549   @param offset: The offset in MiB in the file
1550   @param size: The size in MiB to write
1551
1552   """
1553   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1554          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1555          "count=%d" % size]
1556   result = utils.RunCmd(cmd)
1557
1558   if result.failed:
1559     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1560           result.fail_reason, result.output)
1561
1562
1563 def BlockdevWipe(disk, offset, size):
1564   """Wipes a block device.
1565
1566   @type disk: L{objects.Disk}
1567   @param disk: the disk object we want to wipe
1568   @type offset: int
1569   @param offset: The offset in MiB in the file
1570   @type size: int
1571   @param size: The size in MiB to write
1572
1573   """
1574   try:
1575     rdev = _RecursiveFindBD(disk)
1576   except errors.BlockDeviceError:
1577     rdev = None
1578
1579   if not rdev:
1580     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1581
1582   # Do cross verify some of the parameters
1583   if offset > rdev.size:
1584     _Fail("Offset is bigger than device size")
1585   if (offset + size) > rdev.size:
1586     _Fail("The provided offset and size to wipe is bigger than device size")
1587
1588   _WipeDevice(rdev.dev_path, offset, size)
1589
1590
1591 def BlockdevPauseResumeSync(disks, pause):
1592   """Pause or resume the sync of the block device.
1593
1594   @type disks: list of L{objects.Disk}
1595   @param disks: the disks object we want to pause/resume
1596   @type pause: bool
1597   @param pause: Wheater to pause or resume
1598
1599   """
1600   success = []
1601   for disk in disks:
1602     try:
1603       rdev = _RecursiveFindBD(disk)
1604     except errors.BlockDeviceError:
1605       rdev = None
1606
1607     if not rdev:
1608       success.append((False, ("Cannot change sync for device %s:"
1609                               " device not found" % disk.iv_name)))
1610       continue
1611
1612     result = rdev.PauseResumeSync(pause)
1613
1614     if result:
1615       success.append((result, None))
1616     else:
1617       if pause:
1618         msg = "Pause"
1619       else:
1620         msg = "Resume"
1621       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1622
1623   return success
1624
1625
1626 def BlockdevRemove(disk):
1627   """Remove a block device.
1628
1629   @note: This is intended to be called recursively.
1630
1631   @type disk: L{objects.Disk}
1632   @param disk: the disk object we should remove
1633   @rtype: boolean
1634   @return: the success of the operation
1635
1636   """
1637   msgs = []
1638   try:
1639     rdev = _RecursiveFindBD(disk)
1640   except errors.BlockDeviceError, err:
1641     # probably can't attach
1642     logging.info("Can't attach to device %s in remove", disk)
1643     rdev = None
1644   if rdev is not None:
1645     r_path = rdev.dev_path
1646     try:
1647       rdev.Remove()
1648     except errors.BlockDeviceError, err:
1649       msgs.append(str(err))
1650     if not msgs:
1651       DevCacheManager.RemoveCache(r_path)
1652
1653   if disk.children:
1654     for child in disk.children:
1655       try:
1656         BlockdevRemove(child)
1657       except RPCFail, err:
1658         msgs.append(str(err))
1659
1660   if msgs:
1661     _Fail("; ".join(msgs))
1662
1663
1664 def _RecursiveAssembleBD(disk, owner, as_primary):
1665   """Activate a block device for an instance.
1666
1667   This is run on the primary and secondary nodes for an instance.
1668
1669   @note: this function is called recursively.
1670
1671   @type disk: L{objects.Disk}
1672   @param disk: the disk we try to assemble
1673   @type owner: str
1674   @param owner: the name of the instance which owns the disk
1675   @type as_primary: boolean
1676   @param as_primary: if we should make the block device
1677       read/write
1678
1679   @return: the assembled device or None (in case no device
1680       was assembled)
1681   @raise errors.BlockDeviceError: in case there is an error
1682       during the activation of the children or the device
1683       itself
1684
1685   """
1686   children = []
1687   if disk.children:
1688     mcn = disk.ChildrenNeeded()
1689     if mcn == -1:
1690       mcn = 0 # max number of Nones allowed
1691     else:
1692       mcn = len(disk.children) - mcn # max number of Nones
1693     for chld_disk in disk.children:
1694       try:
1695         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1696       except errors.BlockDeviceError, err:
1697         if children.count(None) >= mcn:
1698           raise
1699         cdev = None
1700         logging.error("Error in child activation (but continuing): %s",
1701                       str(err))
1702       children.append(cdev)
1703
1704   if as_primary or disk.AssembleOnSecondary():
1705     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1706     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1707     result = r_dev
1708     if as_primary or disk.OpenOnSecondary():
1709       r_dev.Open()
1710     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1711                                 as_primary, disk.iv_name)
1712
1713   else:
1714     result = True
1715   return result
1716
1717
1718 def BlockdevAssemble(disk, owner, as_primary, idx):
1719   """Activate a block device for an instance.
1720
1721   This is a wrapper over _RecursiveAssembleBD.
1722
1723   @rtype: str or boolean
1724   @return: a C{/dev/...} path for primary nodes, and
1725       C{True} for secondary nodes
1726
1727   """
1728   try:
1729     result = _RecursiveAssembleBD(disk, owner, as_primary)
1730     if isinstance(result, bdev.BlockDev):
1731       # pylint: disable=E1103
1732       result = result.dev_path
1733       if as_primary:
1734         _SymlinkBlockDev(owner, result, idx)
1735   except errors.BlockDeviceError, err:
1736     _Fail("Error while assembling disk: %s", err, exc=True)
1737   except OSError, err:
1738     _Fail("Error while symlinking disk: %s", err, exc=True)
1739
1740   return result
1741
1742
1743 def BlockdevShutdown(disk):
1744   """Shut down a block device.
1745
1746   First, if the device is assembled (Attach() is successful), then
1747   the device is shutdown. Then the children of the device are
1748   shutdown.
1749
1750   This function is called recursively. Note that we don't cache the
1751   children or such, as oppossed to assemble, shutdown of different
1752   devices doesn't require that the upper device was active.
1753
1754   @type disk: L{objects.Disk}
1755   @param disk: the description of the disk we should
1756       shutdown
1757   @rtype: None
1758
1759   """
1760   msgs = []
1761   r_dev = _RecursiveFindBD(disk)
1762   if r_dev is not None:
1763     r_path = r_dev.dev_path
1764     try:
1765       r_dev.Shutdown()
1766       DevCacheManager.RemoveCache(r_path)
1767     except errors.BlockDeviceError, err:
1768       msgs.append(str(err))
1769
1770   if disk.children:
1771     for child in disk.children:
1772       try:
1773         BlockdevShutdown(child)
1774       except RPCFail, err:
1775         msgs.append(str(err))
1776
1777   if msgs:
1778     _Fail("; ".join(msgs))
1779
1780
1781 def BlockdevAddchildren(parent_cdev, new_cdevs):
1782   """Extend a mirrored block device.
1783
1784   @type parent_cdev: L{objects.Disk}
1785   @param parent_cdev: the disk to which we should add children
1786   @type new_cdevs: list of L{objects.Disk}
1787   @param new_cdevs: the list of children which we should add
1788   @rtype: None
1789
1790   """
1791   parent_bdev = _RecursiveFindBD(parent_cdev)
1792   if parent_bdev is None:
1793     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1794   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1795   if new_bdevs.count(None) > 0:
1796     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1797   parent_bdev.AddChildren(new_bdevs)
1798
1799
1800 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1801   """Shrink a mirrored block device.
1802
1803   @type parent_cdev: L{objects.Disk}
1804   @param parent_cdev: the disk from which we should remove children
1805   @type new_cdevs: list of L{objects.Disk}
1806   @param new_cdevs: the list of children which we should remove
1807   @rtype: None
1808
1809   """
1810   parent_bdev = _RecursiveFindBD(parent_cdev)
1811   if parent_bdev is None:
1812     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1813   devs = []
1814   for disk in new_cdevs:
1815     rpath = disk.StaticDevPath()
1816     if rpath is None:
1817       bd = _RecursiveFindBD(disk)
1818       if bd is None:
1819         _Fail("Can't find device %s while removing children", disk)
1820       else:
1821         devs.append(bd.dev_path)
1822     else:
1823       if not utils.IsNormAbsPath(rpath):
1824         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1825       devs.append(rpath)
1826   parent_bdev.RemoveChildren(devs)
1827
1828
1829 def BlockdevGetmirrorstatus(disks):
1830   """Get the mirroring status of a list of devices.
1831
1832   @type disks: list of L{objects.Disk}
1833   @param disks: the list of disks which we should query
1834   @rtype: disk
1835   @return: List of L{objects.BlockDevStatus}, one for each disk
1836   @raise errors.BlockDeviceError: if any of the disks cannot be
1837       found
1838
1839   """
1840   stats = []
1841   for dsk in disks:
1842     rbd = _RecursiveFindBD(dsk)
1843     if rbd is None:
1844       _Fail("Can't find device %s", dsk)
1845
1846     stats.append(rbd.CombinedSyncStatus())
1847
1848   return stats
1849
1850
1851 def BlockdevGetmirrorstatusMulti(disks):
1852   """Get the mirroring status of a list of devices.
1853
1854   @type disks: list of L{objects.Disk}
1855   @param disks: the list of disks which we should query
1856   @rtype: disk
1857   @return: List of tuples, (bool, status), one for each disk; bool denotes
1858     success/failure, status is L{objects.BlockDevStatus} on success, string
1859     otherwise
1860
1861   """
1862   result = []
1863   for disk in disks:
1864     try:
1865       rbd = _RecursiveFindBD(disk)
1866       if rbd is None:
1867         result.append((False, "Can't find device %s" % disk))
1868         continue
1869
1870       status = rbd.CombinedSyncStatus()
1871     except errors.BlockDeviceError, err:
1872       logging.exception("Error while getting disk status")
1873       result.append((False, str(err)))
1874     else:
1875       result.append((True, status))
1876
1877   assert len(disks) == len(result)
1878
1879   return result
1880
1881
1882 def _RecursiveFindBD(disk):
1883   """Check if a device is activated.
1884
1885   If so, return information about the real device.
1886
1887   @type disk: L{objects.Disk}
1888   @param disk: the disk object we need to find
1889
1890   @return: None if the device can't be found,
1891       otherwise the device instance
1892
1893   """
1894   children = []
1895   if disk.children:
1896     for chdisk in disk.children:
1897       children.append(_RecursiveFindBD(chdisk))
1898
1899   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1900
1901
1902 def _OpenRealBD(disk):
1903   """Opens the underlying block device of a disk.
1904
1905   @type disk: L{objects.Disk}
1906   @param disk: the disk object we want to open
1907
1908   """
1909   real_disk = _RecursiveFindBD(disk)
1910   if real_disk is None:
1911     _Fail("Block device '%s' is not set up", disk)
1912
1913   real_disk.Open()
1914
1915   return real_disk
1916
1917
1918 def BlockdevFind(disk):
1919   """Check if a device is activated.
1920
1921   If it is, return information about the real device.
1922
1923   @type disk: L{objects.Disk}
1924   @param disk: the disk to find
1925   @rtype: None or objects.BlockDevStatus
1926   @return: None if the disk cannot be found, otherwise a the current
1927            information
1928
1929   """
1930   try:
1931     rbd = _RecursiveFindBD(disk)
1932   except errors.BlockDeviceError, err:
1933     _Fail("Failed to find device: %s", err, exc=True)
1934
1935   if rbd is None:
1936     return None
1937
1938   return rbd.GetSyncStatus()
1939
1940
1941 def BlockdevGetsize(disks):
1942   """Computes the size of the given disks.
1943
1944   If a disk is not found, returns None instead.
1945
1946   @type disks: list of L{objects.Disk}
1947   @param disks: the list of disk to compute the size for
1948   @rtype: list
1949   @return: list with elements None if the disk cannot be found,
1950       otherwise the size
1951
1952   """
1953   result = []
1954   for cf in disks:
1955     try:
1956       rbd = _RecursiveFindBD(cf)
1957     except errors.BlockDeviceError:
1958       result.append(None)
1959       continue
1960     if rbd is None:
1961       result.append(None)
1962     else:
1963       result.append(rbd.GetActualSize())
1964   return result
1965
1966
1967 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1968   """Export a block device to a remote node.
1969
1970   @type disk: L{objects.Disk}
1971   @param disk: the description of the disk to export
1972   @type dest_node: str
1973   @param dest_node: the destination node to export to
1974   @type dest_path: str
1975   @param dest_path: the destination path on the target node
1976   @type cluster_name: str
1977   @param cluster_name: the cluster name, needed for SSH hostalias
1978   @rtype: None
1979
1980   """
1981   real_disk = _OpenRealBD(disk)
1982
1983   # the block size on the read dd is 1MiB to match our units
1984   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1985                                "dd if=%s bs=1048576 count=%s",
1986                                real_disk.dev_path, str(disk.size))
1987
1988   # we set here a smaller block size as, due to ssh buffering, more
1989   # than 64-128k will mostly ignored; we use nocreat to fail if the
1990   # device is not already there or we pass a wrong path; we use
1991   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1992   # to not buffer too much memory; this means that at best, we flush
1993   # every 64k, which will not be very fast
1994   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1995                                 " oflag=dsync", dest_path)
1996
1997   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1998                                                    constants.GANETI_RUNAS,
1999                                                    destcmd)
2000
2001   # all commands have been checked, so we're safe to combine them
2002   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2003
2004   result = utils.RunCmd(["bash", "-c", command])
2005
2006   if result.failed:
2007     _Fail("Disk copy command '%s' returned error: %s"
2008           " output: %s", command, result.fail_reason, result.output)
2009
2010
2011 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2012   """Write a file to the filesystem.
2013
2014   This allows the master to overwrite(!) a file. It will only perform
2015   the operation if the file belongs to a list of configuration files.
2016
2017   @type file_name: str
2018   @param file_name: the target file name
2019   @type data: str
2020   @param data: the new contents of the file
2021   @type mode: int
2022   @param mode: the mode to give the file (can be None)
2023   @type uid: string
2024   @param uid: the owner of the file
2025   @type gid: string
2026   @param gid: the group of the file
2027   @type atime: float
2028   @param atime: the atime to set on the file (can be None)
2029   @type mtime: float
2030   @param mtime: the mtime to set on the file (can be None)
2031   @rtype: None
2032
2033   """
2034   if not os.path.isabs(file_name):
2035     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2036
2037   if file_name not in _ALLOWED_UPLOAD_FILES:
2038     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2039           file_name)
2040
2041   raw_data = _Decompress(data)
2042
2043   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2044     _Fail("Invalid username/groupname type")
2045
2046   getents = runtime.GetEnts()
2047   uid = getents.LookupUser(uid)
2048   gid = getents.LookupGroup(gid)
2049
2050   utils.SafeWriteFile(file_name, None,
2051                       data=raw_data, mode=mode, uid=uid, gid=gid,
2052                       atime=atime, mtime=mtime)
2053
2054
2055 def RunOob(oob_program, command, node, timeout):
2056   """Executes oob_program with given command on given node.
2057
2058   @param oob_program: The path to the executable oob_program
2059   @param command: The command to invoke on oob_program
2060   @param node: The node given as an argument to the program
2061   @param timeout: Timeout after which we kill the oob program
2062
2063   @return: stdout
2064   @raise RPCFail: If execution fails for some reason
2065
2066   """
2067   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2068
2069   if result.failed:
2070     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2071           result.fail_reason, result.output)
2072
2073   return result.stdout
2074
2075
2076 def WriteSsconfFiles(values):
2077   """Update all ssconf files.
2078
2079   Wrapper around the SimpleStore.WriteFiles.
2080
2081   """
2082   ssconf.SimpleStore().WriteFiles(values)
2083
2084
2085 def _ErrnoOrStr(err):
2086   """Format an EnvironmentError exception.
2087
2088   If the L{err} argument has an errno attribute, it will be looked up
2089   and converted into a textual C{E...} description. Otherwise the
2090   string representation of the error will be returned.
2091
2092   @type err: L{EnvironmentError}
2093   @param err: the exception to format
2094
2095   """
2096   if hasattr(err, "errno"):
2097     detail = errno.errorcode[err.errno]
2098   else:
2099     detail = str(err)
2100   return detail
2101
2102
2103 def _OSOndiskAPIVersion(os_dir):
2104   """Compute and return the API version of a given OS.
2105
2106   This function will try to read the API version of the OS residing in
2107   the 'os_dir' directory.
2108
2109   @type os_dir: str
2110   @param os_dir: the directory in which we should look for the OS
2111   @rtype: tuple
2112   @return: tuple (status, data) with status denoting the validity and
2113       data holding either the vaid versions or an error message
2114
2115   """
2116   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2117
2118   try:
2119     st = os.stat(api_file)
2120   except EnvironmentError, err:
2121     return False, ("Required file '%s' not found under path %s: %s" %
2122                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2123
2124   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2125     return False, ("File '%s' in %s is not a regular file" %
2126                    (constants.OS_API_FILE, os_dir))
2127
2128   try:
2129     api_versions = utils.ReadFile(api_file).splitlines()
2130   except EnvironmentError, err:
2131     return False, ("Error while reading the API version file at %s: %s" %
2132                    (api_file, _ErrnoOrStr(err)))
2133
2134   try:
2135     api_versions = [int(version.strip()) for version in api_versions]
2136   except (TypeError, ValueError), err:
2137     return False, ("API version(s) can't be converted to integer: %s" %
2138                    str(err))
2139
2140   return True, api_versions
2141
2142
2143 def DiagnoseOS(top_dirs=None):
2144   """Compute the validity for all OSes.
2145
2146   @type top_dirs: list
2147   @param top_dirs: the list of directories in which to
2148       search (if not given defaults to
2149       L{constants.OS_SEARCH_PATH})
2150   @rtype: list of L{objects.OS}
2151   @return: a list of tuples (name, path, status, diagnose, variants,
2152       parameters, api_version) for all (potential) OSes under all
2153       search paths, where:
2154           - name is the (potential) OS name
2155           - path is the full path to the OS
2156           - status True/False is the validity of the OS
2157           - diagnose is the error message for an invalid OS, otherwise empty
2158           - variants is a list of supported OS variants, if any
2159           - parameters is a list of (name, help) parameters, if any
2160           - api_version is a list of support OS API versions
2161
2162   """
2163   if top_dirs is None:
2164     top_dirs = constants.OS_SEARCH_PATH
2165
2166   result = []
2167   for dir_name in top_dirs:
2168     if os.path.isdir(dir_name):
2169       try:
2170         f_names = utils.ListVisibleFiles(dir_name)
2171       except EnvironmentError, err:
2172         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2173         break
2174       for name in f_names:
2175         os_path = utils.PathJoin(dir_name, name)
2176         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2177         if status:
2178           diagnose = ""
2179           variants = os_inst.supported_variants
2180           parameters = os_inst.supported_parameters
2181           api_versions = os_inst.api_versions
2182         else:
2183           diagnose = os_inst
2184           variants = parameters = api_versions = []
2185         result.append((name, os_path, status, diagnose, variants,
2186                        parameters, api_versions))
2187
2188   return result
2189
2190
2191 def _TryOSFromDisk(name, base_dir=None):
2192   """Create an OS instance from disk.
2193
2194   This function will return an OS instance if the given name is a
2195   valid OS name.
2196
2197   @type base_dir: string
2198   @keyword base_dir: Base directory containing OS installations.
2199                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2200   @rtype: tuple
2201   @return: success and either the OS instance if we find a valid one,
2202       or error message
2203
2204   """
2205   if base_dir is None:
2206     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2207   else:
2208     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2209
2210   if os_dir is None:
2211     return False, "Directory for OS %s not found in search path" % name
2212
2213   status, api_versions = _OSOndiskAPIVersion(os_dir)
2214   if not status:
2215     # push the error up
2216     return status, api_versions
2217
2218   if not constants.OS_API_VERSIONS.intersection(api_versions):
2219     return False, ("API version mismatch for path '%s': found %s, want %s." %
2220                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2221
2222   # OS Files dictionary, we will populate it with the absolute path
2223   # names; if the value is True, then it is a required file, otherwise
2224   # an optional one
2225   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2226
2227   if max(api_versions) >= constants.OS_API_V15:
2228     os_files[constants.OS_VARIANTS_FILE] = False
2229
2230   if max(api_versions) >= constants.OS_API_V20:
2231     os_files[constants.OS_PARAMETERS_FILE] = True
2232   else:
2233     del os_files[constants.OS_SCRIPT_VERIFY]
2234
2235   for (filename, required) in os_files.items():
2236     os_files[filename] = utils.PathJoin(os_dir, filename)
2237
2238     try:
2239       st = os.stat(os_files[filename])
2240     except EnvironmentError, err:
2241       if err.errno == errno.ENOENT and not required:
2242         del os_files[filename]
2243         continue
2244       return False, ("File '%s' under path '%s' is missing (%s)" %
2245                      (filename, os_dir, _ErrnoOrStr(err)))
2246
2247     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2248       return False, ("File '%s' under path '%s' is not a regular file" %
2249                      (filename, os_dir))
2250
2251     if filename in constants.OS_SCRIPTS:
2252       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2253         return False, ("File '%s' under path '%s' is not executable" %
2254                        (filename, os_dir))
2255
2256   variants = []
2257   if constants.OS_VARIANTS_FILE in os_files:
2258     variants_file = os_files[constants.OS_VARIANTS_FILE]
2259     try:
2260       variants = utils.ReadFile(variants_file).splitlines()
2261     except EnvironmentError, err:
2262       # we accept missing files, but not other errors
2263       if err.errno != errno.ENOENT:
2264         return False, ("Error while reading the OS variants file at %s: %s" %
2265                        (variants_file, _ErrnoOrStr(err)))
2266
2267   parameters = []
2268   if constants.OS_PARAMETERS_FILE in os_files:
2269     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2270     try:
2271       parameters = utils.ReadFile(parameters_file).splitlines()
2272     except EnvironmentError, err:
2273       return False, ("Error while reading the OS parameters file at %s: %s" %
2274                      (parameters_file, _ErrnoOrStr(err)))
2275     parameters = [v.split(None, 1) for v in parameters]
2276
2277   os_obj = objects.OS(name=name, path=os_dir,
2278                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2279                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2280                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2281                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2282                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2283                                                  None),
2284                       supported_variants=variants,
2285                       supported_parameters=parameters,
2286                       api_versions=api_versions)
2287   return True, os_obj
2288
2289
2290 def OSFromDisk(name, base_dir=None):
2291   """Create an OS instance from disk.
2292
2293   This function will return an OS instance if the given name is a
2294   valid OS name. Otherwise, it will raise an appropriate
2295   L{RPCFail} exception, detailing why this is not a valid OS.
2296
2297   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2298   an exception but returns true/false status data.
2299
2300   @type base_dir: string
2301   @keyword base_dir: Base directory containing OS installations.
2302                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2303   @rtype: L{objects.OS}
2304   @return: the OS instance if we find a valid one
2305   @raise RPCFail: if we don't find a valid OS
2306
2307   """
2308   name_only = objects.OS.GetName(name)
2309   status, payload = _TryOSFromDisk(name_only, base_dir)
2310
2311   if not status:
2312     _Fail(payload)
2313
2314   return payload
2315
2316
2317 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2318   """Calculate the basic environment for an os script.
2319
2320   @type os_name: str
2321   @param os_name: full operating system name (including variant)
2322   @type inst_os: L{objects.OS}
2323   @param inst_os: operating system for which the environment is being built
2324   @type os_params: dict
2325   @param os_params: the OS parameters
2326   @type debug: integer
2327   @param debug: debug level (0 or 1, for OS Api 10)
2328   @rtype: dict
2329   @return: dict of environment variables
2330   @raise errors.BlockDeviceError: if the block device
2331       cannot be found
2332
2333   """
2334   result = {}
2335   api_version = \
2336     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2337   result["OS_API_VERSION"] = "%d" % api_version
2338   result["OS_NAME"] = inst_os.name
2339   result["DEBUG_LEVEL"] = "%d" % debug
2340
2341   # OS variants
2342   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2343     variant = objects.OS.GetVariant(os_name)
2344     if not variant:
2345       variant = inst_os.supported_variants[0]
2346   else:
2347     variant = ""
2348   result["OS_VARIANT"] = variant
2349
2350   # OS params
2351   for pname, pvalue in os_params.items():
2352     result["OSP_%s" % pname.upper()] = pvalue
2353
2354   return result
2355
2356
2357 def OSEnvironment(instance, inst_os, debug=0):
2358   """Calculate the environment for an os script.
2359
2360   @type instance: L{objects.Instance}
2361   @param instance: target instance for the os script run
2362   @type inst_os: L{objects.OS}
2363   @param inst_os: operating system for which the environment is being built
2364   @type debug: integer
2365   @param debug: debug level (0 or 1, for OS Api 10)
2366   @rtype: dict
2367   @return: dict of environment variables
2368   @raise errors.BlockDeviceError: if the block device
2369       cannot be found
2370
2371   """
2372   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2373
2374   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2375     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2376
2377   result["HYPERVISOR"] = instance.hypervisor
2378   result["DISK_COUNT"] = "%d" % len(instance.disks)
2379   result["NIC_COUNT"] = "%d" % len(instance.nics)
2380   result["INSTANCE_SECONDARY_NODES"] = \
2381       ("%s" % " ".join(instance.secondary_nodes))
2382
2383   # Disks
2384   for idx, disk in enumerate(instance.disks):
2385     real_disk = _OpenRealBD(disk)
2386     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2387     result["DISK_%d_ACCESS" % idx] = disk.mode
2388     if constants.HV_DISK_TYPE in instance.hvparams:
2389       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2390         instance.hvparams[constants.HV_DISK_TYPE]
2391     if disk.dev_type in constants.LDS_BLOCK:
2392       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2393     elif disk.dev_type == constants.LD_FILE:
2394       result["DISK_%d_BACKEND_TYPE" % idx] = \
2395         "file:%s" % disk.physical_id[0]
2396
2397   # NICs
2398   for idx, nic in enumerate(instance.nics):
2399     result["NIC_%d_MAC" % idx] = nic.mac
2400     if nic.ip:
2401       result["NIC_%d_IP" % idx] = nic.ip
2402     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2403     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2404       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2405     if nic.nicparams[constants.NIC_LINK]:
2406       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2407     if constants.HV_NIC_TYPE in instance.hvparams:
2408       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2409         instance.hvparams[constants.HV_NIC_TYPE]
2410
2411   # HV/BE params
2412   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2413     for key, value in source.items():
2414       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2415
2416   return result
2417
2418
2419 def BlockdevGrow(disk, amount, dryrun):
2420   """Grow a stack of block devices.
2421
2422   This function is called recursively, with the childrens being the
2423   first ones to resize.
2424
2425   @type disk: L{objects.Disk}
2426   @param disk: the disk to be grown
2427   @type amount: integer
2428   @param amount: the amount (in mebibytes) to grow with
2429   @type dryrun: boolean
2430   @param dryrun: whether to execute the operation in simulation mode
2431       only, without actually increasing the size
2432   @rtype: (status, result)
2433   @return: a tuple with the status of the operation (True/False), and
2434       the errors message if status is False
2435
2436   """
2437   r_dev = _RecursiveFindBD(disk)
2438   if r_dev is None:
2439     _Fail("Cannot find block device %s", disk)
2440
2441   try:
2442     r_dev.Grow(amount, dryrun)
2443   except errors.BlockDeviceError, err:
2444     _Fail("Failed to grow block device: %s", err, exc=True)
2445
2446
2447 def BlockdevSnapshot(disk):
2448   """Create a snapshot copy of a block device.
2449
2450   This function is called recursively, and the snapshot is actually created
2451   just for the leaf lvm backend device.
2452
2453   @type disk: L{objects.Disk}
2454   @param disk: the disk to be snapshotted
2455   @rtype: string
2456   @return: snapshot disk ID as (vg, lv)
2457
2458   """
2459   if disk.dev_type == constants.LD_DRBD8:
2460     if not disk.children:
2461       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2462             disk.unique_id)
2463     return BlockdevSnapshot(disk.children[0])
2464   elif disk.dev_type == constants.LD_LV:
2465     r_dev = _RecursiveFindBD(disk)
2466     if r_dev is not None:
2467       # FIXME: choose a saner value for the snapshot size
2468       # let's stay on the safe side and ask for the full size, for now
2469       return r_dev.Snapshot(disk.size)
2470     else:
2471       _Fail("Cannot find block device %s", disk)
2472   else:
2473     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2474           disk.unique_id, disk.dev_type)
2475
2476
2477 def FinalizeExport(instance, snap_disks):
2478   """Write out the export configuration information.
2479
2480   @type instance: L{objects.Instance}
2481   @param instance: the instance which we export, used for
2482       saving configuration
2483   @type snap_disks: list of L{objects.Disk}
2484   @param snap_disks: list of snapshot block devices, which
2485       will be used to get the actual name of the dump file
2486
2487   @rtype: None
2488
2489   """
2490   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2491   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2492
2493   config = objects.SerializableConfigParser()
2494
2495   config.add_section(constants.INISECT_EXP)
2496   config.set(constants.INISECT_EXP, "version", "0")
2497   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2498   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2499   config.set(constants.INISECT_EXP, "os", instance.os)
2500   config.set(constants.INISECT_EXP, "compression", "none")
2501
2502   config.add_section(constants.INISECT_INS)
2503   config.set(constants.INISECT_INS, "name", instance.name)
2504   config.set(constants.INISECT_INS, "memory", "%d" %
2505              instance.beparams[constants.BE_MEMORY])
2506   config.set(constants.INISECT_INS, "vcpus", "%d" %
2507              instance.beparams[constants.BE_VCPUS])
2508   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2509   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2510   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2511
2512   nic_total = 0
2513   for nic_count, nic in enumerate(instance.nics):
2514     nic_total += 1
2515     config.set(constants.INISECT_INS, "nic%d_mac" %
2516                nic_count, "%s" % nic.mac)
2517     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2518     for param in constants.NICS_PARAMETER_TYPES:
2519       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2520                  "%s" % nic.nicparams.get(param, None))
2521   # TODO: redundant: on load can read nics until it doesn't exist
2522   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2523
2524   disk_total = 0
2525   for disk_count, disk in enumerate(snap_disks):
2526     if disk:
2527       disk_total += 1
2528       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2529                  ("%s" % disk.iv_name))
2530       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2531                  ("%s" % disk.physical_id[1]))
2532       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2533                  ("%d" % disk.size))
2534
2535   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2536
2537   # New-style hypervisor/backend parameters
2538
2539   config.add_section(constants.INISECT_HYP)
2540   for name, value in instance.hvparams.items():
2541     if name not in constants.HVC_GLOBALS:
2542       config.set(constants.INISECT_HYP, name, str(value))
2543
2544   config.add_section(constants.INISECT_BEP)
2545   for name, value in instance.beparams.items():
2546     config.set(constants.INISECT_BEP, name, str(value))
2547
2548   config.add_section(constants.INISECT_OSP)
2549   for name, value in instance.osparams.items():
2550     config.set(constants.INISECT_OSP, name, str(value))
2551
2552   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2553                   data=config.Dumps())
2554   shutil.rmtree(finaldestdir, ignore_errors=True)
2555   shutil.move(destdir, finaldestdir)
2556
2557
2558 def ExportInfo(dest):
2559   """Get export configuration information.
2560
2561   @type dest: str
2562   @param dest: directory containing the export
2563
2564   @rtype: L{objects.SerializableConfigParser}
2565   @return: a serializable config file containing the
2566       export info
2567
2568   """
2569   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2570
2571   config = objects.SerializableConfigParser()
2572   config.read(cff)
2573
2574   if (not config.has_section(constants.INISECT_EXP) or
2575       not config.has_section(constants.INISECT_INS)):
2576     _Fail("Export info file doesn't have the required fields")
2577
2578   return config.Dumps()
2579
2580
2581 def ListExports():
2582   """Return a list of exports currently available on this machine.
2583
2584   @rtype: list
2585   @return: list of the exports
2586
2587   """
2588   if os.path.isdir(constants.EXPORT_DIR):
2589     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2590   else:
2591     _Fail("No exports directory")
2592
2593
2594 def RemoveExport(export):
2595   """Remove an existing export from the node.
2596
2597   @type export: str
2598   @param export: the name of the export to remove
2599   @rtype: None
2600
2601   """
2602   target = utils.PathJoin(constants.EXPORT_DIR, export)
2603
2604   try:
2605     shutil.rmtree(target)
2606   except EnvironmentError, err:
2607     _Fail("Error while removing the export: %s", err, exc=True)
2608
2609
2610 def BlockdevRename(devlist):
2611   """Rename a list of block devices.
2612
2613   @type devlist: list of tuples
2614   @param devlist: list of tuples of the form  (disk,
2615       new_logical_id, new_physical_id); disk is an
2616       L{objects.Disk} object describing the current disk,
2617       and new logical_id/physical_id is the name we
2618       rename it to
2619   @rtype: boolean
2620   @return: True if all renames succeeded, False otherwise
2621
2622   """
2623   msgs = []
2624   result = True
2625   for disk, unique_id in devlist:
2626     dev = _RecursiveFindBD(disk)
2627     if dev is None:
2628       msgs.append("Can't find device %s in rename" % str(disk))
2629       result = False
2630       continue
2631     try:
2632       old_rpath = dev.dev_path
2633       dev.Rename(unique_id)
2634       new_rpath = dev.dev_path
2635       if old_rpath != new_rpath:
2636         DevCacheManager.RemoveCache(old_rpath)
2637         # FIXME: we should add the new cache information here, like:
2638         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2639         # but we don't have the owner here - maybe parse from existing
2640         # cache? for now, we only lose lvm data when we rename, which
2641         # is less critical than DRBD or MD
2642     except errors.BlockDeviceError, err:
2643       msgs.append("Can't rename device '%s' to '%s': %s" %
2644                   (dev, unique_id, err))
2645       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2646       result = False
2647   if not result:
2648     _Fail("; ".join(msgs))
2649
2650
2651 def _TransformFileStorageDir(fs_dir):
2652   """Checks whether given file_storage_dir is valid.
2653
2654   Checks wheter the given fs_dir is within the cluster-wide default
2655   file_storage_dir or the shared_file_storage_dir, which are stored in
2656   SimpleStore. Only paths under those directories are allowed.
2657
2658   @type fs_dir: str
2659   @param fs_dir: the path to check
2660
2661   @return: the normalized path if valid, None otherwise
2662
2663   """
2664   if not constants.ENABLE_FILE_STORAGE:
2665     _Fail("File storage disabled at configure time")
2666   cfg = _GetConfig()
2667   fs_dir = os.path.normpath(fs_dir)
2668   base_fstore = cfg.GetFileStorageDir()
2669   base_shared = cfg.GetSharedFileStorageDir()
2670   if not (utils.IsBelowDir(base_fstore, fs_dir) or
2671           utils.IsBelowDir(base_shared, fs_dir)):
2672     _Fail("File storage directory '%s' is not under base file"
2673           " storage directory '%s' or shared storage directory '%s'",
2674           fs_dir, base_fstore, base_shared)
2675   return fs_dir
2676
2677
2678 def CreateFileStorageDir(file_storage_dir):
2679   """Create file storage directory.
2680
2681   @type file_storage_dir: str
2682   @param file_storage_dir: directory to create
2683
2684   @rtype: tuple
2685   @return: tuple with first element a boolean indicating wheter dir
2686       creation was successful or not
2687
2688   """
2689   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2690   if os.path.exists(file_storage_dir):
2691     if not os.path.isdir(file_storage_dir):
2692       _Fail("Specified storage dir '%s' is not a directory",
2693             file_storage_dir)
2694   else:
2695     try:
2696       os.makedirs(file_storage_dir, 0750)
2697     except OSError, err:
2698       _Fail("Cannot create file storage directory '%s': %s",
2699             file_storage_dir, err, exc=True)
2700
2701
2702 def RemoveFileStorageDir(file_storage_dir):
2703   """Remove file storage directory.
2704
2705   Remove it only if it's empty. If not log an error and return.
2706
2707   @type file_storage_dir: str
2708   @param file_storage_dir: the directory we should cleanup
2709   @rtype: tuple (success,)
2710   @return: tuple of one element, C{success}, denoting
2711       whether the operation was successful
2712
2713   """
2714   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2715   if os.path.exists(file_storage_dir):
2716     if not os.path.isdir(file_storage_dir):
2717       _Fail("Specified Storage directory '%s' is not a directory",
2718             file_storage_dir)
2719     # deletes dir only if empty, otherwise we want to fail the rpc call
2720     try:
2721       os.rmdir(file_storage_dir)
2722     except OSError, err:
2723       _Fail("Cannot remove file storage directory '%s': %s",
2724             file_storage_dir, err)
2725
2726
2727 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2728   """Rename the file storage directory.
2729
2730   @type old_file_storage_dir: str
2731   @param old_file_storage_dir: the current path
2732   @type new_file_storage_dir: str
2733   @param new_file_storage_dir: the name we should rename to
2734   @rtype: tuple (success,)
2735   @return: tuple of one element, C{success}, denoting
2736       whether the operation was successful
2737
2738   """
2739   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2740   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2741   if not os.path.exists(new_file_storage_dir):
2742     if os.path.isdir(old_file_storage_dir):
2743       try:
2744         os.rename(old_file_storage_dir, new_file_storage_dir)
2745       except OSError, err:
2746         _Fail("Cannot rename '%s' to '%s': %s",
2747               old_file_storage_dir, new_file_storage_dir, err)
2748     else:
2749       _Fail("Specified storage dir '%s' is not a directory",
2750             old_file_storage_dir)
2751   else:
2752     if os.path.exists(old_file_storage_dir):
2753       _Fail("Cannot rename '%s' to '%s': both locations exist",
2754             old_file_storage_dir, new_file_storage_dir)
2755
2756
2757 def _EnsureJobQueueFile(file_name):
2758   """Checks whether the given filename is in the queue directory.
2759
2760   @type file_name: str
2761   @param file_name: the file name we should check
2762   @rtype: None
2763   @raises RPCFail: if the file is not valid
2764
2765   """
2766   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2767   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2768
2769   if not result:
2770     _Fail("Passed job queue file '%s' does not belong to"
2771           " the queue directory '%s'", file_name, queue_dir)
2772
2773
2774 def JobQueueUpdate(file_name, content):
2775   """Updates a file in the queue directory.
2776
2777   This is just a wrapper over L{utils.io.WriteFile}, with proper
2778   checking.
2779
2780   @type file_name: str
2781   @param file_name: the job file name
2782   @type content: str
2783   @param content: the new job contents
2784   @rtype: boolean
2785   @return: the success of the operation
2786
2787   """
2788   _EnsureJobQueueFile(file_name)
2789   getents = runtime.GetEnts()
2790
2791   # Write and replace the file atomically
2792   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2793                   gid=getents.masterd_gid)
2794
2795
2796 def JobQueueRename(old, new):
2797   """Renames a job queue file.
2798
2799   This is just a wrapper over os.rename with proper checking.
2800
2801   @type old: str
2802   @param old: the old (actual) file name
2803   @type new: str
2804   @param new: the desired file name
2805   @rtype: tuple
2806   @return: the success of the operation and payload
2807
2808   """
2809   _EnsureJobQueueFile(old)
2810   _EnsureJobQueueFile(new)
2811
2812   getents = runtime.GetEnts()
2813
2814   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2815                    dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2816
2817
2818 def BlockdevClose(instance_name, disks):
2819   """Closes the given block devices.
2820
2821   This means they will be switched to secondary mode (in case of
2822   DRBD).
2823
2824   @param instance_name: if the argument is not empty, the symlinks
2825       of this instance will be removed
2826   @type disks: list of L{objects.Disk}
2827   @param disks: the list of disks to be closed
2828   @rtype: tuple (success, message)
2829   @return: a tuple of success and message, where success
2830       indicates the succes of the operation, and message
2831       which will contain the error details in case we
2832       failed
2833
2834   """
2835   bdevs = []
2836   for cf in disks:
2837     rd = _RecursiveFindBD(cf)
2838     if rd is None:
2839       _Fail("Can't find device %s", cf)
2840     bdevs.append(rd)
2841
2842   msg = []
2843   for rd in bdevs:
2844     try:
2845       rd.Close()
2846     except errors.BlockDeviceError, err:
2847       msg.append(str(err))
2848   if msg:
2849     _Fail("Can't make devices secondary: %s", ",".join(msg))
2850   else:
2851     if instance_name:
2852       _RemoveBlockDevLinks(instance_name, disks)
2853
2854
2855 def ValidateHVParams(hvname, hvparams):
2856   """Validates the given hypervisor parameters.
2857
2858   @type hvname: string
2859   @param hvname: the hypervisor name
2860   @type hvparams: dict
2861   @param hvparams: the hypervisor parameters to be validated
2862   @rtype: None
2863
2864   """
2865   try:
2866     hv_type = hypervisor.GetHypervisor(hvname)
2867     hv_type.ValidateParameters(hvparams)
2868   except errors.HypervisorError, err:
2869     _Fail(str(err), log=False)
2870
2871
2872 def _CheckOSPList(os_obj, parameters):
2873   """Check whether a list of parameters is supported by the OS.
2874
2875   @type os_obj: L{objects.OS}
2876   @param os_obj: OS object to check
2877   @type parameters: list
2878   @param parameters: the list of parameters to check
2879
2880   """
2881   supported = [v[0] for v in os_obj.supported_parameters]
2882   delta = frozenset(parameters).difference(supported)
2883   if delta:
2884     _Fail("The following parameters are not supported"
2885           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2886
2887
2888 def ValidateOS(required, osname, checks, osparams):
2889   """Validate the given OS' parameters.
2890
2891   @type required: boolean
2892   @param required: whether absence of the OS should translate into
2893       failure or not
2894   @type osname: string
2895   @param osname: the OS to be validated
2896   @type checks: list
2897   @param checks: list of the checks to run (currently only 'parameters')
2898   @type osparams: dict
2899   @param osparams: dictionary with OS parameters
2900   @rtype: boolean
2901   @return: True if the validation passed, or False if the OS was not
2902       found and L{required} was false
2903
2904   """
2905   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2906     _Fail("Unknown checks required for OS %s: %s", osname,
2907           set(checks).difference(constants.OS_VALIDATE_CALLS))
2908
2909   name_only = objects.OS.GetName(osname)
2910   status, tbv = _TryOSFromDisk(name_only, None)
2911
2912   if not status:
2913     if required:
2914       _Fail(tbv)
2915     else:
2916       return False
2917
2918   if max(tbv.api_versions) < constants.OS_API_V20:
2919     return True
2920
2921   if constants.OS_VALIDATE_PARAMETERS in checks:
2922     _CheckOSPList(tbv, osparams.keys())
2923
2924   validate_env = OSCoreEnv(osname, tbv, osparams)
2925   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2926                         cwd=tbv.path, reset_env=True)
2927   if result.failed:
2928     logging.error("os validate command '%s' returned error: %s output: %s",
2929                   result.cmd, result.fail_reason, result.output)
2930     _Fail("OS validation script failed (%s), output: %s",
2931           result.fail_reason, result.output, log=False)
2932
2933   return True
2934
2935
2936 def DemoteFromMC():
2937   """Demotes the current node from master candidate role.
2938
2939   """
2940   # try to ensure we're not the master by mistake
2941   master, myself = ssconf.GetMasterAndMyself()
2942   if master == myself:
2943     _Fail("ssconf status shows I'm the master node, will not demote")
2944
2945   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2946   if not result.failed:
2947     _Fail("The master daemon is running, will not demote")
2948
2949   try:
2950     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2951       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2952   except EnvironmentError, err:
2953     if err.errno != errno.ENOENT:
2954       _Fail("Error while backing up cluster file: %s", err, exc=True)
2955
2956   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2957
2958
2959 def _GetX509Filenames(cryptodir, name):
2960   """Returns the full paths for the private key and certificate.
2961
2962   """
2963   return (utils.PathJoin(cryptodir, name),
2964           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2965           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2966
2967
2968 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2969   """Creates a new X509 certificate for SSL/TLS.
2970
2971   @type validity: int
2972   @param validity: Validity in seconds
2973   @rtype: tuple; (string, string)
2974   @return: Certificate name and public part
2975
2976   """
2977   (key_pem, cert_pem) = \
2978     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2979                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2980
2981   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2982                               prefix="x509-%s-" % utils.TimestampForFilename())
2983   try:
2984     name = os.path.basename(cert_dir)
2985     assert len(name) > 5
2986
2987     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2988
2989     utils.WriteFile(key_file, mode=0400, data=key_pem)
2990     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2991
2992     # Never return private key as it shouldn't leave the node
2993     return (name, cert_pem)
2994   except Exception:
2995     shutil.rmtree(cert_dir, ignore_errors=True)
2996     raise
2997
2998
2999 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
3000   """Removes a X509 certificate.
3001
3002   @type name: string
3003   @param name: Certificate name
3004
3005   """
3006   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3007
3008   utils.RemoveFile(key_file)
3009   utils.RemoveFile(cert_file)
3010
3011   try:
3012     os.rmdir(cert_dir)
3013   except EnvironmentError, err:
3014     _Fail("Cannot remove certificate directory '%s': %s",
3015           cert_dir, err)
3016
3017
3018 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3019   """Returns the command for the requested input/output.
3020
3021   @type instance: L{objects.Instance}
3022   @param instance: The instance object
3023   @param mode: Import/export mode
3024   @param ieio: Input/output type
3025   @param ieargs: Input/output arguments
3026
3027   """
3028   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3029
3030   env = None
3031   prefix = None
3032   suffix = None
3033   exp_size = None
3034
3035   if ieio == constants.IEIO_FILE:
3036     (filename, ) = ieargs
3037
3038     if not utils.IsNormAbsPath(filename):
3039       _Fail("Path '%s' is not normalized or absolute", filename)
3040
3041     real_filename = os.path.realpath(filename)
3042     directory = os.path.dirname(real_filename)
3043
3044     if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
3045       _Fail("File '%s' is not under exports directory '%s': %s",
3046             filename, constants.EXPORT_DIR, real_filename)
3047
3048     # Create directory
3049     utils.Makedirs(directory, mode=0750)
3050
3051     quoted_filename = utils.ShellQuote(filename)
3052
3053     if mode == constants.IEM_IMPORT:
3054       suffix = "> %s" % quoted_filename
3055     elif mode == constants.IEM_EXPORT:
3056       suffix = "< %s" % quoted_filename
3057
3058       # Retrieve file size
3059       try:
3060         st = os.stat(filename)
3061       except EnvironmentError, err:
3062         logging.error("Can't stat(2) %s: %s", filename, err)
3063       else:
3064         exp_size = utils.BytesToMebibyte(st.st_size)
3065
3066   elif ieio == constants.IEIO_RAW_DISK:
3067     (disk, ) = ieargs
3068
3069     real_disk = _OpenRealBD(disk)
3070
3071     if mode == constants.IEM_IMPORT:
3072       # we set here a smaller block size as, due to transport buffering, more
3073       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3074       # is not already there or we pass a wrong path; we use notrunc to no
3075       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3076       # much memory; this means that at best, we flush every 64k, which will
3077       # not be very fast
3078       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3079                                     " bs=%s oflag=dsync"),
3080                                     real_disk.dev_path,
3081                                     str(64 * 1024))
3082
3083     elif mode == constants.IEM_EXPORT:
3084       # the block size on the read dd is 1MiB to match our units
3085       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3086                                    real_disk.dev_path,
3087                                    str(1024 * 1024), # 1 MB
3088                                    str(disk.size))
3089       exp_size = disk.size
3090
3091   elif ieio == constants.IEIO_SCRIPT:
3092     (disk, disk_index, ) = ieargs
3093
3094     assert isinstance(disk_index, (int, long))
3095
3096     real_disk = _OpenRealBD(disk)
3097
3098     inst_os = OSFromDisk(instance.os)
3099     env = OSEnvironment(instance, inst_os)
3100
3101     if mode == constants.IEM_IMPORT:
3102       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3103       env["IMPORT_INDEX"] = str(disk_index)
3104       script = inst_os.import_script
3105
3106     elif mode == constants.IEM_EXPORT:
3107       env["EXPORT_DEVICE"] = real_disk.dev_path
3108       env["EXPORT_INDEX"] = str(disk_index)
3109       script = inst_os.export_script
3110
3111     # TODO: Pass special environment only to script
3112     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3113
3114     if mode == constants.IEM_IMPORT:
3115       suffix = "| %s" % script_cmd
3116
3117     elif mode == constants.IEM_EXPORT:
3118       prefix = "%s |" % script_cmd
3119
3120     # Let script predict size
3121     exp_size = constants.IE_CUSTOM_SIZE
3122
3123   else:
3124     _Fail("Invalid %s I/O mode %r", mode, ieio)
3125
3126   return (env, prefix, suffix, exp_size)
3127
3128
3129 def _CreateImportExportStatusDir(prefix):
3130   """Creates status directory for import/export.
3131
3132   """
3133   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3134                           prefix=("%s-%s-" %
3135                                   (prefix, utils.TimestampForFilename())))
3136
3137
3138 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3139                             ieio, ieioargs):
3140   """Starts an import or export daemon.
3141
3142   @param mode: Import/output mode
3143   @type opts: L{objects.ImportExportOptions}
3144   @param opts: Daemon options
3145   @type host: string
3146   @param host: Remote host for export (None for import)
3147   @type port: int
3148   @param port: Remote port for export (None for import)
3149   @type instance: L{objects.Instance}
3150   @param instance: Instance object
3151   @type component: string
3152   @param component: which part of the instance is transferred now,
3153       e.g. 'disk/0'
3154   @param ieio: Input/output type
3155   @param ieioargs: Input/output arguments
3156
3157   """
3158   if mode == constants.IEM_IMPORT:
3159     prefix = "import"
3160
3161     if not (host is None and port is None):
3162       _Fail("Can not specify host or port on import")
3163
3164   elif mode == constants.IEM_EXPORT:
3165     prefix = "export"
3166
3167     if host is None or port is None:
3168       _Fail("Host and port must be specified for an export")
3169
3170   else:
3171     _Fail("Invalid mode %r", mode)
3172
3173   if (opts.key_name is None) ^ (opts.ca_pem is None):
3174     _Fail("Cluster certificate can only be used for both key and CA")
3175
3176   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3177     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3178
3179   if opts.key_name is None:
3180     # Use server.pem
3181     key_path = constants.NODED_CERT_FILE
3182     cert_path = constants.NODED_CERT_FILE
3183     assert opts.ca_pem is None
3184   else:
3185     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3186                                                  opts.key_name)
3187     assert opts.ca_pem is not None
3188
3189   for i in [key_path, cert_path]:
3190     if not os.path.exists(i):
3191       _Fail("File '%s' does not exist" % i)
3192
3193   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3194   try:
3195     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3196     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3197     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3198
3199     if opts.ca_pem is None:
3200       # Use server.pem
3201       ca = utils.ReadFile(constants.NODED_CERT_FILE)
3202     else:
3203       ca = opts.ca_pem
3204
3205     # Write CA file
3206     utils.WriteFile(ca_file, data=ca, mode=0400)
3207
3208     cmd = [
3209       constants.IMPORT_EXPORT_DAEMON,
3210       status_file, mode,
3211       "--key=%s" % key_path,
3212       "--cert=%s" % cert_path,
3213       "--ca=%s" % ca_file,
3214       ]
3215
3216     if host:
3217       cmd.append("--host=%s" % host)
3218
3219     if port:
3220       cmd.append("--port=%s" % port)
3221
3222     if opts.ipv6:
3223       cmd.append("--ipv6")
3224     else:
3225       cmd.append("--ipv4")
3226
3227     if opts.compress:
3228       cmd.append("--compress=%s" % opts.compress)
3229
3230     if opts.magic:
3231       cmd.append("--magic=%s" % opts.magic)
3232
3233     if exp_size is not None:
3234       cmd.append("--expected-size=%s" % exp_size)
3235
3236     if cmd_prefix:
3237       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3238
3239     if cmd_suffix:
3240       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3241
3242     if mode == constants.IEM_EXPORT:
3243       # Retry connection a few times when connecting to remote peer
3244       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3245       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3246     elif opts.connect_timeout is not None:
3247       assert mode == constants.IEM_IMPORT
3248       # Overall timeout for establishing connection while listening
3249       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3250
3251     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3252
3253     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3254     # support for receiving a file descriptor for output
3255     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3256                       output=logfile)
3257
3258     # The import/export name is simply the status directory name
3259     return os.path.basename(status_dir)
3260
3261   except Exception:
3262     shutil.rmtree(status_dir, ignore_errors=True)
3263     raise
3264
3265
3266 def GetImportExportStatus(names):
3267   """Returns import/export daemon status.
3268
3269   @type names: sequence
3270   @param names: List of names
3271   @rtype: List of dicts
3272   @return: Returns a list of the state of each named import/export or None if a
3273            status couldn't be read
3274
3275   """
3276   result = []
3277
3278   for name in names:
3279     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3280                                  _IES_STATUS_FILE)
3281
3282     try:
3283       data = utils.ReadFile(status_file)
3284     except EnvironmentError, err:
3285       if err.errno != errno.ENOENT:
3286         raise
3287       data = None
3288
3289     if not data:
3290       result.append(None)
3291       continue
3292
3293     result.append(serializer.LoadJson(data))
3294
3295   return result
3296
3297
3298 def AbortImportExport(name):
3299   """Sends SIGTERM to a running import/export daemon.
3300
3301   """
3302   logging.info("Abort import/export %s", name)
3303
3304   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3305   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3306
3307   if pid:
3308     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3309                  name, pid)
3310     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3311
3312
3313 def CleanupImportExport(name):
3314   """Cleanup after an import or export.
3315
3316   If the import/export daemon is still running it's killed. Afterwards the
3317   whole status directory is removed.
3318
3319   """
3320   logging.info("Finalizing import/export %s", name)
3321
3322   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3323
3324   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3325
3326   if pid:
3327     logging.info("Import/export %s is still running with PID %s",
3328                  name, pid)
3329     utils.KillProcess(pid, waitpid=False)
3330
3331   shutil.rmtree(status_dir, ignore_errors=True)
3332
3333
3334 def _FindDisks(nodes_ip, disks):
3335   """Sets the physical ID on disks and returns the block devices.
3336
3337   """
3338   # set the correct physical ID
3339   my_name = netutils.Hostname.GetSysName()
3340   for cf in disks:
3341     cf.SetPhysicalID(my_name, nodes_ip)
3342
3343   bdevs = []
3344
3345   for cf in disks:
3346     rd = _RecursiveFindBD(cf)
3347     if rd is None:
3348       _Fail("Can't find device %s", cf)
3349     bdevs.append(rd)
3350   return bdevs
3351
3352
3353 def DrbdDisconnectNet(nodes_ip, disks):
3354   """Disconnects the network on a list of drbd devices.
3355
3356   """
3357   bdevs = _FindDisks(nodes_ip, disks)
3358
3359   # disconnect disks
3360   for rd in bdevs:
3361     try:
3362       rd.DisconnectNet()
3363     except errors.BlockDeviceError, err:
3364       _Fail("Can't change network configuration to standalone mode: %s",
3365             err, exc=True)
3366
3367
3368 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3369   """Attaches the network on a list of drbd devices.
3370
3371   """
3372   bdevs = _FindDisks(nodes_ip, disks)
3373
3374   if multimaster:
3375     for idx, rd in enumerate(bdevs):
3376       try:
3377         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3378       except EnvironmentError, err:
3379         _Fail("Can't create symlink: %s", err)
3380   # reconnect disks, switch to new master configuration and if
3381   # needed primary mode
3382   for rd in bdevs:
3383     try:
3384       rd.AttachNet(multimaster)
3385     except errors.BlockDeviceError, err:
3386       _Fail("Can't change network configuration: %s", err)
3387
3388   # wait until the disks are connected; we need to retry the re-attach
3389   # if the device becomes standalone, as this might happen if the one
3390   # node disconnects and reconnects in a different mode before the
3391   # other node reconnects; in this case, one or both of the nodes will
3392   # decide it has wrong configuration and switch to standalone
3393
3394   def _Attach():
3395     all_connected = True
3396
3397     for rd in bdevs:
3398       stats = rd.GetProcStatus()
3399
3400       all_connected = (all_connected and
3401                        (stats.is_connected or stats.is_in_resync))
3402
3403       if stats.is_standalone:
3404         # peer had different config info and this node became
3405         # standalone, even though this should not happen with the
3406         # new staged way of changing disk configs
3407         try:
3408           rd.AttachNet(multimaster)
3409         except errors.BlockDeviceError, err:
3410           _Fail("Can't change network configuration: %s", err)
3411
3412     if not all_connected:
3413       raise utils.RetryAgain()
3414
3415   try:
3416     # Start with a delay of 100 miliseconds and go up to 5 seconds
3417     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3418   except utils.RetryTimeout:
3419     _Fail("Timeout in disk reconnecting")
3420
3421   if multimaster:
3422     # change to primary mode
3423     for rd in bdevs:
3424       try:
3425         rd.Open()
3426       except errors.BlockDeviceError, err:
3427         _Fail("Can't change to primary mode: %s", err)
3428
3429
3430 def DrbdWaitSync(nodes_ip, disks):
3431   """Wait until DRBDs have synchronized.
3432
3433   """
3434   def _helper(rd):
3435     stats = rd.GetProcStatus()
3436     if not (stats.is_connected or stats.is_in_resync):
3437       raise utils.RetryAgain()
3438     return stats
3439
3440   bdevs = _FindDisks(nodes_ip, disks)
3441
3442   min_resync = 100
3443   alldone = True
3444   for rd in bdevs:
3445     try:
3446       # poll each second for 15 seconds
3447       stats = utils.Retry(_helper, 1, 15, args=[rd])
3448     except utils.RetryTimeout:
3449       stats = rd.GetProcStatus()
3450       # last check
3451       if not (stats.is_connected or stats.is_in_resync):
3452         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3453     alldone = alldone and (not stats.is_in_resync)
3454     if stats.sync_percent is not None:
3455       min_resync = min(min_resync, stats.sync_percent)
3456
3457   return (alldone, min_resync)
3458
3459
3460 def GetDrbdUsermodeHelper():
3461   """Returns DRBD usermode helper currently configured.
3462
3463   """
3464   try:
3465     return bdev.BaseDRBD.GetUsermodeHelper()
3466   except errors.BlockDeviceError, err:
3467     _Fail(str(err))
3468
3469
3470 def PowercycleNode(hypervisor_type):
3471   """Hard-powercycle the node.
3472
3473   Because we need to return first, and schedule the powercycle in the
3474   background, we won't be able to report failures nicely.
3475
3476   """
3477   hyper = hypervisor.GetHypervisor(hypervisor_type)
3478   try:
3479     pid = os.fork()
3480   except OSError:
3481     # if we can't fork, we'll pretend that we're in the child process
3482     pid = 0
3483   if pid > 0:
3484     return "Reboot scheduled in 5 seconds"
3485   # ensure the child is running on ram
3486   try:
3487     utils.Mlockall()
3488   except Exception: # pylint: disable=W0703
3489     pass
3490   time.sleep(5)
3491   hyper.PowercycleNode()
3492
3493
3494 class HooksRunner(object):
3495   """Hook runner.
3496
3497   This class is instantiated on the node side (ganeti-noded) and not
3498   on the master side.
3499
3500   """
3501   def __init__(self, hooks_base_dir=None):
3502     """Constructor for hooks runner.
3503
3504     @type hooks_base_dir: str or None
3505     @param hooks_base_dir: if not None, this overrides the
3506         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3507
3508     """
3509     if hooks_base_dir is None:
3510       hooks_base_dir = constants.HOOKS_BASE_DIR
3511     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3512     # constant
3513     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3514
3515   def RunLocalHooks(self, node_list, hpath, phase, env):
3516     """Check that the hooks will be run only locally and then run them.
3517
3518     """
3519     assert len(node_list) == 1
3520     node = node_list[0]
3521     _, myself = ssconf.GetMasterAndMyself()
3522     assert node == myself
3523
3524     results = self.RunHooks(hpath, phase, env)
3525
3526     # Return values in the form expected by HooksMaster
3527     return {node: (None, False, results)}
3528
3529   def RunHooks(self, hpath, phase, env):
3530     """Run the scripts in the hooks directory.
3531
3532     @type hpath: str
3533     @param hpath: the path to the hooks directory which
3534         holds the scripts
3535     @type phase: str
3536     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3537         L{constants.HOOKS_PHASE_POST}
3538     @type env: dict
3539     @param env: dictionary with the environment for the hook
3540     @rtype: list
3541     @return: list of 3-element tuples:
3542       - script path
3543       - script result, either L{constants.HKR_SUCCESS} or
3544         L{constants.HKR_FAIL}
3545       - output of the script
3546
3547     @raise errors.ProgrammerError: for invalid input
3548         parameters
3549
3550     """
3551     if phase == constants.HOOKS_PHASE_PRE:
3552       suffix = "pre"
3553     elif phase == constants.HOOKS_PHASE_POST:
3554       suffix = "post"
3555     else:
3556       _Fail("Unknown hooks phase '%s'", phase)
3557
3558     subdir = "%s-%s.d" % (hpath, suffix)
3559     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3560
3561     results = []
3562
3563     if not os.path.isdir(dir_name):
3564       # for non-existing/non-dirs, we simply exit instead of logging a
3565       # warning at every operation
3566       return results
3567
3568     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3569
3570     for (relname, relstatus, runresult)  in runparts_results:
3571       if relstatus == constants.RUNPARTS_SKIP:
3572         rrval = constants.HKR_SKIP
3573         output = ""
3574       elif relstatus == constants.RUNPARTS_ERR:
3575         rrval = constants.HKR_FAIL
3576         output = "Hook script execution error: %s" % runresult
3577       elif relstatus == constants.RUNPARTS_RUN:
3578         if runresult.failed:
3579           rrval = constants.HKR_FAIL
3580         else:
3581           rrval = constants.HKR_SUCCESS
3582         output = utils.SafeEncode(runresult.output.strip())
3583       results.append(("%s/%s" % (subdir, relname), rrval, output))
3584
3585     return results
3586
3587
3588 class IAllocatorRunner(object):
3589   """IAllocator runner.
3590
3591   This class is instantiated on the node side (ganeti-noded) and not on
3592   the master side.
3593
3594   """
3595   @staticmethod
3596   def Run(name, idata):
3597     """Run an iallocator script.
3598
3599     @type name: str
3600     @param name: the iallocator script name
3601     @type idata: str
3602     @param idata: the allocator input data
3603
3604     @rtype: tuple
3605     @return: two element tuple of:
3606        - status
3607        - either error message or stdout of allocator (for success)
3608
3609     """
3610     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3611                                   os.path.isfile)
3612     if alloc_script is None:
3613       _Fail("iallocator module '%s' not found in the search path", name)
3614
3615     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3616     try:
3617       os.write(fd, idata)
3618       os.close(fd)
3619       result = utils.RunCmd([alloc_script, fin_name])
3620       if result.failed:
3621         _Fail("iallocator module '%s' failed: %s, output '%s'",
3622               name, result.fail_reason, result.output)
3623     finally:
3624       os.unlink(fin_name)
3625
3626     return result.stdout
3627
3628
3629 class DevCacheManager(object):
3630   """Simple class for managing a cache of block device information.
3631
3632   """
3633   _DEV_PREFIX = "/dev/"
3634   _ROOT_DIR = constants.BDEV_CACHE_DIR
3635
3636   @classmethod
3637   def _ConvertPath(cls, dev_path):
3638     """Converts a /dev/name path to the cache file name.
3639
3640     This replaces slashes with underscores and strips the /dev
3641     prefix. It then returns the full path to the cache file.
3642
3643     @type dev_path: str
3644     @param dev_path: the C{/dev/} path name
3645     @rtype: str
3646     @return: the converted path name
3647
3648     """
3649     if dev_path.startswith(cls._DEV_PREFIX):
3650       dev_path = dev_path[len(cls._DEV_PREFIX):]
3651     dev_path = dev_path.replace("/", "_")
3652     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3653     return fpath
3654
3655   @classmethod
3656   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3657     """Updates the cache information for a given device.
3658
3659     @type dev_path: str
3660     @param dev_path: the pathname of the device
3661     @type owner: str
3662     @param owner: the owner (instance name) of the device
3663     @type on_primary: bool
3664     @param on_primary: whether this is the primary
3665         node nor not
3666     @type iv_name: str
3667     @param iv_name: the instance-visible name of the
3668         device, as in objects.Disk.iv_name
3669
3670     @rtype: None
3671
3672     """
3673     if dev_path is None:
3674       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3675       return
3676     fpath = cls._ConvertPath(dev_path)
3677     if on_primary:
3678       state = "primary"
3679     else:
3680       state = "secondary"
3681     if iv_name is None:
3682       iv_name = "not_visible"
3683     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3684     try:
3685       utils.WriteFile(fpath, data=fdata)
3686     except EnvironmentError, err:
3687       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3688
3689   @classmethod
3690   def RemoveCache(cls, dev_path):
3691     """Remove data for a dev_path.
3692
3693     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3694     path name and logging.
3695
3696     @type dev_path: str
3697     @param dev_path: the pathname of the device
3698
3699     @rtype: None
3700
3701     """
3702     if dev_path is None:
3703       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3704       return
3705     fpath = cls._ConvertPath(dev_path)
3706     try:
3707       utils.RemoveFile(fpath)
3708     except EnvironmentError, err:
3709       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)