serializer: Remove JSON indentation and dict key sorting
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65
66
67 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
68 _ALLOWED_CLEAN_DIRS = frozenset([
69   constants.DATA_DIR,
70   constants.JOB_QUEUE_ARCHIVE_DIR,
71   constants.QUEUE_DIR,
72   constants.CRYPTO_KEYS_DIR,
73   ])
74 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
75 _X509_KEY_FILE = "key"
76 _X509_CERT_FILE = "cert"
77 _IES_STATUS_FILE = "status"
78 _IES_PID_FILE = "pid"
79 _IES_CA_FILE = "ca"
80
81 #: Valid LVS output line regex
82 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
83
84 # Actions for the master setup script
85 _MASTER_START = "start"
86 _MASTER_STOP = "stop"
87
88
89 class RPCFail(Exception):
90   """Class denoting RPC failure.
91
92   Its argument is the error message.
93
94   """
95
96
97 def _Fail(msg, *args, **kwargs):
98   """Log an error and the raise an RPCFail exception.
99
100   This exception is then handled specially in the ganeti daemon and
101   turned into a 'failed' return type. As such, this function is a
102   useful shortcut for logging the error and returning it to the master
103   daemon.
104
105   @type msg: string
106   @param msg: the text of the exception
107   @raise RPCFail
108
109   """
110   if args:
111     msg = msg % args
112   if "log" not in kwargs or kwargs["log"]: # if we should log this error
113     if "exc" in kwargs and kwargs["exc"]:
114       logging.exception(msg)
115     else:
116       logging.error(msg)
117   raise RPCFail(msg)
118
119
120 def _GetConfig():
121   """Simple wrapper to return a SimpleStore.
122
123   @rtype: L{ssconf.SimpleStore}
124   @return: a SimpleStore instance
125
126   """
127   return ssconf.SimpleStore()
128
129
130 def _GetSshRunner(cluster_name):
131   """Simple wrapper to return an SshRunner.
132
133   @type cluster_name: str
134   @param cluster_name: the cluster name, which is needed
135       by the SshRunner constructor
136   @rtype: L{ssh.SshRunner}
137   @return: an SshRunner instance
138
139   """
140   return ssh.SshRunner(cluster_name)
141
142
143 def _Decompress(data):
144   """Unpacks data compressed by the RPC client.
145
146   @type data: list or tuple
147   @param data: Data sent by RPC client
148   @rtype: str
149   @return: Decompressed data
150
151   """
152   assert isinstance(data, (list, tuple))
153   assert len(data) == 2
154   (encoding, content) = data
155   if encoding == constants.RPC_ENCODING_NONE:
156     return content
157   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
158     return zlib.decompress(base64.b64decode(content))
159   else:
160     raise AssertionError("Unknown data encoding")
161
162
163 def _CleanDirectory(path, exclude=None):
164   """Removes all regular files in a directory.
165
166   @type path: str
167   @param path: the directory to clean
168   @type exclude: list
169   @param exclude: list of files to be excluded, defaults
170       to the empty list
171
172   """
173   if path not in _ALLOWED_CLEAN_DIRS:
174     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
175           path)
176
177   if not os.path.isdir(path):
178     return
179   if exclude is None:
180     exclude = []
181   else:
182     # Normalize excluded paths
183     exclude = [os.path.normpath(i) for i in exclude]
184
185   for rel_name in utils.ListVisibleFiles(path):
186     full_name = utils.PathJoin(path, rel_name)
187     if full_name in exclude:
188       continue
189     if os.path.isfile(full_name) and not os.path.islink(full_name):
190       utils.RemoveFile(full_name)
191
192
193 def _BuildUploadFileList():
194   """Build the list of allowed upload files.
195
196   This is abstracted so that it's built only once at module import time.
197
198   """
199   allowed_files = set([
200     constants.CLUSTER_CONF_FILE,
201     constants.ETC_HOSTS,
202     constants.SSH_KNOWN_HOSTS_FILE,
203     constants.VNC_PASSWORD_FILE,
204     constants.RAPI_CERT_FILE,
205     constants.SPICE_CERT_FILE,
206     constants.SPICE_CACERT_FILE,
207     constants.RAPI_USERS_FILE,
208     constants.CONFD_HMAC_KEY,
209     constants.CLUSTER_DOMAIN_SECRET_FILE,
210     ])
211
212   for hv_name in constants.HYPER_TYPES:
213     hv_class = hypervisor.GetHypervisorClass(hv_name)
214     allowed_files.update(hv_class.GetAncillaryFiles()[0])
215
216   return frozenset(allowed_files)
217
218
219 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
220
221
222 def JobQueuePurge():
223   """Removes job queue files and archived jobs.
224
225   @rtype: tuple
226   @return: True, None
227
228   """
229   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
230   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
231
232
233 def GetMasterInfo():
234   """Returns master information.
235
236   This is an utility function to compute master information, either
237   for consumption here or from the node daemon.
238
239   @rtype: tuple
240   @return: master_netdev, master_ip, master_name, primary_ip_family,
241     master_netmask
242   @raise RPCFail: in case of errors
243
244   """
245   try:
246     cfg = _GetConfig()
247     master_netdev = cfg.GetMasterNetdev()
248     master_ip = cfg.GetMasterIP()
249     master_netmask = cfg.GetMasterNetmask()
250     master_node = cfg.GetMasterNode()
251     primary_ip_family = cfg.GetPrimaryIPFamily()
252   except errors.ConfigurationError, err:
253     _Fail("Cluster configuration incomplete: %s", err, exc=True)
254   return (master_netdev, master_ip, master_node, primary_ip_family,
255       master_netmask)
256
257
258 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
259   """Decorator that runs hooks before and after the decorated function.
260
261   @type hook_opcode: string
262   @param hook_opcode: opcode of the hook
263   @type hooks_path: string
264   @param hooks_path: path of the hooks
265   @type env_builder_fn: function
266   @param env_builder_fn: function that returns a dictionary containing the
267     environment variables for the hooks. Will get all the parameters of the
268     decorated function.
269   @raise RPCFail: in case of pre-hook failure
270
271   """
272   def decorator(fn):
273     def wrapper(*args, **kwargs):
274       _, myself = ssconf.GetMasterAndMyself()
275       nodes = ([myself], [myself])  # these hooks run locally
276
277       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
278
279       cfg = _GetConfig()
280       hr = HooksRunner()
281       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
282                             None, env_fn, logging.warning, cfg.GetClusterName(),
283                             cfg.GetMasterNode())
284
285       hm.RunPhase(constants.HOOKS_PHASE_PRE)
286       result = fn(*args, **kwargs)
287       hm.RunPhase(constants.HOOKS_PHASE_POST)
288
289       return result
290     return wrapper
291   return decorator
292
293
294 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
295   """Builds environment variables for master IP hooks.
296
297   @type master_params: L{objects.MasterNetworkParameters}
298   @param master_params: network parameters of the master
299   @type use_external_mip_script: boolean
300   @param use_external_mip_script: whether to use an external master IP
301     address setup script (unused, but necessary per the implementation of the
302     _RunLocalHooks decorator)
303
304   """
305   # pylint: disable=W0613
306   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
307   env = {
308     "MASTER_NETDEV": master_params.netdev,
309     "MASTER_IP": master_params.ip,
310     "MASTER_NETMASK": str(master_params.netmask),
311     "CLUSTER_IP_VERSION": str(ver),
312   }
313
314   return env
315
316
317 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
318   """Execute the master IP address setup script.
319
320   @type master_params: L{objects.MasterNetworkParameters}
321   @param master_params: network parameters of the master
322   @type action: string
323   @param action: action to pass to the script. Must be one of
324     L{backend._MASTER_START} or L{backend._MASTER_STOP}
325   @type use_external_mip_script: boolean
326   @param use_external_mip_script: whether to use an external master IP
327     address setup script
328   @raise backend.RPCFail: if there are errors during the execution of the
329     script
330
331   """
332   env = _BuildMasterIpEnv(master_params)
333
334   if use_external_mip_script:
335     setup_script = constants.EXTERNAL_MASTER_SETUP_SCRIPT
336   else:
337     setup_script = constants.DEFAULT_MASTER_SETUP_SCRIPT
338
339   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
340
341   if result.failed:
342     _Fail("Failed to %s the master IP. Script return value: %s" %
343           (action, result.exit_code), log=True)
344
345
346 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
347                _BuildMasterIpEnv)
348 def ActivateMasterIp(master_params, use_external_mip_script):
349   """Activate the IP address of the master daemon.
350
351   @type master_params: L{objects.MasterNetworkParameters}
352   @param master_params: network parameters of the master
353   @type use_external_mip_script: boolean
354   @param use_external_mip_script: whether to use an external master IP
355     address setup script
356   @raise RPCFail: in case of errors during the IP startup
357
358   """
359   _RunMasterSetupScript(master_params, _MASTER_START,
360                         use_external_mip_script)
361
362
363 def StartMasterDaemons(no_voting):
364   """Activate local node as master node.
365
366   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
367
368   @type no_voting: boolean
369   @param no_voting: whether to start ganeti-masterd without a node vote
370       but still non-interactively
371   @rtype: None
372
373   """
374
375   if no_voting:
376     masterd_args = "--no-voting --yes-do-it"
377   else:
378     masterd_args = ""
379
380   env = {
381     "EXTRA_MASTERD_ARGS": masterd_args,
382     }
383
384   result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
385   if result.failed:
386     msg = "Can't start Ganeti master: %s" % result.output
387     logging.error(msg)
388     _Fail(msg)
389
390
391 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
392                _BuildMasterIpEnv)
393 def DeactivateMasterIp(master_params, use_external_mip_script):
394   """Deactivate the master IP on this node.
395
396   @type master_params: L{objects.MasterNetworkParameters}
397   @param master_params: network parameters of the master
398   @type use_external_mip_script: boolean
399   @param use_external_mip_script: whether to use an external master IP
400     address setup script
401   @raise RPCFail: in case of errors during the IP turndown
402
403   """
404   _RunMasterSetupScript(master_params, _MASTER_STOP,
405                         use_external_mip_script)
406
407
408 def StopMasterDaemons():
409   """Stop the master daemons on this node.
410
411   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
412
413   @rtype: None
414
415   """
416   # TODO: log and report back to the caller the error failures; we
417   # need to decide in which case we fail the RPC for this
418
419   result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
420   if result.failed:
421     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
422                   " and error %s",
423                   result.cmd, result.exit_code, result.output)
424
425
426 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
427   """Change the netmask of the master IP.
428
429   @param old_netmask: the old value of the netmask
430   @param netmask: the new value of the netmask
431   @param master_ip: the master IP
432   @param master_netdev: the master network device
433
434   """
435   if old_netmask == netmask:
436     return
437
438   if not netutils.IPAddress.Own(master_ip):
439     _Fail("The master IP address is not up, not attempting to change its"
440           " netmask")
441
442   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
443                          "%s/%s" % (master_ip, netmask),
444                          "dev", master_netdev, "label",
445                          "%s:0" % master_netdev])
446   if result.failed:
447     _Fail("Could not set the new netmask on the master IP address")
448
449   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
450                          "%s/%s" % (master_ip, old_netmask),
451                          "dev", master_netdev, "label",
452                          "%s:0" % master_netdev])
453   if result.failed:
454     _Fail("Could not bring down the master IP address with the old netmask")
455
456
457 def EtcHostsModify(mode, host, ip):
458   """Modify a host entry in /etc/hosts.
459
460   @param mode: The mode to operate. Either add or remove entry
461   @param host: The host to operate on
462   @param ip: The ip associated with the entry
463
464   """
465   if mode == constants.ETC_HOSTS_ADD:
466     if not ip:
467       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
468               " present")
469     utils.AddHostToEtcHosts(host, ip)
470   elif mode == constants.ETC_HOSTS_REMOVE:
471     if ip:
472       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
473               " parameter is present")
474     utils.RemoveHostFromEtcHosts(host)
475   else:
476     RPCFail("Mode not supported")
477
478
479 def LeaveCluster(modify_ssh_setup):
480   """Cleans up and remove the current node.
481
482   This function cleans up and prepares the current node to be removed
483   from the cluster.
484
485   If processing is successful, then it raises an
486   L{errors.QuitGanetiException} which is used as a special case to
487   shutdown the node daemon.
488
489   @param modify_ssh_setup: boolean
490
491   """
492   _CleanDirectory(constants.DATA_DIR)
493   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
494   JobQueuePurge()
495
496   if modify_ssh_setup:
497     try:
498       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
499
500       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
501
502       utils.RemoveFile(priv_key)
503       utils.RemoveFile(pub_key)
504     except errors.OpExecError:
505       logging.exception("Error while processing ssh files")
506
507   try:
508     utils.RemoveFile(constants.CONFD_HMAC_KEY)
509     utils.RemoveFile(constants.RAPI_CERT_FILE)
510     utils.RemoveFile(constants.SPICE_CERT_FILE)
511     utils.RemoveFile(constants.SPICE_CACERT_FILE)
512     utils.RemoveFile(constants.NODED_CERT_FILE)
513   except: # pylint: disable=W0702
514     logging.exception("Error while removing cluster secrets")
515
516   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
517   if result.failed:
518     logging.error("Command %s failed with exitcode %s and error %s",
519                   result.cmd, result.exit_code, result.output)
520
521   # Raise a custom exception (handled in ganeti-noded)
522   raise errors.QuitGanetiException(True, "Shutdown scheduled")
523
524
525 def _GetVgInfo(name):
526   """Retrieves information about a LVM volume group.
527
528   """
529   # TODO: GetVGInfo supports returning information for multiple VGs at once
530   vginfo = bdev.LogicalVolume.GetVGInfo([name])
531   if vginfo:
532     vg_free = int(round(vginfo[0][0], 0))
533     vg_size = int(round(vginfo[0][1], 0))
534   else:
535     vg_free = None
536     vg_size = None
537
538   return {
539     "name": name,
540     "vg_free": vg_free,
541     "vg_size": vg_size,
542     }
543
544
545 def _GetHvInfo(name):
546   """Retrieves node information from a hypervisor.
547
548   The information returned depends on the hypervisor. Common items:
549
550     - vg_size is the size of the configured volume group in MiB
551     - vg_free is the free size of the volume group in MiB
552     - memory_dom0 is the memory allocated for domain0 in MiB
553     - memory_free is the currently available (free) ram in MiB
554     - memory_total is the total number of ram in MiB
555     - hv_version: the hypervisor version, if available
556
557   """
558   return hypervisor.GetHypervisor(name).GetNodeInfo()
559
560
561 def _GetNamedNodeInfo(names, fn):
562   """Calls C{fn} for all names in C{names} and returns a dictionary.
563
564   @rtype: None or dict
565
566   """
567   if names is None:
568     return None
569   else:
570     return map(fn, names)
571
572
573 def GetNodeInfo(vg_names, hv_names):
574   """Gives back a hash with different information about the node.
575
576   @type vg_names: list of string
577   @param vg_names: Names of the volume groups to ask for disk space information
578   @type hv_names: list of string
579   @param hv_names: Names of the hypervisors to ask for node information
580   @rtype: tuple; (string, None/dict, None/dict)
581   @return: Tuple containing boot ID, volume group information and hypervisor
582     information
583
584   """
585   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
586   vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
587   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
588
589   return (bootid, vg_info, hv_info)
590
591
592 def VerifyNode(what, cluster_name):
593   """Verify the status of the local node.
594
595   Based on the input L{what} parameter, various checks are done on the
596   local node.
597
598   If the I{filelist} key is present, this list of
599   files is checksummed and the file/checksum pairs are returned.
600
601   If the I{nodelist} key is present, we check that we have
602   connectivity via ssh with the target nodes (and check the hostname
603   report).
604
605   If the I{node-net-test} key is present, we check that we have
606   connectivity to the given nodes via both primary IP and, if
607   applicable, secondary IPs.
608
609   @type what: C{dict}
610   @param what: a dictionary of things to check:
611       - filelist: list of files for which to compute checksums
612       - nodelist: list of nodes we should check ssh communication with
613       - node-net-test: list of nodes we should check node daemon port
614         connectivity with
615       - hypervisor: list with hypervisors to run the verify for
616   @rtype: dict
617   @return: a dictionary with the same keys as the input dict, and
618       values representing the result of the checks
619
620   """
621   result = {}
622   my_name = netutils.Hostname.GetSysName()
623   port = netutils.GetDaemonPort(constants.NODED)
624   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
625
626   if constants.NV_HYPERVISOR in what and vm_capable:
627     result[constants.NV_HYPERVISOR] = tmp = {}
628     for hv_name in what[constants.NV_HYPERVISOR]:
629       try:
630         val = hypervisor.GetHypervisor(hv_name).Verify()
631       except errors.HypervisorError, err:
632         val = "Error while checking hypervisor: %s" % str(err)
633       tmp[hv_name] = val
634
635   if constants.NV_HVPARAMS in what and vm_capable:
636     result[constants.NV_HVPARAMS] = tmp = []
637     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
638       try:
639         logging.info("Validating hv %s, %s", hv_name, hvparms)
640         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
641       except errors.HypervisorError, err:
642         tmp.append((source, hv_name, str(err)))
643
644   if constants.NV_FILELIST in what:
645     result[constants.NV_FILELIST] = utils.FingerprintFiles(
646       what[constants.NV_FILELIST])
647
648   if constants.NV_NODELIST in what:
649     (nodes, bynode) = what[constants.NV_NODELIST]
650
651     # Add nodes from other groups (different for each node)
652     try:
653       nodes.extend(bynode[my_name])
654     except KeyError:
655       pass
656
657     # Use a random order
658     random.shuffle(nodes)
659
660     # Try to contact all nodes
661     val = {}
662     for node in nodes:
663       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
664       if not success:
665         val[node] = message
666
667     result[constants.NV_NODELIST] = val
668
669   if constants.NV_NODENETTEST in what:
670     result[constants.NV_NODENETTEST] = tmp = {}
671     my_pip = my_sip = None
672     for name, pip, sip in what[constants.NV_NODENETTEST]:
673       if name == my_name:
674         my_pip = pip
675         my_sip = sip
676         break
677     if not my_pip:
678       tmp[my_name] = ("Can't find my own primary/secondary IP"
679                       " in the node list")
680     else:
681       for name, pip, sip in what[constants.NV_NODENETTEST]:
682         fail = []
683         if not netutils.TcpPing(pip, port, source=my_pip):
684           fail.append("primary")
685         if sip != pip:
686           if not netutils.TcpPing(sip, port, source=my_sip):
687             fail.append("secondary")
688         if fail:
689           tmp[name] = ("failure using the %s interface(s)" %
690                        " and ".join(fail))
691
692   if constants.NV_MASTERIP in what:
693     # FIXME: add checks on incoming data structures (here and in the
694     # rest of the function)
695     master_name, master_ip = what[constants.NV_MASTERIP]
696     if master_name == my_name:
697       source = constants.IP4_ADDRESS_LOCALHOST
698     else:
699       source = None
700     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
701                                                   source=source)
702
703   if constants.NV_USERSCRIPTS in what:
704     result[constants.NV_USERSCRIPTS] = \
705       [script for script in what[constants.NV_USERSCRIPTS]
706        if not (os.path.exists(script) and os.access(script, os.X_OK))]
707
708   if constants.NV_OOB_PATHS in what:
709     result[constants.NV_OOB_PATHS] = tmp = []
710     for path in what[constants.NV_OOB_PATHS]:
711       try:
712         st = os.stat(path)
713       except OSError, err:
714         tmp.append("error stating out of band helper: %s" % err)
715       else:
716         if stat.S_ISREG(st.st_mode):
717           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
718             tmp.append(None)
719           else:
720             tmp.append("out of band helper %s is not executable" % path)
721         else:
722           tmp.append("out of band helper %s is not a file" % path)
723
724   if constants.NV_LVLIST in what and vm_capable:
725     try:
726       val = GetVolumeList(utils.ListVolumeGroups().keys())
727     except RPCFail, err:
728       val = str(err)
729     result[constants.NV_LVLIST] = val
730
731   if constants.NV_INSTANCELIST in what and vm_capable:
732     # GetInstanceList can fail
733     try:
734       val = GetInstanceList(what[constants.NV_INSTANCELIST])
735     except RPCFail, err:
736       val = str(err)
737     result[constants.NV_INSTANCELIST] = val
738
739   if constants.NV_VGLIST in what and vm_capable:
740     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
741
742   if constants.NV_PVLIST in what and vm_capable:
743     result[constants.NV_PVLIST] = \
744       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
745                                    filter_allocatable=False)
746
747   if constants.NV_VERSION in what:
748     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
749                                     constants.RELEASE_VERSION)
750
751   if constants.NV_HVINFO in what and vm_capable:
752     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
753     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
754
755   if constants.NV_DRBDLIST in what and vm_capable:
756     try:
757       used_minors = bdev.DRBD8.GetUsedDevs().keys()
758     except errors.BlockDeviceError, err:
759       logging.warning("Can't get used minors list", exc_info=True)
760       used_minors = str(err)
761     result[constants.NV_DRBDLIST] = used_minors
762
763   if constants.NV_DRBDHELPER in what and vm_capable:
764     status = True
765     try:
766       payload = bdev.BaseDRBD.GetUsermodeHelper()
767     except errors.BlockDeviceError, err:
768       logging.error("Can't get DRBD usermode helper: %s", str(err))
769       status = False
770       payload = str(err)
771     result[constants.NV_DRBDHELPER] = (status, payload)
772
773   if constants.NV_NODESETUP in what:
774     result[constants.NV_NODESETUP] = tmpr = []
775     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
776       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
777                   " under /sys, missing required directories /sys/block"
778                   " and /sys/class/net")
779     if (not os.path.isdir("/proc/sys") or
780         not os.path.isfile("/proc/sysrq-trigger")):
781       tmpr.append("The procfs filesystem doesn't seem to be mounted"
782                   " under /proc, missing required directory /proc/sys and"
783                   " the file /proc/sysrq-trigger")
784
785   if constants.NV_TIME in what:
786     result[constants.NV_TIME] = utils.SplitTime(time.time())
787
788   if constants.NV_OSLIST in what and vm_capable:
789     result[constants.NV_OSLIST] = DiagnoseOS()
790
791   if constants.NV_BRIDGES in what and vm_capable:
792     result[constants.NV_BRIDGES] = [bridge
793                                     for bridge in what[constants.NV_BRIDGES]
794                                     if not utils.BridgeExists(bridge)]
795   return result
796
797
798 def GetBlockDevSizes(devices):
799   """Return the size of the given block devices
800
801   @type devices: list
802   @param devices: list of block device nodes to query
803   @rtype: dict
804   @return:
805     dictionary of all block devices under /dev (key). The value is their
806     size in MiB.
807
808     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
809
810   """
811   DEV_PREFIX = "/dev/"
812   blockdevs = {}
813
814   for devpath in devices:
815     if not utils.IsBelowDir(DEV_PREFIX, devpath):
816       continue
817
818     try:
819       st = os.stat(devpath)
820     except EnvironmentError, err:
821       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
822       continue
823
824     if stat.S_ISBLK(st.st_mode):
825       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
826       if result.failed:
827         # We don't want to fail, just do not list this device as available
828         logging.warning("Cannot get size for block device %s", devpath)
829         continue
830
831       size = int(result.stdout) / (1024 * 1024)
832       blockdevs[devpath] = size
833   return blockdevs
834
835
836 def GetVolumeList(vg_names):
837   """Compute list of logical volumes and their size.
838
839   @type vg_names: list
840   @param vg_names: the volume groups whose LVs we should list, or
841       empty for all volume groups
842   @rtype: dict
843   @return:
844       dictionary of all partions (key) with value being a tuple of
845       their size (in MiB), inactive and online status::
846
847         {'xenvg/test1': ('20.06', True, True)}
848
849       in case of errors, a string is returned with the error
850       details.
851
852   """
853   lvs = {}
854   sep = "|"
855   if not vg_names:
856     vg_names = []
857   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
858                          "--separator=%s" % sep,
859                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
860   if result.failed:
861     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
862
863   for line in result.stdout.splitlines():
864     line = line.strip()
865     match = _LVSLINE_REGEX.match(line)
866     if not match:
867       logging.error("Invalid line returned from lvs output: '%s'", line)
868       continue
869     vg_name, name, size, attr = match.groups()
870     inactive = attr[4] == "-"
871     online = attr[5] == "o"
872     virtual = attr[0] == "v"
873     if virtual:
874       # we don't want to report such volumes as existing, since they
875       # don't really hold data
876       continue
877     lvs[vg_name + "/" + name] = (size, inactive, online)
878
879   return lvs
880
881
882 def ListVolumeGroups():
883   """List the volume groups and their size.
884
885   @rtype: dict
886   @return: dictionary with keys volume name and values the
887       size of the volume
888
889   """
890   return utils.ListVolumeGroups()
891
892
893 def NodeVolumes():
894   """List all volumes on this node.
895
896   @rtype: list
897   @return:
898     A list of dictionaries, each having four keys:
899       - name: the logical volume name,
900       - size: the size of the logical volume
901       - dev: the physical device on which the LV lives
902       - vg: the volume group to which it belongs
903
904     In case of errors, we return an empty list and log the
905     error.
906
907     Note that since a logical volume can live on multiple physical
908     volumes, the resulting list might include a logical volume
909     multiple times.
910
911   """
912   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
913                          "--separator=|",
914                          "--options=lv_name,lv_size,devices,vg_name"])
915   if result.failed:
916     _Fail("Failed to list logical volumes, lvs output: %s",
917           result.output)
918
919   def parse_dev(dev):
920     return dev.split("(")[0]
921
922   def handle_dev(dev):
923     return [parse_dev(x) for x in dev.split(",")]
924
925   def map_line(line):
926     line = [v.strip() for v in line]
927     return [{"name": line[0], "size": line[1],
928              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
929
930   all_devs = []
931   for line in result.stdout.splitlines():
932     if line.count("|") >= 3:
933       all_devs.extend(map_line(line.split("|")))
934     else:
935       logging.warning("Strange line in the output from lvs: '%s'", line)
936   return all_devs
937
938
939 def BridgesExist(bridges_list):
940   """Check if a list of bridges exist on the current node.
941
942   @rtype: boolean
943   @return: C{True} if all of them exist, C{False} otherwise
944
945   """
946   missing = []
947   for bridge in bridges_list:
948     if not utils.BridgeExists(bridge):
949       missing.append(bridge)
950
951   if missing:
952     _Fail("Missing bridges %s", utils.CommaJoin(missing))
953
954
955 def GetInstanceList(hypervisor_list):
956   """Provides a list of instances.
957
958   @type hypervisor_list: list
959   @param hypervisor_list: the list of hypervisors to query information
960
961   @rtype: list
962   @return: a list of all running instances on the current node
963     - instance1.example.com
964     - instance2.example.com
965
966   """
967   results = []
968   for hname in hypervisor_list:
969     try:
970       names = hypervisor.GetHypervisor(hname).ListInstances()
971       results.extend(names)
972     except errors.HypervisorError, err:
973       _Fail("Error enumerating instances (hypervisor %s): %s",
974             hname, err, exc=True)
975
976   return results
977
978
979 def GetInstanceInfo(instance, hname):
980   """Gives back the information about an instance as a dictionary.
981
982   @type instance: string
983   @param instance: the instance name
984   @type hname: string
985   @param hname: the hypervisor type of the instance
986
987   @rtype: dict
988   @return: dictionary with the following keys:
989       - memory: memory size of instance (int)
990       - state: xen state of instance (string)
991       - time: cpu time of instance (float)
992
993   """
994   output = {}
995
996   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
997   if iinfo is not None:
998     output["memory"] = iinfo[2]
999     output["state"] = iinfo[4]
1000     output["time"] = iinfo[5]
1001
1002   return output
1003
1004
1005 def GetInstanceMigratable(instance):
1006   """Gives whether an instance can be migrated.
1007
1008   @type instance: L{objects.Instance}
1009   @param instance: object representing the instance to be checked.
1010
1011   @rtype: tuple
1012   @return: tuple of (result, description) where:
1013       - result: whether the instance can be migrated or not
1014       - description: a description of the issue, if relevant
1015
1016   """
1017   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1018   iname = instance.name
1019   if iname not in hyper.ListInstances():
1020     _Fail("Instance %s is not running", iname)
1021
1022   for idx in range(len(instance.disks)):
1023     link_name = _GetBlockDevSymlinkPath(iname, idx)
1024     if not os.path.islink(link_name):
1025       logging.warning("Instance %s is missing symlink %s for disk %d",
1026                       iname, link_name, idx)
1027
1028
1029 def GetAllInstancesInfo(hypervisor_list):
1030   """Gather data about all instances.
1031
1032   This is the equivalent of L{GetInstanceInfo}, except that it
1033   computes data for all instances at once, thus being faster if one
1034   needs data about more than one instance.
1035
1036   @type hypervisor_list: list
1037   @param hypervisor_list: list of hypervisors to query for instance data
1038
1039   @rtype: dict
1040   @return: dictionary of instance: data, with data having the following keys:
1041       - memory: memory size of instance (int)
1042       - state: xen state of instance (string)
1043       - time: cpu time of instance (float)
1044       - vcpus: the number of vcpus
1045
1046   """
1047   output = {}
1048
1049   for hname in hypervisor_list:
1050     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1051     if iinfo:
1052       for name, _, memory, vcpus, state, times in iinfo:
1053         value = {
1054           "memory": memory,
1055           "vcpus": vcpus,
1056           "state": state,
1057           "time": times,
1058           }
1059         if name in output:
1060           # we only check static parameters, like memory and vcpus,
1061           # and not state and time which can change between the
1062           # invocations of the different hypervisors
1063           for key in "memory", "vcpus":
1064             if value[key] != output[name][key]:
1065               _Fail("Instance %s is running twice"
1066                     " with different parameters", name)
1067         output[name] = value
1068
1069   return output
1070
1071
1072 def _InstanceLogName(kind, os_name, instance, component):
1073   """Compute the OS log filename for a given instance and operation.
1074
1075   The instance name and os name are passed in as strings since not all
1076   operations have these as part of an instance object.
1077
1078   @type kind: string
1079   @param kind: the operation type (e.g. add, import, etc.)
1080   @type os_name: string
1081   @param os_name: the os name
1082   @type instance: string
1083   @param instance: the name of the instance being imported/added/etc.
1084   @type component: string or None
1085   @param component: the name of the component of the instance being
1086       transferred
1087
1088   """
1089   # TODO: Use tempfile.mkstemp to create unique filename
1090   if component:
1091     assert "/" not in component
1092     c_msg = "-%s" % component
1093   else:
1094     c_msg = ""
1095   base = ("%s-%s-%s%s-%s.log" %
1096           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1097   return utils.PathJoin(constants.LOG_OS_DIR, base)
1098
1099
1100 def InstanceOsAdd(instance, reinstall, debug):
1101   """Add an OS to an instance.
1102
1103   @type instance: L{objects.Instance}
1104   @param instance: Instance whose OS is to be installed
1105   @type reinstall: boolean
1106   @param reinstall: whether this is an instance reinstall
1107   @type debug: integer
1108   @param debug: debug level, passed to the OS scripts
1109   @rtype: None
1110
1111   """
1112   inst_os = OSFromDisk(instance.os)
1113
1114   create_env = OSEnvironment(instance, inst_os, debug)
1115   if reinstall:
1116     create_env["INSTANCE_REINSTALL"] = "1"
1117
1118   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1119
1120   result = utils.RunCmd([inst_os.create_script], env=create_env,
1121                         cwd=inst_os.path, output=logfile, reset_env=True)
1122   if result.failed:
1123     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1124                   " output: %s", result.cmd, result.fail_reason, logfile,
1125                   result.output)
1126     lines = [utils.SafeEncode(val)
1127              for val in utils.TailFile(logfile, lines=20)]
1128     _Fail("OS create script failed (%s), last lines in the"
1129           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1130
1131
1132 def RunRenameInstance(instance, old_name, debug):
1133   """Run the OS rename script for an instance.
1134
1135   @type instance: L{objects.Instance}
1136   @param instance: Instance whose OS is to be installed
1137   @type old_name: string
1138   @param old_name: previous instance name
1139   @type debug: integer
1140   @param debug: debug level, passed to the OS scripts
1141   @rtype: boolean
1142   @return: the success of the operation
1143
1144   """
1145   inst_os = OSFromDisk(instance.os)
1146
1147   rename_env = OSEnvironment(instance, inst_os, debug)
1148   rename_env["OLD_INSTANCE_NAME"] = old_name
1149
1150   logfile = _InstanceLogName("rename", instance.os,
1151                              "%s-%s" % (old_name, instance.name), None)
1152
1153   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1154                         cwd=inst_os.path, output=logfile, reset_env=True)
1155
1156   if result.failed:
1157     logging.error("os create command '%s' returned error: %s output: %s",
1158                   result.cmd, result.fail_reason, result.output)
1159     lines = [utils.SafeEncode(val)
1160              for val in utils.TailFile(logfile, lines=20)]
1161     _Fail("OS rename script failed (%s), last lines in the"
1162           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1163
1164
1165 def _GetBlockDevSymlinkPath(instance_name, idx):
1166   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1167                         (instance_name, constants.DISK_SEPARATOR, idx))
1168
1169
1170 def _SymlinkBlockDev(instance_name, device_path, idx):
1171   """Set up symlinks to a instance's block device.
1172
1173   This is an auxiliary function run when an instance is start (on the primary
1174   node) or when an instance is migrated (on the target node).
1175
1176
1177   @param instance_name: the name of the target instance
1178   @param device_path: path of the physical block device, on the node
1179   @param idx: the disk index
1180   @return: absolute path to the disk's symlink
1181
1182   """
1183   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1184   try:
1185     os.symlink(device_path, link_name)
1186   except OSError, err:
1187     if err.errno == errno.EEXIST:
1188       if (not os.path.islink(link_name) or
1189           os.readlink(link_name) != device_path):
1190         os.remove(link_name)
1191         os.symlink(device_path, link_name)
1192     else:
1193       raise
1194
1195   return link_name
1196
1197
1198 def _RemoveBlockDevLinks(instance_name, disks):
1199   """Remove the block device symlinks belonging to the given instance.
1200
1201   """
1202   for idx, _ in enumerate(disks):
1203     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1204     if os.path.islink(link_name):
1205       try:
1206         os.remove(link_name)
1207       except OSError:
1208         logging.exception("Can't remove symlink '%s'", link_name)
1209
1210
1211 def _GatherAndLinkBlockDevs(instance):
1212   """Set up an instance's block device(s).
1213
1214   This is run on the primary node at instance startup. The block
1215   devices must be already assembled.
1216
1217   @type instance: L{objects.Instance}
1218   @param instance: the instance whose disks we shoul assemble
1219   @rtype: list
1220   @return: list of (disk_object, device_path)
1221
1222   """
1223   block_devices = []
1224   for idx, disk in enumerate(instance.disks):
1225     device = _RecursiveFindBD(disk)
1226     if device is None:
1227       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1228                                     str(disk))
1229     device.Open()
1230     try:
1231       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1232     except OSError, e:
1233       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1234                                     e.strerror)
1235
1236     block_devices.append((disk, link_name))
1237
1238   return block_devices
1239
1240
1241 def StartInstance(instance, startup_paused):
1242   """Start an instance.
1243
1244   @type instance: L{objects.Instance}
1245   @param instance: the instance object
1246   @type startup_paused: bool
1247   @param instance: pause instance at startup?
1248   @rtype: None
1249
1250   """
1251   running_instances = GetInstanceList([instance.hypervisor])
1252
1253   if instance.name in running_instances:
1254     logging.info("Instance %s already running, not starting", instance.name)
1255     return
1256
1257   try:
1258     block_devices = _GatherAndLinkBlockDevs(instance)
1259     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1260     hyper.StartInstance(instance, block_devices, startup_paused)
1261   except errors.BlockDeviceError, err:
1262     _Fail("Block device error: %s", err, exc=True)
1263   except errors.HypervisorError, err:
1264     _RemoveBlockDevLinks(instance.name, instance.disks)
1265     _Fail("Hypervisor error: %s", err, exc=True)
1266
1267
1268 def InstanceShutdown(instance, timeout):
1269   """Shut an instance down.
1270
1271   @note: this functions uses polling with a hardcoded timeout.
1272
1273   @type instance: L{objects.Instance}
1274   @param instance: the instance object
1275   @type timeout: integer
1276   @param timeout: maximum timeout for soft shutdown
1277   @rtype: None
1278
1279   """
1280   hv_name = instance.hypervisor
1281   hyper = hypervisor.GetHypervisor(hv_name)
1282   iname = instance.name
1283
1284   if instance.name not in hyper.ListInstances():
1285     logging.info("Instance %s not running, doing nothing", iname)
1286     return
1287
1288   class _TryShutdown:
1289     def __init__(self):
1290       self.tried_once = False
1291
1292     def __call__(self):
1293       if iname not in hyper.ListInstances():
1294         return
1295
1296       try:
1297         hyper.StopInstance(instance, retry=self.tried_once)
1298       except errors.HypervisorError, err:
1299         if iname not in hyper.ListInstances():
1300           # if the instance is no longer existing, consider this a
1301           # success and go to cleanup
1302           return
1303
1304         _Fail("Failed to stop instance %s: %s", iname, err)
1305
1306       self.tried_once = True
1307
1308       raise utils.RetryAgain()
1309
1310   try:
1311     utils.Retry(_TryShutdown(), 5, timeout)
1312   except utils.RetryTimeout:
1313     # the shutdown did not succeed
1314     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1315
1316     try:
1317       hyper.StopInstance(instance, force=True)
1318     except errors.HypervisorError, err:
1319       if iname in hyper.ListInstances():
1320         # only raise an error if the instance still exists, otherwise
1321         # the error could simply be "instance ... unknown"!
1322         _Fail("Failed to force stop instance %s: %s", iname, err)
1323
1324     time.sleep(1)
1325
1326     if iname in hyper.ListInstances():
1327       _Fail("Could not shutdown instance %s even by destroy", iname)
1328
1329   try:
1330     hyper.CleanupInstance(instance.name)
1331   except errors.HypervisorError, err:
1332     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1333
1334   _RemoveBlockDevLinks(iname, instance.disks)
1335
1336
1337 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1338   """Reboot an instance.
1339
1340   @type instance: L{objects.Instance}
1341   @param instance: the instance object to reboot
1342   @type reboot_type: str
1343   @param reboot_type: the type of reboot, one the following
1344     constants:
1345       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1346         instance OS, do not recreate the VM
1347       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1348         restart the VM (at the hypervisor level)
1349       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1350         not accepted here, since that mode is handled differently, in
1351         cmdlib, and translates into full stop and start of the
1352         instance (instead of a call_instance_reboot RPC)
1353   @type shutdown_timeout: integer
1354   @param shutdown_timeout: maximum timeout for soft shutdown
1355   @rtype: None
1356
1357   """
1358   running_instances = GetInstanceList([instance.hypervisor])
1359
1360   if instance.name not in running_instances:
1361     _Fail("Cannot reboot instance %s that is not running", instance.name)
1362
1363   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1364   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1365     try:
1366       hyper.RebootInstance(instance)
1367     except errors.HypervisorError, err:
1368       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1369   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1370     try:
1371       InstanceShutdown(instance, shutdown_timeout)
1372       return StartInstance(instance, False)
1373     except errors.HypervisorError, err:
1374       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1375   else:
1376     _Fail("Invalid reboot_type received: %s", reboot_type)
1377
1378
1379 def MigrationInfo(instance):
1380   """Gather information about an instance to be migrated.
1381
1382   @type instance: L{objects.Instance}
1383   @param instance: the instance definition
1384
1385   """
1386   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1387   try:
1388     info = hyper.MigrationInfo(instance)
1389   except errors.HypervisorError, err:
1390     _Fail("Failed to fetch migration information: %s", err, exc=True)
1391   return info
1392
1393
1394 def AcceptInstance(instance, info, target):
1395   """Prepare the node to accept an instance.
1396
1397   @type instance: L{objects.Instance}
1398   @param instance: the instance definition
1399   @type info: string/data (opaque)
1400   @param info: migration information, from the source node
1401   @type target: string
1402   @param target: target host (usually ip), on this node
1403
1404   """
1405   # TODO: why is this required only for DTS_EXT_MIRROR?
1406   if instance.disk_template in constants.DTS_EXT_MIRROR:
1407     # Create the symlinks, as the disks are not active
1408     # in any way
1409     try:
1410       _GatherAndLinkBlockDevs(instance)
1411     except errors.BlockDeviceError, err:
1412       _Fail("Block device error: %s", err, exc=True)
1413
1414   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1415   try:
1416     hyper.AcceptInstance(instance, info, target)
1417   except errors.HypervisorError, err:
1418     if instance.disk_template in constants.DTS_EXT_MIRROR:
1419       _RemoveBlockDevLinks(instance.name, instance.disks)
1420     _Fail("Failed to accept instance: %s", err, exc=True)
1421
1422
1423 def FinalizeMigrationDst(instance, info, success):
1424   """Finalize any preparation to accept an instance.
1425
1426   @type instance: L{objects.Instance}
1427   @param instance: the instance definition
1428   @type info: string/data (opaque)
1429   @param info: migration information, from the source node
1430   @type success: boolean
1431   @param success: whether the migration was a success or a failure
1432
1433   """
1434   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1435   try:
1436     hyper.FinalizeMigrationDst(instance, info, success)
1437   except errors.HypervisorError, err:
1438     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1439
1440
1441 def MigrateInstance(instance, target, live):
1442   """Migrates an instance to another node.
1443
1444   @type instance: L{objects.Instance}
1445   @param instance: the instance definition
1446   @type target: string
1447   @param target: the target node name
1448   @type live: boolean
1449   @param live: whether the migration should be done live or not (the
1450       interpretation of this parameter is left to the hypervisor)
1451   @raise RPCFail: if migration fails for some reason
1452
1453   """
1454   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1455
1456   try:
1457     hyper.MigrateInstance(instance, target, live)
1458   except errors.HypervisorError, err:
1459     _Fail("Failed to migrate instance: %s", err, exc=True)
1460
1461
1462 def FinalizeMigrationSource(instance, success, live):
1463   """Finalize the instance migration on the source node.
1464
1465   @type instance: L{objects.Instance}
1466   @param instance: the instance definition of the migrated instance
1467   @type success: bool
1468   @param success: whether the migration succeeded or not
1469   @type live: bool
1470   @param live: whether the user requested a live migration or not
1471   @raise RPCFail: If the execution fails for some reason
1472
1473   """
1474   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1475
1476   try:
1477     hyper.FinalizeMigrationSource(instance, success, live)
1478   except Exception, err:  # pylint: disable=W0703
1479     _Fail("Failed to finalize the migration on the source node: %s", err,
1480           exc=True)
1481
1482
1483 def GetMigrationStatus(instance):
1484   """Get the migration status
1485
1486   @type instance: L{objects.Instance}
1487   @param instance: the instance that is being migrated
1488   @rtype: L{objects.MigrationStatus}
1489   @return: the status of the current migration (one of
1490            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1491            progress info that can be retrieved from the hypervisor
1492   @raise RPCFail: If the migration status cannot be retrieved
1493
1494   """
1495   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1496   try:
1497     return hyper.GetMigrationStatus(instance)
1498   except Exception, err:  # pylint: disable=W0703
1499     _Fail("Failed to get migration status: %s", err, exc=True)
1500
1501
1502 def BlockdevCreate(disk, size, owner, on_primary, info):
1503   """Creates a block device for an instance.
1504
1505   @type disk: L{objects.Disk}
1506   @param disk: the object describing the disk we should create
1507   @type size: int
1508   @param size: the size of the physical underlying device, in MiB
1509   @type owner: str
1510   @param owner: the name of the instance for which disk is created,
1511       used for device cache data
1512   @type on_primary: boolean
1513   @param on_primary:  indicates if it is the primary node or not
1514   @type info: string
1515   @param info: string that will be sent to the physical device
1516       creation, used for example to set (LVM) tags on LVs
1517
1518   @return: the new unique_id of the device (this can sometime be
1519       computed only after creation), or None. On secondary nodes,
1520       it's not required to return anything.
1521
1522   """
1523   # TODO: remove the obsolete "size" argument
1524   # pylint: disable=W0613
1525   clist = []
1526   if disk.children:
1527     for child in disk.children:
1528       try:
1529         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1530       except errors.BlockDeviceError, err:
1531         _Fail("Can't assemble device %s: %s", child, err)
1532       if on_primary or disk.AssembleOnSecondary():
1533         # we need the children open in case the device itself has to
1534         # be assembled
1535         try:
1536           # pylint: disable=E1103
1537           crdev.Open()
1538         except errors.BlockDeviceError, err:
1539           _Fail("Can't make child '%s' read-write: %s", child, err)
1540       clist.append(crdev)
1541
1542   try:
1543     device = bdev.Create(disk, clist)
1544   except errors.BlockDeviceError, err:
1545     _Fail("Can't create block device: %s", err)
1546
1547   if on_primary or disk.AssembleOnSecondary():
1548     try:
1549       device.Assemble()
1550     except errors.BlockDeviceError, err:
1551       _Fail("Can't assemble device after creation, unusual event: %s", err)
1552     if on_primary or disk.OpenOnSecondary():
1553       try:
1554         device.Open(force=True)
1555       except errors.BlockDeviceError, err:
1556         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1557     DevCacheManager.UpdateCache(device.dev_path, owner,
1558                                 on_primary, disk.iv_name)
1559
1560   device.SetInfo(info)
1561
1562   return device.unique_id
1563
1564
1565 def _WipeDevice(path, offset, size):
1566   """This function actually wipes the device.
1567
1568   @param path: The path to the device to wipe
1569   @param offset: The offset in MiB in the file
1570   @param size: The size in MiB to write
1571
1572   """
1573   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1574          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1575          "count=%d" % size]
1576   result = utils.RunCmd(cmd)
1577
1578   if result.failed:
1579     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1580           result.fail_reason, result.output)
1581
1582
1583 def BlockdevWipe(disk, offset, size):
1584   """Wipes a block device.
1585
1586   @type disk: L{objects.Disk}
1587   @param disk: the disk object we want to wipe
1588   @type offset: int
1589   @param offset: The offset in MiB in the file
1590   @type size: int
1591   @param size: The size in MiB to write
1592
1593   """
1594   try:
1595     rdev = _RecursiveFindBD(disk)
1596   except errors.BlockDeviceError:
1597     rdev = None
1598
1599   if not rdev:
1600     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1601
1602   # Do cross verify some of the parameters
1603   if offset > rdev.size:
1604     _Fail("Offset is bigger than device size")
1605   if (offset + size) > rdev.size:
1606     _Fail("The provided offset and size to wipe is bigger than device size")
1607
1608   _WipeDevice(rdev.dev_path, offset, size)
1609
1610
1611 def BlockdevPauseResumeSync(disks, pause):
1612   """Pause or resume the sync of the block device.
1613
1614   @type disks: list of L{objects.Disk}
1615   @param disks: the disks object we want to pause/resume
1616   @type pause: bool
1617   @param pause: Wheater to pause or resume
1618
1619   """
1620   success = []
1621   for disk in disks:
1622     try:
1623       rdev = _RecursiveFindBD(disk)
1624     except errors.BlockDeviceError:
1625       rdev = None
1626
1627     if not rdev:
1628       success.append((False, ("Cannot change sync for device %s:"
1629                               " device not found" % disk.iv_name)))
1630       continue
1631
1632     result = rdev.PauseResumeSync(pause)
1633
1634     if result:
1635       success.append((result, None))
1636     else:
1637       if pause:
1638         msg = "Pause"
1639       else:
1640         msg = "Resume"
1641       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1642
1643   return success
1644
1645
1646 def BlockdevRemove(disk):
1647   """Remove a block device.
1648
1649   @note: This is intended to be called recursively.
1650
1651   @type disk: L{objects.Disk}
1652   @param disk: the disk object we should remove
1653   @rtype: boolean
1654   @return: the success of the operation
1655
1656   """
1657   msgs = []
1658   try:
1659     rdev = _RecursiveFindBD(disk)
1660   except errors.BlockDeviceError, err:
1661     # probably can't attach
1662     logging.info("Can't attach to device %s in remove", disk)
1663     rdev = None
1664   if rdev is not None:
1665     r_path = rdev.dev_path
1666     try:
1667       rdev.Remove()
1668     except errors.BlockDeviceError, err:
1669       msgs.append(str(err))
1670     if not msgs:
1671       DevCacheManager.RemoveCache(r_path)
1672
1673   if disk.children:
1674     for child in disk.children:
1675       try:
1676         BlockdevRemove(child)
1677       except RPCFail, err:
1678         msgs.append(str(err))
1679
1680   if msgs:
1681     _Fail("; ".join(msgs))
1682
1683
1684 def _RecursiveAssembleBD(disk, owner, as_primary):
1685   """Activate a block device for an instance.
1686
1687   This is run on the primary and secondary nodes for an instance.
1688
1689   @note: this function is called recursively.
1690
1691   @type disk: L{objects.Disk}
1692   @param disk: the disk we try to assemble
1693   @type owner: str
1694   @param owner: the name of the instance which owns the disk
1695   @type as_primary: boolean
1696   @param as_primary: if we should make the block device
1697       read/write
1698
1699   @return: the assembled device or None (in case no device
1700       was assembled)
1701   @raise errors.BlockDeviceError: in case there is an error
1702       during the activation of the children or the device
1703       itself
1704
1705   """
1706   children = []
1707   if disk.children:
1708     mcn = disk.ChildrenNeeded()
1709     if mcn == -1:
1710       mcn = 0 # max number of Nones allowed
1711     else:
1712       mcn = len(disk.children) - mcn # max number of Nones
1713     for chld_disk in disk.children:
1714       try:
1715         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1716       except errors.BlockDeviceError, err:
1717         if children.count(None) >= mcn:
1718           raise
1719         cdev = None
1720         logging.error("Error in child activation (but continuing): %s",
1721                       str(err))
1722       children.append(cdev)
1723
1724   if as_primary or disk.AssembleOnSecondary():
1725     r_dev = bdev.Assemble(disk, children)
1726     result = r_dev
1727     if as_primary or disk.OpenOnSecondary():
1728       r_dev.Open()
1729     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1730                                 as_primary, disk.iv_name)
1731
1732   else:
1733     result = True
1734   return result
1735
1736
1737 def BlockdevAssemble(disk, owner, as_primary, idx):
1738   """Activate a block device for an instance.
1739
1740   This is a wrapper over _RecursiveAssembleBD.
1741
1742   @rtype: str or boolean
1743   @return: a C{/dev/...} path for primary nodes, and
1744       C{True} for secondary nodes
1745
1746   """
1747   try:
1748     result = _RecursiveAssembleBD(disk, owner, as_primary)
1749     if isinstance(result, bdev.BlockDev):
1750       # pylint: disable=E1103
1751       result = result.dev_path
1752       if as_primary:
1753         _SymlinkBlockDev(owner, result, idx)
1754   except errors.BlockDeviceError, err:
1755     _Fail("Error while assembling disk: %s", err, exc=True)
1756   except OSError, err:
1757     _Fail("Error while symlinking disk: %s", err, exc=True)
1758
1759   return result
1760
1761
1762 def BlockdevShutdown(disk):
1763   """Shut down a block device.
1764
1765   First, if the device is assembled (Attach() is successful), then
1766   the device is shutdown. Then the children of the device are
1767   shutdown.
1768
1769   This function is called recursively. Note that we don't cache the
1770   children or such, as oppossed to assemble, shutdown of different
1771   devices doesn't require that the upper device was active.
1772
1773   @type disk: L{objects.Disk}
1774   @param disk: the description of the disk we should
1775       shutdown
1776   @rtype: None
1777
1778   """
1779   msgs = []
1780   r_dev = _RecursiveFindBD(disk)
1781   if r_dev is not None:
1782     r_path = r_dev.dev_path
1783     try:
1784       r_dev.Shutdown()
1785       DevCacheManager.RemoveCache(r_path)
1786     except errors.BlockDeviceError, err:
1787       msgs.append(str(err))
1788
1789   if disk.children:
1790     for child in disk.children:
1791       try:
1792         BlockdevShutdown(child)
1793       except RPCFail, err:
1794         msgs.append(str(err))
1795
1796   if msgs:
1797     _Fail("; ".join(msgs))
1798
1799
1800 def BlockdevAddchildren(parent_cdev, new_cdevs):
1801   """Extend a mirrored block device.
1802
1803   @type parent_cdev: L{objects.Disk}
1804   @param parent_cdev: the disk to which we should add children
1805   @type new_cdevs: list of L{objects.Disk}
1806   @param new_cdevs: the list of children which we should add
1807   @rtype: None
1808
1809   """
1810   parent_bdev = _RecursiveFindBD(parent_cdev)
1811   if parent_bdev is None:
1812     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1813   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1814   if new_bdevs.count(None) > 0:
1815     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1816   parent_bdev.AddChildren(new_bdevs)
1817
1818
1819 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1820   """Shrink a mirrored block device.
1821
1822   @type parent_cdev: L{objects.Disk}
1823   @param parent_cdev: the disk from which we should remove children
1824   @type new_cdevs: list of L{objects.Disk}
1825   @param new_cdevs: the list of children which we should remove
1826   @rtype: None
1827
1828   """
1829   parent_bdev = _RecursiveFindBD(parent_cdev)
1830   if parent_bdev is None:
1831     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1832   devs = []
1833   for disk in new_cdevs:
1834     rpath = disk.StaticDevPath()
1835     if rpath is None:
1836       bd = _RecursiveFindBD(disk)
1837       if bd is None:
1838         _Fail("Can't find device %s while removing children", disk)
1839       else:
1840         devs.append(bd.dev_path)
1841     else:
1842       if not utils.IsNormAbsPath(rpath):
1843         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1844       devs.append(rpath)
1845   parent_bdev.RemoveChildren(devs)
1846
1847
1848 def BlockdevGetmirrorstatus(disks):
1849   """Get the mirroring status of a list of devices.
1850
1851   @type disks: list of L{objects.Disk}
1852   @param disks: the list of disks which we should query
1853   @rtype: disk
1854   @return: List of L{objects.BlockDevStatus}, one for each disk
1855   @raise errors.BlockDeviceError: if any of the disks cannot be
1856       found
1857
1858   """
1859   stats = []
1860   for dsk in disks:
1861     rbd = _RecursiveFindBD(dsk)
1862     if rbd is None:
1863       _Fail("Can't find device %s", dsk)
1864
1865     stats.append(rbd.CombinedSyncStatus())
1866
1867   return stats
1868
1869
1870 def BlockdevGetmirrorstatusMulti(disks):
1871   """Get the mirroring status of a list of devices.
1872
1873   @type disks: list of L{objects.Disk}
1874   @param disks: the list of disks which we should query
1875   @rtype: disk
1876   @return: List of tuples, (bool, status), one for each disk; bool denotes
1877     success/failure, status is L{objects.BlockDevStatus} on success, string
1878     otherwise
1879
1880   """
1881   result = []
1882   for disk in disks:
1883     try:
1884       rbd = _RecursiveFindBD(disk)
1885       if rbd is None:
1886         result.append((False, "Can't find device %s" % disk))
1887         continue
1888
1889       status = rbd.CombinedSyncStatus()
1890     except errors.BlockDeviceError, err:
1891       logging.exception("Error while getting disk status")
1892       result.append((False, str(err)))
1893     else:
1894       result.append((True, status))
1895
1896   assert len(disks) == len(result)
1897
1898   return result
1899
1900
1901 def _RecursiveFindBD(disk):
1902   """Check if a device is activated.
1903
1904   If so, return information about the real device.
1905
1906   @type disk: L{objects.Disk}
1907   @param disk: the disk object we need to find
1908
1909   @return: None if the device can't be found,
1910       otherwise the device instance
1911
1912   """
1913   children = []
1914   if disk.children:
1915     for chdisk in disk.children:
1916       children.append(_RecursiveFindBD(chdisk))
1917
1918   return bdev.FindDevice(disk, children)
1919
1920
1921 def _OpenRealBD(disk):
1922   """Opens the underlying block device of a disk.
1923
1924   @type disk: L{objects.Disk}
1925   @param disk: the disk object we want to open
1926
1927   """
1928   real_disk = _RecursiveFindBD(disk)
1929   if real_disk is None:
1930     _Fail("Block device '%s' is not set up", disk)
1931
1932   real_disk.Open()
1933
1934   return real_disk
1935
1936
1937 def BlockdevFind(disk):
1938   """Check if a device is activated.
1939
1940   If it is, return information about the real device.
1941
1942   @type disk: L{objects.Disk}
1943   @param disk: the disk to find
1944   @rtype: None or objects.BlockDevStatus
1945   @return: None if the disk cannot be found, otherwise a the current
1946            information
1947
1948   """
1949   try:
1950     rbd = _RecursiveFindBD(disk)
1951   except errors.BlockDeviceError, err:
1952     _Fail("Failed to find device: %s", err, exc=True)
1953
1954   if rbd is None:
1955     return None
1956
1957   return rbd.GetSyncStatus()
1958
1959
1960 def BlockdevGetsize(disks):
1961   """Computes the size of the given disks.
1962
1963   If a disk is not found, returns None instead.
1964
1965   @type disks: list of L{objects.Disk}
1966   @param disks: the list of disk to compute the size for
1967   @rtype: list
1968   @return: list with elements None if the disk cannot be found,
1969       otherwise the size
1970
1971   """
1972   result = []
1973   for cf in disks:
1974     try:
1975       rbd = _RecursiveFindBD(cf)
1976     except errors.BlockDeviceError:
1977       result.append(None)
1978       continue
1979     if rbd is None:
1980       result.append(None)
1981     else:
1982       result.append(rbd.GetActualSize())
1983   return result
1984
1985
1986 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1987   """Export a block device to a remote node.
1988
1989   @type disk: L{objects.Disk}
1990   @param disk: the description of the disk to export
1991   @type dest_node: str
1992   @param dest_node: the destination node to export to
1993   @type dest_path: str
1994   @param dest_path: the destination path on the target node
1995   @type cluster_name: str
1996   @param cluster_name: the cluster name, needed for SSH hostalias
1997   @rtype: None
1998
1999   """
2000   real_disk = _OpenRealBD(disk)
2001
2002   # the block size on the read dd is 1MiB to match our units
2003   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2004                                "dd if=%s bs=1048576 count=%s",
2005                                real_disk.dev_path, str(disk.size))
2006
2007   # we set here a smaller block size as, due to ssh buffering, more
2008   # than 64-128k will mostly ignored; we use nocreat to fail if the
2009   # device is not already there or we pass a wrong path; we use
2010   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2011   # to not buffer too much memory; this means that at best, we flush
2012   # every 64k, which will not be very fast
2013   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2014                                 " oflag=dsync", dest_path)
2015
2016   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2017                                                    constants.GANETI_RUNAS,
2018                                                    destcmd)
2019
2020   # all commands have been checked, so we're safe to combine them
2021   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2022
2023   result = utils.RunCmd(["bash", "-c", command])
2024
2025   if result.failed:
2026     _Fail("Disk copy command '%s' returned error: %s"
2027           " output: %s", command, result.fail_reason, result.output)
2028
2029
2030 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2031   """Write a file to the filesystem.
2032
2033   This allows the master to overwrite(!) a file. It will only perform
2034   the operation if the file belongs to a list of configuration files.
2035
2036   @type file_name: str
2037   @param file_name: the target file name
2038   @type data: str
2039   @param data: the new contents of the file
2040   @type mode: int
2041   @param mode: the mode to give the file (can be None)
2042   @type uid: string
2043   @param uid: the owner of the file
2044   @type gid: string
2045   @param gid: the group of the file
2046   @type atime: float
2047   @param atime: the atime to set on the file (can be None)
2048   @type mtime: float
2049   @param mtime: the mtime to set on the file (can be None)
2050   @rtype: None
2051
2052   """
2053   if not os.path.isabs(file_name):
2054     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2055
2056   if file_name not in _ALLOWED_UPLOAD_FILES:
2057     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2058           file_name)
2059
2060   raw_data = _Decompress(data)
2061
2062   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2063     _Fail("Invalid username/groupname type")
2064
2065   getents = runtime.GetEnts()
2066   uid = getents.LookupUser(uid)
2067   gid = getents.LookupGroup(gid)
2068
2069   utils.SafeWriteFile(file_name, None,
2070                       data=raw_data, mode=mode, uid=uid, gid=gid,
2071                       atime=atime, mtime=mtime)
2072
2073
2074 def RunOob(oob_program, command, node, timeout):
2075   """Executes oob_program with given command on given node.
2076
2077   @param oob_program: The path to the executable oob_program
2078   @param command: The command to invoke on oob_program
2079   @param node: The node given as an argument to the program
2080   @param timeout: Timeout after which we kill the oob program
2081
2082   @return: stdout
2083   @raise RPCFail: If execution fails for some reason
2084
2085   """
2086   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2087
2088   if result.failed:
2089     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2090           result.fail_reason, result.output)
2091
2092   return result.stdout
2093
2094
2095 def WriteSsconfFiles(values):
2096   """Update all ssconf files.
2097
2098   Wrapper around the SimpleStore.WriteFiles.
2099
2100   """
2101   ssconf.SimpleStore().WriteFiles(values)
2102
2103
2104 def _OSOndiskAPIVersion(os_dir):
2105   """Compute and return the API version of a given OS.
2106
2107   This function will try to read the API version of the OS residing in
2108   the 'os_dir' directory.
2109
2110   @type os_dir: str
2111   @param os_dir: the directory in which we should look for the OS
2112   @rtype: tuple
2113   @return: tuple (status, data) with status denoting the validity and
2114       data holding either the vaid versions or an error message
2115
2116   """
2117   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2118
2119   try:
2120     st = os.stat(api_file)
2121   except EnvironmentError, err:
2122     return False, ("Required file '%s' not found under path %s: %s" %
2123                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2124
2125   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2126     return False, ("File '%s' in %s is not a regular file" %
2127                    (constants.OS_API_FILE, os_dir))
2128
2129   try:
2130     api_versions = utils.ReadFile(api_file).splitlines()
2131   except EnvironmentError, err:
2132     return False, ("Error while reading the API version file at %s: %s" %
2133                    (api_file, utils.ErrnoOrStr(err)))
2134
2135   try:
2136     api_versions = [int(version.strip()) for version in api_versions]
2137   except (TypeError, ValueError), err:
2138     return False, ("API version(s) can't be converted to integer: %s" %
2139                    str(err))
2140
2141   return True, api_versions
2142
2143
2144 def DiagnoseOS(top_dirs=None):
2145   """Compute the validity for all OSes.
2146
2147   @type top_dirs: list
2148   @param top_dirs: the list of directories in which to
2149       search (if not given defaults to
2150       L{constants.OS_SEARCH_PATH})
2151   @rtype: list of L{objects.OS}
2152   @return: a list of tuples (name, path, status, diagnose, variants,
2153       parameters, api_version) for all (potential) OSes under all
2154       search paths, where:
2155           - name is the (potential) OS name
2156           - path is the full path to the OS
2157           - status True/False is the validity of the OS
2158           - diagnose is the error message for an invalid OS, otherwise empty
2159           - variants is a list of supported OS variants, if any
2160           - parameters is a list of (name, help) parameters, if any
2161           - api_version is a list of support OS API versions
2162
2163   """
2164   if top_dirs is None:
2165     top_dirs = constants.OS_SEARCH_PATH
2166
2167   result = []
2168   for dir_name in top_dirs:
2169     if os.path.isdir(dir_name):
2170       try:
2171         f_names = utils.ListVisibleFiles(dir_name)
2172       except EnvironmentError, err:
2173         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2174         break
2175       for name in f_names:
2176         os_path = utils.PathJoin(dir_name, name)
2177         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2178         if status:
2179           diagnose = ""
2180           variants = os_inst.supported_variants
2181           parameters = os_inst.supported_parameters
2182           api_versions = os_inst.api_versions
2183         else:
2184           diagnose = os_inst
2185           variants = parameters = api_versions = []
2186         result.append((name, os_path, status, diagnose, variants,
2187                        parameters, api_versions))
2188
2189   return result
2190
2191
2192 def _TryOSFromDisk(name, base_dir=None):
2193   """Create an OS instance from disk.
2194
2195   This function will return an OS instance if the given name is a
2196   valid OS name.
2197
2198   @type base_dir: string
2199   @keyword base_dir: Base directory containing OS installations.
2200                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2201   @rtype: tuple
2202   @return: success and either the OS instance if we find a valid one,
2203       or error message
2204
2205   """
2206   if base_dir is None:
2207     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2208   else:
2209     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2210
2211   if os_dir is None:
2212     return False, "Directory for OS %s not found in search path" % name
2213
2214   status, api_versions = _OSOndiskAPIVersion(os_dir)
2215   if not status:
2216     # push the error up
2217     return status, api_versions
2218
2219   if not constants.OS_API_VERSIONS.intersection(api_versions):
2220     return False, ("API version mismatch for path '%s': found %s, want %s." %
2221                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2222
2223   # OS Files dictionary, we will populate it with the absolute path
2224   # names; if the value is True, then it is a required file, otherwise
2225   # an optional one
2226   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2227
2228   if max(api_versions) >= constants.OS_API_V15:
2229     os_files[constants.OS_VARIANTS_FILE] = False
2230
2231   if max(api_versions) >= constants.OS_API_V20:
2232     os_files[constants.OS_PARAMETERS_FILE] = True
2233   else:
2234     del os_files[constants.OS_SCRIPT_VERIFY]
2235
2236   for (filename, required) in os_files.items():
2237     os_files[filename] = utils.PathJoin(os_dir, filename)
2238
2239     try:
2240       st = os.stat(os_files[filename])
2241     except EnvironmentError, err:
2242       if err.errno == errno.ENOENT and not required:
2243         del os_files[filename]
2244         continue
2245       return False, ("File '%s' under path '%s' is missing (%s)" %
2246                      (filename, os_dir, utils.ErrnoOrStr(err)))
2247
2248     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2249       return False, ("File '%s' under path '%s' is not a regular file" %
2250                      (filename, os_dir))
2251
2252     if filename in constants.OS_SCRIPTS:
2253       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2254         return False, ("File '%s' under path '%s' is not executable" %
2255                        (filename, os_dir))
2256
2257   variants = []
2258   if constants.OS_VARIANTS_FILE in os_files:
2259     variants_file = os_files[constants.OS_VARIANTS_FILE]
2260     try:
2261       variants = utils.ReadFile(variants_file).splitlines()
2262     except EnvironmentError, err:
2263       # we accept missing files, but not other errors
2264       if err.errno != errno.ENOENT:
2265         return False, ("Error while reading the OS variants file at %s: %s" %
2266                        (variants_file, utils.ErrnoOrStr(err)))
2267
2268   parameters = []
2269   if constants.OS_PARAMETERS_FILE in os_files:
2270     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2271     try:
2272       parameters = utils.ReadFile(parameters_file).splitlines()
2273     except EnvironmentError, err:
2274       return False, ("Error while reading the OS parameters file at %s: %s" %
2275                      (parameters_file, utils.ErrnoOrStr(err)))
2276     parameters = [v.split(None, 1) for v in parameters]
2277
2278   os_obj = objects.OS(name=name, path=os_dir,
2279                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2280                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2281                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2282                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2283                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2284                                                  None),
2285                       supported_variants=variants,
2286                       supported_parameters=parameters,
2287                       api_versions=api_versions)
2288   return True, os_obj
2289
2290
2291 def OSFromDisk(name, base_dir=None):
2292   """Create an OS instance from disk.
2293
2294   This function will return an OS instance if the given name is a
2295   valid OS name. Otherwise, it will raise an appropriate
2296   L{RPCFail} exception, detailing why this is not a valid OS.
2297
2298   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2299   an exception but returns true/false status data.
2300
2301   @type base_dir: string
2302   @keyword base_dir: Base directory containing OS installations.
2303                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2304   @rtype: L{objects.OS}
2305   @return: the OS instance if we find a valid one
2306   @raise RPCFail: if we don't find a valid OS
2307
2308   """
2309   name_only = objects.OS.GetName(name)
2310   status, payload = _TryOSFromDisk(name_only, base_dir)
2311
2312   if not status:
2313     _Fail(payload)
2314
2315   return payload
2316
2317
2318 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2319   """Calculate the basic environment for an os script.
2320
2321   @type os_name: str
2322   @param os_name: full operating system name (including variant)
2323   @type inst_os: L{objects.OS}
2324   @param inst_os: operating system for which the environment is being built
2325   @type os_params: dict
2326   @param os_params: the OS parameters
2327   @type debug: integer
2328   @param debug: debug level (0 or 1, for OS Api 10)
2329   @rtype: dict
2330   @return: dict of environment variables
2331   @raise errors.BlockDeviceError: if the block device
2332       cannot be found
2333
2334   """
2335   result = {}
2336   api_version = \
2337     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2338   result["OS_API_VERSION"] = "%d" % api_version
2339   result["OS_NAME"] = inst_os.name
2340   result["DEBUG_LEVEL"] = "%d" % debug
2341
2342   # OS variants
2343   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2344     variant = objects.OS.GetVariant(os_name)
2345     if not variant:
2346       variant = inst_os.supported_variants[0]
2347   else:
2348     variant = ""
2349   result["OS_VARIANT"] = variant
2350
2351   # OS params
2352   for pname, pvalue in os_params.items():
2353     result["OSP_%s" % pname.upper()] = pvalue
2354
2355   return result
2356
2357
2358 def OSEnvironment(instance, inst_os, debug=0):
2359   """Calculate the environment for an os script.
2360
2361   @type instance: L{objects.Instance}
2362   @param instance: target instance for the os script run
2363   @type inst_os: L{objects.OS}
2364   @param inst_os: operating system for which the environment is being built
2365   @type debug: integer
2366   @param debug: debug level (0 or 1, for OS Api 10)
2367   @rtype: dict
2368   @return: dict of environment variables
2369   @raise errors.BlockDeviceError: if the block device
2370       cannot be found
2371
2372   """
2373   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2374
2375   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2376     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2377
2378   result["HYPERVISOR"] = instance.hypervisor
2379   result["DISK_COUNT"] = "%d" % len(instance.disks)
2380   result["NIC_COUNT"] = "%d" % len(instance.nics)
2381   result["INSTANCE_SECONDARY_NODES"] = \
2382       ("%s" % " ".join(instance.secondary_nodes))
2383
2384   # Disks
2385   for idx, disk in enumerate(instance.disks):
2386     real_disk = _OpenRealBD(disk)
2387     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2388     result["DISK_%d_ACCESS" % idx] = disk.mode
2389     if constants.HV_DISK_TYPE in instance.hvparams:
2390       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2391         instance.hvparams[constants.HV_DISK_TYPE]
2392     if disk.dev_type in constants.LDS_BLOCK:
2393       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2394     elif disk.dev_type == constants.LD_FILE:
2395       result["DISK_%d_BACKEND_TYPE" % idx] = \
2396         "file:%s" % disk.physical_id[0]
2397
2398   # NICs
2399   for idx, nic in enumerate(instance.nics):
2400     result["NIC_%d_MAC" % idx] = nic.mac
2401     if nic.ip:
2402       result["NIC_%d_IP" % idx] = nic.ip
2403     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2404     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2405       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2406     if nic.nicparams[constants.NIC_LINK]:
2407       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2408     if constants.HV_NIC_TYPE in instance.hvparams:
2409       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2410         instance.hvparams[constants.HV_NIC_TYPE]
2411
2412   # HV/BE params
2413   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2414     for key, value in source.items():
2415       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2416
2417   return result
2418
2419
2420 def BlockdevGrow(disk, amount, dryrun):
2421   """Grow a stack of block devices.
2422
2423   This function is called recursively, with the childrens being the
2424   first ones to resize.
2425
2426   @type disk: L{objects.Disk}
2427   @param disk: the disk to be grown
2428   @type amount: integer
2429   @param amount: the amount (in mebibytes) to grow with
2430   @type dryrun: boolean
2431   @param dryrun: whether to execute the operation in simulation mode
2432       only, without actually increasing the size
2433   @rtype: (status, result)
2434   @return: a tuple with the status of the operation (True/False), and
2435       the errors message if status is False
2436
2437   """
2438   r_dev = _RecursiveFindBD(disk)
2439   if r_dev is None:
2440     _Fail("Cannot find block device %s", disk)
2441
2442   try:
2443     r_dev.Grow(amount, dryrun)
2444   except errors.BlockDeviceError, err:
2445     _Fail("Failed to grow block device: %s", err, exc=True)
2446
2447
2448 def BlockdevSnapshot(disk):
2449   """Create a snapshot copy of a block device.
2450
2451   This function is called recursively, and the snapshot is actually created
2452   just for the leaf lvm backend device.
2453
2454   @type disk: L{objects.Disk}
2455   @param disk: the disk to be snapshotted
2456   @rtype: string
2457   @return: snapshot disk ID as (vg, lv)
2458
2459   """
2460   if disk.dev_type == constants.LD_DRBD8:
2461     if not disk.children:
2462       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2463             disk.unique_id)
2464     return BlockdevSnapshot(disk.children[0])
2465   elif disk.dev_type == constants.LD_LV:
2466     r_dev = _RecursiveFindBD(disk)
2467     if r_dev is not None:
2468       # FIXME: choose a saner value for the snapshot size
2469       # let's stay on the safe side and ask for the full size, for now
2470       return r_dev.Snapshot(disk.size)
2471     else:
2472       _Fail("Cannot find block device %s", disk)
2473   else:
2474     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2475           disk.unique_id, disk.dev_type)
2476
2477
2478 def FinalizeExport(instance, snap_disks):
2479   """Write out the export configuration information.
2480
2481   @type instance: L{objects.Instance}
2482   @param instance: the instance which we export, used for
2483       saving configuration
2484   @type snap_disks: list of L{objects.Disk}
2485   @param snap_disks: list of snapshot block devices, which
2486       will be used to get the actual name of the dump file
2487
2488   @rtype: None
2489
2490   """
2491   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2492   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2493
2494   config = objects.SerializableConfigParser()
2495
2496   config.add_section(constants.INISECT_EXP)
2497   config.set(constants.INISECT_EXP, "version", "0")
2498   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2499   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2500   config.set(constants.INISECT_EXP, "os", instance.os)
2501   config.set(constants.INISECT_EXP, "compression", "none")
2502
2503   config.add_section(constants.INISECT_INS)
2504   config.set(constants.INISECT_INS, "name", instance.name)
2505   config.set(constants.INISECT_INS, "maxmem", "%d" %
2506              instance.beparams[constants.BE_MAXMEM])
2507   config.set(constants.INISECT_INS, "minmem", "%d" %
2508              instance.beparams[constants.BE_MINMEM])
2509   # "memory" is deprecated, but useful for exporting to old ganeti versions
2510   config.set(constants.INISECT_INS, "memory", "%d" %
2511              instance.beparams[constants.BE_MAXMEM])
2512   config.set(constants.INISECT_INS, "vcpus", "%d" %
2513              instance.beparams[constants.BE_VCPUS])
2514   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2515   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2516   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2517
2518   nic_total = 0
2519   for nic_count, nic in enumerate(instance.nics):
2520     nic_total += 1
2521     config.set(constants.INISECT_INS, "nic%d_mac" %
2522                nic_count, "%s" % nic.mac)
2523     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2524     for param in constants.NICS_PARAMETER_TYPES:
2525       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2526                  "%s" % nic.nicparams.get(param, None))
2527   # TODO: redundant: on load can read nics until it doesn't exist
2528   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2529
2530   disk_total = 0
2531   for disk_count, disk in enumerate(snap_disks):
2532     if disk:
2533       disk_total += 1
2534       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2535                  ("%s" % disk.iv_name))
2536       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2537                  ("%s" % disk.physical_id[1]))
2538       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2539                  ("%d" % disk.size))
2540
2541   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2542
2543   # New-style hypervisor/backend parameters
2544
2545   config.add_section(constants.INISECT_HYP)
2546   for name, value in instance.hvparams.items():
2547     if name not in constants.HVC_GLOBALS:
2548       config.set(constants.INISECT_HYP, name, str(value))
2549
2550   config.add_section(constants.INISECT_BEP)
2551   for name, value in instance.beparams.items():
2552     config.set(constants.INISECT_BEP, name, str(value))
2553
2554   config.add_section(constants.INISECT_OSP)
2555   for name, value in instance.osparams.items():
2556     config.set(constants.INISECT_OSP, name, str(value))
2557
2558   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2559                   data=config.Dumps())
2560   shutil.rmtree(finaldestdir, ignore_errors=True)
2561   shutil.move(destdir, finaldestdir)
2562
2563
2564 def ExportInfo(dest):
2565   """Get export configuration information.
2566
2567   @type dest: str
2568   @param dest: directory containing the export
2569
2570   @rtype: L{objects.SerializableConfigParser}
2571   @return: a serializable config file containing the
2572       export info
2573
2574   """
2575   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2576
2577   config = objects.SerializableConfigParser()
2578   config.read(cff)
2579
2580   if (not config.has_section(constants.INISECT_EXP) or
2581       not config.has_section(constants.INISECT_INS)):
2582     _Fail("Export info file doesn't have the required fields")
2583
2584   return config.Dumps()
2585
2586
2587 def ListExports():
2588   """Return a list of exports currently available on this machine.
2589
2590   @rtype: list
2591   @return: list of the exports
2592
2593   """
2594   if os.path.isdir(constants.EXPORT_DIR):
2595     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2596   else:
2597     _Fail("No exports directory")
2598
2599
2600 def RemoveExport(export):
2601   """Remove an existing export from the node.
2602
2603   @type export: str
2604   @param export: the name of the export to remove
2605   @rtype: None
2606
2607   """
2608   target = utils.PathJoin(constants.EXPORT_DIR, export)
2609
2610   try:
2611     shutil.rmtree(target)
2612   except EnvironmentError, err:
2613     _Fail("Error while removing the export: %s", err, exc=True)
2614
2615
2616 def BlockdevRename(devlist):
2617   """Rename a list of block devices.
2618
2619   @type devlist: list of tuples
2620   @param devlist: list of tuples of the form  (disk,
2621       new_logical_id, new_physical_id); disk is an
2622       L{objects.Disk} object describing the current disk,
2623       and new logical_id/physical_id is the name we
2624       rename it to
2625   @rtype: boolean
2626   @return: True if all renames succeeded, False otherwise
2627
2628   """
2629   msgs = []
2630   result = True
2631   for disk, unique_id in devlist:
2632     dev = _RecursiveFindBD(disk)
2633     if dev is None:
2634       msgs.append("Can't find device %s in rename" % str(disk))
2635       result = False
2636       continue
2637     try:
2638       old_rpath = dev.dev_path
2639       dev.Rename(unique_id)
2640       new_rpath = dev.dev_path
2641       if old_rpath != new_rpath:
2642         DevCacheManager.RemoveCache(old_rpath)
2643         # FIXME: we should add the new cache information here, like:
2644         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2645         # but we don't have the owner here - maybe parse from existing
2646         # cache? for now, we only lose lvm data when we rename, which
2647         # is less critical than DRBD or MD
2648     except errors.BlockDeviceError, err:
2649       msgs.append("Can't rename device '%s' to '%s': %s" %
2650                   (dev, unique_id, err))
2651       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2652       result = False
2653   if not result:
2654     _Fail("; ".join(msgs))
2655
2656
2657 def _TransformFileStorageDir(fs_dir):
2658   """Checks whether given file_storage_dir is valid.
2659
2660   Checks wheter the given fs_dir is within the cluster-wide default
2661   file_storage_dir or the shared_file_storage_dir, which are stored in
2662   SimpleStore. Only paths under those directories are allowed.
2663
2664   @type fs_dir: str
2665   @param fs_dir: the path to check
2666
2667   @return: the normalized path if valid, None otherwise
2668
2669   """
2670   if not constants.ENABLE_FILE_STORAGE:
2671     _Fail("File storage disabled at configure time")
2672   cfg = _GetConfig()
2673   fs_dir = os.path.normpath(fs_dir)
2674   base_fstore = cfg.GetFileStorageDir()
2675   base_shared = cfg.GetSharedFileStorageDir()
2676   if not (utils.IsBelowDir(base_fstore, fs_dir) or
2677           utils.IsBelowDir(base_shared, fs_dir)):
2678     _Fail("File storage directory '%s' is not under base file"
2679           " storage directory '%s' or shared storage directory '%s'",
2680           fs_dir, base_fstore, base_shared)
2681   return fs_dir
2682
2683
2684 def CreateFileStorageDir(file_storage_dir):
2685   """Create file storage directory.
2686
2687   @type file_storage_dir: str
2688   @param file_storage_dir: directory to create
2689
2690   @rtype: tuple
2691   @return: tuple with first element a boolean indicating wheter dir
2692       creation was successful or not
2693
2694   """
2695   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2696   if os.path.exists(file_storage_dir):
2697     if not os.path.isdir(file_storage_dir):
2698       _Fail("Specified storage dir '%s' is not a directory",
2699             file_storage_dir)
2700   else:
2701     try:
2702       os.makedirs(file_storage_dir, 0750)
2703     except OSError, err:
2704       _Fail("Cannot create file storage directory '%s': %s",
2705             file_storage_dir, err, exc=True)
2706
2707
2708 def RemoveFileStorageDir(file_storage_dir):
2709   """Remove file storage directory.
2710
2711   Remove it only if it's empty. If not log an error and return.
2712
2713   @type file_storage_dir: str
2714   @param file_storage_dir: the directory we should cleanup
2715   @rtype: tuple (success,)
2716   @return: tuple of one element, C{success}, denoting
2717       whether the operation was successful
2718
2719   """
2720   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2721   if os.path.exists(file_storage_dir):
2722     if not os.path.isdir(file_storage_dir):
2723       _Fail("Specified Storage directory '%s' is not a directory",
2724             file_storage_dir)
2725     # deletes dir only if empty, otherwise we want to fail the rpc call
2726     try:
2727       os.rmdir(file_storage_dir)
2728     except OSError, err:
2729       _Fail("Cannot remove file storage directory '%s': %s",
2730             file_storage_dir, err)
2731
2732
2733 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2734   """Rename the file storage directory.
2735
2736   @type old_file_storage_dir: str
2737   @param old_file_storage_dir: the current path
2738   @type new_file_storage_dir: str
2739   @param new_file_storage_dir: the name we should rename to
2740   @rtype: tuple (success,)
2741   @return: tuple of one element, C{success}, denoting
2742       whether the operation was successful
2743
2744   """
2745   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2746   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2747   if not os.path.exists(new_file_storage_dir):
2748     if os.path.isdir(old_file_storage_dir):
2749       try:
2750         os.rename(old_file_storage_dir, new_file_storage_dir)
2751       except OSError, err:
2752         _Fail("Cannot rename '%s' to '%s': %s",
2753               old_file_storage_dir, new_file_storage_dir, err)
2754     else:
2755       _Fail("Specified storage dir '%s' is not a directory",
2756             old_file_storage_dir)
2757   else:
2758     if os.path.exists(old_file_storage_dir):
2759       _Fail("Cannot rename '%s' to '%s': both locations exist",
2760             old_file_storage_dir, new_file_storage_dir)
2761
2762
2763 def _EnsureJobQueueFile(file_name):
2764   """Checks whether the given filename is in the queue directory.
2765
2766   @type file_name: str
2767   @param file_name: the file name we should check
2768   @rtype: None
2769   @raises RPCFail: if the file is not valid
2770
2771   """
2772   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2773   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2774
2775   if not result:
2776     _Fail("Passed job queue file '%s' does not belong to"
2777           " the queue directory '%s'", file_name, queue_dir)
2778
2779
2780 def JobQueueUpdate(file_name, content):
2781   """Updates a file in the queue directory.
2782
2783   This is just a wrapper over L{utils.io.WriteFile}, with proper
2784   checking.
2785
2786   @type file_name: str
2787   @param file_name: the job file name
2788   @type content: str
2789   @param content: the new job contents
2790   @rtype: boolean
2791   @return: the success of the operation
2792
2793   """
2794   _EnsureJobQueueFile(file_name)
2795   getents = runtime.GetEnts()
2796
2797   # Write and replace the file atomically
2798   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2799                   gid=getents.masterd_gid)
2800
2801
2802 def JobQueueRename(old, new):
2803   """Renames a job queue file.
2804
2805   This is just a wrapper over os.rename with proper checking.
2806
2807   @type old: str
2808   @param old: the old (actual) file name
2809   @type new: str
2810   @param new: the desired file name
2811   @rtype: tuple
2812   @return: the success of the operation and payload
2813
2814   """
2815   _EnsureJobQueueFile(old)
2816   _EnsureJobQueueFile(new)
2817
2818   getents = runtime.GetEnts()
2819
2820   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2821                    dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2822
2823
2824 def BlockdevClose(instance_name, disks):
2825   """Closes the given block devices.
2826
2827   This means they will be switched to secondary mode (in case of
2828   DRBD).
2829
2830   @param instance_name: if the argument is not empty, the symlinks
2831       of this instance will be removed
2832   @type disks: list of L{objects.Disk}
2833   @param disks: the list of disks to be closed
2834   @rtype: tuple (success, message)
2835   @return: a tuple of success and message, where success
2836       indicates the succes of the operation, and message
2837       which will contain the error details in case we
2838       failed
2839
2840   """
2841   bdevs = []
2842   for cf in disks:
2843     rd = _RecursiveFindBD(cf)
2844     if rd is None:
2845       _Fail("Can't find device %s", cf)
2846     bdevs.append(rd)
2847
2848   msg = []
2849   for rd in bdevs:
2850     try:
2851       rd.Close()
2852     except errors.BlockDeviceError, err:
2853       msg.append(str(err))
2854   if msg:
2855     _Fail("Can't make devices secondary: %s", ",".join(msg))
2856   else:
2857     if instance_name:
2858       _RemoveBlockDevLinks(instance_name, disks)
2859
2860
2861 def ValidateHVParams(hvname, hvparams):
2862   """Validates the given hypervisor parameters.
2863
2864   @type hvname: string
2865   @param hvname: the hypervisor name
2866   @type hvparams: dict
2867   @param hvparams: the hypervisor parameters to be validated
2868   @rtype: None
2869
2870   """
2871   try:
2872     hv_type = hypervisor.GetHypervisor(hvname)
2873     hv_type.ValidateParameters(hvparams)
2874   except errors.HypervisorError, err:
2875     _Fail(str(err), log=False)
2876
2877
2878 def _CheckOSPList(os_obj, parameters):
2879   """Check whether a list of parameters is supported by the OS.
2880
2881   @type os_obj: L{objects.OS}
2882   @param os_obj: OS object to check
2883   @type parameters: list
2884   @param parameters: the list of parameters to check
2885
2886   """
2887   supported = [v[0] for v in os_obj.supported_parameters]
2888   delta = frozenset(parameters).difference(supported)
2889   if delta:
2890     _Fail("The following parameters are not supported"
2891           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2892
2893
2894 def ValidateOS(required, osname, checks, osparams):
2895   """Validate the given OS' parameters.
2896
2897   @type required: boolean
2898   @param required: whether absence of the OS should translate into
2899       failure or not
2900   @type osname: string
2901   @param osname: the OS to be validated
2902   @type checks: list
2903   @param checks: list of the checks to run (currently only 'parameters')
2904   @type osparams: dict
2905   @param osparams: dictionary with OS parameters
2906   @rtype: boolean
2907   @return: True if the validation passed, or False if the OS was not
2908       found and L{required} was false
2909
2910   """
2911   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2912     _Fail("Unknown checks required for OS %s: %s", osname,
2913           set(checks).difference(constants.OS_VALIDATE_CALLS))
2914
2915   name_only = objects.OS.GetName(osname)
2916   status, tbv = _TryOSFromDisk(name_only, None)
2917
2918   if not status:
2919     if required:
2920       _Fail(tbv)
2921     else:
2922       return False
2923
2924   if max(tbv.api_versions) < constants.OS_API_V20:
2925     return True
2926
2927   if constants.OS_VALIDATE_PARAMETERS in checks:
2928     _CheckOSPList(tbv, osparams.keys())
2929
2930   validate_env = OSCoreEnv(osname, tbv, osparams)
2931   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2932                         cwd=tbv.path, reset_env=True)
2933   if result.failed:
2934     logging.error("os validate command '%s' returned error: %s output: %s",
2935                   result.cmd, result.fail_reason, result.output)
2936     _Fail("OS validation script failed (%s), output: %s",
2937           result.fail_reason, result.output, log=False)
2938
2939   return True
2940
2941
2942 def DemoteFromMC():
2943   """Demotes the current node from master candidate role.
2944
2945   """
2946   # try to ensure we're not the master by mistake
2947   master, myself = ssconf.GetMasterAndMyself()
2948   if master == myself:
2949     _Fail("ssconf status shows I'm the master node, will not demote")
2950
2951   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2952   if not result.failed:
2953     _Fail("The master daemon is running, will not demote")
2954
2955   try:
2956     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2957       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2958   except EnvironmentError, err:
2959     if err.errno != errno.ENOENT:
2960       _Fail("Error while backing up cluster file: %s", err, exc=True)
2961
2962   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2963
2964
2965 def _GetX509Filenames(cryptodir, name):
2966   """Returns the full paths for the private key and certificate.
2967
2968   """
2969   return (utils.PathJoin(cryptodir, name),
2970           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2971           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2972
2973
2974 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2975   """Creates a new X509 certificate for SSL/TLS.
2976
2977   @type validity: int
2978   @param validity: Validity in seconds
2979   @rtype: tuple; (string, string)
2980   @return: Certificate name and public part
2981
2982   """
2983   (key_pem, cert_pem) = \
2984     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2985                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2986
2987   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2988                               prefix="x509-%s-" % utils.TimestampForFilename())
2989   try:
2990     name = os.path.basename(cert_dir)
2991     assert len(name) > 5
2992
2993     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2994
2995     utils.WriteFile(key_file, mode=0400, data=key_pem)
2996     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2997
2998     # Never return private key as it shouldn't leave the node
2999     return (name, cert_pem)
3000   except Exception:
3001     shutil.rmtree(cert_dir, ignore_errors=True)
3002     raise
3003
3004
3005 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
3006   """Removes a X509 certificate.
3007
3008   @type name: string
3009   @param name: Certificate name
3010
3011   """
3012   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3013
3014   utils.RemoveFile(key_file)
3015   utils.RemoveFile(cert_file)
3016
3017   try:
3018     os.rmdir(cert_dir)
3019   except EnvironmentError, err:
3020     _Fail("Cannot remove certificate directory '%s': %s",
3021           cert_dir, err)
3022
3023
3024 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3025   """Returns the command for the requested input/output.
3026
3027   @type instance: L{objects.Instance}
3028   @param instance: The instance object
3029   @param mode: Import/export mode
3030   @param ieio: Input/output type
3031   @param ieargs: Input/output arguments
3032
3033   """
3034   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3035
3036   env = None
3037   prefix = None
3038   suffix = None
3039   exp_size = None
3040
3041   if ieio == constants.IEIO_FILE:
3042     (filename, ) = ieargs
3043
3044     if not utils.IsNormAbsPath(filename):
3045       _Fail("Path '%s' is not normalized or absolute", filename)
3046
3047     real_filename = os.path.realpath(filename)
3048     directory = os.path.dirname(real_filename)
3049
3050     if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
3051       _Fail("File '%s' is not under exports directory '%s': %s",
3052             filename, constants.EXPORT_DIR, real_filename)
3053
3054     # Create directory
3055     utils.Makedirs(directory, mode=0750)
3056
3057     quoted_filename = utils.ShellQuote(filename)
3058
3059     if mode == constants.IEM_IMPORT:
3060       suffix = "> %s" % quoted_filename
3061     elif mode == constants.IEM_EXPORT:
3062       suffix = "< %s" % quoted_filename
3063
3064       # Retrieve file size
3065       try:
3066         st = os.stat(filename)
3067       except EnvironmentError, err:
3068         logging.error("Can't stat(2) %s: %s", filename, err)
3069       else:
3070         exp_size = utils.BytesToMebibyte(st.st_size)
3071
3072   elif ieio == constants.IEIO_RAW_DISK:
3073     (disk, ) = ieargs
3074
3075     real_disk = _OpenRealBD(disk)
3076
3077     if mode == constants.IEM_IMPORT:
3078       # we set here a smaller block size as, due to transport buffering, more
3079       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3080       # is not already there or we pass a wrong path; we use notrunc to no
3081       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3082       # much memory; this means that at best, we flush every 64k, which will
3083       # not be very fast
3084       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3085                                     " bs=%s oflag=dsync"),
3086                                     real_disk.dev_path,
3087                                     str(64 * 1024))
3088
3089     elif mode == constants.IEM_EXPORT:
3090       # the block size on the read dd is 1MiB to match our units
3091       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3092                                    real_disk.dev_path,
3093                                    str(1024 * 1024), # 1 MB
3094                                    str(disk.size))
3095       exp_size = disk.size
3096
3097   elif ieio == constants.IEIO_SCRIPT:
3098     (disk, disk_index, ) = ieargs
3099
3100     assert isinstance(disk_index, (int, long))
3101
3102     real_disk = _OpenRealBD(disk)
3103
3104     inst_os = OSFromDisk(instance.os)
3105     env = OSEnvironment(instance, inst_os)
3106
3107     if mode == constants.IEM_IMPORT:
3108       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3109       env["IMPORT_INDEX"] = str(disk_index)
3110       script = inst_os.import_script
3111
3112     elif mode == constants.IEM_EXPORT:
3113       env["EXPORT_DEVICE"] = real_disk.dev_path
3114       env["EXPORT_INDEX"] = str(disk_index)
3115       script = inst_os.export_script
3116
3117     # TODO: Pass special environment only to script
3118     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3119
3120     if mode == constants.IEM_IMPORT:
3121       suffix = "| %s" % script_cmd
3122
3123     elif mode == constants.IEM_EXPORT:
3124       prefix = "%s |" % script_cmd
3125
3126     # Let script predict size
3127     exp_size = constants.IE_CUSTOM_SIZE
3128
3129   else:
3130     _Fail("Invalid %s I/O mode %r", mode, ieio)
3131
3132   return (env, prefix, suffix, exp_size)
3133
3134
3135 def _CreateImportExportStatusDir(prefix):
3136   """Creates status directory for import/export.
3137
3138   """
3139   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3140                           prefix=("%s-%s-" %
3141                                   (prefix, utils.TimestampForFilename())))
3142
3143
3144 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3145                             ieio, ieioargs):
3146   """Starts an import or export daemon.
3147
3148   @param mode: Import/output mode
3149   @type opts: L{objects.ImportExportOptions}
3150   @param opts: Daemon options
3151   @type host: string
3152   @param host: Remote host for export (None for import)
3153   @type port: int
3154   @param port: Remote port for export (None for import)
3155   @type instance: L{objects.Instance}
3156   @param instance: Instance object
3157   @type component: string
3158   @param component: which part of the instance is transferred now,
3159       e.g. 'disk/0'
3160   @param ieio: Input/output type
3161   @param ieioargs: Input/output arguments
3162
3163   """
3164   if mode == constants.IEM_IMPORT:
3165     prefix = "import"
3166
3167     if not (host is None and port is None):
3168       _Fail("Can not specify host or port on import")
3169
3170   elif mode == constants.IEM_EXPORT:
3171     prefix = "export"
3172
3173     if host is None or port is None:
3174       _Fail("Host and port must be specified for an export")
3175
3176   else:
3177     _Fail("Invalid mode %r", mode)
3178
3179   if (opts.key_name is None) ^ (opts.ca_pem is None):
3180     _Fail("Cluster certificate can only be used for both key and CA")
3181
3182   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3183     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3184
3185   if opts.key_name is None:
3186     # Use server.pem
3187     key_path = constants.NODED_CERT_FILE
3188     cert_path = constants.NODED_CERT_FILE
3189     assert opts.ca_pem is None
3190   else:
3191     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3192                                                  opts.key_name)
3193     assert opts.ca_pem is not None
3194
3195   for i in [key_path, cert_path]:
3196     if not os.path.exists(i):
3197       _Fail("File '%s' does not exist" % i)
3198
3199   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3200   try:
3201     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3202     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3203     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3204
3205     if opts.ca_pem is None:
3206       # Use server.pem
3207       ca = utils.ReadFile(constants.NODED_CERT_FILE)
3208     else:
3209       ca = opts.ca_pem
3210
3211     # Write CA file
3212     utils.WriteFile(ca_file, data=ca, mode=0400)
3213
3214     cmd = [
3215       constants.IMPORT_EXPORT_DAEMON,
3216       status_file, mode,
3217       "--key=%s" % key_path,
3218       "--cert=%s" % cert_path,
3219       "--ca=%s" % ca_file,
3220       ]
3221
3222     if host:
3223       cmd.append("--host=%s" % host)
3224
3225     if port:
3226       cmd.append("--port=%s" % port)
3227
3228     if opts.ipv6:
3229       cmd.append("--ipv6")
3230     else:
3231       cmd.append("--ipv4")
3232
3233     if opts.compress:
3234       cmd.append("--compress=%s" % opts.compress)
3235
3236     if opts.magic:
3237       cmd.append("--magic=%s" % opts.magic)
3238
3239     if exp_size is not None:
3240       cmd.append("--expected-size=%s" % exp_size)
3241
3242     if cmd_prefix:
3243       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3244
3245     if cmd_suffix:
3246       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3247
3248     if mode == constants.IEM_EXPORT:
3249       # Retry connection a few times when connecting to remote peer
3250       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3251       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3252     elif opts.connect_timeout is not None:
3253       assert mode == constants.IEM_IMPORT
3254       # Overall timeout for establishing connection while listening
3255       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3256
3257     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3258
3259     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3260     # support for receiving a file descriptor for output
3261     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3262                       output=logfile)
3263
3264     # The import/export name is simply the status directory name
3265     return os.path.basename(status_dir)
3266
3267   except Exception:
3268     shutil.rmtree(status_dir, ignore_errors=True)
3269     raise
3270
3271
3272 def GetImportExportStatus(names):
3273   """Returns import/export daemon status.
3274
3275   @type names: sequence
3276   @param names: List of names
3277   @rtype: List of dicts
3278   @return: Returns a list of the state of each named import/export or None if a
3279            status couldn't be read
3280
3281   """
3282   result = []
3283
3284   for name in names:
3285     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3286                                  _IES_STATUS_FILE)
3287
3288     try:
3289       data = utils.ReadFile(status_file)
3290     except EnvironmentError, err:
3291       if err.errno != errno.ENOENT:
3292         raise
3293       data = None
3294
3295     if not data:
3296       result.append(None)
3297       continue
3298
3299     result.append(serializer.LoadJson(data))
3300
3301   return result
3302
3303
3304 def AbortImportExport(name):
3305   """Sends SIGTERM to a running import/export daemon.
3306
3307   """
3308   logging.info("Abort import/export %s", name)
3309
3310   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3311   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3312
3313   if pid:
3314     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3315                  name, pid)
3316     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3317
3318
3319 def CleanupImportExport(name):
3320   """Cleanup after an import or export.
3321
3322   If the import/export daemon is still running it's killed. Afterwards the
3323   whole status directory is removed.
3324
3325   """
3326   logging.info("Finalizing import/export %s", name)
3327
3328   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3329
3330   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3331
3332   if pid:
3333     logging.info("Import/export %s is still running with PID %s",
3334                  name, pid)
3335     utils.KillProcess(pid, waitpid=False)
3336
3337   shutil.rmtree(status_dir, ignore_errors=True)
3338
3339
3340 def _FindDisks(nodes_ip, disks):
3341   """Sets the physical ID on disks and returns the block devices.
3342
3343   """
3344   # set the correct physical ID
3345   my_name = netutils.Hostname.GetSysName()
3346   for cf in disks:
3347     cf.SetPhysicalID(my_name, nodes_ip)
3348
3349   bdevs = []
3350
3351   for cf in disks:
3352     rd = _RecursiveFindBD(cf)
3353     if rd is None:
3354       _Fail("Can't find device %s", cf)
3355     bdevs.append(rd)
3356   return bdevs
3357
3358
3359 def DrbdDisconnectNet(nodes_ip, disks):
3360   """Disconnects the network on a list of drbd devices.
3361
3362   """
3363   bdevs = _FindDisks(nodes_ip, disks)
3364
3365   # disconnect disks
3366   for rd in bdevs:
3367     try:
3368       rd.DisconnectNet()
3369     except errors.BlockDeviceError, err:
3370       _Fail("Can't change network configuration to standalone mode: %s",
3371             err, exc=True)
3372
3373
3374 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3375   """Attaches the network on a list of drbd devices.
3376
3377   """
3378   bdevs = _FindDisks(nodes_ip, disks)
3379
3380   if multimaster:
3381     for idx, rd in enumerate(bdevs):
3382       try:
3383         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3384       except EnvironmentError, err:
3385         _Fail("Can't create symlink: %s", err)
3386   # reconnect disks, switch to new master configuration and if
3387   # needed primary mode
3388   for rd in bdevs:
3389     try:
3390       rd.AttachNet(multimaster)
3391     except errors.BlockDeviceError, err:
3392       _Fail("Can't change network configuration: %s", err)
3393
3394   # wait until the disks are connected; we need to retry the re-attach
3395   # if the device becomes standalone, as this might happen if the one
3396   # node disconnects and reconnects in a different mode before the
3397   # other node reconnects; in this case, one or both of the nodes will
3398   # decide it has wrong configuration and switch to standalone
3399
3400   def _Attach():
3401     all_connected = True
3402
3403     for rd in bdevs:
3404       stats = rd.GetProcStatus()
3405
3406       all_connected = (all_connected and
3407                        (stats.is_connected or stats.is_in_resync))
3408
3409       if stats.is_standalone:
3410         # peer had different config info and this node became
3411         # standalone, even though this should not happen with the
3412         # new staged way of changing disk configs
3413         try:
3414           rd.AttachNet(multimaster)
3415         except errors.BlockDeviceError, err:
3416           _Fail("Can't change network configuration: %s", err)
3417
3418     if not all_connected:
3419       raise utils.RetryAgain()
3420
3421   try:
3422     # Start with a delay of 100 miliseconds and go up to 5 seconds
3423     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3424   except utils.RetryTimeout:
3425     _Fail("Timeout in disk reconnecting")
3426
3427   if multimaster:
3428     # change to primary mode
3429     for rd in bdevs:
3430       try:
3431         rd.Open()
3432       except errors.BlockDeviceError, err:
3433         _Fail("Can't change to primary mode: %s", err)
3434
3435
3436 def DrbdWaitSync(nodes_ip, disks):
3437   """Wait until DRBDs have synchronized.
3438
3439   """
3440   def _helper(rd):
3441     stats = rd.GetProcStatus()
3442     if not (stats.is_connected or stats.is_in_resync):
3443       raise utils.RetryAgain()
3444     return stats
3445
3446   bdevs = _FindDisks(nodes_ip, disks)
3447
3448   min_resync = 100
3449   alldone = True
3450   for rd in bdevs:
3451     try:
3452       # poll each second for 15 seconds
3453       stats = utils.Retry(_helper, 1, 15, args=[rd])
3454     except utils.RetryTimeout:
3455       stats = rd.GetProcStatus()
3456       # last check
3457       if not (stats.is_connected or stats.is_in_resync):
3458         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3459     alldone = alldone and (not stats.is_in_resync)
3460     if stats.sync_percent is not None:
3461       min_resync = min(min_resync, stats.sync_percent)
3462
3463   return (alldone, min_resync)
3464
3465
3466 def GetDrbdUsermodeHelper():
3467   """Returns DRBD usermode helper currently configured.
3468
3469   """
3470   try:
3471     return bdev.BaseDRBD.GetUsermodeHelper()
3472   except errors.BlockDeviceError, err:
3473     _Fail(str(err))
3474
3475
3476 def PowercycleNode(hypervisor_type):
3477   """Hard-powercycle the node.
3478
3479   Because we need to return first, and schedule the powercycle in the
3480   background, we won't be able to report failures nicely.
3481
3482   """
3483   hyper = hypervisor.GetHypervisor(hypervisor_type)
3484   try:
3485     pid = os.fork()
3486   except OSError:
3487     # if we can't fork, we'll pretend that we're in the child process
3488     pid = 0
3489   if pid > 0:
3490     return "Reboot scheduled in 5 seconds"
3491   # ensure the child is running on ram
3492   try:
3493     utils.Mlockall()
3494   except Exception: # pylint: disable=W0703
3495     pass
3496   time.sleep(5)
3497   hyper.PowercycleNode()
3498
3499
3500 class HooksRunner(object):
3501   """Hook runner.
3502
3503   This class is instantiated on the node side (ganeti-noded) and not
3504   on the master side.
3505
3506   """
3507   def __init__(self, hooks_base_dir=None):
3508     """Constructor for hooks runner.
3509
3510     @type hooks_base_dir: str or None
3511     @param hooks_base_dir: if not None, this overrides the
3512         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3513
3514     """
3515     if hooks_base_dir is None:
3516       hooks_base_dir = constants.HOOKS_BASE_DIR
3517     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3518     # constant
3519     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3520
3521   def RunLocalHooks(self, node_list, hpath, phase, env):
3522     """Check that the hooks will be run only locally and then run them.
3523
3524     """
3525     assert len(node_list) == 1
3526     node = node_list[0]
3527     _, myself = ssconf.GetMasterAndMyself()
3528     assert node == myself
3529
3530     results = self.RunHooks(hpath, phase, env)
3531
3532     # Return values in the form expected by HooksMaster
3533     return {node: (None, False, results)}
3534
3535   def RunHooks(self, hpath, phase, env):
3536     """Run the scripts in the hooks directory.
3537
3538     @type hpath: str
3539     @param hpath: the path to the hooks directory which
3540         holds the scripts
3541     @type phase: str
3542     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3543         L{constants.HOOKS_PHASE_POST}
3544     @type env: dict
3545     @param env: dictionary with the environment for the hook
3546     @rtype: list
3547     @return: list of 3-element tuples:
3548       - script path
3549       - script result, either L{constants.HKR_SUCCESS} or
3550         L{constants.HKR_FAIL}
3551       - output of the script
3552
3553     @raise errors.ProgrammerError: for invalid input
3554         parameters
3555
3556     """
3557     if phase == constants.HOOKS_PHASE_PRE:
3558       suffix = "pre"
3559     elif phase == constants.HOOKS_PHASE_POST:
3560       suffix = "post"
3561     else:
3562       _Fail("Unknown hooks phase '%s'", phase)
3563
3564     subdir = "%s-%s.d" % (hpath, suffix)
3565     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3566
3567     results = []
3568
3569     if not os.path.isdir(dir_name):
3570       # for non-existing/non-dirs, we simply exit instead of logging a
3571       # warning at every operation
3572       return results
3573
3574     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3575
3576     for (relname, relstatus, runresult)  in runparts_results:
3577       if relstatus == constants.RUNPARTS_SKIP:
3578         rrval = constants.HKR_SKIP
3579         output = ""
3580       elif relstatus == constants.RUNPARTS_ERR:
3581         rrval = constants.HKR_FAIL
3582         output = "Hook script execution error: %s" % runresult
3583       elif relstatus == constants.RUNPARTS_RUN:
3584         if runresult.failed:
3585           rrval = constants.HKR_FAIL
3586         else:
3587           rrval = constants.HKR_SUCCESS
3588         output = utils.SafeEncode(runresult.output.strip())
3589       results.append(("%s/%s" % (subdir, relname), rrval, output))
3590
3591     return results
3592
3593
3594 class IAllocatorRunner(object):
3595   """IAllocator runner.
3596
3597   This class is instantiated on the node side (ganeti-noded) and not on
3598   the master side.
3599
3600   """
3601   @staticmethod
3602   def Run(name, idata):
3603     """Run an iallocator script.
3604
3605     @type name: str
3606     @param name: the iallocator script name
3607     @type idata: str
3608     @param idata: the allocator input data
3609
3610     @rtype: tuple
3611     @return: two element tuple of:
3612        - status
3613        - either error message or stdout of allocator (for success)
3614
3615     """
3616     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3617                                   os.path.isfile)
3618     if alloc_script is None:
3619       _Fail("iallocator module '%s' not found in the search path", name)
3620
3621     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3622     try:
3623       os.write(fd, idata)
3624       os.close(fd)
3625       result = utils.RunCmd([alloc_script, fin_name])
3626       if result.failed:
3627         _Fail("iallocator module '%s' failed: %s, output '%s'",
3628               name, result.fail_reason, result.output)
3629     finally:
3630       os.unlink(fin_name)
3631
3632     return result.stdout
3633
3634
3635 class DevCacheManager(object):
3636   """Simple class for managing a cache of block device information.
3637
3638   """
3639   _DEV_PREFIX = "/dev/"
3640   _ROOT_DIR = constants.BDEV_CACHE_DIR
3641
3642   @classmethod
3643   def _ConvertPath(cls, dev_path):
3644     """Converts a /dev/name path to the cache file name.
3645
3646     This replaces slashes with underscores and strips the /dev
3647     prefix. It then returns the full path to the cache file.
3648
3649     @type dev_path: str
3650     @param dev_path: the C{/dev/} path name
3651     @rtype: str
3652     @return: the converted path name
3653
3654     """
3655     if dev_path.startswith(cls._DEV_PREFIX):
3656       dev_path = dev_path[len(cls._DEV_PREFIX):]
3657     dev_path = dev_path.replace("/", "_")
3658     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3659     return fpath
3660
3661   @classmethod
3662   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3663     """Updates the cache information for a given device.
3664
3665     @type dev_path: str
3666     @param dev_path: the pathname of the device
3667     @type owner: str
3668     @param owner: the owner (instance name) of the device
3669     @type on_primary: bool
3670     @param on_primary: whether this is the primary
3671         node nor not
3672     @type iv_name: str
3673     @param iv_name: the instance-visible name of the
3674         device, as in objects.Disk.iv_name
3675
3676     @rtype: None
3677
3678     """
3679     if dev_path is None:
3680       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3681       return
3682     fpath = cls._ConvertPath(dev_path)
3683     if on_primary:
3684       state = "primary"
3685     else:
3686       state = "secondary"
3687     if iv_name is None:
3688       iv_name = "not_visible"
3689     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3690     try:
3691       utils.WriteFile(fpath, data=fdata)
3692     except EnvironmentError, err:
3693       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3694
3695   @classmethod
3696   def RemoveCache(cls, dev_path):
3697     """Remove data for a dev_path.
3698
3699     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3700     path name and logging.
3701
3702     @type dev_path: str
3703     @param dev_path: the pathname of the device
3704
3705     @rtype: None
3706
3707     """
3708     if dev_path is None:
3709       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3710       return
3711     fpath = cls._ConvertPath(dev_path)
3712     try:
3713       utils.RemoveFile(fpath)
3714     except EnvironmentError, err:
3715       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)