Fix the node powered field
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
67
68
69 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70 _ALLOWED_CLEAN_DIRS = frozenset([
71   pathutils.DATA_DIR,
72   pathutils.JOB_QUEUE_ARCHIVE_DIR,
73   pathutils.QUEUE_DIR,
74   pathutils.CRYPTO_KEYS_DIR,
75   ])
76 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77 _X509_KEY_FILE = "key"
78 _X509_CERT_FILE = "cert"
79 _IES_STATUS_FILE = "status"
80 _IES_PID_FILE = "pid"
81 _IES_CA_FILE = "ca"
82
83 #: Valid LVS output line regex
84 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
85
86 # Actions for the master setup script
87 _MASTER_START = "start"
88 _MASTER_STOP = "stop"
89
90
91 class RPCFail(Exception):
92   """Class denoting RPC failure.
93
94   Its argument is the error message.
95
96   """
97
98
99 def _Fail(msg, *args, **kwargs):
100   """Log an error and the raise an RPCFail exception.
101
102   This exception is then handled specially in the ganeti daemon and
103   turned into a 'failed' return type. As such, this function is a
104   useful shortcut for logging the error and returning it to the master
105   daemon.
106
107   @type msg: string
108   @param msg: the text of the exception
109   @raise RPCFail
110
111   """
112   if args:
113     msg = msg % args
114   if "log" not in kwargs or kwargs["log"]: # if we should log this error
115     if "exc" in kwargs and kwargs["exc"]:
116       logging.exception(msg)
117     else:
118       logging.error(msg)
119   raise RPCFail(msg)
120
121
122 def _GetConfig():
123   """Simple wrapper to return a SimpleStore.
124
125   @rtype: L{ssconf.SimpleStore}
126   @return: a SimpleStore instance
127
128   """
129   return ssconf.SimpleStore()
130
131
132 def _GetSshRunner(cluster_name):
133   """Simple wrapper to return an SshRunner.
134
135   @type cluster_name: str
136   @param cluster_name: the cluster name, which is needed
137       by the SshRunner constructor
138   @rtype: L{ssh.SshRunner}
139   @return: an SshRunner instance
140
141   """
142   return ssh.SshRunner(cluster_name)
143
144
145 def _Decompress(data):
146   """Unpacks data compressed by the RPC client.
147
148   @type data: list or tuple
149   @param data: Data sent by RPC client
150   @rtype: str
151   @return: Decompressed data
152
153   """
154   assert isinstance(data, (list, tuple))
155   assert len(data) == 2
156   (encoding, content) = data
157   if encoding == constants.RPC_ENCODING_NONE:
158     return content
159   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
160     return zlib.decompress(base64.b64decode(content))
161   else:
162     raise AssertionError("Unknown data encoding")
163
164
165 def _CleanDirectory(path, exclude=None):
166   """Removes all regular files in a directory.
167
168   @type path: str
169   @param path: the directory to clean
170   @type exclude: list
171   @param exclude: list of files to be excluded, defaults
172       to the empty list
173
174   """
175   if path not in _ALLOWED_CLEAN_DIRS:
176     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
177           path)
178
179   if not os.path.isdir(path):
180     return
181   if exclude is None:
182     exclude = []
183   else:
184     # Normalize excluded paths
185     exclude = [os.path.normpath(i) for i in exclude]
186
187   for rel_name in utils.ListVisibleFiles(path):
188     full_name = utils.PathJoin(path, rel_name)
189     if full_name in exclude:
190       continue
191     if os.path.isfile(full_name) and not os.path.islink(full_name):
192       utils.RemoveFile(full_name)
193
194
195 def _BuildUploadFileList():
196   """Build the list of allowed upload files.
197
198   This is abstracted so that it's built only once at module import time.
199
200   """
201   allowed_files = set([
202     pathutils.CLUSTER_CONF_FILE,
203     pathutils.ETC_HOSTS,
204     pathutils.SSH_KNOWN_HOSTS_FILE,
205     pathutils.VNC_PASSWORD_FILE,
206     pathutils.RAPI_CERT_FILE,
207     pathutils.SPICE_CERT_FILE,
208     pathutils.SPICE_CACERT_FILE,
209     pathutils.RAPI_USERS_FILE,
210     pathutils.CONFD_HMAC_KEY,
211     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
212     ])
213
214   for hv_name in constants.HYPER_TYPES:
215     hv_class = hypervisor.GetHypervisorClass(hv_name)
216     allowed_files.update(hv_class.GetAncillaryFiles()[0])
217
218   assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
219     "Allowed file storage paths should never be uploaded via RPC"
220
221   return frozenset(allowed_files)
222
223
224 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
225
226
227 def JobQueuePurge():
228   """Removes job queue files and archived jobs.
229
230   @rtype: tuple
231   @return: True, None
232
233   """
234   _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
235   _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
236
237
238 def GetMasterInfo():
239   """Returns master information.
240
241   This is an utility function to compute master information, either
242   for consumption here or from the node daemon.
243
244   @rtype: tuple
245   @return: master_netdev, master_ip, master_name, primary_ip_family,
246     master_netmask
247   @raise RPCFail: in case of errors
248
249   """
250   try:
251     cfg = _GetConfig()
252     master_netdev = cfg.GetMasterNetdev()
253     master_ip = cfg.GetMasterIP()
254     master_netmask = cfg.GetMasterNetmask()
255     master_node = cfg.GetMasterNode()
256     primary_ip_family = cfg.GetPrimaryIPFamily()
257   except errors.ConfigurationError, err:
258     _Fail("Cluster configuration incomplete: %s", err, exc=True)
259   return (master_netdev, master_ip, master_node, primary_ip_family,
260           master_netmask)
261
262
263 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
264   """Decorator that runs hooks before and after the decorated function.
265
266   @type hook_opcode: string
267   @param hook_opcode: opcode of the hook
268   @type hooks_path: string
269   @param hooks_path: path of the hooks
270   @type env_builder_fn: function
271   @param env_builder_fn: function that returns a dictionary containing the
272     environment variables for the hooks. Will get all the parameters of the
273     decorated function.
274   @raise RPCFail: in case of pre-hook failure
275
276   """
277   def decorator(fn):
278     def wrapper(*args, **kwargs):
279       _, myself = ssconf.GetMasterAndMyself()
280       nodes = ([myself], [myself])  # these hooks run locally
281
282       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
283
284       cfg = _GetConfig()
285       hr = HooksRunner()
286       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
287                             None, env_fn, logging.warning, cfg.GetClusterName(),
288                             cfg.GetMasterNode())
289
290       hm.RunPhase(constants.HOOKS_PHASE_PRE)
291       result = fn(*args, **kwargs)
292       hm.RunPhase(constants.HOOKS_PHASE_POST)
293
294       return result
295     return wrapper
296   return decorator
297
298
299 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
300   """Builds environment variables for master IP hooks.
301
302   @type master_params: L{objects.MasterNetworkParameters}
303   @param master_params: network parameters of the master
304   @type use_external_mip_script: boolean
305   @param use_external_mip_script: whether to use an external master IP
306     address setup script (unused, but necessary per the implementation of the
307     _RunLocalHooks decorator)
308
309   """
310   # pylint: disable=W0613
311   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
312   env = {
313     "MASTER_NETDEV": master_params.netdev,
314     "MASTER_IP": master_params.ip,
315     "MASTER_NETMASK": str(master_params.netmask),
316     "CLUSTER_IP_VERSION": str(ver),
317   }
318
319   return env
320
321
322 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
323   """Execute the master IP address setup script.
324
325   @type master_params: L{objects.MasterNetworkParameters}
326   @param master_params: network parameters of the master
327   @type action: string
328   @param action: action to pass to the script. Must be one of
329     L{backend._MASTER_START} or L{backend._MASTER_STOP}
330   @type use_external_mip_script: boolean
331   @param use_external_mip_script: whether to use an external master IP
332     address setup script
333   @raise backend.RPCFail: if there are errors during the execution of the
334     script
335
336   """
337   env = _BuildMasterIpEnv(master_params)
338
339   if use_external_mip_script:
340     setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
341   else:
342     setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
343
344   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
345
346   if result.failed:
347     _Fail("Failed to %s the master IP. Script return value: %s" %
348           (action, result.exit_code), log=True)
349
350
351 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
352                _BuildMasterIpEnv)
353 def ActivateMasterIp(master_params, use_external_mip_script):
354   """Activate the IP address of the master daemon.
355
356   @type master_params: L{objects.MasterNetworkParameters}
357   @param master_params: network parameters of the master
358   @type use_external_mip_script: boolean
359   @param use_external_mip_script: whether to use an external master IP
360     address setup script
361   @raise RPCFail: in case of errors during the IP startup
362
363   """
364   _RunMasterSetupScript(master_params, _MASTER_START,
365                         use_external_mip_script)
366
367
368 def StartMasterDaemons(no_voting):
369   """Activate local node as master node.
370
371   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
372
373   @type no_voting: boolean
374   @param no_voting: whether to start ganeti-masterd without a node vote
375       but still non-interactively
376   @rtype: None
377
378   """
379
380   if no_voting:
381     masterd_args = "--no-voting --yes-do-it"
382   else:
383     masterd_args = ""
384
385   env = {
386     "EXTRA_MASTERD_ARGS": masterd_args,
387     }
388
389   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
390   if result.failed:
391     msg = "Can't start Ganeti master: %s" % result.output
392     logging.error(msg)
393     _Fail(msg)
394
395
396 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
397                _BuildMasterIpEnv)
398 def DeactivateMasterIp(master_params, use_external_mip_script):
399   """Deactivate the master IP on this node.
400
401   @type master_params: L{objects.MasterNetworkParameters}
402   @param master_params: network parameters of the master
403   @type use_external_mip_script: boolean
404   @param use_external_mip_script: whether to use an external master IP
405     address setup script
406   @raise RPCFail: in case of errors during the IP turndown
407
408   """
409   _RunMasterSetupScript(master_params, _MASTER_STOP,
410                         use_external_mip_script)
411
412
413 def StopMasterDaemons():
414   """Stop the master daemons on this node.
415
416   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
417
418   @rtype: None
419
420   """
421   # TODO: log and report back to the caller the error failures; we
422   # need to decide in which case we fail the RPC for this
423
424   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
425   if result.failed:
426     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
427                   " and error %s",
428                   result.cmd, result.exit_code, result.output)
429
430
431 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
432   """Change the netmask of the master IP.
433
434   @param old_netmask: the old value of the netmask
435   @param netmask: the new value of the netmask
436   @param master_ip: the master IP
437   @param master_netdev: the master network device
438
439   """
440   if old_netmask == netmask:
441     return
442
443   if not netutils.IPAddress.Own(master_ip):
444     _Fail("The master IP address is not up, not attempting to change its"
445           " netmask")
446
447   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
448                          "%s/%s" % (master_ip, netmask),
449                          "dev", master_netdev, "label",
450                          "%s:0" % master_netdev])
451   if result.failed:
452     _Fail("Could not set the new netmask on the master IP address")
453
454   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
455                          "%s/%s" % (master_ip, old_netmask),
456                          "dev", master_netdev, "label",
457                          "%s:0" % master_netdev])
458   if result.failed:
459     _Fail("Could not bring down the master IP address with the old netmask")
460
461
462 def EtcHostsModify(mode, host, ip):
463   """Modify a host entry in /etc/hosts.
464
465   @param mode: The mode to operate. Either add or remove entry
466   @param host: The host to operate on
467   @param ip: The ip associated with the entry
468
469   """
470   if mode == constants.ETC_HOSTS_ADD:
471     if not ip:
472       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
473               " present")
474     utils.AddHostToEtcHosts(host, ip)
475   elif mode == constants.ETC_HOSTS_REMOVE:
476     if ip:
477       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
478               " parameter is present")
479     utils.RemoveHostFromEtcHosts(host)
480   else:
481     RPCFail("Mode not supported")
482
483
484 def LeaveCluster(modify_ssh_setup):
485   """Cleans up and remove the current node.
486
487   This function cleans up and prepares the current node to be removed
488   from the cluster.
489
490   If processing is successful, then it raises an
491   L{errors.QuitGanetiException} which is used as a special case to
492   shutdown the node daemon.
493
494   @param modify_ssh_setup: boolean
495
496   """
497   _CleanDirectory(pathutils.DATA_DIR)
498   _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
499   JobQueuePurge()
500
501   if modify_ssh_setup:
502     try:
503       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
504
505       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
506
507       utils.RemoveFile(priv_key)
508       utils.RemoveFile(pub_key)
509     except errors.OpExecError:
510       logging.exception("Error while processing ssh files")
511
512   try:
513     utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
514     utils.RemoveFile(pathutils.RAPI_CERT_FILE)
515     utils.RemoveFile(pathutils.SPICE_CERT_FILE)
516     utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
517     utils.RemoveFile(pathutils.NODED_CERT_FILE)
518   except: # pylint: disable=W0702
519     logging.exception("Error while removing cluster secrets")
520
521   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
522   if result.failed:
523     logging.error("Command %s failed with exitcode %s and error %s",
524                   result.cmd, result.exit_code, result.output)
525
526   # Raise a custom exception (handled in ganeti-noded)
527   raise errors.QuitGanetiException(True, "Shutdown scheduled")
528
529
530 def _GetVgInfo(name):
531   """Retrieves information about a LVM volume group.
532
533   """
534   # TODO: GetVGInfo supports returning information for multiple VGs at once
535   vginfo = bdev.LogicalVolume.GetVGInfo([name])
536   if vginfo:
537     vg_free = int(round(vginfo[0][0], 0))
538     vg_size = int(round(vginfo[0][1], 0))
539   else:
540     vg_free = None
541     vg_size = None
542
543   return {
544     "name": name,
545     "vg_free": vg_free,
546     "vg_size": vg_size,
547     }
548
549
550 def _GetHvInfo(name):
551   """Retrieves node information from a hypervisor.
552
553   The information returned depends on the hypervisor. Common items:
554
555     - vg_size is the size of the configured volume group in MiB
556     - vg_free is the free size of the volume group in MiB
557     - memory_dom0 is the memory allocated for domain0 in MiB
558     - memory_free is the currently available (free) ram in MiB
559     - memory_total is the total number of ram in MiB
560     - hv_version: the hypervisor version, if available
561
562   """
563   return hypervisor.GetHypervisor(name).GetNodeInfo()
564
565
566 def _GetNamedNodeInfo(names, fn):
567   """Calls C{fn} for all names in C{names} and returns a dictionary.
568
569   @rtype: None or dict
570
571   """
572   if names is None:
573     return None
574   else:
575     return map(fn, names)
576
577
578 def GetNodeInfo(vg_names, hv_names):
579   """Gives back a hash with different information about the node.
580
581   @type vg_names: list of string
582   @param vg_names: Names of the volume groups to ask for disk space information
583   @type hv_names: list of string
584   @param hv_names: Names of the hypervisors to ask for node information
585   @rtype: tuple; (string, None/dict, None/dict)
586   @return: Tuple containing boot ID, volume group information and hypervisor
587     information
588
589   """
590   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
591   vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
592   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
593
594   return (bootid, vg_info, hv_info)
595
596
597 def VerifyNode(what, cluster_name):
598   """Verify the status of the local node.
599
600   Based on the input L{what} parameter, various checks are done on the
601   local node.
602
603   If the I{filelist} key is present, this list of
604   files is checksummed and the file/checksum pairs are returned.
605
606   If the I{nodelist} key is present, we check that we have
607   connectivity via ssh with the target nodes (and check the hostname
608   report).
609
610   If the I{node-net-test} key is present, we check that we have
611   connectivity to the given nodes via both primary IP and, if
612   applicable, secondary IPs.
613
614   @type what: C{dict}
615   @param what: a dictionary of things to check:
616       - filelist: list of files for which to compute checksums
617       - nodelist: list of nodes we should check ssh communication with
618       - node-net-test: list of nodes we should check node daemon port
619         connectivity with
620       - hypervisor: list with hypervisors to run the verify for
621   @rtype: dict
622   @return: a dictionary with the same keys as the input dict, and
623       values representing the result of the checks
624
625   """
626   result = {}
627   my_name = netutils.Hostname.GetSysName()
628   port = netutils.GetDaemonPort(constants.NODED)
629   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
630
631   if constants.NV_HYPERVISOR in what and vm_capable:
632     result[constants.NV_HYPERVISOR] = tmp = {}
633     for hv_name in what[constants.NV_HYPERVISOR]:
634       try:
635         val = hypervisor.GetHypervisor(hv_name).Verify()
636       except errors.HypervisorError, err:
637         val = "Error while checking hypervisor: %s" % str(err)
638       tmp[hv_name] = val
639
640   if constants.NV_HVPARAMS in what and vm_capable:
641     result[constants.NV_HVPARAMS] = tmp = []
642     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
643       try:
644         logging.info("Validating hv %s, %s", hv_name, hvparms)
645         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
646       except errors.HypervisorError, err:
647         tmp.append((source, hv_name, str(err)))
648
649   if constants.NV_FILELIST in what:
650     result[constants.NV_FILELIST] = utils.FingerprintFiles(
651       what[constants.NV_FILELIST])
652
653   if constants.NV_NODELIST in what:
654     (nodes, bynode) = what[constants.NV_NODELIST]
655
656     # Add nodes from other groups (different for each node)
657     try:
658       nodes.extend(bynode[my_name])
659     except KeyError:
660       pass
661
662     # Use a random order
663     random.shuffle(nodes)
664
665     # Try to contact all nodes
666     val = {}
667     for node in nodes:
668       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
669       if not success:
670         val[node] = message
671
672     result[constants.NV_NODELIST] = val
673
674   if constants.NV_NODENETTEST in what:
675     result[constants.NV_NODENETTEST] = tmp = {}
676     my_pip = my_sip = None
677     for name, pip, sip in what[constants.NV_NODENETTEST]:
678       if name == my_name:
679         my_pip = pip
680         my_sip = sip
681         break
682     if not my_pip:
683       tmp[my_name] = ("Can't find my own primary/secondary IP"
684                       " in the node list")
685     else:
686       for name, pip, sip in what[constants.NV_NODENETTEST]:
687         fail = []
688         if not netutils.TcpPing(pip, port, source=my_pip):
689           fail.append("primary")
690         if sip != pip:
691           if not netutils.TcpPing(sip, port, source=my_sip):
692             fail.append("secondary")
693         if fail:
694           tmp[name] = ("failure using the %s interface(s)" %
695                        " and ".join(fail))
696
697   if constants.NV_MASTERIP in what:
698     # FIXME: add checks on incoming data structures (here and in the
699     # rest of the function)
700     master_name, master_ip = what[constants.NV_MASTERIP]
701     if master_name == my_name:
702       source = constants.IP4_ADDRESS_LOCALHOST
703     else:
704       source = None
705     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
706                                                      source=source)
707
708   if constants.NV_USERSCRIPTS in what:
709     result[constants.NV_USERSCRIPTS] = \
710       [script for script in what[constants.NV_USERSCRIPTS]
711        if not (os.path.exists(script) and os.access(script, os.X_OK))]
712
713   if constants.NV_OOB_PATHS in what:
714     result[constants.NV_OOB_PATHS] = tmp = []
715     for path in what[constants.NV_OOB_PATHS]:
716       try:
717         st = os.stat(path)
718       except OSError, err:
719         tmp.append("error stating out of band helper: %s" % err)
720       else:
721         if stat.S_ISREG(st.st_mode):
722           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
723             tmp.append(None)
724           else:
725             tmp.append("out of band helper %s is not executable" % path)
726         else:
727           tmp.append("out of band helper %s is not a file" % path)
728
729   if constants.NV_LVLIST in what and vm_capable:
730     try:
731       val = GetVolumeList(utils.ListVolumeGroups().keys())
732     except RPCFail, err:
733       val = str(err)
734     result[constants.NV_LVLIST] = val
735
736   if constants.NV_INSTANCELIST in what and vm_capable:
737     # GetInstanceList can fail
738     try:
739       val = GetInstanceList(what[constants.NV_INSTANCELIST])
740     except RPCFail, err:
741       val = str(err)
742     result[constants.NV_INSTANCELIST] = val
743
744   if constants.NV_VGLIST in what and vm_capable:
745     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
746
747   if constants.NV_PVLIST in what and vm_capable:
748     result[constants.NV_PVLIST] = \
749       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
750                                    filter_allocatable=False)
751
752   if constants.NV_VERSION in what:
753     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
754                                     constants.RELEASE_VERSION)
755
756   if constants.NV_HVINFO in what and vm_capable:
757     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
758     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
759
760   if constants.NV_DRBDLIST in what and vm_capable:
761     try:
762       used_minors = bdev.DRBD8.GetUsedDevs().keys()
763     except errors.BlockDeviceError, err:
764       logging.warning("Can't get used minors list", exc_info=True)
765       used_minors = str(err)
766     result[constants.NV_DRBDLIST] = used_minors
767
768   if constants.NV_DRBDHELPER in what and vm_capable:
769     status = True
770     try:
771       payload = bdev.BaseDRBD.GetUsermodeHelper()
772     except errors.BlockDeviceError, err:
773       logging.error("Can't get DRBD usermode helper: %s", str(err))
774       status = False
775       payload = str(err)
776     result[constants.NV_DRBDHELPER] = (status, payload)
777
778   if constants.NV_NODESETUP in what:
779     result[constants.NV_NODESETUP] = tmpr = []
780     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
781       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
782                   " under /sys, missing required directories /sys/block"
783                   " and /sys/class/net")
784     if (not os.path.isdir("/proc/sys") or
785         not os.path.isfile("/proc/sysrq-trigger")):
786       tmpr.append("The procfs filesystem doesn't seem to be mounted"
787                   " under /proc, missing required directory /proc/sys and"
788                   " the file /proc/sysrq-trigger")
789
790   if constants.NV_TIME in what:
791     result[constants.NV_TIME] = utils.SplitTime(time.time())
792
793   if constants.NV_OSLIST in what and vm_capable:
794     result[constants.NV_OSLIST] = DiagnoseOS()
795
796   if constants.NV_BRIDGES in what and vm_capable:
797     result[constants.NV_BRIDGES] = [bridge
798                                     for bridge in what[constants.NV_BRIDGES]
799                                     if not utils.BridgeExists(bridge)]
800   return result
801
802
803 def GetBlockDevSizes(devices):
804   """Return the size of the given block devices
805
806   @type devices: list
807   @param devices: list of block device nodes to query
808   @rtype: dict
809   @return:
810     dictionary of all block devices under /dev (key). The value is their
811     size in MiB.
812
813     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
814
815   """
816   DEV_PREFIX = "/dev/"
817   blockdevs = {}
818
819   for devpath in devices:
820     if not utils.IsBelowDir(DEV_PREFIX, devpath):
821       continue
822
823     try:
824       st = os.stat(devpath)
825     except EnvironmentError, err:
826       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
827       continue
828
829     if stat.S_ISBLK(st.st_mode):
830       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
831       if result.failed:
832         # We don't want to fail, just do not list this device as available
833         logging.warning("Cannot get size for block device %s", devpath)
834         continue
835
836       size = int(result.stdout) / (1024 * 1024)
837       blockdevs[devpath] = size
838   return blockdevs
839
840
841 def GetVolumeList(vg_names):
842   """Compute list of logical volumes and their size.
843
844   @type vg_names: list
845   @param vg_names: the volume groups whose LVs we should list, or
846       empty for all volume groups
847   @rtype: dict
848   @return:
849       dictionary of all partions (key) with value being a tuple of
850       their size (in MiB), inactive and online status::
851
852         {'xenvg/test1': ('20.06', True, True)}
853
854       in case of errors, a string is returned with the error
855       details.
856
857   """
858   lvs = {}
859   sep = "|"
860   if not vg_names:
861     vg_names = []
862   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
863                          "--separator=%s" % sep,
864                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
865   if result.failed:
866     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
867
868   for line in result.stdout.splitlines():
869     line = line.strip()
870     match = _LVSLINE_REGEX.match(line)
871     if not match:
872       logging.error("Invalid line returned from lvs output: '%s'", line)
873       continue
874     vg_name, name, size, attr = match.groups()
875     inactive = attr[4] == "-"
876     online = attr[5] == "o"
877     virtual = attr[0] == "v"
878     if virtual:
879       # we don't want to report such volumes as existing, since they
880       # don't really hold data
881       continue
882     lvs[vg_name + "/" + name] = (size, inactive, online)
883
884   return lvs
885
886
887 def ListVolumeGroups():
888   """List the volume groups and their size.
889
890   @rtype: dict
891   @return: dictionary with keys volume name and values the
892       size of the volume
893
894   """
895   return utils.ListVolumeGroups()
896
897
898 def NodeVolumes():
899   """List all volumes on this node.
900
901   @rtype: list
902   @return:
903     A list of dictionaries, each having four keys:
904       - name: the logical volume name,
905       - size: the size of the logical volume
906       - dev: the physical device on which the LV lives
907       - vg: the volume group to which it belongs
908
909     In case of errors, we return an empty list and log the
910     error.
911
912     Note that since a logical volume can live on multiple physical
913     volumes, the resulting list might include a logical volume
914     multiple times.
915
916   """
917   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
918                          "--separator=|",
919                          "--options=lv_name,lv_size,devices,vg_name"])
920   if result.failed:
921     _Fail("Failed to list logical volumes, lvs output: %s",
922           result.output)
923
924   def parse_dev(dev):
925     return dev.split("(")[0]
926
927   def handle_dev(dev):
928     return [parse_dev(x) for x in dev.split(",")]
929
930   def map_line(line):
931     line = [v.strip() for v in line]
932     return [{"name": line[0], "size": line[1],
933              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
934
935   all_devs = []
936   for line in result.stdout.splitlines():
937     if line.count("|") >= 3:
938       all_devs.extend(map_line(line.split("|")))
939     else:
940       logging.warning("Strange line in the output from lvs: '%s'", line)
941   return all_devs
942
943
944 def BridgesExist(bridges_list):
945   """Check if a list of bridges exist on the current node.
946
947   @rtype: boolean
948   @return: C{True} if all of them exist, C{False} otherwise
949
950   """
951   missing = []
952   for bridge in bridges_list:
953     if not utils.BridgeExists(bridge):
954       missing.append(bridge)
955
956   if missing:
957     _Fail("Missing bridges %s", utils.CommaJoin(missing))
958
959
960 def GetInstanceList(hypervisor_list):
961   """Provides a list of instances.
962
963   @type hypervisor_list: list
964   @param hypervisor_list: the list of hypervisors to query information
965
966   @rtype: list
967   @return: a list of all running instances on the current node
968     - instance1.example.com
969     - instance2.example.com
970
971   """
972   results = []
973   for hname in hypervisor_list:
974     try:
975       names = hypervisor.GetHypervisor(hname).ListInstances()
976       results.extend(names)
977     except errors.HypervisorError, err:
978       _Fail("Error enumerating instances (hypervisor %s): %s",
979             hname, err, exc=True)
980
981   return results
982
983
984 def GetInstanceInfo(instance, hname):
985   """Gives back the information about an instance as a dictionary.
986
987   @type instance: string
988   @param instance: the instance name
989   @type hname: string
990   @param hname: the hypervisor type of the instance
991
992   @rtype: dict
993   @return: dictionary with the following keys:
994       - memory: memory size of instance (int)
995       - state: xen state of instance (string)
996       - time: cpu time of instance (float)
997       - vcpus: the number of vcpus (int)
998
999   """
1000   output = {}
1001
1002   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1003   if iinfo is not None:
1004     output["memory"] = iinfo[2]
1005     output["vcpus"] = iinfo[3]
1006     output["state"] = iinfo[4]
1007     output["time"] = iinfo[5]
1008
1009   return output
1010
1011
1012 def GetInstanceMigratable(instance):
1013   """Gives whether an instance can be migrated.
1014
1015   @type instance: L{objects.Instance}
1016   @param instance: object representing the instance to be checked.
1017
1018   @rtype: tuple
1019   @return: tuple of (result, description) where:
1020       - result: whether the instance can be migrated or not
1021       - description: a description of the issue, if relevant
1022
1023   """
1024   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1025   iname = instance.name
1026   if iname not in hyper.ListInstances():
1027     _Fail("Instance %s is not running", iname)
1028
1029   for idx in range(len(instance.disks)):
1030     link_name = _GetBlockDevSymlinkPath(iname, idx)
1031     if not os.path.islink(link_name):
1032       logging.warning("Instance %s is missing symlink %s for disk %d",
1033                       iname, link_name, idx)
1034
1035
1036 def GetAllInstancesInfo(hypervisor_list):
1037   """Gather data about all instances.
1038
1039   This is the equivalent of L{GetInstanceInfo}, except that it
1040   computes data for all instances at once, thus being faster if one
1041   needs data about more than one instance.
1042
1043   @type hypervisor_list: list
1044   @param hypervisor_list: list of hypervisors to query for instance data
1045
1046   @rtype: dict
1047   @return: dictionary of instance: data, with data having the following keys:
1048       - memory: memory size of instance (int)
1049       - state: xen state of instance (string)
1050       - time: cpu time of instance (float)
1051       - vcpus: the number of vcpus
1052
1053   """
1054   output = {}
1055
1056   for hname in hypervisor_list:
1057     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1058     if iinfo:
1059       for name, _, memory, vcpus, state, times in iinfo:
1060         value = {
1061           "memory": memory,
1062           "vcpus": vcpus,
1063           "state": state,
1064           "time": times,
1065           }
1066         if name in output:
1067           # we only check static parameters, like memory and vcpus,
1068           # and not state and time which can change between the
1069           # invocations of the different hypervisors
1070           for key in "memory", "vcpus":
1071             if value[key] != output[name][key]:
1072               _Fail("Instance %s is running twice"
1073                     " with different parameters", name)
1074         output[name] = value
1075
1076   return output
1077
1078
1079 def _InstanceLogName(kind, os_name, instance, component):
1080   """Compute the OS log filename for a given instance and operation.
1081
1082   The instance name and os name are passed in as strings since not all
1083   operations have these as part of an instance object.
1084
1085   @type kind: string
1086   @param kind: the operation type (e.g. add, import, etc.)
1087   @type os_name: string
1088   @param os_name: the os name
1089   @type instance: string
1090   @param instance: the name of the instance being imported/added/etc.
1091   @type component: string or None
1092   @param component: the name of the component of the instance being
1093       transferred
1094
1095   """
1096   # TODO: Use tempfile.mkstemp to create unique filename
1097   if component:
1098     assert "/" not in component
1099     c_msg = "-%s" % component
1100   else:
1101     c_msg = ""
1102   base = ("%s-%s-%s%s-%s.log" %
1103           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1104   return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1105
1106
1107 def InstanceOsAdd(instance, reinstall, debug):
1108   """Add an OS to an instance.
1109
1110   @type instance: L{objects.Instance}
1111   @param instance: Instance whose OS is to be installed
1112   @type reinstall: boolean
1113   @param reinstall: whether this is an instance reinstall
1114   @type debug: integer
1115   @param debug: debug level, passed to the OS scripts
1116   @rtype: None
1117
1118   """
1119   inst_os = OSFromDisk(instance.os)
1120
1121   create_env = OSEnvironment(instance, inst_os, debug)
1122   if reinstall:
1123     create_env["INSTANCE_REINSTALL"] = "1"
1124
1125   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1126
1127   result = utils.RunCmd([inst_os.create_script], env=create_env,
1128                         cwd=inst_os.path, output=logfile, reset_env=True)
1129   if result.failed:
1130     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1131                   " output: %s", result.cmd, result.fail_reason, logfile,
1132                   result.output)
1133     lines = [utils.SafeEncode(val)
1134              for val in utils.TailFile(logfile, lines=20)]
1135     _Fail("OS create script failed (%s), last lines in the"
1136           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1137
1138
1139 def RunRenameInstance(instance, old_name, debug):
1140   """Run the OS rename script for an instance.
1141
1142   @type instance: L{objects.Instance}
1143   @param instance: Instance whose OS is to be installed
1144   @type old_name: string
1145   @param old_name: previous instance name
1146   @type debug: integer
1147   @param debug: debug level, passed to the OS scripts
1148   @rtype: boolean
1149   @return: the success of the operation
1150
1151   """
1152   inst_os = OSFromDisk(instance.os)
1153
1154   rename_env = OSEnvironment(instance, inst_os, debug)
1155   rename_env["OLD_INSTANCE_NAME"] = old_name
1156
1157   logfile = _InstanceLogName("rename", instance.os,
1158                              "%s-%s" % (old_name, instance.name), None)
1159
1160   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1161                         cwd=inst_os.path, output=logfile, reset_env=True)
1162
1163   if result.failed:
1164     logging.error("os create command '%s' returned error: %s output: %s",
1165                   result.cmd, result.fail_reason, result.output)
1166     lines = [utils.SafeEncode(val)
1167              for val in utils.TailFile(logfile, lines=20)]
1168     _Fail("OS rename script failed (%s), last lines in the"
1169           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1170
1171
1172 def _GetBlockDevSymlinkPath(instance_name, idx):
1173   return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1174                         (instance_name, constants.DISK_SEPARATOR, idx))
1175
1176
1177 def _SymlinkBlockDev(instance_name, device_path, idx):
1178   """Set up symlinks to a instance's block device.
1179
1180   This is an auxiliary function run when an instance is start (on the primary
1181   node) or when an instance is migrated (on the target node).
1182
1183
1184   @param instance_name: the name of the target instance
1185   @param device_path: path of the physical block device, on the node
1186   @param idx: the disk index
1187   @return: absolute path to the disk's symlink
1188
1189   """
1190   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1191   try:
1192     os.symlink(device_path, link_name)
1193   except OSError, err:
1194     if err.errno == errno.EEXIST:
1195       if (not os.path.islink(link_name) or
1196           os.readlink(link_name) != device_path):
1197         os.remove(link_name)
1198         os.symlink(device_path, link_name)
1199     else:
1200       raise
1201
1202   return link_name
1203
1204
1205 def _RemoveBlockDevLinks(instance_name, disks):
1206   """Remove the block device symlinks belonging to the given instance.
1207
1208   """
1209   for idx, _ in enumerate(disks):
1210     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1211     if os.path.islink(link_name):
1212       try:
1213         os.remove(link_name)
1214       except OSError:
1215         logging.exception("Can't remove symlink '%s'", link_name)
1216
1217
1218 def _GatherAndLinkBlockDevs(instance):
1219   """Set up an instance's block device(s).
1220
1221   This is run on the primary node at instance startup. The block
1222   devices must be already assembled.
1223
1224   @type instance: L{objects.Instance}
1225   @param instance: the instance whose disks we shoul assemble
1226   @rtype: list
1227   @return: list of (disk_object, device_path)
1228
1229   """
1230   block_devices = []
1231   for idx, disk in enumerate(instance.disks):
1232     device = _RecursiveFindBD(disk)
1233     if device is None:
1234       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1235                                     str(disk))
1236     device.Open()
1237     try:
1238       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1239     except OSError, e:
1240       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1241                                     e.strerror)
1242
1243     block_devices.append((disk, link_name))
1244
1245   return block_devices
1246
1247
1248 def StartInstance(instance, startup_paused):
1249   """Start an instance.
1250
1251   @type instance: L{objects.Instance}
1252   @param instance: the instance object
1253   @type startup_paused: bool
1254   @param instance: pause instance at startup?
1255   @rtype: None
1256
1257   """
1258   running_instances = GetInstanceList([instance.hypervisor])
1259
1260   if instance.name in running_instances:
1261     logging.info("Instance %s already running, not starting", instance.name)
1262     return
1263
1264   try:
1265     block_devices = _GatherAndLinkBlockDevs(instance)
1266     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1267     hyper.StartInstance(instance, block_devices, startup_paused)
1268   except errors.BlockDeviceError, err:
1269     _Fail("Block device error: %s", err, exc=True)
1270   except errors.HypervisorError, err:
1271     _RemoveBlockDevLinks(instance.name, instance.disks)
1272     _Fail("Hypervisor error: %s", err, exc=True)
1273
1274
1275 def InstanceShutdown(instance, timeout):
1276   """Shut an instance down.
1277
1278   @note: this functions uses polling with a hardcoded timeout.
1279
1280   @type instance: L{objects.Instance}
1281   @param instance: the instance object
1282   @type timeout: integer
1283   @param timeout: maximum timeout for soft shutdown
1284   @rtype: None
1285
1286   """
1287   hv_name = instance.hypervisor
1288   hyper = hypervisor.GetHypervisor(hv_name)
1289   iname = instance.name
1290
1291   if instance.name not in hyper.ListInstances():
1292     logging.info("Instance %s not running, doing nothing", iname)
1293     return
1294
1295   class _TryShutdown:
1296     def __init__(self):
1297       self.tried_once = False
1298
1299     def __call__(self):
1300       if iname not in hyper.ListInstances():
1301         return
1302
1303       try:
1304         hyper.StopInstance(instance, retry=self.tried_once)
1305       except errors.HypervisorError, err:
1306         if iname not in hyper.ListInstances():
1307           # if the instance is no longer existing, consider this a
1308           # success and go to cleanup
1309           return
1310
1311         _Fail("Failed to stop instance %s: %s", iname, err)
1312
1313       self.tried_once = True
1314
1315       raise utils.RetryAgain()
1316
1317   try:
1318     utils.Retry(_TryShutdown(), 5, timeout)
1319   except utils.RetryTimeout:
1320     # the shutdown did not succeed
1321     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1322
1323     try:
1324       hyper.StopInstance(instance, force=True)
1325     except errors.HypervisorError, err:
1326       if iname in hyper.ListInstances():
1327         # only raise an error if the instance still exists, otherwise
1328         # the error could simply be "instance ... unknown"!
1329         _Fail("Failed to force stop instance %s: %s", iname, err)
1330
1331     time.sleep(1)
1332
1333     if iname in hyper.ListInstances():
1334       _Fail("Could not shutdown instance %s even by destroy", iname)
1335
1336   try:
1337     hyper.CleanupInstance(instance.name)
1338   except errors.HypervisorError, err:
1339     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1340
1341   _RemoveBlockDevLinks(iname, instance.disks)
1342
1343
1344 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1345   """Reboot an instance.
1346
1347   @type instance: L{objects.Instance}
1348   @param instance: the instance object to reboot
1349   @type reboot_type: str
1350   @param reboot_type: the type of reboot, one the following
1351     constants:
1352       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1353         instance OS, do not recreate the VM
1354       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1355         restart the VM (at the hypervisor level)
1356       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1357         not accepted here, since that mode is handled differently, in
1358         cmdlib, and translates into full stop and start of the
1359         instance (instead of a call_instance_reboot RPC)
1360   @type shutdown_timeout: integer
1361   @param shutdown_timeout: maximum timeout for soft shutdown
1362   @rtype: None
1363
1364   """
1365   running_instances = GetInstanceList([instance.hypervisor])
1366
1367   if instance.name not in running_instances:
1368     _Fail("Cannot reboot instance %s that is not running", instance.name)
1369
1370   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1371   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1372     try:
1373       hyper.RebootInstance(instance)
1374     except errors.HypervisorError, err:
1375       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1376   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1377     try:
1378       InstanceShutdown(instance, shutdown_timeout)
1379       return StartInstance(instance, False)
1380     except errors.HypervisorError, err:
1381       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1382   else:
1383     _Fail("Invalid reboot_type received: %s", reboot_type)
1384
1385
1386 def InstanceBalloonMemory(instance, memory):
1387   """Resize an instance's memory.
1388
1389   @type instance: L{objects.Instance}
1390   @param instance: the instance object
1391   @type memory: int
1392   @param memory: new memory amount in MB
1393   @rtype: None
1394
1395   """
1396   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1397   running = hyper.ListInstances()
1398   if instance.name not in running:
1399     logging.info("Instance %s is not running, cannot balloon", instance.name)
1400     return
1401   try:
1402     hyper.BalloonInstanceMemory(instance, memory)
1403   except errors.HypervisorError, err:
1404     _Fail("Failed to balloon instance memory: %s", err, exc=True)
1405
1406
1407 def MigrationInfo(instance):
1408   """Gather information about an instance to be migrated.
1409
1410   @type instance: L{objects.Instance}
1411   @param instance: the instance definition
1412
1413   """
1414   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1415   try:
1416     info = hyper.MigrationInfo(instance)
1417   except errors.HypervisorError, err:
1418     _Fail("Failed to fetch migration information: %s", err, exc=True)
1419   return info
1420
1421
1422 def AcceptInstance(instance, info, target):
1423   """Prepare the node to accept an instance.
1424
1425   @type instance: L{objects.Instance}
1426   @param instance: the instance definition
1427   @type info: string/data (opaque)
1428   @param info: migration information, from the source node
1429   @type target: string
1430   @param target: target host (usually ip), on this node
1431
1432   """
1433   # TODO: why is this required only for DTS_EXT_MIRROR?
1434   if instance.disk_template in constants.DTS_EXT_MIRROR:
1435     # Create the symlinks, as the disks are not active
1436     # in any way
1437     try:
1438       _GatherAndLinkBlockDevs(instance)
1439     except errors.BlockDeviceError, err:
1440       _Fail("Block device error: %s", err, exc=True)
1441
1442   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1443   try:
1444     hyper.AcceptInstance(instance, info, target)
1445   except errors.HypervisorError, err:
1446     if instance.disk_template in constants.DTS_EXT_MIRROR:
1447       _RemoveBlockDevLinks(instance.name, instance.disks)
1448     _Fail("Failed to accept instance: %s", err, exc=True)
1449
1450
1451 def FinalizeMigrationDst(instance, info, success):
1452   """Finalize any preparation to accept an instance.
1453
1454   @type instance: L{objects.Instance}
1455   @param instance: the instance definition
1456   @type info: string/data (opaque)
1457   @param info: migration information, from the source node
1458   @type success: boolean
1459   @param success: whether the migration was a success or a failure
1460
1461   """
1462   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1463   try:
1464     hyper.FinalizeMigrationDst(instance, info, success)
1465   except errors.HypervisorError, err:
1466     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1467
1468
1469 def MigrateInstance(instance, target, live):
1470   """Migrates an instance to another node.
1471
1472   @type instance: L{objects.Instance}
1473   @param instance: the instance definition
1474   @type target: string
1475   @param target: the target node name
1476   @type live: boolean
1477   @param live: whether the migration should be done live or not (the
1478       interpretation of this parameter is left to the hypervisor)
1479   @raise RPCFail: if migration fails for some reason
1480
1481   """
1482   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1483
1484   try:
1485     hyper.MigrateInstance(instance, target, live)
1486   except errors.HypervisorError, err:
1487     _Fail("Failed to migrate instance: %s", err, exc=True)
1488
1489
1490 def FinalizeMigrationSource(instance, success, live):
1491   """Finalize the instance migration on the source node.
1492
1493   @type instance: L{objects.Instance}
1494   @param instance: the instance definition of the migrated instance
1495   @type success: bool
1496   @param success: whether the migration succeeded or not
1497   @type live: bool
1498   @param live: whether the user requested a live migration or not
1499   @raise RPCFail: If the execution fails for some reason
1500
1501   """
1502   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1503
1504   try:
1505     hyper.FinalizeMigrationSource(instance, success, live)
1506   except Exception, err:  # pylint: disable=W0703
1507     _Fail("Failed to finalize the migration on the source node: %s", err,
1508           exc=True)
1509
1510
1511 def GetMigrationStatus(instance):
1512   """Get the migration status
1513
1514   @type instance: L{objects.Instance}
1515   @param instance: the instance that is being migrated
1516   @rtype: L{objects.MigrationStatus}
1517   @return: the status of the current migration (one of
1518            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1519            progress info that can be retrieved from the hypervisor
1520   @raise RPCFail: If the migration status cannot be retrieved
1521
1522   """
1523   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1524   try:
1525     return hyper.GetMigrationStatus(instance)
1526   except Exception, err:  # pylint: disable=W0703
1527     _Fail("Failed to get migration status: %s", err, exc=True)
1528
1529
1530 def BlockdevCreate(disk, size, owner, on_primary, info):
1531   """Creates a block device for an instance.
1532
1533   @type disk: L{objects.Disk}
1534   @param disk: the object describing the disk we should create
1535   @type size: int
1536   @param size: the size of the physical underlying device, in MiB
1537   @type owner: str
1538   @param owner: the name of the instance for which disk is created,
1539       used for device cache data
1540   @type on_primary: boolean
1541   @param on_primary:  indicates if it is the primary node or not
1542   @type info: string
1543   @param info: string that will be sent to the physical device
1544       creation, used for example to set (LVM) tags on LVs
1545
1546   @return: the new unique_id of the device (this can sometime be
1547       computed only after creation), or None. On secondary nodes,
1548       it's not required to return anything.
1549
1550   """
1551   # TODO: remove the obsolete "size" argument
1552   # pylint: disable=W0613
1553   clist = []
1554   if disk.children:
1555     for child in disk.children:
1556       try:
1557         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1558       except errors.BlockDeviceError, err:
1559         _Fail("Can't assemble device %s: %s", child, err)
1560       if on_primary or disk.AssembleOnSecondary():
1561         # we need the children open in case the device itself has to
1562         # be assembled
1563         try:
1564           # pylint: disable=E1103
1565           crdev.Open()
1566         except errors.BlockDeviceError, err:
1567           _Fail("Can't make child '%s' read-write: %s", child, err)
1568       clist.append(crdev)
1569
1570   try:
1571     device = bdev.Create(disk, clist)
1572   except errors.BlockDeviceError, err:
1573     _Fail("Can't create block device: %s", err)
1574
1575   if on_primary or disk.AssembleOnSecondary():
1576     try:
1577       device.Assemble()
1578     except errors.BlockDeviceError, err:
1579       _Fail("Can't assemble device after creation, unusual event: %s", err)
1580     if on_primary or disk.OpenOnSecondary():
1581       try:
1582         device.Open(force=True)
1583       except errors.BlockDeviceError, err:
1584         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1585     DevCacheManager.UpdateCache(device.dev_path, owner,
1586                                 on_primary, disk.iv_name)
1587
1588   device.SetInfo(info)
1589
1590   return device.unique_id
1591
1592
1593 def _WipeDevice(path, offset, size):
1594   """This function actually wipes the device.
1595
1596   @param path: The path to the device to wipe
1597   @param offset: The offset in MiB in the file
1598   @param size: The size in MiB to write
1599
1600   """
1601   # Internal sizes are always in Mebibytes; if the following "dd" command
1602   # should use a different block size the offset and size given to this
1603   # function must be adjusted accordingly before being passed to "dd".
1604   block_size = 1024 * 1024
1605
1606   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1607          "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1608          "count=%d" % size]
1609   result = utils.RunCmd(cmd)
1610
1611   if result.failed:
1612     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1613           result.fail_reason, result.output)
1614
1615
1616 def BlockdevWipe(disk, offset, size):
1617   """Wipes a block device.
1618
1619   @type disk: L{objects.Disk}
1620   @param disk: the disk object we want to wipe
1621   @type offset: int
1622   @param offset: The offset in MiB in the file
1623   @type size: int
1624   @param size: The size in MiB to write
1625
1626   """
1627   try:
1628     rdev = _RecursiveFindBD(disk)
1629   except errors.BlockDeviceError:
1630     rdev = None
1631
1632   if not rdev:
1633     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1634
1635   # Do cross verify some of the parameters
1636   if offset < 0:
1637     _Fail("Negative offset")
1638   if size < 0:
1639     _Fail("Negative size")
1640   if offset > rdev.size:
1641     _Fail("Offset is bigger than device size")
1642   if (offset + size) > rdev.size:
1643     _Fail("The provided offset and size to wipe is bigger than device size")
1644
1645   _WipeDevice(rdev.dev_path, offset, size)
1646
1647
1648 def BlockdevPauseResumeSync(disks, pause):
1649   """Pause or resume the sync of the block device.
1650
1651   @type disks: list of L{objects.Disk}
1652   @param disks: the disks object we want to pause/resume
1653   @type pause: bool
1654   @param pause: Wheater to pause or resume
1655
1656   """
1657   success = []
1658   for disk in disks:
1659     try:
1660       rdev = _RecursiveFindBD(disk)
1661     except errors.BlockDeviceError:
1662       rdev = None
1663
1664     if not rdev:
1665       success.append((False, ("Cannot change sync for device %s:"
1666                               " device not found" % disk.iv_name)))
1667       continue
1668
1669     result = rdev.PauseResumeSync(pause)
1670
1671     if result:
1672       success.append((result, None))
1673     else:
1674       if pause:
1675         msg = "Pause"
1676       else:
1677         msg = "Resume"
1678       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1679
1680   return success
1681
1682
1683 def BlockdevRemove(disk):
1684   """Remove a block device.
1685
1686   @note: This is intended to be called recursively.
1687
1688   @type disk: L{objects.Disk}
1689   @param disk: the disk object we should remove
1690   @rtype: boolean
1691   @return: the success of the operation
1692
1693   """
1694   msgs = []
1695   try:
1696     rdev = _RecursiveFindBD(disk)
1697   except errors.BlockDeviceError, err:
1698     # probably can't attach
1699     logging.info("Can't attach to device %s in remove", disk)
1700     rdev = None
1701   if rdev is not None:
1702     r_path = rdev.dev_path
1703     try:
1704       rdev.Remove()
1705     except errors.BlockDeviceError, err:
1706       msgs.append(str(err))
1707     if not msgs:
1708       DevCacheManager.RemoveCache(r_path)
1709
1710   if disk.children:
1711     for child in disk.children:
1712       try:
1713         BlockdevRemove(child)
1714       except RPCFail, err:
1715         msgs.append(str(err))
1716
1717   if msgs:
1718     _Fail("; ".join(msgs))
1719
1720
1721 def _RecursiveAssembleBD(disk, owner, as_primary):
1722   """Activate a block device for an instance.
1723
1724   This is run on the primary and secondary nodes for an instance.
1725
1726   @note: this function is called recursively.
1727
1728   @type disk: L{objects.Disk}
1729   @param disk: the disk we try to assemble
1730   @type owner: str
1731   @param owner: the name of the instance which owns the disk
1732   @type as_primary: boolean
1733   @param as_primary: if we should make the block device
1734       read/write
1735
1736   @return: the assembled device or None (in case no device
1737       was assembled)
1738   @raise errors.BlockDeviceError: in case there is an error
1739       during the activation of the children or the device
1740       itself
1741
1742   """
1743   children = []
1744   if disk.children:
1745     mcn = disk.ChildrenNeeded()
1746     if mcn == -1:
1747       mcn = 0 # max number of Nones allowed
1748     else:
1749       mcn = len(disk.children) - mcn # max number of Nones
1750     for chld_disk in disk.children:
1751       try:
1752         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1753       except errors.BlockDeviceError, err:
1754         if children.count(None) >= mcn:
1755           raise
1756         cdev = None
1757         logging.error("Error in child activation (but continuing): %s",
1758                       str(err))
1759       children.append(cdev)
1760
1761   if as_primary or disk.AssembleOnSecondary():
1762     r_dev = bdev.Assemble(disk, children)
1763     result = r_dev
1764     if as_primary or disk.OpenOnSecondary():
1765       r_dev.Open()
1766     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1767                                 as_primary, disk.iv_name)
1768
1769   else:
1770     result = True
1771   return result
1772
1773
1774 def BlockdevAssemble(disk, owner, as_primary, idx):
1775   """Activate a block device for an instance.
1776
1777   This is a wrapper over _RecursiveAssembleBD.
1778
1779   @rtype: str or boolean
1780   @return: a C{/dev/...} path for primary nodes, and
1781       C{True} for secondary nodes
1782
1783   """
1784   try:
1785     result = _RecursiveAssembleBD(disk, owner, as_primary)
1786     if isinstance(result, bdev.BlockDev):
1787       # pylint: disable=E1103
1788       result = result.dev_path
1789       if as_primary:
1790         _SymlinkBlockDev(owner, result, idx)
1791   except errors.BlockDeviceError, err:
1792     _Fail("Error while assembling disk: %s", err, exc=True)
1793   except OSError, err:
1794     _Fail("Error while symlinking disk: %s", err, exc=True)
1795
1796   return result
1797
1798
1799 def BlockdevShutdown(disk):
1800   """Shut down a block device.
1801
1802   First, if the device is assembled (Attach() is successful), then
1803   the device is shutdown. Then the children of the device are
1804   shutdown.
1805
1806   This function is called recursively. Note that we don't cache the
1807   children or such, as oppossed to assemble, shutdown of different
1808   devices doesn't require that the upper device was active.
1809
1810   @type disk: L{objects.Disk}
1811   @param disk: the description of the disk we should
1812       shutdown
1813   @rtype: None
1814
1815   """
1816   msgs = []
1817   r_dev = _RecursiveFindBD(disk)
1818   if r_dev is not None:
1819     r_path = r_dev.dev_path
1820     try:
1821       r_dev.Shutdown()
1822       DevCacheManager.RemoveCache(r_path)
1823     except errors.BlockDeviceError, err:
1824       msgs.append(str(err))
1825
1826   if disk.children:
1827     for child in disk.children:
1828       try:
1829         BlockdevShutdown(child)
1830       except RPCFail, err:
1831         msgs.append(str(err))
1832
1833   if msgs:
1834     _Fail("; ".join(msgs))
1835
1836
1837 def BlockdevAddchildren(parent_cdev, new_cdevs):
1838   """Extend a mirrored block device.
1839
1840   @type parent_cdev: L{objects.Disk}
1841   @param parent_cdev: the disk to which we should add children
1842   @type new_cdevs: list of L{objects.Disk}
1843   @param new_cdevs: the list of children which we should add
1844   @rtype: None
1845
1846   """
1847   parent_bdev = _RecursiveFindBD(parent_cdev)
1848   if parent_bdev is None:
1849     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1850   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1851   if new_bdevs.count(None) > 0:
1852     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1853   parent_bdev.AddChildren(new_bdevs)
1854
1855
1856 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1857   """Shrink a mirrored block device.
1858
1859   @type parent_cdev: L{objects.Disk}
1860   @param parent_cdev: the disk from which we should remove children
1861   @type new_cdevs: list of L{objects.Disk}
1862   @param new_cdevs: the list of children which we should remove
1863   @rtype: None
1864
1865   """
1866   parent_bdev = _RecursiveFindBD(parent_cdev)
1867   if parent_bdev is None:
1868     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1869   devs = []
1870   for disk in new_cdevs:
1871     rpath = disk.StaticDevPath()
1872     if rpath is None:
1873       bd = _RecursiveFindBD(disk)
1874       if bd is None:
1875         _Fail("Can't find device %s while removing children", disk)
1876       else:
1877         devs.append(bd.dev_path)
1878     else:
1879       if not utils.IsNormAbsPath(rpath):
1880         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1881       devs.append(rpath)
1882   parent_bdev.RemoveChildren(devs)
1883
1884
1885 def BlockdevGetmirrorstatus(disks):
1886   """Get the mirroring status of a list of devices.
1887
1888   @type disks: list of L{objects.Disk}
1889   @param disks: the list of disks which we should query
1890   @rtype: disk
1891   @return: List of L{objects.BlockDevStatus}, one for each disk
1892   @raise errors.BlockDeviceError: if any of the disks cannot be
1893       found
1894
1895   """
1896   stats = []
1897   for dsk in disks:
1898     rbd = _RecursiveFindBD(dsk)
1899     if rbd is None:
1900       _Fail("Can't find device %s", dsk)
1901
1902     stats.append(rbd.CombinedSyncStatus())
1903
1904   return stats
1905
1906
1907 def BlockdevGetmirrorstatusMulti(disks):
1908   """Get the mirroring status of a list of devices.
1909
1910   @type disks: list of L{objects.Disk}
1911   @param disks: the list of disks which we should query
1912   @rtype: disk
1913   @return: List of tuples, (bool, status), one for each disk; bool denotes
1914     success/failure, status is L{objects.BlockDevStatus} on success, string
1915     otherwise
1916
1917   """
1918   result = []
1919   for disk in disks:
1920     try:
1921       rbd = _RecursiveFindBD(disk)
1922       if rbd is None:
1923         result.append((False, "Can't find device %s" % disk))
1924         continue
1925
1926       status = rbd.CombinedSyncStatus()
1927     except errors.BlockDeviceError, err:
1928       logging.exception("Error while getting disk status")
1929       result.append((False, str(err)))
1930     else:
1931       result.append((True, status))
1932
1933   assert len(disks) == len(result)
1934
1935   return result
1936
1937
1938 def _RecursiveFindBD(disk):
1939   """Check if a device is activated.
1940
1941   If so, return information about the real device.
1942
1943   @type disk: L{objects.Disk}
1944   @param disk: the disk object we need to find
1945
1946   @return: None if the device can't be found,
1947       otherwise the device instance
1948
1949   """
1950   children = []
1951   if disk.children:
1952     for chdisk in disk.children:
1953       children.append(_RecursiveFindBD(chdisk))
1954
1955   return bdev.FindDevice(disk, children)
1956
1957
1958 def _OpenRealBD(disk):
1959   """Opens the underlying block device of a disk.
1960
1961   @type disk: L{objects.Disk}
1962   @param disk: the disk object we want to open
1963
1964   """
1965   real_disk = _RecursiveFindBD(disk)
1966   if real_disk is None:
1967     _Fail("Block device '%s' is not set up", disk)
1968
1969   real_disk.Open()
1970
1971   return real_disk
1972
1973
1974 def BlockdevFind(disk):
1975   """Check if a device is activated.
1976
1977   If it is, return information about the real device.
1978
1979   @type disk: L{objects.Disk}
1980   @param disk: the disk to find
1981   @rtype: None or objects.BlockDevStatus
1982   @return: None if the disk cannot be found, otherwise a the current
1983            information
1984
1985   """
1986   try:
1987     rbd = _RecursiveFindBD(disk)
1988   except errors.BlockDeviceError, err:
1989     _Fail("Failed to find device: %s", err, exc=True)
1990
1991   if rbd is None:
1992     return None
1993
1994   return rbd.GetSyncStatus()
1995
1996
1997 def BlockdevGetsize(disks):
1998   """Computes the size of the given disks.
1999
2000   If a disk is not found, returns None instead.
2001
2002   @type disks: list of L{objects.Disk}
2003   @param disks: the list of disk to compute the size for
2004   @rtype: list
2005   @return: list with elements None if the disk cannot be found,
2006       otherwise the size
2007
2008   """
2009   result = []
2010   for cf in disks:
2011     try:
2012       rbd = _RecursiveFindBD(cf)
2013     except errors.BlockDeviceError:
2014       result.append(None)
2015       continue
2016     if rbd is None:
2017       result.append(None)
2018     else:
2019       result.append(rbd.GetActualSize())
2020   return result
2021
2022
2023 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2024   """Export a block device to a remote node.
2025
2026   @type disk: L{objects.Disk}
2027   @param disk: the description of the disk to export
2028   @type dest_node: str
2029   @param dest_node: the destination node to export to
2030   @type dest_path: str
2031   @param dest_path: the destination path on the target node
2032   @type cluster_name: str
2033   @param cluster_name: the cluster name, needed for SSH hostalias
2034   @rtype: None
2035
2036   """
2037   real_disk = _OpenRealBD(disk)
2038
2039   # the block size on the read dd is 1MiB to match our units
2040   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2041                                "dd if=%s bs=1048576 count=%s",
2042                                real_disk.dev_path, str(disk.size))
2043
2044   # we set here a smaller block size as, due to ssh buffering, more
2045   # than 64-128k will mostly ignored; we use nocreat to fail if the
2046   # device is not already there or we pass a wrong path; we use
2047   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2048   # to not buffer too much memory; this means that at best, we flush
2049   # every 64k, which will not be very fast
2050   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2051                                 " oflag=dsync", dest_path)
2052
2053   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2054                                                    constants.SSH_LOGIN_USER,
2055                                                    destcmd)
2056
2057   # all commands have been checked, so we're safe to combine them
2058   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2059
2060   result = utils.RunCmd(["bash", "-c", command])
2061
2062   if result.failed:
2063     _Fail("Disk copy command '%s' returned error: %s"
2064           " output: %s", command, result.fail_reason, result.output)
2065
2066
2067 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2068   """Write a file to the filesystem.
2069
2070   This allows the master to overwrite(!) a file. It will only perform
2071   the operation if the file belongs to a list of configuration files.
2072
2073   @type file_name: str
2074   @param file_name: the target file name
2075   @type data: str
2076   @param data: the new contents of the file
2077   @type mode: int
2078   @param mode: the mode to give the file (can be None)
2079   @type uid: string
2080   @param uid: the owner of the file
2081   @type gid: string
2082   @param gid: the group of the file
2083   @type atime: float
2084   @param atime: the atime to set on the file (can be None)
2085   @type mtime: float
2086   @param mtime: the mtime to set on the file (can be None)
2087   @rtype: None
2088
2089   """
2090   file_name = vcluster.LocalizeVirtualPath(file_name)
2091
2092   if not os.path.isabs(file_name):
2093     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2094
2095   if file_name not in _ALLOWED_UPLOAD_FILES:
2096     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2097           file_name)
2098
2099   raw_data = _Decompress(data)
2100
2101   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2102     _Fail("Invalid username/groupname type")
2103
2104   getents = runtime.GetEnts()
2105   uid = getents.LookupUser(uid)
2106   gid = getents.LookupGroup(gid)
2107
2108   utils.SafeWriteFile(file_name, None,
2109                       data=raw_data, mode=mode, uid=uid, gid=gid,
2110                       atime=atime, mtime=mtime)
2111
2112
2113 def RunOob(oob_program, command, node, timeout):
2114   """Executes oob_program with given command on given node.
2115
2116   @param oob_program: The path to the executable oob_program
2117   @param command: The command to invoke on oob_program
2118   @param node: The node given as an argument to the program
2119   @param timeout: Timeout after which we kill the oob program
2120
2121   @return: stdout
2122   @raise RPCFail: If execution fails for some reason
2123
2124   """
2125   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2126
2127   if result.failed:
2128     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2129           result.fail_reason, result.output)
2130
2131   return result.stdout
2132
2133
2134 def _OSOndiskAPIVersion(os_dir):
2135   """Compute and return the API version of a given OS.
2136
2137   This function will try to read the API version of the OS residing in
2138   the 'os_dir' directory.
2139
2140   @type os_dir: str
2141   @param os_dir: the directory in which we should look for the OS
2142   @rtype: tuple
2143   @return: tuple (status, data) with status denoting the validity and
2144       data holding either the vaid versions or an error message
2145
2146   """
2147   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2148
2149   try:
2150     st = os.stat(api_file)
2151   except EnvironmentError, err:
2152     return False, ("Required file '%s' not found under path %s: %s" %
2153                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2154
2155   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2156     return False, ("File '%s' in %s is not a regular file" %
2157                    (constants.OS_API_FILE, os_dir))
2158
2159   try:
2160     api_versions = utils.ReadFile(api_file).splitlines()
2161   except EnvironmentError, err:
2162     return False, ("Error while reading the API version file at %s: %s" %
2163                    (api_file, utils.ErrnoOrStr(err)))
2164
2165   try:
2166     api_versions = [int(version.strip()) for version in api_versions]
2167   except (TypeError, ValueError), err:
2168     return False, ("API version(s) can't be converted to integer: %s" %
2169                    str(err))
2170
2171   return True, api_versions
2172
2173
2174 def DiagnoseOS(top_dirs=None):
2175   """Compute the validity for all OSes.
2176
2177   @type top_dirs: list
2178   @param top_dirs: the list of directories in which to
2179       search (if not given defaults to
2180       L{pathutils.OS_SEARCH_PATH})
2181   @rtype: list of L{objects.OS}
2182   @return: a list of tuples (name, path, status, diagnose, variants,
2183       parameters, api_version) for all (potential) OSes under all
2184       search paths, where:
2185           - name is the (potential) OS name
2186           - path is the full path to the OS
2187           - status True/False is the validity of the OS
2188           - diagnose is the error message for an invalid OS, otherwise empty
2189           - variants is a list of supported OS variants, if any
2190           - parameters is a list of (name, help) parameters, if any
2191           - api_version is a list of support OS API versions
2192
2193   """
2194   if top_dirs is None:
2195     top_dirs = pathutils.OS_SEARCH_PATH
2196
2197   result = []
2198   for dir_name in top_dirs:
2199     if os.path.isdir(dir_name):
2200       try:
2201         f_names = utils.ListVisibleFiles(dir_name)
2202       except EnvironmentError, err:
2203         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2204         break
2205       for name in f_names:
2206         os_path = utils.PathJoin(dir_name, name)
2207         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2208         if status:
2209           diagnose = ""
2210           variants = os_inst.supported_variants
2211           parameters = os_inst.supported_parameters
2212           api_versions = os_inst.api_versions
2213         else:
2214           diagnose = os_inst
2215           variants = parameters = api_versions = []
2216         result.append((name, os_path, status, diagnose, variants,
2217                        parameters, api_versions))
2218
2219   return result
2220
2221
2222 def _TryOSFromDisk(name, base_dir=None):
2223   """Create an OS instance from disk.
2224
2225   This function will return an OS instance if the given name is a
2226   valid OS name.
2227
2228   @type base_dir: string
2229   @keyword base_dir: Base directory containing OS installations.
2230                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2231   @rtype: tuple
2232   @return: success and either the OS instance if we find a valid one,
2233       or error message
2234
2235   """
2236   if base_dir is None:
2237     os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2238   else:
2239     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2240
2241   if os_dir is None:
2242     return False, "Directory for OS %s not found in search path" % name
2243
2244   status, api_versions = _OSOndiskAPIVersion(os_dir)
2245   if not status:
2246     # push the error up
2247     return status, api_versions
2248
2249   if not constants.OS_API_VERSIONS.intersection(api_versions):
2250     return False, ("API version mismatch for path '%s': found %s, want %s." %
2251                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2252
2253   # OS Files dictionary, we will populate it with the absolute path
2254   # names; if the value is True, then it is a required file, otherwise
2255   # an optional one
2256   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2257
2258   if max(api_versions) >= constants.OS_API_V15:
2259     os_files[constants.OS_VARIANTS_FILE] = False
2260
2261   if max(api_versions) >= constants.OS_API_V20:
2262     os_files[constants.OS_PARAMETERS_FILE] = True
2263   else:
2264     del os_files[constants.OS_SCRIPT_VERIFY]
2265
2266   for (filename, required) in os_files.items():
2267     os_files[filename] = utils.PathJoin(os_dir, filename)
2268
2269     try:
2270       st = os.stat(os_files[filename])
2271     except EnvironmentError, err:
2272       if err.errno == errno.ENOENT and not required:
2273         del os_files[filename]
2274         continue
2275       return False, ("File '%s' under path '%s' is missing (%s)" %
2276                      (filename, os_dir, utils.ErrnoOrStr(err)))
2277
2278     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2279       return False, ("File '%s' under path '%s' is not a regular file" %
2280                      (filename, os_dir))
2281
2282     if filename in constants.OS_SCRIPTS:
2283       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2284         return False, ("File '%s' under path '%s' is not executable" %
2285                        (filename, os_dir))
2286
2287   variants = []
2288   if constants.OS_VARIANTS_FILE in os_files:
2289     variants_file = os_files[constants.OS_VARIANTS_FILE]
2290     try:
2291       variants = utils.ReadFile(variants_file).splitlines()
2292     except EnvironmentError, err:
2293       # we accept missing files, but not other errors
2294       if err.errno != errno.ENOENT:
2295         return False, ("Error while reading the OS variants file at %s: %s" %
2296                        (variants_file, utils.ErrnoOrStr(err)))
2297
2298   parameters = []
2299   if constants.OS_PARAMETERS_FILE in os_files:
2300     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2301     try:
2302       parameters = utils.ReadFile(parameters_file).splitlines()
2303     except EnvironmentError, err:
2304       return False, ("Error while reading the OS parameters file at %s: %s" %
2305                      (parameters_file, utils.ErrnoOrStr(err)))
2306     parameters = [v.split(None, 1) for v in parameters]
2307
2308   os_obj = objects.OS(name=name, path=os_dir,
2309                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2310                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2311                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2312                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2313                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2314                                                  None),
2315                       supported_variants=variants,
2316                       supported_parameters=parameters,
2317                       api_versions=api_versions)
2318   return True, os_obj
2319
2320
2321 def OSFromDisk(name, base_dir=None):
2322   """Create an OS instance from disk.
2323
2324   This function will return an OS instance if the given name is a
2325   valid OS name. Otherwise, it will raise an appropriate
2326   L{RPCFail} exception, detailing why this is not a valid OS.
2327
2328   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2329   an exception but returns true/false status data.
2330
2331   @type base_dir: string
2332   @keyword base_dir: Base directory containing OS installations.
2333                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2334   @rtype: L{objects.OS}
2335   @return: the OS instance if we find a valid one
2336   @raise RPCFail: if we don't find a valid OS
2337
2338   """
2339   name_only = objects.OS.GetName(name)
2340   status, payload = _TryOSFromDisk(name_only, base_dir)
2341
2342   if not status:
2343     _Fail(payload)
2344
2345   return payload
2346
2347
2348 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2349   """Calculate the basic environment for an os script.
2350
2351   @type os_name: str
2352   @param os_name: full operating system name (including variant)
2353   @type inst_os: L{objects.OS}
2354   @param inst_os: operating system for which the environment is being built
2355   @type os_params: dict
2356   @param os_params: the OS parameters
2357   @type debug: integer
2358   @param debug: debug level (0 or 1, for OS Api 10)
2359   @rtype: dict
2360   @return: dict of environment variables
2361   @raise errors.BlockDeviceError: if the block device
2362       cannot be found
2363
2364   """
2365   result = {}
2366   api_version = \
2367     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2368   result["OS_API_VERSION"] = "%d" % api_version
2369   result["OS_NAME"] = inst_os.name
2370   result["DEBUG_LEVEL"] = "%d" % debug
2371
2372   # OS variants
2373   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2374     variant = objects.OS.GetVariant(os_name)
2375     if not variant:
2376       variant = inst_os.supported_variants[0]
2377   else:
2378     variant = ""
2379   result["OS_VARIANT"] = variant
2380
2381   # OS params
2382   for pname, pvalue in os_params.items():
2383     result["OSP_%s" % pname.upper()] = pvalue
2384
2385   # Set a default path otherwise programs called by OS scripts (or
2386   # even hooks called from OS scripts) might break, and we don't want
2387   # to have each script require setting a PATH variable
2388   result["PATH"] = constants.HOOKS_PATH
2389
2390   return result
2391
2392
2393 def OSEnvironment(instance, inst_os, debug=0):
2394   """Calculate the environment for an os script.
2395
2396   @type instance: L{objects.Instance}
2397   @param instance: target instance for the os script run
2398   @type inst_os: L{objects.OS}
2399   @param inst_os: operating system for which the environment is being built
2400   @type debug: integer
2401   @param debug: debug level (0 or 1, for OS Api 10)
2402   @rtype: dict
2403   @return: dict of environment variables
2404   @raise errors.BlockDeviceError: if the block device
2405       cannot be found
2406
2407   """
2408   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2409
2410   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2411     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2412
2413   result["HYPERVISOR"] = instance.hypervisor
2414   result["DISK_COUNT"] = "%d" % len(instance.disks)
2415   result["NIC_COUNT"] = "%d" % len(instance.nics)
2416   result["INSTANCE_SECONDARY_NODES"] = \
2417       ("%s" % " ".join(instance.secondary_nodes))
2418
2419   # Disks
2420   for idx, disk in enumerate(instance.disks):
2421     real_disk = _OpenRealBD(disk)
2422     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2423     result["DISK_%d_ACCESS" % idx] = disk.mode
2424     if constants.HV_DISK_TYPE in instance.hvparams:
2425       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2426         instance.hvparams[constants.HV_DISK_TYPE]
2427     if disk.dev_type in constants.LDS_BLOCK:
2428       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2429     elif disk.dev_type == constants.LD_FILE:
2430       result["DISK_%d_BACKEND_TYPE" % idx] = \
2431         "file:%s" % disk.physical_id[0]
2432
2433   # NICs
2434   for idx, nic in enumerate(instance.nics):
2435     result["NIC_%d_MAC" % idx] = nic.mac
2436     if nic.ip:
2437       result["NIC_%d_IP" % idx] = nic.ip
2438     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2439     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2440       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2441     if nic.nicparams[constants.NIC_LINK]:
2442       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2443     if constants.HV_NIC_TYPE in instance.hvparams:
2444       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2445         instance.hvparams[constants.HV_NIC_TYPE]
2446
2447   # HV/BE params
2448   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2449     for key, value in source.items():
2450       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2451
2452   return result
2453
2454
2455 def BlockdevGrow(disk, amount, dryrun, backingstore):
2456   """Grow a stack of block devices.
2457
2458   This function is called recursively, with the childrens being the
2459   first ones to resize.
2460
2461   @type disk: L{objects.Disk}
2462   @param disk: the disk to be grown
2463   @type amount: integer
2464   @param amount: the amount (in mebibytes) to grow with
2465   @type dryrun: boolean
2466   @param dryrun: whether to execute the operation in simulation mode
2467       only, without actually increasing the size
2468   @param backingstore: whether to execute the operation on backing storage
2469       only, or on "logical" storage only; e.g. DRBD is logical storage,
2470       whereas LVM, file, RBD are backing storage
2471   @rtype: (status, result)
2472   @return: a tuple with the status of the operation (True/False), and
2473       the errors message if status is False
2474
2475   """
2476   r_dev = _RecursiveFindBD(disk)
2477   if r_dev is None:
2478     _Fail("Cannot find block device %s", disk)
2479
2480   try:
2481     r_dev.Grow(amount, dryrun, backingstore)
2482   except errors.BlockDeviceError, err:
2483     _Fail("Failed to grow block device: %s", err, exc=True)
2484
2485
2486 def BlockdevSnapshot(disk):
2487   """Create a snapshot copy of a block device.
2488
2489   This function is called recursively, and the snapshot is actually created
2490   just for the leaf lvm backend device.
2491
2492   @type disk: L{objects.Disk}
2493   @param disk: the disk to be snapshotted
2494   @rtype: string
2495   @return: snapshot disk ID as (vg, lv)
2496
2497   """
2498   if disk.dev_type == constants.LD_DRBD8:
2499     if not disk.children:
2500       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2501             disk.unique_id)
2502     return BlockdevSnapshot(disk.children[0])
2503   elif disk.dev_type == constants.LD_LV:
2504     r_dev = _RecursiveFindBD(disk)
2505     if r_dev is not None:
2506       # FIXME: choose a saner value for the snapshot size
2507       # let's stay on the safe side and ask for the full size, for now
2508       return r_dev.Snapshot(disk.size)
2509     else:
2510       _Fail("Cannot find block device %s", disk)
2511   else:
2512     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2513           disk.unique_id, disk.dev_type)
2514
2515
2516 def FinalizeExport(instance, snap_disks):
2517   """Write out the export configuration information.
2518
2519   @type instance: L{objects.Instance}
2520   @param instance: the instance which we export, used for
2521       saving configuration
2522   @type snap_disks: list of L{objects.Disk}
2523   @param snap_disks: list of snapshot block devices, which
2524       will be used to get the actual name of the dump file
2525
2526   @rtype: None
2527
2528   """
2529   destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2530   finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2531
2532   config = objects.SerializableConfigParser()
2533
2534   config.add_section(constants.INISECT_EXP)
2535   config.set(constants.INISECT_EXP, "version", "0")
2536   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2537   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2538   config.set(constants.INISECT_EXP, "os", instance.os)
2539   config.set(constants.INISECT_EXP, "compression", "none")
2540
2541   config.add_section(constants.INISECT_INS)
2542   config.set(constants.INISECT_INS, "name", instance.name)
2543   config.set(constants.INISECT_INS, "maxmem", "%d" %
2544              instance.beparams[constants.BE_MAXMEM])
2545   config.set(constants.INISECT_INS, "minmem", "%d" %
2546              instance.beparams[constants.BE_MINMEM])
2547   # "memory" is deprecated, but useful for exporting to old ganeti versions
2548   config.set(constants.INISECT_INS, "memory", "%d" %
2549              instance.beparams[constants.BE_MAXMEM])
2550   config.set(constants.INISECT_INS, "vcpus", "%d" %
2551              instance.beparams[constants.BE_VCPUS])
2552   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2553   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2554   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2555
2556   nic_total = 0
2557   for nic_count, nic in enumerate(instance.nics):
2558     nic_total += 1
2559     config.set(constants.INISECT_INS, "nic%d_mac" %
2560                nic_count, "%s" % nic.mac)
2561     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2562     for param in constants.NICS_PARAMETER_TYPES:
2563       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2564                  "%s" % nic.nicparams.get(param, None))
2565   # TODO: redundant: on load can read nics until it doesn't exist
2566   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2567
2568   disk_total = 0
2569   for disk_count, disk in enumerate(snap_disks):
2570     if disk:
2571       disk_total += 1
2572       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2573                  ("%s" % disk.iv_name))
2574       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2575                  ("%s" % disk.physical_id[1]))
2576       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2577                  ("%d" % disk.size))
2578
2579   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2580
2581   # New-style hypervisor/backend parameters
2582
2583   config.add_section(constants.INISECT_HYP)
2584   for name, value in instance.hvparams.items():
2585     if name not in constants.HVC_GLOBALS:
2586       config.set(constants.INISECT_HYP, name, str(value))
2587
2588   config.add_section(constants.INISECT_BEP)
2589   for name, value in instance.beparams.items():
2590     config.set(constants.INISECT_BEP, name, str(value))
2591
2592   config.add_section(constants.INISECT_OSP)
2593   for name, value in instance.osparams.items():
2594     config.set(constants.INISECT_OSP, name, str(value))
2595
2596   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2597                   data=config.Dumps())
2598   shutil.rmtree(finaldestdir, ignore_errors=True)
2599   shutil.move(destdir, finaldestdir)
2600
2601
2602 def ExportInfo(dest):
2603   """Get export configuration information.
2604
2605   @type dest: str
2606   @param dest: directory containing the export
2607
2608   @rtype: L{objects.SerializableConfigParser}
2609   @return: a serializable config file containing the
2610       export info
2611
2612   """
2613   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2614
2615   config = objects.SerializableConfigParser()
2616   config.read(cff)
2617
2618   if (not config.has_section(constants.INISECT_EXP) or
2619       not config.has_section(constants.INISECT_INS)):
2620     _Fail("Export info file doesn't have the required fields")
2621
2622   return config.Dumps()
2623
2624
2625 def ListExports():
2626   """Return a list of exports currently available on this machine.
2627
2628   @rtype: list
2629   @return: list of the exports
2630
2631   """
2632   if os.path.isdir(pathutils.EXPORT_DIR):
2633     return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2634   else:
2635     _Fail("No exports directory")
2636
2637
2638 def RemoveExport(export):
2639   """Remove an existing export from the node.
2640
2641   @type export: str
2642   @param export: the name of the export to remove
2643   @rtype: None
2644
2645   """
2646   target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2647
2648   try:
2649     shutil.rmtree(target)
2650   except EnvironmentError, err:
2651     _Fail("Error while removing the export: %s", err, exc=True)
2652
2653
2654 def BlockdevRename(devlist):
2655   """Rename a list of block devices.
2656
2657   @type devlist: list of tuples
2658   @param devlist: list of tuples of the form  (disk,
2659       new_logical_id, new_physical_id); disk is an
2660       L{objects.Disk} object describing the current disk,
2661       and new logical_id/physical_id is the name we
2662       rename it to
2663   @rtype: boolean
2664   @return: True if all renames succeeded, False otherwise
2665
2666   """
2667   msgs = []
2668   result = True
2669   for disk, unique_id in devlist:
2670     dev = _RecursiveFindBD(disk)
2671     if dev is None:
2672       msgs.append("Can't find device %s in rename" % str(disk))
2673       result = False
2674       continue
2675     try:
2676       old_rpath = dev.dev_path
2677       dev.Rename(unique_id)
2678       new_rpath = dev.dev_path
2679       if old_rpath != new_rpath:
2680         DevCacheManager.RemoveCache(old_rpath)
2681         # FIXME: we should add the new cache information here, like:
2682         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2683         # but we don't have the owner here - maybe parse from existing
2684         # cache? for now, we only lose lvm data when we rename, which
2685         # is less critical than DRBD or MD
2686     except errors.BlockDeviceError, err:
2687       msgs.append("Can't rename device '%s' to '%s': %s" %
2688                   (dev, unique_id, err))
2689       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2690       result = False
2691   if not result:
2692     _Fail("; ".join(msgs))
2693
2694
2695 def _TransformFileStorageDir(fs_dir):
2696   """Checks whether given file_storage_dir is valid.
2697
2698   Checks wheter the given fs_dir is within the cluster-wide default
2699   file_storage_dir or the shared_file_storage_dir, which are stored in
2700   SimpleStore. Only paths under those directories are allowed.
2701
2702   @type fs_dir: str
2703   @param fs_dir: the path to check
2704
2705   @return: the normalized path if valid, None otherwise
2706
2707   """
2708   if not (constants.ENABLE_FILE_STORAGE or
2709           constants.ENABLE_SHARED_FILE_STORAGE):
2710     _Fail("File storage disabled at configure time")
2711   cfg = _GetConfig()
2712   fs_dir = os.path.normpath(fs_dir)
2713   base_fstore = cfg.GetFileStorageDir()
2714   base_shared = cfg.GetSharedFileStorageDir()
2715   if not (utils.IsBelowDir(base_fstore, fs_dir) or
2716           utils.IsBelowDir(base_shared, fs_dir)):
2717     _Fail("File storage directory '%s' is not under base file"
2718           " storage directory '%s' or shared storage directory '%s'",
2719           fs_dir, base_fstore, base_shared)
2720   return fs_dir
2721
2722
2723 def CreateFileStorageDir(file_storage_dir):
2724   """Create file storage directory.
2725
2726   @type file_storage_dir: str
2727   @param file_storage_dir: directory to create
2728
2729   @rtype: tuple
2730   @return: tuple with first element a boolean indicating wheter dir
2731       creation was successful or not
2732
2733   """
2734   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2735   if os.path.exists(file_storage_dir):
2736     if not os.path.isdir(file_storage_dir):
2737       _Fail("Specified storage dir '%s' is not a directory",
2738             file_storage_dir)
2739   else:
2740     try:
2741       os.makedirs(file_storage_dir, 0750)
2742     except OSError, err:
2743       _Fail("Cannot create file storage directory '%s': %s",
2744             file_storage_dir, err, exc=True)
2745
2746
2747 def RemoveFileStorageDir(file_storage_dir):
2748   """Remove file storage directory.
2749
2750   Remove it only if it's empty. If not log an error and return.
2751
2752   @type file_storage_dir: str
2753   @param file_storage_dir: the directory we should cleanup
2754   @rtype: tuple (success,)
2755   @return: tuple of one element, C{success}, denoting
2756       whether the operation was successful
2757
2758   """
2759   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2760   if os.path.exists(file_storage_dir):
2761     if not os.path.isdir(file_storage_dir):
2762       _Fail("Specified Storage directory '%s' is not a directory",
2763             file_storage_dir)
2764     # deletes dir only if empty, otherwise we want to fail the rpc call
2765     try:
2766       os.rmdir(file_storage_dir)
2767     except OSError, err:
2768       _Fail("Cannot remove file storage directory '%s': %s",
2769             file_storage_dir, err)
2770
2771
2772 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2773   """Rename the file storage directory.
2774
2775   @type old_file_storage_dir: str
2776   @param old_file_storage_dir: the current path
2777   @type new_file_storage_dir: str
2778   @param new_file_storage_dir: the name we should rename to
2779   @rtype: tuple (success,)
2780   @return: tuple of one element, C{success}, denoting
2781       whether the operation was successful
2782
2783   """
2784   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2785   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2786   if not os.path.exists(new_file_storage_dir):
2787     if os.path.isdir(old_file_storage_dir):
2788       try:
2789         os.rename(old_file_storage_dir, new_file_storage_dir)
2790       except OSError, err:
2791         _Fail("Cannot rename '%s' to '%s': %s",
2792               old_file_storage_dir, new_file_storage_dir, err)
2793     else:
2794       _Fail("Specified storage dir '%s' is not a directory",
2795             old_file_storage_dir)
2796   else:
2797     if os.path.exists(old_file_storage_dir):
2798       _Fail("Cannot rename '%s' to '%s': both locations exist",
2799             old_file_storage_dir, new_file_storage_dir)
2800
2801
2802 def _EnsureJobQueueFile(file_name):
2803   """Checks whether the given filename is in the queue directory.
2804
2805   @type file_name: str
2806   @param file_name: the file name we should check
2807   @rtype: None
2808   @raises RPCFail: if the file is not valid
2809
2810   """
2811   if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2812     _Fail("Passed job queue file '%s' does not belong to"
2813           " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2814
2815
2816 def JobQueueUpdate(file_name, content):
2817   """Updates a file in the queue directory.
2818
2819   This is just a wrapper over L{utils.io.WriteFile}, with proper
2820   checking.
2821
2822   @type file_name: str
2823   @param file_name: the job file name
2824   @type content: str
2825   @param content: the new job contents
2826   @rtype: boolean
2827   @return: the success of the operation
2828
2829   """
2830   file_name = vcluster.LocalizeVirtualPath(file_name)
2831
2832   _EnsureJobQueueFile(file_name)
2833   getents = runtime.GetEnts()
2834
2835   # Write and replace the file atomically
2836   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2837                   gid=getents.masterd_gid)
2838
2839
2840 def JobQueueRename(old, new):
2841   """Renames a job queue file.
2842
2843   This is just a wrapper over os.rename with proper checking.
2844
2845   @type old: str
2846   @param old: the old (actual) file name
2847   @type new: str
2848   @param new: the desired file name
2849   @rtype: tuple
2850   @return: the success of the operation and payload
2851
2852   """
2853   old = vcluster.LocalizeVirtualPath(old)
2854   new = vcluster.LocalizeVirtualPath(new)
2855
2856   _EnsureJobQueueFile(old)
2857   _EnsureJobQueueFile(new)
2858
2859   getents = runtime.GetEnts()
2860
2861   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2862                    dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2863
2864
2865 def BlockdevClose(instance_name, disks):
2866   """Closes the given block devices.
2867
2868   This means they will be switched to secondary mode (in case of
2869   DRBD).
2870
2871   @param instance_name: if the argument is not empty, the symlinks
2872       of this instance will be removed
2873   @type disks: list of L{objects.Disk}
2874   @param disks: the list of disks to be closed
2875   @rtype: tuple (success, message)
2876   @return: a tuple of success and message, where success
2877       indicates the succes of the operation, and message
2878       which will contain the error details in case we
2879       failed
2880
2881   """
2882   bdevs = []
2883   for cf in disks:
2884     rd = _RecursiveFindBD(cf)
2885     if rd is None:
2886       _Fail("Can't find device %s", cf)
2887     bdevs.append(rd)
2888
2889   msg = []
2890   for rd in bdevs:
2891     try:
2892       rd.Close()
2893     except errors.BlockDeviceError, err:
2894       msg.append(str(err))
2895   if msg:
2896     _Fail("Can't make devices secondary: %s", ",".join(msg))
2897   else:
2898     if instance_name:
2899       _RemoveBlockDevLinks(instance_name, disks)
2900
2901
2902 def ValidateHVParams(hvname, hvparams):
2903   """Validates the given hypervisor parameters.
2904
2905   @type hvname: string
2906   @param hvname: the hypervisor name
2907   @type hvparams: dict
2908   @param hvparams: the hypervisor parameters to be validated
2909   @rtype: None
2910
2911   """
2912   try:
2913     hv_type = hypervisor.GetHypervisor(hvname)
2914     hv_type.ValidateParameters(hvparams)
2915   except errors.HypervisorError, err:
2916     _Fail(str(err), log=False)
2917
2918
2919 def _CheckOSPList(os_obj, parameters):
2920   """Check whether a list of parameters is supported by the OS.
2921
2922   @type os_obj: L{objects.OS}
2923   @param os_obj: OS object to check
2924   @type parameters: list
2925   @param parameters: the list of parameters to check
2926
2927   """
2928   supported = [v[0] for v in os_obj.supported_parameters]
2929   delta = frozenset(parameters).difference(supported)
2930   if delta:
2931     _Fail("The following parameters are not supported"
2932           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2933
2934
2935 def ValidateOS(required, osname, checks, osparams):
2936   """Validate the given OS' parameters.
2937
2938   @type required: boolean
2939   @param required: whether absence of the OS should translate into
2940       failure or not
2941   @type osname: string
2942   @param osname: the OS to be validated
2943   @type checks: list
2944   @param checks: list of the checks to run (currently only 'parameters')
2945   @type osparams: dict
2946   @param osparams: dictionary with OS parameters
2947   @rtype: boolean
2948   @return: True if the validation passed, or False if the OS was not
2949       found and L{required} was false
2950
2951   """
2952   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2953     _Fail("Unknown checks required for OS %s: %s", osname,
2954           set(checks).difference(constants.OS_VALIDATE_CALLS))
2955
2956   name_only = objects.OS.GetName(osname)
2957   status, tbv = _TryOSFromDisk(name_only, None)
2958
2959   if not status:
2960     if required:
2961       _Fail(tbv)
2962     else:
2963       return False
2964
2965   if max(tbv.api_versions) < constants.OS_API_V20:
2966     return True
2967
2968   if constants.OS_VALIDATE_PARAMETERS in checks:
2969     _CheckOSPList(tbv, osparams.keys())
2970
2971   validate_env = OSCoreEnv(osname, tbv, osparams)
2972   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2973                         cwd=tbv.path, reset_env=True)
2974   if result.failed:
2975     logging.error("os validate command '%s' returned error: %s output: %s",
2976                   result.cmd, result.fail_reason, result.output)
2977     _Fail("OS validation script failed (%s), output: %s",
2978           result.fail_reason, result.output, log=False)
2979
2980   return True
2981
2982
2983 def DemoteFromMC():
2984   """Demotes the current node from master candidate role.
2985
2986   """
2987   # try to ensure we're not the master by mistake
2988   master, myself = ssconf.GetMasterAndMyself()
2989   if master == myself:
2990     _Fail("ssconf status shows I'm the master node, will not demote")
2991
2992   result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
2993   if not result.failed:
2994     _Fail("The master daemon is running, will not demote")
2995
2996   try:
2997     if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
2998       utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
2999   except EnvironmentError, err:
3000     if err.errno != errno.ENOENT:
3001       _Fail("Error while backing up cluster file: %s", err, exc=True)
3002
3003   utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3004
3005
3006 def _GetX509Filenames(cryptodir, name):
3007   """Returns the full paths for the private key and certificate.
3008
3009   """
3010   return (utils.PathJoin(cryptodir, name),
3011           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3012           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3013
3014
3015 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3016   """Creates a new X509 certificate for SSL/TLS.
3017
3018   @type validity: int
3019   @param validity: Validity in seconds
3020   @rtype: tuple; (string, string)
3021   @return: Certificate name and public part
3022
3023   """
3024   (key_pem, cert_pem) = \
3025     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3026                                      min(validity, _MAX_SSL_CERT_VALIDITY))
3027
3028   cert_dir = tempfile.mkdtemp(dir=cryptodir,
3029                               prefix="x509-%s-" % utils.TimestampForFilename())
3030   try:
3031     name = os.path.basename(cert_dir)
3032     assert len(name) > 5
3033
3034     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3035
3036     utils.WriteFile(key_file, mode=0400, data=key_pem)
3037     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3038
3039     # Never return private key as it shouldn't leave the node
3040     return (name, cert_pem)
3041   except Exception:
3042     shutil.rmtree(cert_dir, ignore_errors=True)
3043     raise
3044
3045
3046 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3047   """Removes a X509 certificate.
3048
3049   @type name: string
3050   @param name: Certificate name
3051
3052   """
3053   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3054
3055   utils.RemoveFile(key_file)
3056   utils.RemoveFile(cert_file)
3057
3058   try:
3059     os.rmdir(cert_dir)
3060   except EnvironmentError, err:
3061     _Fail("Cannot remove certificate directory '%s': %s",
3062           cert_dir, err)
3063
3064
3065 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3066   """Returns the command for the requested input/output.
3067
3068   @type instance: L{objects.Instance}
3069   @param instance: The instance object
3070   @param mode: Import/export mode
3071   @param ieio: Input/output type
3072   @param ieargs: Input/output arguments
3073
3074   """
3075   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3076
3077   env = None
3078   prefix = None
3079   suffix = None
3080   exp_size = None
3081
3082   if ieio == constants.IEIO_FILE:
3083     (filename, ) = ieargs
3084
3085     if not utils.IsNormAbsPath(filename):
3086       _Fail("Path '%s' is not normalized or absolute", filename)
3087
3088     real_filename = os.path.realpath(filename)
3089     directory = os.path.dirname(real_filename)
3090
3091     if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3092       _Fail("File '%s' is not under exports directory '%s': %s",
3093             filename, pathutils.EXPORT_DIR, real_filename)
3094
3095     # Create directory
3096     utils.Makedirs(directory, mode=0750)
3097
3098     quoted_filename = utils.ShellQuote(filename)
3099
3100     if mode == constants.IEM_IMPORT:
3101       suffix = "> %s" % quoted_filename
3102     elif mode == constants.IEM_EXPORT:
3103       suffix = "< %s" % quoted_filename
3104
3105       # Retrieve file size
3106       try:
3107         st = os.stat(filename)
3108       except EnvironmentError, err:
3109         logging.error("Can't stat(2) %s: %s", filename, err)
3110       else:
3111         exp_size = utils.BytesToMebibyte(st.st_size)
3112
3113   elif ieio == constants.IEIO_RAW_DISK:
3114     (disk, ) = ieargs
3115
3116     real_disk = _OpenRealBD(disk)
3117
3118     if mode == constants.IEM_IMPORT:
3119       # we set here a smaller block size as, due to transport buffering, more
3120       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3121       # is not already there or we pass a wrong path; we use notrunc to no
3122       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3123       # much memory; this means that at best, we flush every 64k, which will
3124       # not be very fast
3125       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3126                                     " bs=%s oflag=dsync"),
3127                                     real_disk.dev_path,
3128                                     str(64 * 1024))
3129
3130     elif mode == constants.IEM_EXPORT:
3131       # the block size on the read dd is 1MiB to match our units
3132       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3133                                    real_disk.dev_path,
3134                                    str(1024 * 1024), # 1 MB
3135                                    str(disk.size))
3136       exp_size = disk.size
3137
3138   elif ieio == constants.IEIO_SCRIPT:
3139     (disk, disk_index, ) = ieargs
3140
3141     assert isinstance(disk_index, (int, long))
3142
3143     real_disk = _OpenRealBD(disk)
3144
3145     inst_os = OSFromDisk(instance.os)
3146     env = OSEnvironment(instance, inst_os)
3147
3148     if mode == constants.IEM_IMPORT:
3149       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3150       env["IMPORT_INDEX"] = str(disk_index)
3151       script = inst_os.import_script
3152
3153     elif mode == constants.IEM_EXPORT:
3154       env["EXPORT_DEVICE"] = real_disk.dev_path
3155       env["EXPORT_INDEX"] = str(disk_index)
3156       script = inst_os.export_script
3157
3158     # TODO: Pass special environment only to script
3159     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3160
3161     if mode == constants.IEM_IMPORT:
3162       suffix = "| %s" % script_cmd
3163
3164     elif mode == constants.IEM_EXPORT:
3165       prefix = "%s |" % script_cmd
3166
3167     # Let script predict size
3168     exp_size = constants.IE_CUSTOM_SIZE
3169
3170   else:
3171     _Fail("Invalid %s I/O mode %r", mode, ieio)
3172
3173   return (env, prefix, suffix, exp_size)
3174
3175
3176 def _CreateImportExportStatusDir(prefix):
3177   """Creates status directory for import/export.
3178
3179   """
3180   return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3181                           prefix=("%s-%s-" %
3182                                   (prefix, utils.TimestampForFilename())))
3183
3184
3185 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3186                             ieio, ieioargs):
3187   """Starts an import or export daemon.
3188
3189   @param mode: Import/output mode
3190   @type opts: L{objects.ImportExportOptions}
3191   @param opts: Daemon options
3192   @type host: string
3193   @param host: Remote host for export (None for import)
3194   @type port: int
3195   @param port: Remote port for export (None for import)
3196   @type instance: L{objects.Instance}
3197   @param instance: Instance object
3198   @type component: string
3199   @param component: which part of the instance is transferred now,
3200       e.g. 'disk/0'
3201   @param ieio: Input/output type
3202   @param ieioargs: Input/output arguments
3203
3204   """
3205   if mode == constants.IEM_IMPORT:
3206     prefix = "import"
3207
3208     if not (host is None and port is None):
3209       _Fail("Can not specify host or port on import")
3210
3211   elif mode == constants.IEM_EXPORT:
3212     prefix = "export"
3213
3214     if host is None or port is None:
3215       _Fail("Host and port must be specified for an export")
3216
3217   else:
3218     _Fail("Invalid mode %r", mode)
3219
3220   if (opts.key_name is None) ^ (opts.ca_pem is None):
3221     _Fail("Cluster certificate can only be used for both key and CA")
3222
3223   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3224     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3225
3226   if opts.key_name is None:
3227     # Use server.pem
3228     key_path = pathutils.NODED_CERT_FILE
3229     cert_path = pathutils.NODED_CERT_FILE
3230     assert opts.ca_pem is None
3231   else:
3232     (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3233                                                  opts.key_name)
3234     assert opts.ca_pem is not None
3235
3236   for i in [key_path, cert_path]:
3237     if not os.path.exists(i):
3238       _Fail("File '%s' does not exist" % i)
3239
3240   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3241   try:
3242     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3243     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3244     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3245
3246     if opts.ca_pem is None:
3247       # Use server.pem
3248       ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3249     else:
3250       ca = opts.ca_pem
3251
3252     # Write CA file
3253     utils.WriteFile(ca_file, data=ca, mode=0400)
3254
3255     cmd = [
3256       pathutils.IMPORT_EXPORT_DAEMON,
3257       status_file, mode,
3258       "--key=%s" % key_path,
3259       "--cert=%s" % cert_path,
3260       "--ca=%s" % ca_file,
3261       ]
3262
3263     if host:
3264       cmd.append("--host=%s" % host)
3265
3266     if port:
3267       cmd.append("--port=%s" % port)
3268
3269     if opts.ipv6:
3270       cmd.append("--ipv6")
3271     else:
3272       cmd.append("--ipv4")
3273
3274     if opts.compress:
3275       cmd.append("--compress=%s" % opts.compress)
3276
3277     if opts.magic:
3278       cmd.append("--magic=%s" % opts.magic)
3279
3280     if exp_size is not None:
3281       cmd.append("--expected-size=%s" % exp_size)
3282
3283     if cmd_prefix:
3284       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3285
3286     if cmd_suffix:
3287       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3288
3289     if mode == constants.IEM_EXPORT:
3290       # Retry connection a few times when connecting to remote peer
3291       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3292       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3293     elif opts.connect_timeout is not None:
3294       assert mode == constants.IEM_IMPORT
3295       # Overall timeout for establishing connection while listening
3296       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3297
3298     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3299
3300     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3301     # support for receiving a file descriptor for output
3302     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3303                       output=logfile)
3304
3305     # The import/export name is simply the status directory name
3306     return os.path.basename(status_dir)
3307
3308   except Exception:
3309     shutil.rmtree(status_dir, ignore_errors=True)
3310     raise
3311
3312
3313 def GetImportExportStatus(names):
3314   """Returns import/export daemon status.
3315
3316   @type names: sequence
3317   @param names: List of names
3318   @rtype: List of dicts
3319   @return: Returns a list of the state of each named import/export or None if a
3320            status couldn't be read
3321
3322   """
3323   result = []
3324
3325   for name in names:
3326     status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3327                                  _IES_STATUS_FILE)
3328
3329     try:
3330       data = utils.ReadFile(status_file)
3331     except EnvironmentError, err:
3332       if err.errno != errno.ENOENT:
3333         raise
3334       data = None
3335
3336     if not data:
3337       result.append(None)
3338       continue
3339
3340     result.append(serializer.LoadJson(data))
3341
3342   return result
3343
3344
3345 def AbortImportExport(name):
3346   """Sends SIGTERM to a running import/export daemon.
3347
3348   """
3349   logging.info("Abort import/export %s", name)
3350
3351   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3352   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3353
3354   if pid:
3355     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3356                  name, pid)
3357     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3358
3359
3360 def CleanupImportExport(name):
3361   """Cleanup after an import or export.
3362
3363   If the import/export daemon is still running it's killed. Afterwards the
3364   whole status directory is removed.
3365
3366   """
3367   logging.info("Finalizing import/export %s", name)
3368
3369   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3370
3371   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3372
3373   if pid:
3374     logging.info("Import/export %s is still running with PID %s",
3375                  name, pid)
3376     utils.KillProcess(pid, waitpid=False)
3377
3378   shutil.rmtree(status_dir, ignore_errors=True)
3379
3380
3381 def _FindDisks(nodes_ip, disks):
3382   """Sets the physical ID on disks and returns the block devices.
3383
3384   """
3385   # set the correct physical ID
3386   my_name = netutils.Hostname.GetSysName()
3387   for cf in disks:
3388     cf.SetPhysicalID(my_name, nodes_ip)
3389
3390   bdevs = []
3391
3392   for cf in disks:
3393     rd = _RecursiveFindBD(cf)
3394     if rd is None:
3395       _Fail("Can't find device %s", cf)
3396     bdevs.append(rd)
3397   return bdevs
3398
3399
3400 def DrbdDisconnectNet(nodes_ip, disks):
3401   """Disconnects the network on a list of drbd devices.
3402
3403   """
3404   bdevs = _FindDisks(nodes_ip, disks)
3405
3406   # disconnect disks
3407   for rd in bdevs:
3408     try:
3409       rd.DisconnectNet()
3410     except errors.BlockDeviceError, err:
3411       _Fail("Can't change network configuration to standalone mode: %s",
3412             err, exc=True)
3413
3414
3415 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3416   """Attaches the network on a list of drbd devices.
3417
3418   """
3419   bdevs = _FindDisks(nodes_ip, disks)
3420
3421   if multimaster:
3422     for idx, rd in enumerate(bdevs):
3423       try:
3424         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3425       except EnvironmentError, err:
3426         _Fail("Can't create symlink: %s", err)
3427   # reconnect disks, switch to new master configuration and if
3428   # needed primary mode
3429   for rd in bdevs:
3430     try:
3431       rd.AttachNet(multimaster)
3432     except errors.BlockDeviceError, err:
3433       _Fail("Can't change network configuration: %s", err)
3434
3435   # wait until the disks are connected; we need to retry the re-attach
3436   # if the device becomes standalone, as this might happen if the one
3437   # node disconnects and reconnects in a different mode before the
3438   # other node reconnects; in this case, one or both of the nodes will
3439   # decide it has wrong configuration and switch to standalone
3440
3441   def _Attach():
3442     all_connected = True
3443
3444     for rd in bdevs:
3445       stats = rd.GetProcStatus()
3446
3447       all_connected = (all_connected and
3448                        (stats.is_connected or stats.is_in_resync))
3449
3450       if stats.is_standalone:
3451         # peer had different config info and this node became
3452         # standalone, even though this should not happen with the
3453         # new staged way of changing disk configs
3454         try:
3455           rd.AttachNet(multimaster)
3456         except errors.BlockDeviceError, err:
3457           _Fail("Can't change network configuration: %s", err)
3458
3459     if not all_connected:
3460       raise utils.RetryAgain()
3461
3462   try:
3463     # Start with a delay of 100 miliseconds and go up to 5 seconds
3464     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3465   except utils.RetryTimeout:
3466     _Fail("Timeout in disk reconnecting")
3467
3468   if multimaster:
3469     # change to primary mode
3470     for rd in bdevs:
3471       try:
3472         rd.Open()
3473       except errors.BlockDeviceError, err:
3474         _Fail("Can't change to primary mode: %s", err)
3475
3476
3477 def DrbdWaitSync(nodes_ip, disks):
3478   """Wait until DRBDs have synchronized.
3479
3480   """
3481   def _helper(rd):
3482     stats = rd.GetProcStatus()
3483     if not (stats.is_connected or stats.is_in_resync):
3484       raise utils.RetryAgain()
3485     return stats
3486
3487   bdevs = _FindDisks(nodes_ip, disks)
3488
3489   min_resync = 100
3490   alldone = True
3491   for rd in bdevs:
3492     try:
3493       # poll each second for 15 seconds
3494       stats = utils.Retry(_helper, 1, 15, args=[rd])
3495     except utils.RetryTimeout:
3496       stats = rd.GetProcStatus()
3497       # last check
3498       if not (stats.is_connected or stats.is_in_resync):
3499         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3500     alldone = alldone and (not stats.is_in_resync)
3501     if stats.sync_percent is not None:
3502       min_resync = min(min_resync, stats.sync_percent)
3503
3504   return (alldone, min_resync)
3505
3506
3507 def GetDrbdUsermodeHelper():
3508   """Returns DRBD usermode helper currently configured.
3509
3510   """
3511   try:
3512     return bdev.BaseDRBD.GetUsermodeHelper()
3513   except errors.BlockDeviceError, err:
3514     _Fail(str(err))
3515
3516
3517 def PowercycleNode(hypervisor_type):
3518   """Hard-powercycle the node.
3519
3520   Because we need to return first, and schedule the powercycle in the
3521   background, we won't be able to report failures nicely.
3522
3523   """
3524   hyper = hypervisor.GetHypervisor(hypervisor_type)
3525   try:
3526     pid = os.fork()
3527   except OSError:
3528     # if we can't fork, we'll pretend that we're in the child process
3529     pid = 0
3530   if pid > 0:
3531     return "Reboot scheduled in 5 seconds"
3532   # ensure the child is running on ram
3533   try:
3534     utils.Mlockall()
3535   except Exception: # pylint: disable=W0703
3536     pass
3537   time.sleep(5)
3538   hyper.PowercycleNode()
3539
3540
3541 class HooksRunner(object):
3542   """Hook runner.
3543
3544   This class is instantiated on the node side (ganeti-noded) and not
3545   on the master side.
3546
3547   """
3548   def __init__(self, hooks_base_dir=None):
3549     """Constructor for hooks runner.
3550
3551     @type hooks_base_dir: str or None
3552     @param hooks_base_dir: if not None, this overrides the
3553         L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3554
3555     """
3556     if hooks_base_dir is None:
3557       hooks_base_dir = pathutils.HOOKS_BASE_DIR
3558     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3559     # constant
3560     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3561
3562   def RunLocalHooks(self, node_list, hpath, phase, env):
3563     """Check that the hooks will be run only locally and then run them.
3564
3565     """
3566     assert len(node_list) == 1
3567     node = node_list[0]
3568     _, myself = ssconf.GetMasterAndMyself()
3569     assert node == myself
3570
3571     results = self.RunHooks(hpath, phase, env)
3572
3573     # Return values in the form expected by HooksMaster
3574     return {node: (None, False, results)}
3575
3576   def RunHooks(self, hpath, phase, env):
3577     """Run the scripts in the hooks directory.
3578
3579     @type hpath: str
3580     @param hpath: the path to the hooks directory which
3581         holds the scripts
3582     @type phase: str
3583     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3584         L{constants.HOOKS_PHASE_POST}
3585     @type env: dict
3586     @param env: dictionary with the environment for the hook
3587     @rtype: list
3588     @return: list of 3-element tuples:
3589       - script path
3590       - script result, either L{constants.HKR_SUCCESS} or
3591         L{constants.HKR_FAIL}
3592       - output of the script
3593
3594     @raise errors.ProgrammerError: for invalid input
3595         parameters
3596
3597     """
3598     if phase == constants.HOOKS_PHASE_PRE:
3599       suffix = "pre"
3600     elif phase == constants.HOOKS_PHASE_POST:
3601       suffix = "post"
3602     else:
3603       _Fail("Unknown hooks phase '%s'", phase)
3604
3605     subdir = "%s-%s.d" % (hpath, suffix)
3606     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3607
3608     results = []
3609
3610     if not os.path.isdir(dir_name):
3611       # for non-existing/non-dirs, we simply exit instead of logging a
3612       # warning at every operation
3613       return results
3614
3615     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3616
3617     for (relname, relstatus, runresult) in runparts_results:
3618       if relstatus == constants.RUNPARTS_SKIP:
3619         rrval = constants.HKR_SKIP
3620         output = ""
3621       elif relstatus == constants.RUNPARTS_ERR:
3622         rrval = constants.HKR_FAIL
3623         output = "Hook script execution error: %s" % runresult
3624       elif relstatus == constants.RUNPARTS_RUN:
3625         if runresult.failed:
3626           rrval = constants.HKR_FAIL
3627         else:
3628           rrval = constants.HKR_SUCCESS
3629         output = utils.SafeEncode(runresult.output.strip())
3630       results.append(("%s/%s" % (subdir, relname), rrval, output))
3631
3632     return results
3633
3634
3635 class IAllocatorRunner(object):
3636   """IAllocator runner.
3637
3638   This class is instantiated on the node side (ganeti-noded) and not on
3639   the master side.
3640
3641   """
3642   @staticmethod
3643   def Run(name, idata):
3644     """Run an iallocator script.
3645
3646     @type name: str
3647     @param name: the iallocator script name
3648     @type idata: str
3649     @param idata: the allocator input data
3650
3651     @rtype: tuple
3652     @return: two element tuple of:
3653        - status
3654        - either error message or stdout of allocator (for success)
3655
3656     """
3657     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3658                                   os.path.isfile)
3659     if alloc_script is None:
3660       _Fail("iallocator module '%s' not found in the search path", name)
3661
3662     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3663     try:
3664       os.write(fd, idata)
3665       os.close(fd)
3666       result = utils.RunCmd([alloc_script, fin_name])
3667       if result.failed:
3668         _Fail("iallocator module '%s' failed: %s, output '%s'",
3669               name, result.fail_reason, result.output)
3670     finally:
3671       os.unlink(fin_name)
3672
3673     return result.stdout
3674
3675
3676 class DevCacheManager(object):
3677   """Simple class for managing a cache of block device information.
3678
3679   """
3680   _DEV_PREFIX = "/dev/"
3681   _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3682
3683   @classmethod
3684   def _ConvertPath(cls, dev_path):
3685     """Converts a /dev/name path to the cache file name.
3686
3687     This replaces slashes with underscores and strips the /dev
3688     prefix. It then returns the full path to the cache file.
3689
3690     @type dev_path: str
3691     @param dev_path: the C{/dev/} path name
3692     @rtype: str
3693     @return: the converted path name
3694
3695     """
3696     if dev_path.startswith(cls._DEV_PREFIX):
3697       dev_path = dev_path[len(cls._DEV_PREFIX):]
3698     dev_path = dev_path.replace("/", "_")
3699     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3700     return fpath
3701
3702   @classmethod
3703   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3704     """Updates the cache information for a given device.
3705
3706     @type dev_path: str
3707     @param dev_path: the pathname of the device
3708     @type owner: str
3709     @param owner: the owner (instance name) of the device
3710     @type on_primary: bool
3711     @param on_primary: whether this is the primary
3712         node nor not
3713     @type iv_name: str
3714     @param iv_name: the instance-visible name of the
3715         device, as in objects.Disk.iv_name
3716
3717     @rtype: None
3718
3719     """
3720     if dev_path is None:
3721       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3722       return
3723     fpath = cls._ConvertPath(dev_path)
3724     if on_primary:
3725       state = "primary"
3726     else:
3727       state = "secondary"
3728     if iv_name is None:
3729       iv_name = "not_visible"
3730     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3731     try:
3732       utils.WriteFile(fpath, data=fdata)
3733     except EnvironmentError, err:
3734       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3735
3736   @classmethod
3737   def RemoveCache(cls, dev_path):
3738     """Remove data for a dev_path.
3739
3740     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3741     path name and logging.
3742
3743     @type dev_path: str
3744     @param dev_path: the pathname of the device
3745
3746     @rtype: None
3747
3748     """
3749     if dev_path is None:
3750       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3751       return
3752     fpath = cls._ConvertPath(dev_path)
3753     try:
3754       utils.RemoveFile(fpath)
3755     except EnvironmentError, err:
3756       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)