Remove support for PUT in noded
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
67
68
69 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70 _ALLOWED_CLEAN_DIRS = frozenset([
71   pathutils.DATA_DIR,
72   pathutils.JOB_QUEUE_ARCHIVE_DIR,
73   pathutils.QUEUE_DIR,
74   pathutils.CRYPTO_KEYS_DIR,
75   ])
76 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77 _X509_KEY_FILE = "key"
78 _X509_CERT_FILE = "cert"
79 _IES_STATUS_FILE = "status"
80 _IES_PID_FILE = "pid"
81 _IES_CA_FILE = "ca"
82
83 #: Valid LVS output line regex
84 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
85
86 # Actions for the master setup script
87 _MASTER_START = "start"
88 _MASTER_STOP = "stop"
89
90
91 class RPCFail(Exception):
92   """Class denoting RPC failure.
93
94   Its argument is the error message.
95
96   """
97
98
99 def _Fail(msg, *args, **kwargs):
100   """Log an error and the raise an RPCFail exception.
101
102   This exception is then handled specially in the ganeti daemon and
103   turned into a 'failed' return type. As such, this function is a
104   useful shortcut for logging the error and returning it to the master
105   daemon.
106
107   @type msg: string
108   @param msg: the text of the exception
109   @raise RPCFail
110
111   """
112   if args:
113     msg = msg % args
114   if "log" not in kwargs or kwargs["log"]: # if we should log this error
115     if "exc" in kwargs and kwargs["exc"]:
116       logging.exception(msg)
117     else:
118       logging.error(msg)
119   raise RPCFail(msg)
120
121
122 def _GetConfig():
123   """Simple wrapper to return a SimpleStore.
124
125   @rtype: L{ssconf.SimpleStore}
126   @return: a SimpleStore instance
127
128   """
129   return ssconf.SimpleStore()
130
131
132 def _GetSshRunner(cluster_name):
133   """Simple wrapper to return an SshRunner.
134
135   @type cluster_name: str
136   @param cluster_name: the cluster name, which is needed
137       by the SshRunner constructor
138   @rtype: L{ssh.SshRunner}
139   @return: an SshRunner instance
140
141   """
142   return ssh.SshRunner(cluster_name)
143
144
145 def _Decompress(data):
146   """Unpacks data compressed by the RPC client.
147
148   @type data: list or tuple
149   @param data: Data sent by RPC client
150   @rtype: str
151   @return: Decompressed data
152
153   """
154   assert isinstance(data, (list, tuple))
155   assert len(data) == 2
156   (encoding, content) = data
157   if encoding == constants.RPC_ENCODING_NONE:
158     return content
159   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
160     return zlib.decompress(base64.b64decode(content))
161   else:
162     raise AssertionError("Unknown data encoding")
163
164
165 def _CleanDirectory(path, exclude=None):
166   """Removes all regular files in a directory.
167
168   @type path: str
169   @param path: the directory to clean
170   @type exclude: list
171   @param exclude: list of files to be excluded, defaults
172       to the empty list
173
174   """
175   if path not in _ALLOWED_CLEAN_DIRS:
176     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
177           path)
178
179   if not os.path.isdir(path):
180     return
181   if exclude is None:
182     exclude = []
183   else:
184     # Normalize excluded paths
185     exclude = [os.path.normpath(i) for i in exclude]
186
187   for rel_name in utils.ListVisibleFiles(path):
188     full_name = utils.PathJoin(path, rel_name)
189     if full_name in exclude:
190       continue
191     if os.path.isfile(full_name) and not os.path.islink(full_name):
192       utils.RemoveFile(full_name)
193
194
195 def _BuildUploadFileList():
196   """Build the list of allowed upload files.
197
198   This is abstracted so that it's built only once at module import time.
199
200   """
201   allowed_files = set([
202     pathutils.CLUSTER_CONF_FILE,
203     pathutils.ETC_HOSTS,
204     pathutils.SSH_KNOWN_HOSTS_FILE,
205     pathutils.VNC_PASSWORD_FILE,
206     pathutils.RAPI_CERT_FILE,
207     pathutils.SPICE_CERT_FILE,
208     pathutils.SPICE_CACERT_FILE,
209     pathutils.RAPI_USERS_FILE,
210     pathutils.CONFD_HMAC_KEY,
211     pathutils.CLUSTER_DOMAIN_SECRET_FILE,
212     ])
213
214   for hv_name in constants.HYPER_TYPES:
215     hv_class = hypervisor.GetHypervisorClass(hv_name)
216     allowed_files.update(hv_class.GetAncillaryFiles()[0])
217
218   assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
219     "Allowed file storage paths should never be uploaded via RPC"
220
221   return frozenset(allowed_files)
222
223
224 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
225
226
227 def JobQueuePurge():
228   """Removes job queue files and archived jobs.
229
230   @rtype: tuple
231   @return: True, None
232
233   """
234   _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
235   _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
236
237
238 def GetMasterInfo():
239   """Returns master information.
240
241   This is an utility function to compute master information, either
242   for consumption here or from the node daemon.
243
244   @rtype: tuple
245   @return: master_netdev, master_ip, master_name, primary_ip_family,
246     master_netmask
247   @raise RPCFail: in case of errors
248
249   """
250   try:
251     cfg = _GetConfig()
252     master_netdev = cfg.GetMasterNetdev()
253     master_ip = cfg.GetMasterIP()
254     master_netmask = cfg.GetMasterNetmask()
255     master_node = cfg.GetMasterNode()
256     primary_ip_family = cfg.GetPrimaryIPFamily()
257   except errors.ConfigurationError, err:
258     _Fail("Cluster configuration incomplete: %s", err, exc=True)
259   return (master_netdev, master_ip, master_node, primary_ip_family,
260           master_netmask)
261
262
263 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
264   """Decorator that runs hooks before and after the decorated function.
265
266   @type hook_opcode: string
267   @param hook_opcode: opcode of the hook
268   @type hooks_path: string
269   @param hooks_path: path of the hooks
270   @type env_builder_fn: function
271   @param env_builder_fn: function that returns a dictionary containing the
272     environment variables for the hooks. Will get all the parameters of the
273     decorated function.
274   @raise RPCFail: in case of pre-hook failure
275
276   """
277   def decorator(fn):
278     def wrapper(*args, **kwargs):
279       _, myself = ssconf.GetMasterAndMyself()
280       nodes = ([myself], [myself])  # these hooks run locally
281
282       env_fn = compat.partial(env_builder_fn, *args, **kwargs)
283
284       cfg = _GetConfig()
285       hr = HooksRunner()
286       hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
287                             None, env_fn, logging.warning, cfg.GetClusterName(),
288                             cfg.GetMasterNode())
289
290       hm.RunPhase(constants.HOOKS_PHASE_PRE)
291       result = fn(*args, **kwargs)
292       hm.RunPhase(constants.HOOKS_PHASE_POST)
293
294       return result
295     return wrapper
296   return decorator
297
298
299 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
300   """Builds environment variables for master IP hooks.
301
302   @type master_params: L{objects.MasterNetworkParameters}
303   @param master_params: network parameters of the master
304   @type use_external_mip_script: boolean
305   @param use_external_mip_script: whether to use an external master IP
306     address setup script (unused, but necessary per the implementation of the
307     _RunLocalHooks decorator)
308
309   """
310   # pylint: disable=W0613
311   ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
312   env = {
313     "MASTER_NETDEV": master_params.netdev,
314     "MASTER_IP": master_params.ip,
315     "MASTER_NETMASK": str(master_params.netmask),
316     "CLUSTER_IP_VERSION": str(ver),
317   }
318
319   return env
320
321
322 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
323   """Execute the master IP address setup script.
324
325   @type master_params: L{objects.MasterNetworkParameters}
326   @param master_params: network parameters of the master
327   @type action: string
328   @param action: action to pass to the script. Must be one of
329     L{backend._MASTER_START} or L{backend._MASTER_STOP}
330   @type use_external_mip_script: boolean
331   @param use_external_mip_script: whether to use an external master IP
332     address setup script
333   @raise backend.RPCFail: if there are errors during the execution of the
334     script
335
336   """
337   env = _BuildMasterIpEnv(master_params)
338
339   if use_external_mip_script:
340     setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
341   else:
342     setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
343
344   result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
345
346   if result.failed:
347     _Fail("Failed to %s the master IP. Script return value: %s" %
348           (action, result.exit_code), log=True)
349
350
351 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
352                _BuildMasterIpEnv)
353 def ActivateMasterIp(master_params, use_external_mip_script):
354   """Activate the IP address of the master daemon.
355
356   @type master_params: L{objects.MasterNetworkParameters}
357   @param master_params: network parameters of the master
358   @type use_external_mip_script: boolean
359   @param use_external_mip_script: whether to use an external master IP
360     address setup script
361   @raise RPCFail: in case of errors during the IP startup
362
363   """
364   _RunMasterSetupScript(master_params, _MASTER_START,
365                         use_external_mip_script)
366
367
368 def StartMasterDaemons(no_voting):
369   """Activate local node as master node.
370
371   The function will start the master daemons (ganeti-masterd and ganeti-rapi).
372
373   @type no_voting: boolean
374   @param no_voting: whether to start ganeti-masterd without a node vote
375       but still non-interactively
376   @rtype: None
377
378   """
379
380   if no_voting:
381     masterd_args = "--no-voting --yes-do-it"
382   else:
383     masterd_args = ""
384
385   env = {
386     "EXTRA_MASTERD_ARGS": masterd_args,
387     }
388
389   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
390   if result.failed:
391     msg = "Can't start Ganeti master: %s" % result.output
392     logging.error(msg)
393     _Fail(msg)
394
395
396 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
397                _BuildMasterIpEnv)
398 def DeactivateMasterIp(master_params, use_external_mip_script):
399   """Deactivate the master IP on this node.
400
401   @type master_params: L{objects.MasterNetworkParameters}
402   @param master_params: network parameters of the master
403   @type use_external_mip_script: boolean
404   @param use_external_mip_script: whether to use an external master IP
405     address setup script
406   @raise RPCFail: in case of errors during the IP turndown
407
408   """
409   _RunMasterSetupScript(master_params, _MASTER_STOP,
410                         use_external_mip_script)
411
412
413 def StopMasterDaemons():
414   """Stop the master daemons on this node.
415
416   Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
417
418   @rtype: None
419
420   """
421   # TODO: log and report back to the caller the error failures; we
422   # need to decide in which case we fail the RPC for this
423
424   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
425   if result.failed:
426     logging.error("Could not stop Ganeti master, command %s had exitcode %s"
427                   " and error %s",
428                   result.cmd, result.exit_code, result.output)
429
430
431 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
432   """Change the netmask of the master IP.
433
434   @param old_netmask: the old value of the netmask
435   @param netmask: the new value of the netmask
436   @param master_ip: the master IP
437   @param master_netdev: the master network device
438
439   """
440   if old_netmask == netmask:
441     return
442
443   if not netutils.IPAddress.Own(master_ip):
444     _Fail("The master IP address is not up, not attempting to change its"
445           " netmask")
446
447   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
448                          "%s/%s" % (master_ip, netmask),
449                          "dev", master_netdev, "label",
450                          "%s:0" % master_netdev])
451   if result.failed:
452     _Fail("Could not set the new netmask on the master IP address")
453
454   result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
455                          "%s/%s" % (master_ip, old_netmask),
456                          "dev", master_netdev, "label",
457                          "%s:0" % master_netdev])
458   if result.failed:
459     _Fail("Could not bring down the master IP address with the old netmask")
460
461
462 def EtcHostsModify(mode, host, ip):
463   """Modify a host entry in /etc/hosts.
464
465   @param mode: The mode to operate. Either add or remove entry
466   @param host: The host to operate on
467   @param ip: The ip associated with the entry
468
469   """
470   if mode == constants.ETC_HOSTS_ADD:
471     if not ip:
472       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
473               " present")
474     utils.AddHostToEtcHosts(host, ip)
475   elif mode == constants.ETC_HOSTS_REMOVE:
476     if ip:
477       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
478               " parameter is present")
479     utils.RemoveHostFromEtcHosts(host)
480   else:
481     RPCFail("Mode not supported")
482
483
484 def LeaveCluster(modify_ssh_setup):
485   """Cleans up and remove the current node.
486
487   This function cleans up and prepares the current node to be removed
488   from the cluster.
489
490   If processing is successful, then it raises an
491   L{errors.QuitGanetiException} which is used as a special case to
492   shutdown the node daemon.
493
494   @param modify_ssh_setup: boolean
495
496   """
497   _CleanDirectory(pathutils.DATA_DIR)
498   _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
499   JobQueuePurge()
500
501   if modify_ssh_setup:
502     try:
503       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
504
505       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
506
507       utils.RemoveFile(priv_key)
508       utils.RemoveFile(pub_key)
509     except errors.OpExecError:
510       logging.exception("Error while processing ssh files")
511
512   try:
513     utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
514     utils.RemoveFile(pathutils.RAPI_CERT_FILE)
515     utils.RemoveFile(pathutils.SPICE_CERT_FILE)
516     utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
517     utils.RemoveFile(pathutils.NODED_CERT_FILE)
518   except: # pylint: disable=W0702
519     logging.exception("Error while removing cluster secrets")
520
521   result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
522   if result.failed:
523     logging.error("Command %s failed with exitcode %s and error %s",
524                   result.cmd, result.exit_code, result.output)
525
526   # Raise a custom exception (handled in ganeti-noded)
527   raise errors.QuitGanetiException(True, "Shutdown scheduled")
528
529
530 def _GetVgInfo(name):
531   """Retrieves information about a LVM volume group.
532
533   """
534   # TODO: GetVGInfo supports returning information for multiple VGs at once
535   vginfo = bdev.LogicalVolume.GetVGInfo([name])
536   if vginfo:
537     vg_free = int(round(vginfo[0][0], 0))
538     vg_size = int(round(vginfo[0][1], 0))
539   else:
540     vg_free = None
541     vg_size = None
542
543   return {
544     "name": name,
545     "vg_free": vg_free,
546     "vg_size": vg_size,
547     }
548
549
550 def _GetHvInfo(name):
551   """Retrieves node information from a hypervisor.
552
553   The information returned depends on the hypervisor. Common items:
554
555     - vg_size is the size of the configured volume group in MiB
556     - vg_free is the free size of the volume group in MiB
557     - memory_dom0 is the memory allocated for domain0 in MiB
558     - memory_free is the currently available (free) ram in MiB
559     - memory_total is the total number of ram in MiB
560     - hv_version: the hypervisor version, if available
561
562   """
563   return hypervisor.GetHypervisor(name).GetNodeInfo()
564
565
566 def _GetNamedNodeInfo(names, fn):
567   """Calls C{fn} for all names in C{names} and returns a dictionary.
568
569   @rtype: None or dict
570
571   """
572   if names is None:
573     return None
574   else:
575     return map(fn, names)
576
577
578 def GetNodeInfo(vg_names, hv_names):
579   """Gives back a hash with different information about the node.
580
581   @type vg_names: list of string
582   @param vg_names: Names of the volume groups to ask for disk space information
583   @type hv_names: list of string
584   @param hv_names: Names of the hypervisors to ask for node information
585   @rtype: tuple; (string, None/dict, None/dict)
586   @return: Tuple containing boot ID, volume group information and hypervisor
587     information
588
589   """
590   bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
591   vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
592   hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
593
594   return (bootid, vg_info, hv_info)
595
596
597 def VerifyNode(what, cluster_name):
598   """Verify the status of the local node.
599
600   Based on the input L{what} parameter, various checks are done on the
601   local node.
602
603   If the I{filelist} key is present, this list of
604   files is checksummed and the file/checksum pairs are returned.
605
606   If the I{nodelist} key is present, we check that we have
607   connectivity via ssh with the target nodes (and check the hostname
608   report).
609
610   If the I{node-net-test} key is present, we check that we have
611   connectivity to the given nodes via both primary IP and, if
612   applicable, secondary IPs.
613
614   @type what: C{dict}
615   @param what: a dictionary of things to check:
616       - filelist: list of files for which to compute checksums
617       - nodelist: list of nodes we should check ssh communication with
618       - node-net-test: list of nodes we should check node daemon port
619         connectivity with
620       - hypervisor: list with hypervisors to run the verify for
621   @rtype: dict
622   @return: a dictionary with the same keys as the input dict, and
623       values representing the result of the checks
624
625   """
626   result = {}
627   my_name = netutils.Hostname.GetSysName()
628   port = netutils.GetDaemonPort(constants.NODED)
629   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
630
631   if constants.NV_HYPERVISOR in what and vm_capable:
632     result[constants.NV_HYPERVISOR] = tmp = {}
633     for hv_name in what[constants.NV_HYPERVISOR]:
634       try:
635         val = hypervisor.GetHypervisor(hv_name).Verify()
636       except errors.HypervisorError, err:
637         val = "Error while checking hypervisor: %s" % str(err)
638       tmp[hv_name] = val
639
640   if constants.NV_HVPARAMS in what and vm_capable:
641     result[constants.NV_HVPARAMS] = tmp = []
642     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
643       try:
644         logging.info("Validating hv %s, %s", hv_name, hvparms)
645         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
646       except errors.HypervisorError, err:
647         tmp.append((source, hv_name, str(err)))
648
649   if constants.NV_FILELIST in what:
650     fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
651                                               what[constants.NV_FILELIST]))
652     result[constants.NV_FILELIST] = \
653       dict((vcluster.MakeVirtualPath(key), value)
654            for (key, value) in fingerprints.items())
655
656   if constants.NV_NODELIST in what:
657     (nodes, bynode) = what[constants.NV_NODELIST]
658
659     # Add nodes from other groups (different for each node)
660     try:
661       nodes.extend(bynode[my_name])
662     except KeyError:
663       pass
664
665     # Use a random order
666     random.shuffle(nodes)
667
668     # Try to contact all nodes
669     val = {}
670     for node in nodes:
671       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
672       if not success:
673         val[node] = message
674
675     result[constants.NV_NODELIST] = val
676
677   if constants.NV_NODENETTEST in what:
678     result[constants.NV_NODENETTEST] = tmp = {}
679     my_pip = my_sip = None
680     for name, pip, sip in what[constants.NV_NODENETTEST]:
681       if name == my_name:
682         my_pip = pip
683         my_sip = sip
684         break
685     if not my_pip:
686       tmp[my_name] = ("Can't find my own primary/secondary IP"
687                       " in the node list")
688     else:
689       for name, pip, sip in what[constants.NV_NODENETTEST]:
690         fail = []
691         if not netutils.TcpPing(pip, port, source=my_pip):
692           fail.append("primary")
693         if sip != pip:
694           if not netutils.TcpPing(sip, port, source=my_sip):
695             fail.append("secondary")
696         if fail:
697           tmp[name] = ("failure using the %s interface(s)" %
698                        " and ".join(fail))
699
700   if constants.NV_MASTERIP in what:
701     # FIXME: add checks on incoming data structures (here and in the
702     # rest of the function)
703     master_name, master_ip = what[constants.NV_MASTERIP]
704     if master_name == my_name:
705       source = constants.IP4_ADDRESS_LOCALHOST
706     else:
707       source = None
708     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
709                                                      source=source)
710
711   if constants.NV_USERSCRIPTS in what:
712     result[constants.NV_USERSCRIPTS] = \
713       [script for script in what[constants.NV_USERSCRIPTS]
714        if not (os.path.exists(script) and os.access(script, os.X_OK))]
715
716   if constants.NV_OOB_PATHS in what:
717     result[constants.NV_OOB_PATHS] = tmp = []
718     for path in what[constants.NV_OOB_PATHS]:
719       try:
720         st = os.stat(path)
721       except OSError, err:
722         tmp.append("error stating out of band helper: %s" % err)
723       else:
724         if stat.S_ISREG(st.st_mode):
725           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
726             tmp.append(None)
727           else:
728             tmp.append("out of band helper %s is not executable" % path)
729         else:
730           tmp.append("out of band helper %s is not a file" % path)
731
732   if constants.NV_LVLIST in what and vm_capable:
733     try:
734       val = GetVolumeList(utils.ListVolumeGroups().keys())
735     except RPCFail, err:
736       val = str(err)
737     result[constants.NV_LVLIST] = val
738
739   if constants.NV_INSTANCELIST in what and vm_capable:
740     # GetInstanceList can fail
741     try:
742       val = GetInstanceList(what[constants.NV_INSTANCELIST])
743     except RPCFail, err:
744       val = str(err)
745     result[constants.NV_INSTANCELIST] = val
746
747   if constants.NV_VGLIST in what and vm_capable:
748     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
749
750   if constants.NV_PVLIST in what and vm_capable:
751     result[constants.NV_PVLIST] = \
752       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
753                                    filter_allocatable=False)
754
755   if constants.NV_VERSION in what:
756     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
757                                     constants.RELEASE_VERSION)
758
759   if constants.NV_HVINFO in what and vm_capable:
760     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
761     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
762
763   if constants.NV_DRBDLIST in what and vm_capable:
764     try:
765       used_minors = bdev.DRBD8.GetUsedDevs().keys()
766     except errors.BlockDeviceError, err:
767       logging.warning("Can't get used minors list", exc_info=True)
768       used_minors = str(err)
769     result[constants.NV_DRBDLIST] = used_minors
770
771   if constants.NV_DRBDHELPER in what and vm_capable:
772     status = True
773     try:
774       payload = bdev.BaseDRBD.GetUsermodeHelper()
775     except errors.BlockDeviceError, err:
776       logging.error("Can't get DRBD usermode helper: %s", str(err))
777       status = False
778       payload = str(err)
779     result[constants.NV_DRBDHELPER] = (status, payload)
780
781   if constants.NV_NODESETUP in what:
782     result[constants.NV_NODESETUP] = tmpr = []
783     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
784       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
785                   " under /sys, missing required directories /sys/block"
786                   " and /sys/class/net")
787     if (not os.path.isdir("/proc/sys") or
788         not os.path.isfile("/proc/sysrq-trigger")):
789       tmpr.append("The procfs filesystem doesn't seem to be mounted"
790                   " under /proc, missing required directory /proc/sys and"
791                   " the file /proc/sysrq-trigger")
792
793   if constants.NV_TIME in what:
794     result[constants.NV_TIME] = utils.SplitTime(time.time())
795
796   if constants.NV_OSLIST in what and vm_capable:
797     result[constants.NV_OSLIST] = DiagnoseOS()
798
799   if constants.NV_BRIDGES in what and vm_capable:
800     result[constants.NV_BRIDGES] = [bridge
801                                     for bridge in what[constants.NV_BRIDGES]
802                                     if not utils.BridgeExists(bridge)]
803   return result
804
805
806 def GetBlockDevSizes(devices):
807   """Return the size of the given block devices
808
809   @type devices: list
810   @param devices: list of block device nodes to query
811   @rtype: dict
812   @return:
813     dictionary of all block devices under /dev (key). The value is their
814     size in MiB.
815
816     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
817
818   """
819   DEV_PREFIX = "/dev/"
820   blockdevs = {}
821
822   for devpath in devices:
823     if not utils.IsBelowDir(DEV_PREFIX, devpath):
824       continue
825
826     try:
827       st = os.stat(devpath)
828     except EnvironmentError, err:
829       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
830       continue
831
832     if stat.S_ISBLK(st.st_mode):
833       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
834       if result.failed:
835         # We don't want to fail, just do not list this device as available
836         logging.warning("Cannot get size for block device %s", devpath)
837         continue
838
839       size = int(result.stdout) / (1024 * 1024)
840       blockdevs[devpath] = size
841   return blockdevs
842
843
844 def GetVolumeList(vg_names):
845   """Compute list of logical volumes and their size.
846
847   @type vg_names: list
848   @param vg_names: the volume groups whose LVs we should list, or
849       empty for all volume groups
850   @rtype: dict
851   @return:
852       dictionary of all partions (key) with value being a tuple of
853       their size (in MiB), inactive and online status::
854
855         {'xenvg/test1': ('20.06', True, True)}
856
857       in case of errors, a string is returned with the error
858       details.
859
860   """
861   lvs = {}
862   sep = "|"
863   if not vg_names:
864     vg_names = []
865   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
866                          "--separator=%s" % sep,
867                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
868   if result.failed:
869     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
870
871   for line in result.stdout.splitlines():
872     line = line.strip()
873     match = _LVSLINE_REGEX.match(line)
874     if not match:
875       logging.error("Invalid line returned from lvs output: '%s'", line)
876       continue
877     vg_name, name, size, attr = match.groups()
878     inactive = attr[4] == "-"
879     online = attr[5] == "o"
880     virtual = attr[0] == "v"
881     if virtual:
882       # we don't want to report such volumes as existing, since they
883       # don't really hold data
884       continue
885     lvs[vg_name + "/" + name] = (size, inactive, online)
886
887   return lvs
888
889
890 def ListVolumeGroups():
891   """List the volume groups and their size.
892
893   @rtype: dict
894   @return: dictionary with keys volume name and values the
895       size of the volume
896
897   """
898   return utils.ListVolumeGroups()
899
900
901 def NodeVolumes():
902   """List all volumes on this node.
903
904   @rtype: list
905   @return:
906     A list of dictionaries, each having four keys:
907       - name: the logical volume name,
908       - size: the size of the logical volume
909       - dev: the physical device on which the LV lives
910       - vg: the volume group to which it belongs
911
912     In case of errors, we return an empty list and log the
913     error.
914
915     Note that since a logical volume can live on multiple physical
916     volumes, the resulting list might include a logical volume
917     multiple times.
918
919   """
920   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
921                          "--separator=|",
922                          "--options=lv_name,lv_size,devices,vg_name"])
923   if result.failed:
924     _Fail("Failed to list logical volumes, lvs output: %s",
925           result.output)
926
927   def parse_dev(dev):
928     return dev.split("(")[0]
929
930   def handle_dev(dev):
931     return [parse_dev(x) for x in dev.split(",")]
932
933   def map_line(line):
934     line = [v.strip() for v in line]
935     return [{"name": line[0], "size": line[1],
936              "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
937
938   all_devs = []
939   for line in result.stdout.splitlines():
940     if line.count("|") >= 3:
941       all_devs.extend(map_line(line.split("|")))
942     else:
943       logging.warning("Strange line in the output from lvs: '%s'", line)
944   return all_devs
945
946
947 def BridgesExist(bridges_list):
948   """Check if a list of bridges exist on the current node.
949
950   @rtype: boolean
951   @return: C{True} if all of them exist, C{False} otherwise
952
953   """
954   missing = []
955   for bridge in bridges_list:
956     if not utils.BridgeExists(bridge):
957       missing.append(bridge)
958
959   if missing:
960     _Fail("Missing bridges %s", utils.CommaJoin(missing))
961
962
963 def GetInstanceList(hypervisor_list):
964   """Provides a list of instances.
965
966   @type hypervisor_list: list
967   @param hypervisor_list: the list of hypervisors to query information
968
969   @rtype: list
970   @return: a list of all running instances on the current node
971     - instance1.example.com
972     - instance2.example.com
973
974   """
975   results = []
976   for hname in hypervisor_list:
977     try:
978       names = hypervisor.GetHypervisor(hname).ListInstances()
979       results.extend(names)
980     except errors.HypervisorError, err:
981       _Fail("Error enumerating instances (hypervisor %s): %s",
982             hname, err, exc=True)
983
984   return results
985
986
987 def GetInstanceInfo(instance, hname):
988   """Gives back the information about an instance as a dictionary.
989
990   @type instance: string
991   @param instance: the instance name
992   @type hname: string
993   @param hname: the hypervisor type of the instance
994
995   @rtype: dict
996   @return: dictionary with the following keys:
997       - memory: memory size of instance (int)
998       - state: xen state of instance (string)
999       - time: cpu time of instance (float)
1000       - vcpus: the number of vcpus (int)
1001
1002   """
1003   output = {}
1004
1005   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1006   if iinfo is not None:
1007     output["memory"] = iinfo[2]
1008     output["vcpus"] = iinfo[3]
1009     output["state"] = iinfo[4]
1010     output["time"] = iinfo[5]
1011
1012   return output
1013
1014
1015 def GetInstanceMigratable(instance):
1016   """Gives whether an instance can be migrated.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: object representing the instance to be checked.
1020
1021   @rtype: tuple
1022   @return: tuple of (result, description) where:
1023       - result: whether the instance can be migrated or not
1024       - description: a description of the issue, if relevant
1025
1026   """
1027   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1028   iname = instance.name
1029   if iname not in hyper.ListInstances():
1030     _Fail("Instance %s is not running", iname)
1031
1032   for idx in range(len(instance.disks)):
1033     link_name = _GetBlockDevSymlinkPath(iname, idx)
1034     if not os.path.islink(link_name):
1035       logging.warning("Instance %s is missing symlink %s for disk %d",
1036                       iname, link_name, idx)
1037
1038
1039 def GetAllInstancesInfo(hypervisor_list):
1040   """Gather data about all instances.
1041
1042   This is the equivalent of L{GetInstanceInfo}, except that it
1043   computes data for all instances at once, thus being faster if one
1044   needs data about more than one instance.
1045
1046   @type hypervisor_list: list
1047   @param hypervisor_list: list of hypervisors to query for instance data
1048
1049   @rtype: dict
1050   @return: dictionary of instance: data, with data having the following keys:
1051       - memory: memory size of instance (int)
1052       - state: xen state of instance (string)
1053       - time: cpu time of instance (float)
1054       - vcpus: the number of vcpus
1055
1056   """
1057   output = {}
1058
1059   for hname in hypervisor_list:
1060     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1061     if iinfo:
1062       for name, _, memory, vcpus, state, times in iinfo:
1063         value = {
1064           "memory": memory,
1065           "vcpus": vcpus,
1066           "state": state,
1067           "time": times,
1068           }
1069         if name in output:
1070           # we only check static parameters, like memory and vcpus,
1071           # and not state and time which can change between the
1072           # invocations of the different hypervisors
1073           for key in "memory", "vcpus":
1074             if value[key] != output[name][key]:
1075               _Fail("Instance %s is running twice"
1076                     " with different parameters", name)
1077         output[name] = value
1078
1079   return output
1080
1081
1082 def _InstanceLogName(kind, os_name, instance, component):
1083   """Compute the OS log filename for a given instance and operation.
1084
1085   The instance name and os name are passed in as strings since not all
1086   operations have these as part of an instance object.
1087
1088   @type kind: string
1089   @param kind: the operation type (e.g. add, import, etc.)
1090   @type os_name: string
1091   @param os_name: the os name
1092   @type instance: string
1093   @param instance: the name of the instance being imported/added/etc.
1094   @type component: string or None
1095   @param component: the name of the component of the instance being
1096       transferred
1097
1098   """
1099   # TODO: Use tempfile.mkstemp to create unique filename
1100   if component:
1101     assert "/" not in component
1102     c_msg = "-%s" % component
1103   else:
1104     c_msg = ""
1105   base = ("%s-%s-%s%s-%s.log" %
1106           (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1107   return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1108
1109
1110 def InstanceOsAdd(instance, reinstall, debug):
1111   """Add an OS to an instance.
1112
1113   @type instance: L{objects.Instance}
1114   @param instance: Instance whose OS is to be installed
1115   @type reinstall: boolean
1116   @param reinstall: whether this is an instance reinstall
1117   @type debug: integer
1118   @param debug: debug level, passed to the OS scripts
1119   @rtype: None
1120
1121   """
1122   inst_os = OSFromDisk(instance.os)
1123
1124   create_env = OSEnvironment(instance, inst_os, debug)
1125   if reinstall:
1126     create_env["INSTANCE_REINSTALL"] = "1"
1127
1128   logfile = _InstanceLogName("add", instance.os, instance.name, None)
1129
1130   result = utils.RunCmd([inst_os.create_script], env=create_env,
1131                         cwd=inst_os.path, output=logfile, reset_env=True)
1132   if result.failed:
1133     logging.error("os create command '%s' returned error: %s, logfile: %s,"
1134                   " output: %s", result.cmd, result.fail_reason, logfile,
1135                   result.output)
1136     lines = [utils.SafeEncode(val)
1137              for val in utils.TailFile(logfile, lines=20)]
1138     _Fail("OS create script failed (%s), last lines in the"
1139           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1140
1141
1142 def RunRenameInstance(instance, old_name, debug):
1143   """Run the OS rename script for an instance.
1144
1145   @type instance: L{objects.Instance}
1146   @param instance: Instance whose OS is to be installed
1147   @type old_name: string
1148   @param old_name: previous instance name
1149   @type debug: integer
1150   @param debug: debug level, passed to the OS scripts
1151   @rtype: boolean
1152   @return: the success of the operation
1153
1154   """
1155   inst_os = OSFromDisk(instance.os)
1156
1157   rename_env = OSEnvironment(instance, inst_os, debug)
1158   rename_env["OLD_INSTANCE_NAME"] = old_name
1159
1160   logfile = _InstanceLogName("rename", instance.os,
1161                              "%s-%s" % (old_name, instance.name), None)
1162
1163   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1164                         cwd=inst_os.path, output=logfile, reset_env=True)
1165
1166   if result.failed:
1167     logging.error("os create command '%s' returned error: %s output: %s",
1168                   result.cmd, result.fail_reason, result.output)
1169     lines = [utils.SafeEncode(val)
1170              for val in utils.TailFile(logfile, lines=20)]
1171     _Fail("OS rename script failed (%s), last lines in the"
1172           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1173
1174
1175 def _GetBlockDevSymlinkPath(instance_name, idx):
1176   return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1177                         (instance_name, constants.DISK_SEPARATOR, idx))
1178
1179
1180 def _SymlinkBlockDev(instance_name, device_path, idx):
1181   """Set up symlinks to a instance's block device.
1182
1183   This is an auxiliary function run when an instance is start (on the primary
1184   node) or when an instance is migrated (on the target node).
1185
1186
1187   @param instance_name: the name of the target instance
1188   @param device_path: path of the physical block device, on the node
1189   @param idx: the disk index
1190   @return: absolute path to the disk's symlink
1191
1192   """
1193   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1194   try:
1195     os.symlink(device_path, link_name)
1196   except OSError, err:
1197     if err.errno == errno.EEXIST:
1198       if (not os.path.islink(link_name) or
1199           os.readlink(link_name) != device_path):
1200         os.remove(link_name)
1201         os.symlink(device_path, link_name)
1202     else:
1203       raise
1204
1205   return link_name
1206
1207
1208 def _RemoveBlockDevLinks(instance_name, disks):
1209   """Remove the block device symlinks belonging to the given instance.
1210
1211   """
1212   for idx, _ in enumerate(disks):
1213     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1214     if os.path.islink(link_name):
1215       try:
1216         os.remove(link_name)
1217       except OSError:
1218         logging.exception("Can't remove symlink '%s'", link_name)
1219
1220
1221 def _GatherAndLinkBlockDevs(instance):
1222   """Set up an instance's block device(s).
1223
1224   This is run on the primary node at instance startup. The block
1225   devices must be already assembled.
1226
1227   @type instance: L{objects.Instance}
1228   @param instance: the instance whose disks we shoul assemble
1229   @rtype: list
1230   @return: list of (disk_object, device_path)
1231
1232   """
1233   block_devices = []
1234   for idx, disk in enumerate(instance.disks):
1235     device = _RecursiveFindBD(disk)
1236     if device is None:
1237       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1238                                     str(disk))
1239     device.Open()
1240     try:
1241       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1242     except OSError, e:
1243       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1244                                     e.strerror)
1245
1246     block_devices.append((disk, link_name))
1247
1248   return block_devices
1249
1250
1251 def StartInstance(instance, startup_paused):
1252   """Start an instance.
1253
1254   @type instance: L{objects.Instance}
1255   @param instance: the instance object
1256   @type startup_paused: bool
1257   @param instance: pause instance at startup?
1258   @rtype: None
1259
1260   """
1261   running_instances = GetInstanceList([instance.hypervisor])
1262
1263   if instance.name in running_instances:
1264     logging.info("Instance %s already running, not starting", instance.name)
1265     return
1266
1267   try:
1268     block_devices = _GatherAndLinkBlockDevs(instance)
1269     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1270     hyper.StartInstance(instance, block_devices, startup_paused)
1271   except errors.BlockDeviceError, err:
1272     _Fail("Block device error: %s", err, exc=True)
1273   except errors.HypervisorError, err:
1274     _RemoveBlockDevLinks(instance.name, instance.disks)
1275     _Fail("Hypervisor error: %s", err, exc=True)
1276
1277
1278 def InstanceShutdown(instance, timeout):
1279   """Shut an instance down.
1280
1281   @note: this functions uses polling with a hardcoded timeout.
1282
1283   @type instance: L{objects.Instance}
1284   @param instance: the instance object
1285   @type timeout: integer
1286   @param timeout: maximum timeout for soft shutdown
1287   @rtype: None
1288
1289   """
1290   hv_name = instance.hypervisor
1291   hyper = hypervisor.GetHypervisor(hv_name)
1292   iname = instance.name
1293
1294   if instance.name not in hyper.ListInstances():
1295     logging.info("Instance %s not running, doing nothing", iname)
1296     return
1297
1298   class _TryShutdown:
1299     def __init__(self):
1300       self.tried_once = False
1301
1302     def __call__(self):
1303       if iname not in hyper.ListInstances():
1304         return
1305
1306       try:
1307         hyper.StopInstance(instance, retry=self.tried_once)
1308       except errors.HypervisorError, err:
1309         if iname not in hyper.ListInstances():
1310           # if the instance is no longer existing, consider this a
1311           # success and go to cleanup
1312           return
1313
1314         _Fail("Failed to stop instance %s: %s", iname, err)
1315
1316       self.tried_once = True
1317
1318       raise utils.RetryAgain()
1319
1320   try:
1321     utils.Retry(_TryShutdown(), 5, timeout)
1322   except utils.RetryTimeout:
1323     # the shutdown did not succeed
1324     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1325
1326     try:
1327       hyper.StopInstance(instance, force=True)
1328     except errors.HypervisorError, err:
1329       if iname in hyper.ListInstances():
1330         # only raise an error if the instance still exists, otherwise
1331         # the error could simply be "instance ... unknown"!
1332         _Fail("Failed to force stop instance %s: %s", iname, err)
1333
1334     time.sleep(1)
1335
1336     if iname in hyper.ListInstances():
1337       _Fail("Could not shutdown instance %s even by destroy", iname)
1338
1339   try:
1340     hyper.CleanupInstance(instance.name)
1341   except errors.HypervisorError, err:
1342     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1343
1344   _RemoveBlockDevLinks(iname, instance.disks)
1345
1346
1347 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1348   """Reboot an instance.
1349
1350   @type instance: L{objects.Instance}
1351   @param instance: the instance object to reboot
1352   @type reboot_type: str
1353   @param reboot_type: the type of reboot, one the following
1354     constants:
1355       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1356         instance OS, do not recreate the VM
1357       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1358         restart the VM (at the hypervisor level)
1359       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1360         not accepted here, since that mode is handled differently, in
1361         cmdlib, and translates into full stop and start of the
1362         instance (instead of a call_instance_reboot RPC)
1363   @type shutdown_timeout: integer
1364   @param shutdown_timeout: maximum timeout for soft shutdown
1365   @rtype: None
1366
1367   """
1368   running_instances = GetInstanceList([instance.hypervisor])
1369
1370   if instance.name not in running_instances:
1371     _Fail("Cannot reboot instance %s that is not running", instance.name)
1372
1373   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1374   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1375     try:
1376       hyper.RebootInstance(instance)
1377     except errors.HypervisorError, err:
1378       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1379   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1380     try:
1381       InstanceShutdown(instance, shutdown_timeout)
1382       return StartInstance(instance, False)
1383     except errors.HypervisorError, err:
1384       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1385   else:
1386     _Fail("Invalid reboot_type received: %s", reboot_type)
1387
1388
1389 def InstanceBalloonMemory(instance, memory):
1390   """Resize an instance's memory.
1391
1392   @type instance: L{objects.Instance}
1393   @param instance: the instance object
1394   @type memory: int
1395   @param memory: new memory amount in MB
1396   @rtype: None
1397
1398   """
1399   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1400   running = hyper.ListInstances()
1401   if instance.name not in running:
1402     logging.info("Instance %s is not running, cannot balloon", instance.name)
1403     return
1404   try:
1405     hyper.BalloonInstanceMemory(instance, memory)
1406   except errors.HypervisorError, err:
1407     _Fail("Failed to balloon instance memory: %s", err, exc=True)
1408
1409
1410 def MigrationInfo(instance):
1411   """Gather information about an instance to be migrated.
1412
1413   @type instance: L{objects.Instance}
1414   @param instance: the instance definition
1415
1416   """
1417   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1418   try:
1419     info = hyper.MigrationInfo(instance)
1420   except errors.HypervisorError, err:
1421     _Fail("Failed to fetch migration information: %s", err, exc=True)
1422   return info
1423
1424
1425 def AcceptInstance(instance, info, target):
1426   """Prepare the node to accept an instance.
1427
1428   @type instance: L{objects.Instance}
1429   @param instance: the instance definition
1430   @type info: string/data (opaque)
1431   @param info: migration information, from the source node
1432   @type target: string
1433   @param target: target host (usually ip), on this node
1434
1435   """
1436   # TODO: why is this required only for DTS_EXT_MIRROR?
1437   if instance.disk_template in constants.DTS_EXT_MIRROR:
1438     # Create the symlinks, as the disks are not active
1439     # in any way
1440     try:
1441       _GatherAndLinkBlockDevs(instance)
1442     except errors.BlockDeviceError, err:
1443       _Fail("Block device error: %s", err, exc=True)
1444
1445   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1446   try:
1447     hyper.AcceptInstance(instance, info, target)
1448   except errors.HypervisorError, err:
1449     if instance.disk_template in constants.DTS_EXT_MIRROR:
1450       _RemoveBlockDevLinks(instance.name, instance.disks)
1451     _Fail("Failed to accept instance: %s", err, exc=True)
1452
1453
1454 def FinalizeMigrationDst(instance, info, success):
1455   """Finalize any preparation to accept an instance.
1456
1457   @type instance: L{objects.Instance}
1458   @param instance: the instance definition
1459   @type info: string/data (opaque)
1460   @param info: migration information, from the source node
1461   @type success: boolean
1462   @param success: whether the migration was a success or a failure
1463
1464   """
1465   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1466   try:
1467     hyper.FinalizeMigrationDst(instance, info, success)
1468   except errors.HypervisorError, err:
1469     _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1470
1471
1472 def MigrateInstance(instance, target, live):
1473   """Migrates an instance to another node.
1474
1475   @type instance: L{objects.Instance}
1476   @param instance: the instance definition
1477   @type target: string
1478   @param target: the target node name
1479   @type live: boolean
1480   @param live: whether the migration should be done live or not (the
1481       interpretation of this parameter is left to the hypervisor)
1482   @raise RPCFail: if migration fails for some reason
1483
1484   """
1485   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1486
1487   try:
1488     hyper.MigrateInstance(instance, target, live)
1489   except errors.HypervisorError, err:
1490     _Fail("Failed to migrate instance: %s", err, exc=True)
1491
1492
1493 def FinalizeMigrationSource(instance, success, live):
1494   """Finalize the instance migration on the source node.
1495
1496   @type instance: L{objects.Instance}
1497   @param instance: the instance definition of the migrated instance
1498   @type success: bool
1499   @param success: whether the migration succeeded or not
1500   @type live: bool
1501   @param live: whether the user requested a live migration or not
1502   @raise RPCFail: If the execution fails for some reason
1503
1504   """
1505   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1506
1507   try:
1508     hyper.FinalizeMigrationSource(instance, success, live)
1509   except Exception, err:  # pylint: disable=W0703
1510     _Fail("Failed to finalize the migration on the source node: %s", err,
1511           exc=True)
1512
1513
1514 def GetMigrationStatus(instance):
1515   """Get the migration status
1516
1517   @type instance: L{objects.Instance}
1518   @param instance: the instance that is being migrated
1519   @rtype: L{objects.MigrationStatus}
1520   @return: the status of the current migration (one of
1521            L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1522            progress info that can be retrieved from the hypervisor
1523   @raise RPCFail: If the migration status cannot be retrieved
1524
1525   """
1526   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1527   try:
1528     return hyper.GetMigrationStatus(instance)
1529   except Exception, err:  # pylint: disable=W0703
1530     _Fail("Failed to get migration status: %s", err, exc=True)
1531
1532
1533 def BlockdevCreate(disk, size, owner, on_primary, info):
1534   """Creates a block device for an instance.
1535
1536   @type disk: L{objects.Disk}
1537   @param disk: the object describing the disk we should create
1538   @type size: int
1539   @param size: the size of the physical underlying device, in MiB
1540   @type owner: str
1541   @param owner: the name of the instance for which disk is created,
1542       used for device cache data
1543   @type on_primary: boolean
1544   @param on_primary:  indicates if it is the primary node or not
1545   @type info: string
1546   @param info: string that will be sent to the physical device
1547       creation, used for example to set (LVM) tags on LVs
1548
1549   @return: the new unique_id of the device (this can sometime be
1550       computed only after creation), or None. On secondary nodes,
1551       it's not required to return anything.
1552
1553   """
1554   # TODO: remove the obsolete "size" argument
1555   # pylint: disable=W0613
1556   clist = []
1557   if disk.children:
1558     for child in disk.children:
1559       try:
1560         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1561       except errors.BlockDeviceError, err:
1562         _Fail("Can't assemble device %s: %s", child, err)
1563       if on_primary or disk.AssembleOnSecondary():
1564         # we need the children open in case the device itself has to
1565         # be assembled
1566         try:
1567           # pylint: disable=E1103
1568           crdev.Open()
1569         except errors.BlockDeviceError, err:
1570           _Fail("Can't make child '%s' read-write: %s", child, err)
1571       clist.append(crdev)
1572
1573   try:
1574     device = bdev.Create(disk, clist)
1575   except errors.BlockDeviceError, err:
1576     _Fail("Can't create block device: %s", err)
1577
1578   if on_primary or disk.AssembleOnSecondary():
1579     try:
1580       device.Assemble()
1581     except errors.BlockDeviceError, err:
1582       _Fail("Can't assemble device after creation, unusual event: %s", err)
1583     if on_primary or disk.OpenOnSecondary():
1584       try:
1585         device.Open(force=True)
1586       except errors.BlockDeviceError, err:
1587         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1588     DevCacheManager.UpdateCache(device.dev_path, owner,
1589                                 on_primary, disk.iv_name)
1590
1591   device.SetInfo(info)
1592
1593   return device.unique_id
1594
1595
1596 def _WipeDevice(path, offset, size):
1597   """This function actually wipes the device.
1598
1599   @param path: The path to the device to wipe
1600   @param offset: The offset in MiB in the file
1601   @param size: The size in MiB to write
1602
1603   """
1604   # Internal sizes are always in Mebibytes; if the following "dd" command
1605   # should use a different block size the offset and size given to this
1606   # function must be adjusted accordingly before being passed to "dd".
1607   block_size = 1024 * 1024
1608
1609   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1610          "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1611          "count=%d" % size]
1612   result = utils.RunCmd(cmd)
1613
1614   if result.failed:
1615     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1616           result.fail_reason, result.output)
1617
1618
1619 def BlockdevWipe(disk, offset, size):
1620   """Wipes a block device.
1621
1622   @type disk: L{objects.Disk}
1623   @param disk: the disk object we want to wipe
1624   @type offset: int
1625   @param offset: The offset in MiB in the file
1626   @type size: int
1627   @param size: The size in MiB to write
1628
1629   """
1630   try:
1631     rdev = _RecursiveFindBD(disk)
1632   except errors.BlockDeviceError:
1633     rdev = None
1634
1635   if not rdev:
1636     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1637
1638   # Do cross verify some of the parameters
1639   if offset < 0:
1640     _Fail("Negative offset")
1641   if size < 0:
1642     _Fail("Negative size")
1643   if offset > rdev.size:
1644     _Fail("Offset is bigger than device size")
1645   if (offset + size) > rdev.size:
1646     _Fail("The provided offset and size to wipe is bigger than device size")
1647
1648   _WipeDevice(rdev.dev_path, offset, size)
1649
1650
1651 def BlockdevPauseResumeSync(disks, pause):
1652   """Pause or resume the sync of the block device.
1653
1654   @type disks: list of L{objects.Disk}
1655   @param disks: the disks object we want to pause/resume
1656   @type pause: bool
1657   @param pause: Wheater to pause or resume
1658
1659   """
1660   success = []
1661   for disk in disks:
1662     try:
1663       rdev = _RecursiveFindBD(disk)
1664     except errors.BlockDeviceError:
1665       rdev = None
1666
1667     if not rdev:
1668       success.append((False, ("Cannot change sync for device %s:"
1669                               " device not found" % disk.iv_name)))
1670       continue
1671
1672     result = rdev.PauseResumeSync(pause)
1673
1674     if result:
1675       success.append((result, None))
1676     else:
1677       if pause:
1678         msg = "Pause"
1679       else:
1680         msg = "Resume"
1681       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1682
1683   return success
1684
1685
1686 def BlockdevRemove(disk):
1687   """Remove a block device.
1688
1689   @note: This is intended to be called recursively.
1690
1691   @type disk: L{objects.Disk}
1692   @param disk: the disk object we should remove
1693   @rtype: boolean
1694   @return: the success of the operation
1695
1696   """
1697   msgs = []
1698   try:
1699     rdev = _RecursiveFindBD(disk)
1700   except errors.BlockDeviceError, err:
1701     # probably can't attach
1702     logging.info("Can't attach to device %s in remove", disk)
1703     rdev = None
1704   if rdev is not None:
1705     r_path = rdev.dev_path
1706     try:
1707       rdev.Remove()
1708     except errors.BlockDeviceError, err:
1709       msgs.append(str(err))
1710     if not msgs:
1711       DevCacheManager.RemoveCache(r_path)
1712
1713   if disk.children:
1714     for child in disk.children:
1715       try:
1716         BlockdevRemove(child)
1717       except RPCFail, err:
1718         msgs.append(str(err))
1719
1720   if msgs:
1721     _Fail("; ".join(msgs))
1722
1723
1724 def _RecursiveAssembleBD(disk, owner, as_primary):
1725   """Activate a block device for an instance.
1726
1727   This is run on the primary and secondary nodes for an instance.
1728
1729   @note: this function is called recursively.
1730
1731   @type disk: L{objects.Disk}
1732   @param disk: the disk we try to assemble
1733   @type owner: str
1734   @param owner: the name of the instance which owns the disk
1735   @type as_primary: boolean
1736   @param as_primary: if we should make the block device
1737       read/write
1738
1739   @return: the assembled device or None (in case no device
1740       was assembled)
1741   @raise errors.BlockDeviceError: in case there is an error
1742       during the activation of the children or the device
1743       itself
1744
1745   """
1746   children = []
1747   if disk.children:
1748     mcn = disk.ChildrenNeeded()
1749     if mcn == -1:
1750       mcn = 0 # max number of Nones allowed
1751     else:
1752       mcn = len(disk.children) - mcn # max number of Nones
1753     for chld_disk in disk.children:
1754       try:
1755         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1756       except errors.BlockDeviceError, err:
1757         if children.count(None) >= mcn:
1758           raise
1759         cdev = None
1760         logging.error("Error in child activation (but continuing): %s",
1761                       str(err))
1762       children.append(cdev)
1763
1764   if as_primary or disk.AssembleOnSecondary():
1765     r_dev = bdev.Assemble(disk, children)
1766     result = r_dev
1767     if as_primary or disk.OpenOnSecondary():
1768       r_dev.Open()
1769     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1770                                 as_primary, disk.iv_name)
1771
1772   else:
1773     result = True
1774   return result
1775
1776
1777 def BlockdevAssemble(disk, owner, as_primary, idx):
1778   """Activate a block device for an instance.
1779
1780   This is a wrapper over _RecursiveAssembleBD.
1781
1782   @rtype: str or boolean
1783   @return: a C{/dev/...} path for primary nodes, and
1784       C{True} for secondary nodes
1785
1786   """
1787   try:
1788     result = _RecursiveAssembleBD(disk, owner, as_primary)
1789     if isinstance(result, bdev.BlockDev):
1790       # pylint: disable=E1103
1791       result = result.dev_path
1792       if as_primary:
1793         _SymlinkBlockDev(owner, result, idx)
1794   except errors.BlockDeviceError, err:
1795     _Fail("Error while assembling disk: %s", err, exc=True)
1796   except OSError, err:
1797     _Fail("Error while symlinking disk: %s", err, exc=True)
1798
1799   return result
1800
1801
1802 def BlockdevShutdown(disk):
1803   """Shut down a block device.
1804
1805   First, if the device is assembled (Attach() is successful), then
1806   the device is shutdown. Then the children of the device are
1807   shutdown.
1808
1809   This function is called recursively. Note that we don't cache the
1810   children or such, as oppossed to assemble, shutdown of different
1811   devices doesn't require that the upper device was active.
1812
1813   @type disk: L{objects.Disk}
1814   @param disk: the description of the disk we should
1815       shutdown
1816   @rtype: None
1817
1818   """
1819   msgs = []
1820   r_dev = _RecursiveFindBD(disk)
1821   if r_dev is not None:
1822     r_path = r_dev.dev_path
1823     try:
1824       r_dev.Shutdown()
1825       DevCacheManager.RemoveCache(r_path)
1826     except errors.BlockDeviceError, err:
1827       msgs.append(str(err))
1828
1829   if disk.children:
1830     for child in disk.children:
1831       try:
1832         BlockdevShutdown(child)
1833       except RPCFail, err:
1834         msgs.append(str(err))
1835
1836   if msgs:
1837     _Fail("; ".join(msgs))
1838
1839
1840 def BlockdevAddchildren(parent_cdev, new_cdevs):
1841   """Extend a mirrored block device.
1842
1843   @type parent_cdev: L{objects.Disk}
1844   @param parent_cdev: the disk to which we should add children
1845   @type new_cdevs: list of L{objects.Disk}
1846   @param new_cdevs: the list of children which we should add
1847   @rtype: None
1848
1849   """
1850   parent_bdev = _RecursiveFindBD(parent_cdev)
1851   if parent_bdev is None:
1852     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1853   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1854   if new_bdevs.count(None) > 0:
1855     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1856   parent_bdev.AddChildren(new_bdevs)
1857
1858
1859 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1860   """Shrink a mirrored block device.
1861
1862   @type parent_cdev: L{objects.Disk}
1863   @param parent_cdev: the disk from which we should remove children
1864   @type new_cdevs: list of L{objects.Disk}
1865   @param new_cdevs: the list of children which we should remove
1866   @rtype: None
1867
1868   """
1869   parent_bdev = _RecursiveFindBD(parent_cdev)
1870   if parent_bdev is None:
1871     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1872   devs = []
1873   for disk in new_cdevs:
1874     rpath = disk.StaticDevPath()
1875     if rpath is None:
1876       bd = _RecursiveFindBD(disk)
1877       if bd is None:
1878         _Fail("Can't find device %s while removing children", disk)
1879       else:
1880         devs.append(bd.dev_path)
1881     else:
1882       if not utils.IsNormAbsPath(rpath):
1883         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1884       devs.append(rpath)
1885   parent_bdev.RemoveChildren(devs)
1886
1887
1888 def BlockdevGetmirrorstatus(disks):
1889   """Get the mirroring status of a list of devices.
1890
1891   @type disks: list of L{objects.Disk}
1892   @param disks: the list of disks which we should query
1893   @rtype: disk
1894   @return: List of L{objects.BlockDevStatus}, one for each disk
1895   @raise errors.BlockDeviceError: if any of the disks cannot be
1896       found
1897
1898   """
1899   stats = []
1900   for dsk in disks:
1901     rbd = _RecursiveFindBD(dsk)
1902     if rbd is None:
1903       _Fail("Can't find device %s", dsk)
1904
1905     stats.append(rbd.CombinedSyncStatus())
1906
1907   return stats
1908
1909
1910 def BlockdevGetmirrorstatusMulti(disks):
1911   """Get the mirroring status of a list of devices.
1912
1913   @type disks: list of L{objects.Disk}
1914   @param disks: the list of disks which we should query
1915   @rtype: disk
1916   @return: List of tuples, (bool, status), one for each disk; bool denotes
1917     success/failure, status is L{objects.BlockDevStatus} on success, string
1918     otherwise
1919
1920   """
1921   result = []
1922   for disk in disks:
1923     try:
1924       rbd = _RecursiveFindBD(disk)
1925       if rbd is None:
1926         result.append((False, "Can't find device %s" % disk))
1927         continue
1928
1929       status = rbd.CombinedSyncStatus()
1930     except errors.BlockDeviceError, err:
1931       logging.exception("Error while getting disk status")
1932       result.append((False, str(err)))
1933     else:
1934       result.append((True, status))
1935
1936   assert len(disks) == len(result)
1937
1938   return result
1939
1940
1941 def _RecursiveFindBD(disk):
1942   """Check if a device is activated.
1943
1944   If so, return information about the real device.
1945
1946   @type disk: L{objects.Disk}
1947   @param disk: the disk object we need to find
1948
1949   @return: None if the device can't be found,
1950       otherwise the device instance
1951
1952   """
1953   children = []
1954   if disk.children:
1955     for chdisk in disk.children:
1956       children.append(_RecursiveFindBD(chdisk))
1957
1958   return bdev.FindDevice(disk, children)
1959
1960
1961 def _OpenRealBD(disk):
1962   """Opens the underlying block device of a disk.
1963
1964   @type disk: L{objects.Disk}
1965   @param disk: the disk object we want to open
1966
1967   """
1968   real_disk = _RecursiveFindBD(disk)
1969   if real_disk is None:
1970     _Fail("Block device '%s' is not set up", disk)
1971
1972   real_disk.Open()
1973
1974   return real_disk
1975
1976
1977 def BlockdevFind(disk):
1978   """Check if a device is activated.
1979
1980   If it is, return information about the real device.
1981
1982   @type disk: L{objects.Disk}
1983   @param disk: the disk to find
1984   @rtype: None or objects.BlockDevStatus
1985   @return: None if the disk cannot be found, otherwise a the current
1986            information
1987
1988   """
1989   try:
1990     rbd = _RecursiveFindBD(disk)
1991   except errors.BlockDeviceError, err:
1992     _Fail("Failed to find device: %s", err, exc=True)
1993
1994   if rbd is None:
1995     return None
1996
1997   return rbd.GetSyncStatus()
1998
1999
2000 def BlockdevGetsize(disks):
2001   """Computes the size of the given disks.
2002
2003   If a disk is not found, returns None instead.
2004
2005   @type disks: list of L{objects.Disk}
2006   @param disks: the list of disk to compute the size for
2007   @rtype: list
2008   @return: list with elements None if the disk cannot be found,
2009       otherwise the size
2010
2011   """
2012   result = []
2013   for cf in disks:
2014     try:
2015       rbd = _RecursiveFindBD(cf)
2016     except errors.BlockDeviceError:
2017       result.append(None)
2018       continue
2019     if rbd is None:
2020       result.append(None)
2021     else:
2022       result.append(rbd.GetActualSize())
2023   return result
2024
2025
2026 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2027   """Export a block device to a remote node.
2028
2029   @type disk: L{objects.Disk}
2030   @param disk: the description of the disk to export
2031   @type dest_node: str
2032   @param dest_node: the destination node to export to
2033   @type dest_path: str
2034   @param dest_path: the destination path on the target node
2035   @type cluster_name: str
2036   @param cluster_name: the cluster name, needed for SSH hostalias
2037   @rtype: None
2038
2039   """
2040   real_disk = _OpenRealBD(disk)
2041
2042   # the block size on the read dd is 1MiB to match our units
2043   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2044                                "dd if=%s bs=1048576 count=%s",
2045                                real_disk.dev_path, str(disk.size))
2046
2047   # we set here a smaller block size as, due to ssh buffering, more
2048   # than 64-128k will mostly ignored; we use nocreat to fail if the
2049   # device is not already there or we pass a wrong path; we use
2050   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2051   # to not buffer too much memory; this means that at best, we flush
2052   # every 64k, which will not be very fast
2053   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2054                                 " oflag=dsync", dest_path)
2055
2056   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2057                                                    constants.SSH_LOGIN_USER,
2058                                                    destcmd)
2059
2060   # all commands have been checked, so we're safe to combine them
2061   command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2062
2063   result = utils.RunCmd(["bash", "-c", command])
2064
2065   if result.failed:
2066     _Fail("Disk copy command '%s' returned error: %s"
2067           " output: %s", command, result.fail_reason, result.output)
2068
2069
2070 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2071   """Write a file to the filesystem.
2072
2073   This allows the master to overwrite(!) a file. It will only perform
2074   the operation if the file belongs to a list of configuration files.
2075
2076   @type file_name: str
2077   @param file_name: the target file name
2078   @type data: str
2079   @param data: the new contents of the file
2080   @type mode: int
2081   @param mode: the mode to give the file (can be None)
2082   @type uid: string
2083   @param uid: the owner of the file
2084   @type gid: string
2085   @param gid: the group of the file
2086   @type atime: float
2087   @param atime: the atime to set on the file (can be None)
2088   @type mtime: float
2089   @param mtime: the mtime to set on the file (can be None)
2090   @rtype: None
2091
2092   """
2093   file_name = vcluster.LocalizeVirtualPath(file_name)
2094
2095   if not os.path.isabs(file_name):
2096     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2097
2098   if file_name not in _ALLOWED_UPLOAD_FILES:
2099     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2100           file_name)
2101
2102   raw_data = _Decompress(data)
2103
2104   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2105     _Fail("Invalid username/groupname type")
2106
2107   getents = runtime.GetEnts()
2108   uid = getents.LookupUser(uid)
2109   gid = getents.LookupGroup(gid)
2110
2111   utils.SafeWriteFile(file_name, None,
2112                       data=raw_data, mode=mode, uid=uid, gid=gid,
2113                       atime=atime, mtime=mtime)
2114
2115
2116 def RunOob(oob_program, command, node, timeout):
2117   """Executes oob_program with given command on given node.
2118
2119   @param oob_program: The path to the executable oob_program
2120   @param command: The command to invoke on oob_program
2121   @param node: The node given as an argument to the program
2122   @param timeout: Timeout after which we kill the oob program
2123
2124   @return: stdout
2125   @raise RPCFail: If execution fails for some reason
2126
2127   """
2128   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2129
2130   if result.failed:
2131     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2132           result.fail_reason, result.output)
2133
2134   return result.stdout
2135
2136
2137 def _OSOndiskAPIVersion(os_dir):
2138   """Compute and return the API version of a given OS.
2139
2140   This function will try to read the API version of the OS residing in
2141   the 'os_dir' directory.
2142
2143   @type os_dir: str
2144   @param os_dir: the directory in which we should look for the OS
2145   @rtype: tuple
2146   @return: tuple (status, data) with status denoting the validity and
2147       data holding either the vaid versions or an error message
2148
2149   """
2150   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2151
2152   try:
2153     st = os.stat(api_file)
2154   except EnvironmentError, err:
2155     return False, ("Required file '%s' not found under path %s: %s" %
2156                    (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2157
2158   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2159     return False, ("File '%s' in %s is not a regular file" %
2160                    (constants.OS_API_FILE, os_dir))
2161
2162   try:
2163     api_versions = utils.ReadFile(api_file).splitlines()
2164   except EnvironmentError, err:
2165     return False, ("Error while reading the API version file at %s: %s" %
2166                    (api_file, utils.ErrnoOrStr(err)))
2167
2168   try:
2169     api_versions = [int(version.strip()) for version in api_versions]
2170   except (TypeError, ValueError), err:
2171     return False, ("API version(s) can't be converted to integer: %s" %
2172                    str(err))
2173
2174   return True, api_versions
2175
2176
2177 def DiagnoseOS(top_dirs=None):
2178   """Compute the validity for all OSes.
2179
2180   @type top_dirs: list
2181   @param top_dirs: the list of directories in which to
2182       search (if not given defaults to
2183       L{pathutils.OS_SEARCH_PATH})
2184   @rtype: list of L{objects.OS}
2185   @return: a list of tuples (name, path, status, diagnose, variants,
2186       parameters, api_version) for all (potential) OSes under all
2187       search paths, where:
2188           - name is the (potential) OS name
2189           - path is the full path to the OS
2190           - status True/False is the validity of the OS
2191           - diagnose is the error message for an invalid OS, otherwise empty
2192           - variants is a list of supported OS variants, if any
2193           - parameters is a list of (name, help) parameters, if any
2194           - api_version is a list of support OS API versions
2195
2196   """
2197   if top_dirs is None:
2198     top_dirs = pathutils.OS_SEARCH_PATH
2199
2200   result = []
2201   for dir_name in top_dirs:
2202     if os.path.isdir(dir_name):
2203       try:
2204         f_names = utils.ListVisibleFiles(dir_name)
2205       except EnvironmentError, err:
2206         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2207         break
2208       for name in f_names:
2209         os_path = utils.PathJoin(dir_name, name)
2210         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2211         if status:
2212           diagnose = ""
2213           variants = os_inst.supported_variants
2214           parameters = os_inst.supported_parameters
2215           api_versions = os_inst.api_versions
2216         else:
2217           diagnose = os_inst
2218           variants = parameters = api_versions = []
2219         result.append((name, os_path, status, diagnose, variants,
2220                        parameters, api_versions))
2221
2222   return result
2223
2224
2225 def _TryOSFromDisk(name, base_dir=None):
2226   """Create an OS instance from disk.
2227
2228   This function will return an OS instance if the given name is a
2229   valid OS name.
2230
2231   @type base_dir: string
2232   @keyword base_dir: Base directory containing OS installations.
2233                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2234   @rtype: tuple
2235   @return: success and either the OS instance if we find a valid one,
2236       or error message
2237
2238   """
2239   if base_dir is None:
2240     os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2241   else:
2242     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2243
2244   if os_dir is None:
2245     return False, "Directory for OS %s not found in search path" % name
2246
2247   status, api_versions = _OSOndiskAPIVersion(os_dir)
2248   if not status:
2249     # push the error up
2250     return status, api_versions
2251
2252   if not constants.OS_API_VERSIONS.intersection(api_versions):
2253     return False, ("API version mismatch for path '%s': found %s, want %s." %
2254                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2255
2256   # OS Files dictionary, we will populate it with the absolute path
2257   # names; if the value is True, then it is a required file, otherwise
2258   # an optional one
2259   os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2260
2261   if max(api_versions) >= constants.OS_API_V15:
2262     os_files[constants.OS_VARIANTS_FILE] = False
2263
2264   if max(api_versions) >= constants.OS_API_V20:
2265     os_files[constants.OS_PARAMETERS_FILE] = True
2266   else:
2267     del os_files[constants.OS_SCRIPT_VERIFY]
2268
2269   for (filename, required) in os_files.items():
2270     os_files[filename] = utils.PathJoin(os_dir, filename)
2271
2272     try:
2273       st = os.stat(os_files[filename])
2274     except EnvironmentError, err:
2275       if err.errno == errno.ENOENT and not required:
2276         del os_files[filename]
2277         continue
2278       return False, ("File '%s' under path '%s' is missing (%s)" %
2279                      (filename, os_dir, utils.ErrnoOrStr(err)))
2280
2281     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2282       return False, ("File '%s' under path '%s' is not a regular file" %
2283                      (filename, os_dir))
2284
2285     if filename in constants.OS_SCRIPTS:
2286       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2287         return False, ("File '%s' under path '%s' is not executable" %
2288                        (filename, os_dir))
2289
2290   variants = []
2291   if constants.OS_VARIANTS_FILE in os_files:
2292     variants_file = os_files[constants.OS_VARIANTS_FILE]
2293     try:
2294       variants = \
2295         utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2296     except EnvironmentError, err:
2297       # we accept missing files, but not other errors
2298       if err.errno != errno.ENOENT:
2299         return False, ("Error while reading the OS variants file at %s: %s" %
2300                        (variants_file, utils.ErrnoOrStr(err)))
2301
2302   parameters = []
2303   if constants.OS_PARAMETERS_FILE in os_files:
2304     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2305     try:
2306       parameters = utils.ReadFile(parameters_file).splitlines()
2307     except EnvironmentError, err:
2308       return False, ("Error while reading the OS parameters file at %s: %s" %
2309                      (parameters_file, utils.ErrnoOrStr(err)))
2310     parameters = [v.split(None, 1) for v in parameters]
2311
2312   os_obj = objects.OS(name=name, path=os_dir,
2313                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2314                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2315                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2316                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2317                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2318                                                  None),
2319                       supported_variants=variants,
2320                       supported_parameters=parameters,
2321                       api_versions=api_versions)
2322   return True, os_obj
2323
2324
2325 def OSFromDisk(name, base_dir=None):
2326   """Create an OS instance from disk.
2327
2328   This function will return an OS instance if the given name is a
2329   valid OS name. Otherwise, it will raise an appropriate
2330   L{RPCFail} exception, detailing why this is not a valid OS.
2331
2332   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2333   an exception but returns true/false status data.
2334
2335   @type base_dir: string
2336   @keyword base_dir: Base directory containing OS installations.
2337                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2338   @rtype: L{objects.OS}
2339   @return: the OS instance if we find a valid one
2340   @raise RPCFail: if we don't find a valid OS
2341
2342   """
2343   name_only = objects.OS.GetName(name)
2344   status, payload = _TryOSFromDisk(name_only, base_dir)
2345
2346   if not status:
2347     _Fail(payload)
2348
2349   return payload
2350
2351
2352 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2353   """Calculate the basic environment for an os script.
2354
2355   @type os_name: str
2356   @param os_name: full operating system name (including variant)
2357   @type inst_os: L{objects.OS}
2358   @param inst_os: operating system for which the environment is being built
2359   @type os_params: dict
2360   @param os_params: the OS parameters
2361   @type debug: integer
2362   @param debug: debug level (0 or 1, for OS Api 10)
2363   @rtype: dict
2364   @return: dict of environment variables
2365   @raise errors.BlockDeviceError: if the block device
2366       cannot be found
2367
2368   """
2369   result = {}
2370   api_version = \
2371     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2372   result["OS_API_VERSION"] = "%d" % api_version
2373   result["OS_NAME"] = inst_os.name
2374   result["DEBUG_LEVEL"] = "%d" % debug
2375
2376   # OS variants
2377   if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2378     variant = objects.OS.GetVariant(os_name)
2379     if not variant:
2380       variant = inst_os.supported_variants[0]
2381   else:
2382     variant = ""
2383   result["OS_VARIANT"] = variant
2384
2385   # OS params
2386   for pname, pvalue in os_params.items():
2387     result["OSP_%s" % pname.upper()] = pvalue
2388
2389   # Set a default path otherwise programs called by OS scripts (or
2390   # even hooks called from OS scripts) might break, and we don't want
2391   # to have each script require setting a PATH variable
2392   result["PATH"] = constants.HOOKS_PATH
2393
2394   return result
2395
2396
2397 def OSEnvironment(instance, inst_os, debug=0):
2398   """Calculate the environment for an os script.
2399
2400   @type instance: L{objects.Instance}
2401   @param instance: target instance for the os script run
2402   @type inst_os: L{objects.OS}
2403   @param inst_os: operating system for which the environment is being built
2404   @type debug: integer
2405   @param debug: debug level (0 or 1, for OS Api 10)
2406   @rtype: dict
2407   @return: dict of environment variables
2408   @raise errors.BlockDeviceError: if the block device
2409       cannot be found
2410
2411   """
2412   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2413
2414   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2415     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2416
2417   result["HYPERVISOR"] = instance.hypervisor
2418   result["DISK_COUNT"] = "%d" % len(instance.disks)
2419   result["NIC_COUNT"] = "%d" % len(instance.nics)
2420   result["INSTANCE_SECONDARY_NODES"] = \
2421       ("%s" % " ".join(instance.secondary_nodes))
2422
2423   # Disks
2424   for idx, disk in enumerate(instance.disks):
2425     real_disk = _OpenRealBD(disk)
2426     result["DISK_%d_PATH" % idx] = real_disk.dev_path
2427     result["DISK_%d_ACCESS" % idx] = disk.mode
2428     if constants.HV_DISK_TYPE in instance.hvparams:
2429       result["DISK_%d_FRONTEND_TYPE" % idx] = \
2430         instance.hvparams[constants.HV_DISK_TYPE]
2431     if disk.dev_type in constants.LDS_BLOCK:
2432       result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2433     elif disk.dev_type == constants.LD_FILE:
2434       result["DISK_%d_BACKEND_TYPE" % idx] = \
2435         "file:%s" % disk.physical_id[0]
2436
2437   # NICs
2438   for idx, nic in enumerate(instance.nics):
2439     result["NIC_%d_MAC" % idx] = nic.mac
2440     if nic.ip:
2441       result["NIC_%d_IP" % idx] = nic.ip
2442     result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2443     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2444       result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2445     if nic.nicparams[constants.NIC_LINK]:
2446       result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2447     if constants.HV_NIC_TYPE in instance.hvparams:
2448       result["NIC_%d_FRONTEND_TYPE" % idx] = \
2449         instance.hvparams[constants.HV_NIC_TYPE]
2450
2451   # HV/BE params
2452   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2453     for key, value in source.items():
2454       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2455
2456   return result
2457
2458
2459 def BlockdevGrow(disk, amount, dryrun, backingstore):
2460   """Grow a stack of block devices.
2461
2462   This function is called recursively, with the childrens being the
2463   first ones to resize.
2464
2465   @type disk: L{objects.Disk}
2466   @param disk: the disk to be grown
2467   @type amount: integer
2468   @param amount: the amount (in mebibytes) to grow with
2469   @type dryrun: boolean
2470   @param dryrun: whether to execute the operation in simulation mode
2471       only, without actually increasing the size
2472   @param backingstore: whether to execute the operation on backing storage
2473       only, or on "logical" storage only; e.g. DRBD is logical storage,
2474       whereas LVM, file, RBD are backing storage
2475   @rtype: (status, result)
2476   @return: a tuple with the status of the operation (True/False), and
2477       the errors message if status is False
2478
2479   """
2480   r_dev = _RecursiveFindBD(disk)
2481   if r_dev is None:
2482     _Fail("Cannot find block device %s", disk)
2483
2484   try:
2485     r_dev.Grow(amount, dryrun, backingstore)
2486   except errors.BlockDeviceError, err:
2487     _Fail("Failed to grow block device: %s", err, exc=True)
2488
2489
2490 def BlockdevSnapshot(disk):
2491   """Create a snapshot copy of a block device.
2492
2493   This function is called recursively, and the snapshot is actually created
2494   just for the leaf lvm backend device.
2495
2496   @type disk: L{objects.Disk}
2497   @param disk: the disk to be snapshotted
2498   @rtype: string
2499   @return: snapshot disk ID as (vg, lv)
2500
2501   """
2502   if disk.dev_type == constants.LD_DRBD8:
2503     if not disk.children:
2504       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2505             disk.unique_id)
2506     return BlockdevSnapshot(disk.children[0])
2507   elif disk.dev_type == constants.LD_LV:
2508     r_dev = _RecursiveFindBD(disk)
2509     if r_dev is not None:
2510       # FIXME: choose a saner value for the snapshot size
2511       # let's stay on the safe side and ask for the full size, for now
2512       return r_dev.Snapshot(disk.size)
2513     else:
2514       _Fail("Cannot find block device %s", disk)
2515   else:
2516     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2517           disk.unique_id, disk.dev_type)
2518
2519
2520 def FinalizeExport(instance, snap_disks):
2521   """Write out the export configuration information.
2522
2523   @type instance: L{objects.Instance}
2524   @param instance: the instance which we export, used for
2525       saving configuration
2526   @type snap_disks: list of L{objects.Disk}
2527   @param snap_disks: list of snapshot block devices, which
2528       will be used to get the actual name of the dump file
2529
2530   @rtype: None
2531
2532   """
2533   destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2534   finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2535
2536   config = objects.SerializableConfigParser()
2537
2538   config.add_section(constants.INISECT_EXP)
2539   config.set(constants.INISECT_EXP, "version", "0")
2540   config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2541   config.set(constants.INISECT_EXP, "source", instance.primary_node)
2542   config.set(constants.INISECT_EXP, "os", instance.os)
2543   config.set(constants.INISECT_EXP, "compression", "none")
2544
2545   config.add_section(constants.INISECT_INS)
2546   config.set(constants.INISECT_INS, "name", instance.name)
2547   config.set(constants.INISECT_INS, "maxmem", "%d" %
2548              instance.beparams[constants.BE_MAXMEM])
2549   config.set(constants.INISECT_INS, "minmem", "%d" %
2550              instance.beparams[constants.BE_MINMEM])
2551   # "memory" is deprecated, but useful for exporting to old ganeti versions
2552   config.set(constants.INISECT_INS, "memory", "%d" %
2553              instance.beparams[constants.BE_MAXMEM])
2554   config.set(constants.INISECT_INS, "vcpus", "%d" %
2555              instance.beparams[constants.BE_VCPUS])
2556   config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2557   config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2558   config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2559
2560   nic_total = 0
2561   for nic_count, nic in enumerate(instance.nics):
2562     nic_total += 1
2563     config.set(constants.INISECT_INS, "nic%d_mac" %
2564                nic_count, "%s" % nic.mac)
2565     config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2566     for param in constants.NICS_PARAMETER_TYPES:
2567       config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2568                  "%s" % nic.nicparams.get(param, None))
2569   # TODO: redundant: on load can read nics until it doesn't exist
2570   config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2571
2572   disk_total = 0
2573   for disk_count, disk in enumerate(snap_disks):
2574     if disk:
2575       disk_total += 1
2576       config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2577                  ("%s" % disk.iv_name))
2578       config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2579                  ("%s" % disk.physical_id[1]))
2580       config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2581                  ("%d" % disk.size))
2582
2583   config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2584
2585   # New-style hypervisor/backend parameters
2586
2587   config.add_section(constants.INISECT_HYP)
2588   for name, value in instance.hvparams.items():
2589     if name not in constants.HVC_GLOBALS:
2590       config.set(constants.INISECT_HYP, name, str(value))
2591
2592   config.add_section(constants.INISECT_BEP)
2593   for name, value in instance.beparams.items():
2594     config.set(constants.INISECT_BEP, name, str(value))
2595
2596   config.add_section(constants.INISECT_OSP)
2597   for name, value in instance.osparams.items():
2598     config.set(constants.INISECT_OSP, name, str(value))
2599
2600   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2601                   data=config.Dumps())
2602   shutil.rmtree(finaldestdir, ignore_errors=True)
2603   shutil.move(destdir, finaldestdir)
2604
2605
2606 def ExportInfo(dest):
2607   """Get export configuration information.
2608
2609   @type dest: str
2610   @param dest: directory containing the export
2611
2612   @rtype: L{objects.SerializableConfigParser}
2613   @return: a serializable config file containing the
2614       export info
2615
2616   """
2617   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2618
2619   config = objects.SerializableConfigParser()
2620   config.read(cff)
2621
2622   if (not config.has_section(constants.INISECT_EXP) or
2623       not config.has_section(constants.INISECT_INS)):
2624     _Fail("Export info file doesn't have the required fields")
2625
2626   return config.Dumps()
2627
2628
2629 def ListExports():
2630   """Return a list of exports currently available on this machine.
2631
2632   @rtype: list
2633   @return: list of the exports
2634
2635   """
2636   if os.path.isdir(pathutils.EXPORT_DIR):
2637     return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2638   else:
2639     _Fail("No exports directory")
2640
2641
2642 def RemoveExport(export):
2643   """Remove an existing export from the node.
2644
2645   @type export: str
2646   @param export: the name of the export to remove
2647   @rtype: None
2648
2649   """
2650   target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2651
2652   try:
2653     shutil.rmtree(target)
2654   except EnvironmentError, err:
2655     _Fail("Error while removing the export: %s", err, exc=True)
2656
2657
2658 def BlockdevRename(devlist):
2659   """Rename a list of block devices.
2660
2661   @type devlist: list of tuples
2662   @param devlist: list of tuples of the form  (disk,
2663       new_logical_id, new_physical_id); disk is an
2664       L{objects.Disk} object describing the current disk,
2665       and new logical_id/physical_id is the name we
2666       rename it to
2667   @rtype: boolean
2668   @return: True if all renames succeeded, False otherwise
2669
2670   """
2671   msgs = []
2672   result = True
2673   for disk, unique_id in devlist:
2674     dev = _RecursiveFindBD(disk)
2675     if dev is None:
2676       msgs.append("Can't find device %s in rename" % str(disk))
2677       result = False
2678       continue
2679     try:
2680       old_rpath = dev.dev_path
2681       dev.Rename(unique_id)
2682       new_rpath = dev.dev_path
2683       if old_rpath != new_rpath:
2684         DevCacheManager.RemoveCache(old_rpath)
2685         # FIXME: we should add the new cache information here, like:
2686         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2687         # but we don't have the owner here - maybe parse from existing
2688         # cache? for now, we only lose lvm data when we rename, which
2689         # is less critical than DRBD or MD
2690     except errors.BlockDeviceError, err:
2691       msgs.append("Can't rename device '%s' to '%s': %s" %
2692                   (dev, unique_id, err))
2693       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2694       result = False
2695   if not result:
2696     _Fail("; ".join(msgs))
2697
2698
2699 def _TransformFileStorageDir(fs_dir):
2700   """Checks whether given file_storage_dir is valid.
2701
2702   Checks wheter the given fs_dir is within the cluster-wide default
2703   file_storage_dir or the shared_file_storage_dir, which are stored in
2704   SimpleStore. Only paths under those directories are allowed.
2705
2706   @type fs_dir: str
2707   @param fs_dir: the path to check
2708
2709   @return: the normalized path if valid, None otherwise
2710
2711   """
2712   if not (constants.ENABLE_FILE_STORAGE or
2713           constants.ENABLE_SHARED_FILE_STORAGE):
2714     _Fail("File storage disabled at configure time")
2715   cfg = _GetConfig()
2716   fs_dir = os.path.normpath(fs_dir)
2717   base_fstore = cfg.GetFileStorageDir()
2718   base_shared = cfg.GetSharedFileStorageDir()
2719   if not (utils.IsBelowDir(base_fstore, fs_dir) or
2720           utils.IsBelowDir(base_shared, fs_dir)):
2721     _Fail("File storage directory '%s' is not under base file"
2722           " storage directory '%s' or shared storage directory '%s'",
2723           fs_dir, base_fstore, base_shared)
2724   return fs_dir
2725
2726
2727 def CreateFileStorageDir(file_storage_dir):
2728   """Create file storage directory.
2729
2730   @type file_storage_dir: str
2731   @param file_storage_dir: directory to create
2732
2733   @rtype: tuple
2734   @return: tuple with first element a boolean indicating wheter dir
2735       creation was successful or not
2736
2737   """
2738   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2739   if os.path.exists(file_storage_dir):
2740     if not os.path.isdir(file_storage_dir):
2741       _Fail("Specified storage dir '%s' is not a directory",
2742             file_storage_dir)
2743   else:
2744     try:
2745       os.makedirs(file_storage_dir, 0750)
2746     except OSError, err:
2747       _Fail("Cannot create file storage directory '%s': %s",
2748             file_storage_dir, err, exc=True)
2749
2750
2751 def RemoveFileStorageDir(file_storage_dir):
2752   """Remove file storage directory.
2753
2754   Remove it only if it's empty. If not log an error and return.
2755
2756   @type file_storage_dir: str
2757   @param file_storage_dir: the directory we should cleanup
2758   @rtype: tuple (success,)
2759   @return: tuple of one element, C{success}, denoting
2760       whether the operation was successful
2761
2762   """
2763   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2764   if os.path.exists(file_storage_dir):
2765     if not os.path.isdir(file_storage_dir):
2766       _Fail("Specified Storage directory '%s' is not a directory",
2767             file_storage_dir)
2768     # deletes dir only if empty, otherwise we want to fail the rpc call
2769     try:
2770       os.rmdir(file_storage_dir)
2771     except OSError, err:
2772       _Fail("Cannot remove file storage directory '%s': %s",
2773             file_storage_dir, err)
2774
2775
2776 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2777   """Rename the file storage directory.
2778
2779   @type old_file_storage_dir: str
2780   @param old_file_storage_dir: the current path
2781   @type new_file_storage_dir: str
2782   @param new_file_storage_dir: the name we should rename to
2783   @rtype: tuple (success,)
2784   @return: tuple of one element, C{success}, denoting
2785       whether the operation was successful
2786
2787   """
2788   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2789   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2790   if not os.path.exists(new_file_storage_dir):
2791     if os.path.isdir(old_file_storage_dir):
2792       try:
2793         os.rename(old_file_storage_dir, new_file_storage_dir)
2794       except OSError, err:
2795         _Fail("Cannot rename '%s' to '%s': %s",
2796               old_file_storage_dir, new_file_storage_dir, err)
2797     else:
2798       _Fail("Specified storage dir '%s' is not a directory",
2799             old_file_storage_dir)
2800   else:
2801     if os.path.exists(old_file_storage_dir):
2802       _Fail("Cannot rename '%s' to '%s': both locations exist",
2803             old_file_storage_dir, new_file_storage_dir)
2804
2805
2806 def _EnsureJobQueueFile(file_name):
2807   """Checks whether the given filename is in the queue directory.
2808
2809   @type file_name: str
2810   @param file_name: the file name we should check
2811   @rtype: None
2812   @raises RPCFail: if the file is not valid
2813
2814   """
2815   if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2816     _Fail("Passed job queue file '%s' does not belong to"
2817           " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2818
2819
2820 def JobQueueUpdate(file_name, content):
2821   """Updates a file in the queue directory.
2822
2823   This is just a wrapper over L{utils.io.WriteFile}, with proper
2824   checking.
2825
2826   @type file_name: str
2827   @param file_name: the job file name
2828   @type content: str
2829   @param content: the new job contents
2830   @rtype: boolean
2831   @return: the success of the operation
2832
2833   """
2834   file_name = vcluster.LocalizeVirtualPath(file_name)
2835
2836   _EnsureJobQueueFile(file_name)
2837   getents = runtime.GetEnts()
2838
2839   # Write and replace the file atomically
2840   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2841                   gid=getents.masterd_gid)
2842
2843
2844 def JobQueueRename(old, new):
2845   """Renames a job queue file.
2846
2847   This is just a wrapper over os.rename with proper checking.
2848
2849   @type old: str
2850   @param old: the old (actual) file name
2851   @type new: str
2852   @param new: the desired file name
2853   @rtype: tuple
2854   @return: the success of the operation and payload
2855
2856   """
2857   old = vcluster.LocalizeVirtualPath(old)
2858   new = vcluster.LocalizeVirtualPath(new)
2859
2860   _EnsureJobQueueFile(old)
2861   _EnsureJobQueueFile(new)
2862
2863   getents = runtime.GetEnts()
2864
2865   utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2866                    dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2867
2868
2869 def BlockdevClose(instance_name, disks):
2870   """Closes the given block devices.
2871
2872   This means they will be switched to secondary mode (in case of
2873   DRBD).
2874
2875   @param instance_name: if the argument is not empty, the symlinks
2876       of this instance will be removed
2877   @type disks: list of L{objects.Disk}
2878   @param disks: the list of disks to be closed
2879   @rtype: tuple (success, message)
2880   @return: a tuple of success and message, where success
2881       indicates the succes of the operation, and message
2882       which will contain the error details in case we
2883       failed
2884
2885   """
2886   bdevs = []
2887   for cf in disks:
2888     rd = _RecursiveFindBD(cf)
2889     if rd is None:
2890       _Fail("Can't find device %s", cf)
2891     bdevs.append(rd)
2892
2893   msg = []
2894   for rd in bdevs:
2895     try:
2896       rd.Close()
2897     except errors.BlockDeviceError, err:
2898       msg.append(str(err))
2899   if msg:
2900     _Fail("Can't make devices secondary: %s", ",".join(msg))
2901   else:
2902     if instance_name:
2903       _RemoveBlockDevLinks(instance_name, disks)
2904
2905
2906 def ValidateHVParams(hvname, hvparams):
2907   """Validates the given hypervisor parameters.
2908
2909   @type hvname: string
2910   @param hvname: the hypervisor name
2911   @type hvparams: dict
2912   @param hvparams: the hypervisor parameters to be validated
2913   @rtype: None
2914
2915   """
2916   try:
2917     hv_type = hypervisor.GetHypervisor(hvname)
2918     hv_type.ValidateParameters(hvparams)
2919   except errors.HypervisorError, err:
2920     _Fail(str(err), log=False)
2921
2922
2923 def _CheckOSPList(os_obj, parameters):
2924   """Check whether a list of parameters is supported by the OS.
2925
2926   @type os_obj: L{objects.OS}
2927   @param os_obj: OS object to check
2928   @type parameters: list
2929   @param parameters: the list of parameters to check
2930
2931   """
2932   supported = [v[0] for v in os_obj.supported_parameters]
2933   delta = frozenset(parameters).difference(supported)
2934   if delta:
2935     _Fail("The following parameters are not supported"
2936           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2937
2938
2939 def ValidateOS(required, osname, checks, osparams):
2940   """Validate the given OS' parameters.
2941
2942   @type required: boolean
2943   @param required: whether absence of the OS should translate into
2944       failure or not
2945   @type osname: string
2946   @param osname: the OS to be validated
2947   @type checks: list
2948   @param checks: list of the checks to run (currently only 'parameters')
2949   @type osparams: dict
2950   @param osparams: dictionary with OS parameters
2951   @rtype: boolean
2952   @return: True if the validation passed, or False if the OS was not
2953       found and L{required} was false
2954
2955   """
2956   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2957     _Fail("Unknown checks required for OS %s: %s", osname,
2958           set(checks).difference(constants.OS_VALIDATE_CALLS))
2959
2960   name_only = objects.OS.GetName(osname)
2961   status, tbv = _TryOSFromDisk(name_only, None)
2962
2963   if not status:
2964     if required:
2965       _Fail(tbv)
2966     else:
2967       return False
2968
2969   if max(tbv.api_versions) < constants.OS_API_V20:
2970     return True
2971
2972   if constants.OS_VALIDATE_PARAMETERS in checks:
2973     _CheckOSPList(tbv, osparams.keys())
2974
2975   validate_env = OSCoreEnv(osname, tbv, osparams)
2976   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2977                         cwd=tbv.path, reset_env=True)
2978   if result.failed:
2979     logging.error("os validate command '%s' returned error: %s output: %s",
2980                   result.cmd, result.fail_reason, result.output)
2981     _Fail("OS validation script failed (%s), output: %s",
2982           result.fail_reason, result.output, log=False)
2983
2984   return True
2985
2986
2987 def DemoteFromMC():
2988   """Demotes the current node from master candidate role.
2989
2990   """
2991   # try to ensure we're not the master by mistake
2992   master, myself = ssconf.GetMasterAndMyself()
2993   if master == myself:
2994     _Fail("ssconf status shows I'm the master node, will not demote")
2995
2996   result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
2997   if not result.failed:
2998     _Fail("The master daemon is running, will not demote")
2999
3000   try:
3001     if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3002       utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3003   except EnvironmentError, err:
3004     if err.errno != errno.ENOENT:
3005       _Fail("Error while backing up cluster file: %s", err, exc=True)
3006
3007   utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3008
3009
3010 def _GetX509Filenames(cryptodir, name):
3011   """Returns the full paths for the private key and certificate.
3012
3013   """
3014   return (utils.PathJoin(cryptodir, name),
3015           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3016           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3017
3018
3019 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3020   """Creates a new X509 certificate for SSL/TLS.
3021
3022   @type validity: int
3023   @param validity: Validity in seconds
3024   @rtype: tuple; (string, string)
3025   @return: Certificate name and public part
3026
3027   """
3028   (key_pem, cert_pem) = \
3029     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3030                                      min(validity, _MAX_SSL_CERT_VALIDITY))
3031
3032   cert_dir = tempfile.mkdtemp(dir=cryptodir,
3033                               prefix="x509-%s-" % utils.TimestampForFilename())
3034   try:
3035     name = os.path.basename(cert_dir)
3036     assert len(name) > 5
3037
3038     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3039
3040     utils.WriteFile(key_file, mode=0400, data=key_pem)
3041     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3042
3043     # Never return private key as it shouldn't leave the node
3044     return (name, cert_pem)
3045   except Exception:
3046     shutil.rmtree(cert_dir, ignore_errors=True)
3047     raise
3048
3049
3050 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3051   """Removes a X509 certificate.
3052
3053   @type name: string
3054   @param name: Certificate name
3055
3056   """
3057   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3058
3059   utils.RemoveFile(key_file)
3060   utils.RemoveFile(cert_file)
3061
3062   try:
3063     os.rmdir(cert_dir)
3064   except EnvironmentError, err:
3065     _Fail("Cannot remove certificate directory '%s': %s",
3066           cert_dir, err)
3067
3068
3069 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3070   """Returns the command for the requested input/output.
3071
3072   @type instance: L{objects.Instance}
3073   @param instance: The instance object
3074   @param mode: Import/export mode
3075   @param ieio: Input/output type
3076   @param ieargs: Input/output arguments
3077
3078   """
3079   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3080
3081   env = None
3082   prefix = None
3083   suffix = None
3084   exp_size = None
3085
3086   if ieio == constants.IEIO_FILE:
3087     (filename, ) = ieargs
3088
3089     if not utils.IsNormAbsPath(filename):
3090       _Fail("Path '%s' is not normalized or absolute", filename)
3091
3092     real_filename = os.path.realpath(filename)
3093     directory = os.path.dirname(real_filename)
3094
3095     if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3096       _Fail("File '%s' is not under exports directory '%s': %s",
3097             filename, pathutils.EXPORT_DIR, real_filename)
3098
3099     # Create directory
3100     utils.Makedirs(directory, mode=0750)
3101
3102     quoted_filename = utils.ShellQuote(filename)
3103
3104     if mode == constants.IEM_IMPORT:
3105       suffix = "> %s" % quoted_filename
3106     elif mode == constants.IEM_EXPORT:
3107       suffix = "< %s" % quoted_filename
3108
3109       # Retrieve file size
3110       try:
3111         st = os.stat(filename)
3112       except EnvironmentError, err:
3113         logging.error("Can't stat(2) %s: %s", filename, err)
3114       else:
3115         exp_size = utils.BytesToMebibyte(st.st_size)
3116
3117   elif ieio == constants.IEIO_RAW_DISK:
3118     (disk, ) = ieargs
3119
3120     real_disk = _OpenRealBD(disk)
3121
3122     if mode == constants.IEM_IMPORT:
3123       # we set here a smaller block size as, due to transport buffering, more
3124       # than 64-128k will mostly ignored; we use nocreat to fail if the device
3125       # is not already there or we pass a wrong path; we use notrunc to no
3126       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3127       # much memory; this means that at best, we flush every 64k, which will
3128       # not be very fast
3129       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3130                                     " bs=%s oflag=dsync"),
3131                                     real_disk.dev_path,
3132                                     str(64 * 1024))
3133
3134     elif mode == constants.IEM_EXPORT:
3135       # the block size on the read dd is 1MiB to match our units
3136       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3137                                    real_disk.dev_path,
3138                                    str(1024 * 1024), # 1 MB
3139                                    str(disk.size))
3140       exp_size = disk.size
3141
3142   elif ieio == constants.IEIO_SCRIPT:
3143     (disk, disk_index, ) = ieargs
3144
3145     assert isinstance(disk_index, (int, long))
3146
3147     real_disk = _OpenRealBD(disk)
3148
3149     inst_os = OSFromDisk(instance.os)
3150     env = OSEnvironment(instance, inst_os)
3151
3152     if mode == constants.IEM_IMPORT:
3153       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3154       env["IMPORT_INDEX"] = str(disk_index)
3155       script = inst_os.import_script
3156
3157     elif mode == constants.IEM_EXPORT:
3158       env["EXPORT_DEVICE"] = real_disk.dev_path
3159       env["EXPORT_INDEX"] = str(disk_index)
3160       script = inst_os.export_script
3161
3162     # TODO: Pass special environment only to script
3163     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3164
3165     if mode == constants.IEM_IMPORT:
3166       suffix = "| %s" % script_cmd
3167
3168     elif mode == constants.IEM_EXPORT:
3169       prefix = "%s |" % script_cmd
3170
3171     # Let script predict size
3172     exp_size = constants.IE_CUSTOM_SIZE
3173
3174   else:
3175     _Fail("Invalid %s I/O mode %r", mode, ieio)
3176
3177   return (env, prefix, suffix, exp_size)
3178
3179
3180 def _CreateImportExportStatusDir(prefix):
3181   """Creates status directory for import/export.
3182
3183   """
3184   return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3185                           prefix=("%s-%s-" %
3186                                   (prefix, utils.TimestampForFilename())))
3187
3188
3189 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3190                             ieio, ieioargs):
3191   """Starts an import or export daemon.
3192
3193   @param mode: Import/output mode
3194   @type opts: L{objects.ImportExportOptions}
3195   @param opts: Daemon options
3196   @type host: string
3197   @param host: Remote host for export (None for import)
3198   @type port: int
3199   @param port: Remote port for export (None for import)
3200   @type instance: L{objects.Instance}
3201   @param instance: Instance object
3202   @type component: string
3203   @param component: which part of the instance is transferred now,
3204       e.g. 'disk/0'
3205   @param ieio: Input/output type
3206   @param ieioargs: Input/output arguments
3207
3208   """
3209   if mode == constants.IEM_IMPORT:
3210     prefix = "import"
3211
3212     if not (host is None and port is None):
3213       _Fail("Can not specify host or port on import")
3214
3215   elif mode == constants.IEM_EXPORT:
3216     prefix = "export"
3217
3218     if host is None or port is None:
3219       _Fail("Host and port must be specified for an export")
3220
3221   else:
3222     _Fail("Invalid mode %r", mode)
3223
3224   if (opts.key_name is None) ^ (opts.ca_pem is None):
3225     _Fail("Cluster certificate can only be used for both key and CA")
3226
3227   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3228     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3229
3230   if opts.key_name is None:
3231     # Use server.pem
3232     key_path = pathutils.NODED_CERT_FILE
3233     cert_path = pathutils.NODED_CERT_FILE
3234     assert opts.ca_pem is None
3235   else:
3236     (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3237                                                  opts.key_name)
3238     assert opts.ca_pem is not None
3239
3240   for i in [key_path, cert_path]:
3241     if not os.path.exists(i):
3242       _Fail("File '%s' does not exist" % i)
3243
3244   status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3245   try:
3246     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3247     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3248     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3249
3250     if opts.ca_pem is None:
3251       # Use server.pem
3252       ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3253     else:
3254       ca = opts.ca_pem
3255
3256     # Write CA file
3257     utils.WriteFile(ca_file, data=ca, mode=0400)
3258
3259     cmd = [
3260       pathutils.IMPORT_EXPORT_DAEMON,
3261       status_file, mode,
3262       "--key=%s" % key_path,
3263       "--cert=%s" % cert_path,
3264       "--ca=%s" % ca_file,
3265       ]
3266
3267     if host:
3268       cmd.append("--host=%s" % host)
3269
3270     if port:
3271       cmd.append("--port=%s" % port)
3272
3273     if opts.ipv6:
3274       cmd.append("--ipv6")
3275     else:
3276       cmd.append("--ipv4")
3277
3278     if opts.compress:
3279       cmd.append("--compress=%s" % opts.compress)
3280
3281     if opts.magic:
3282       cmd.append("--magic=%s" % opts.magic)
3283
3284     if exp_size is not None:
3285       cmd.append("--expected-size=%s" % exp_size)
3286
3287     if cmd_prefix:
3288       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3289
3290     if cmd_suffix:
3291       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3292
3293     if mode == constants.IEM_EXPORT:
3294       # Retry connection a few times when connecting to remote peer
3295       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3296       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3297     elif opts.connect_timeout is not None:
3298       assert mode == constants.IEM_IMPORT
3299       # Overall timeout for establishing connection while listening
3300       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3301
3302     logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3303
3304     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3305     # support for receiving a file descriptor for output
3306     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3307                       output=logfile)
3308
3309     # The import/export name is simply the status directory name
3310     return os.path.basename(status_dir)
3311
3312   except Exception:
3313     shutil.rmtree(status_dir, ignore_errors=True)
3314     raise
3315
3316
3317 def GetImportExportStatus(names):
3318   """Returns import/export daemon status.
3319
3320   @type names: sequence
3321   @param names: List of names
3322   @rtype: List of dicts
3323   @return: Returns a list of the state of each named import/export or None if a
3324            status couldn't be read
3325
3326   """
3327   result = []
3328
3329   for name in names:
3330     status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3331                                  _IES_STATUS_FILE)
3332
3333     try:
3334       data = utils.ReadFile(status_file)
3335     except EnvironmentError, err:
3336       if err.errno != errno.ENOENT:
3337         raise
3338       data = None
3339
3340     if not data:
3341       result.append(None)
3342       continue
3343
3344     result.append(serializer.LoadJson(data))
3345
3346   return result
3347
3348
3349 def AbortImportExport(name):
3350   """Sends SIGTERM to a running import/export daemon.
3351
3352   """
3353   logging.info("Abort import/export %s", name)
3354
3355   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3356   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3357
3358   if pid:
3359     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3360                  name, pid)
3361     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3362
3363
3364 def CleanupImportExport(name):
3365   """Cleanup after an import or export.
3366
3367   If the import/export daemon is still running it's killed. Afterwards the
3368   whole status directory is removed.
3369
3370   """
3371   logging.info("Finalizing import/export %s", name)
3372
3373   status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3374
3375   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3376
3377   if pid:
3378     logging.info("Import/export %s is still running with PID %s",
3379                  name, pid)
3380     utils.KillProcess(pid, waitpid=False)
3381
3382   shutil.rmtree(status_dir, ignore_errors=True)
3383
3384
3385 def _FindDisks(nodes_ip, disks):
3386   """Sets the physical ID on disks and returns the block devices.
3387
3388   """
3389   # set the correct physical ID
3390   my_name = netutils.Hostname.GetSysName()
3391   for cf in disks:
3392     cf.SetPhysicalID(my_name, nodes_ip)
3393
3394   bdevs = []
3395
3396   for cf in disks:
3397     rd = _RecursiveFindBD(cf)
3398     if rd is None:
3399       _Fail("Can't find device %s", cf)
3400     bdevs.append(rd)
3401   return bdevs
3402
3403
3404 def DrbdDisconnectNet(nodes_ip, disks):
3405   """Disconnects the network on a list of drbd devices.
3406
3407   """
3408   bdevs = _FindDisks(nodes_ip, disks)
3409
3410   # disconnect disks
3411   for rd in bdevs:
3412     try:
3413       rd.DisconnectNet()
3414     except errors.BlockDeviceError, err:
3415       _Fail("Can't change network configuration to standalone mode: %s",
3416             err, exc=True)
3417
3418
3419 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3420   """Attaches the network on a list of drbd devices.
3421
3422   """
3423   bdevs = _FindDisks(nodes_ip, disks)
3424
3425   if multimaster:
3426     for idx, rd in enumerate(bdevs):
3427       try:
3428         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3429       except EnvironmentError, err:
3430         _Fail("Can't create symlink: %s", err)
3431   # reconnect disks, switch to new master configuration and if
3432   # needed primary mode
3433   for rd in bdevs:
3434     try:
3435       rd.AttachNet(multimaster)
3436     except errors.BlockDeviceError, err:
3437       _Fail("Can't change network configuration: %s", err)
3438
3439   # wait until the disks are connected; we need to retry the re-attach
3440   # if the device becomes standalone, as this might happen if the one
3441   # node disconnects and reconnects in a different mode before the
3442   # other node reconnects; in this case, one or both of the nodes will
3443   # decide it has wrong configuration and switch to standalone
3444
3445   def _Attach():
3446     all_connected = True
3447
3448     for rd in bdevs:
3449       stats = rd.GetProcStatus()
3450
3451       all_connected = (all_connected and
3452                        (stats.is_connected or stats.is_in_resync))
3453
3454       if stats.is_standalone:
3455         # peer had different config info and this node became
3456         # standalone, even though this should not happen with the
3457         # new staged way of changing disk configs
3458         try:
3459           rd.AttachNet(multimaster)
3460         except errors.BlockDeviceError, err:
3461           _Fail("Can't change network configuration: %s", err)
3462
3463     if not all_connected:
3464       raise utils.RetryAgain()
3465
3466   try:
3467     # Start with a delay of 100 miliseconds and go up to 5 seconds
3468     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3469   except utils.RetryTimeout:
3470     _Fail("Timeout in disk reconnecting")
3471
3472   if multimaster:
3473     # change to primary mode
3474     for rd in bdevs:
3475       try:
3476         rd.Open()
3477       except errors.BlockDeviceError, err:
3478         _Fail("Can't change to primary mode: %s", err)
3479
3480
3481 def DrbdWaitSync(nodes_ip, disks):
3482   """Wait until DRBDs have synchronized.
3483
3484   """
3485   def _helper(rd):
3486     stats = rd.GetProcStatus()
3487     if not (stats.is_connected or stats.is_in_resync):
3488       raise utils.RetryAgain()
3489     return stats
3490
3491   bdevs = _FindDisks(nodes_ip, disks)
3492
3493   min_resync = 100
3494   alldone = True
3495   for rd in bdevs:
3496     try:
3497       # poll each second for 15 seconds
3498       stats = utils.Retry(_helper, 1, 15, args=[rd])
3499     except utils.RetryTimeout:
3500       stats = rd.GetProcStatus()
3501       # last check
3502       if not (stats.is_connected or stats.is_in_resync):
3503         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3504     alldone = alldone and (not stats.is_in_resync)
3505     if stats.sync_percent is not None:
3506       min_resync = min(min_resync, stats.sync_percent)
3507
3508   return (alldone, min_resync)
3509
3510
3511 def GetDrbdUsermodeHelper():
3512   """Returns DRBD usermode helper currently configured.
3513
3514   """
3515   try:
3516     return bdev.BaseDRBD.GetUsermodeHelper()
3517   except errors.BlockDeviceError, err:
3518     _Fail(str(err))
3519
3520
3521 def PowercycleNode(hypervisor_type):
3522   """Hard-powercycle the node.
3523
3524   Because we need to return first, and schedule the powercycle in the
3525   background, we won't be able to report failures nicely.
3526
3527   """
3528   hyper = hypervisor.GetHypervisor(hypervisor_type)
3529   try:
3530     pid = os.fork()
3531   except OSError:
3532     # if we can't fork, we'll pretend that we're in the child process
3533     pid = 0
3534   if pid > 0:
3535     return "Reboot scheduled in 5 seconds"
3536   # ensure the child is running on ram
3537   try:
3538     utils.Mlockall()
3539   except Exception: # pylint: disable=W0703
3540     pass
3541   time.sleep(5)
3542   hyper.PowercycleNode()
3543
3544
3545 class HooksRunner(object):
3546   """Hook runner.
3547
3548   This class is instantiated on the node side (ganeti-noded) and not
3549   on the master side.
3550
3551   """
3552   def __init__(self, hooks_base_dir=None):
3553     """Constructor for hooks runner.
3554
3555     @type hooks_base_dir: str or None
3556     @param hooks_base_dir: if not None, this overrides the
3557         L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3558
3559     """
3560     if hooks_base_dir is None:
3561       hooks_base_dir = pathutils.HOOKS_BASE_DIR
3562     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3563     # constant
3564     self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3565
3566   def RunLocalHooks(self, node_list, hpath, phase, env):
3567     """Check that the hooks will be run only locally and then run them.
3568
3569     """
3570     assert len(node_list) == 1
3571     node = node_list[0]
3572     _, myself = ssconf.GetMasterAndMyself()
3573     assert node == myself
3574
3575     results = self.RunHooks(hpath, phase, env)
3576
3577     # Return values in the form expected by HooksMaster
3578     return {node: (None, False, results)}
3579
3580   def RunHooks(self, hpath, phase, env):
3581     """Run the scripts in the hooks directory.
3582
3583     @type hpath: str
3584     @param hpath: the path to the hooks directory which
3585         holds the scripts
3586     @type phase: str
3587     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3588         L{constants.HOOKS_PHASE_POST}
3589     @type env: dict
3590     @param env: dictionary with the environment for the hook
3591     @rtype: list
3592     @return: list of 3-element tuples:
3593       - script path
3594       - script result, either L{constants.HKR_SUCCESS} or
3595         L{constants.HKR_FAIL}
3596       - output of the script
3597
3598     @raise errors.ProgrammerError: for invalid input
3599         parameters
3600
3601     """
3602     if phase == constants.HOOKS_PHASE_PRE:
3603       suffix = "pre"
3604     elif phase == constants.HOOKS_PHASE_POST:
3605       suffix = "post"
3606     else:
3607       _Fail("Unknown hooks phase '%s'", phase)
3608
3609     subdir = "%s-%s.d" % (hpath, suffix)
3610     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3611
3612     results = []
3613
3614     if not os.path.isdir(dir_name):
3615       # for non-existing/non-dirs, we simply exit instead of logging a
3616       # warning at every operation
3617       return results
3618
3619     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3620
3621     for (relname, relstatus, runresult) in runparts_results:
3622       if relstatus == constants.RUNPARTS_SKIP:
3623         rrval = constants.HKR_SKIP
3624         output = ""
3625       elif relstatus == constants.RUNPARTS_ERR:
3626         rrval = constants.HKR_FAIL
3627         output = "Hook script execution error: %s" % runresult
3628       elif relstatus == constants.RUNPARTS_RUN:
3629         if runresult.failed:
3630           rrval = constants.HKR_FAIL
3631         else:
3632           rrval = constants.HKR_SUCCESS
3633         output = utils.SafeEncode(runresult.output.strip())
3634       results.append(("%s/%s" % (subdir, relname), rrval, output))
3635
3636     return results
3637
3638
3639 class IAllocatorRunner(object):
3640   """IAllocator runner.
3641
3642   This class is instantiated on the node side (ganeti-noded) and not on
3643   the master side.
3644
3645   """
3646   @staticmethod
3647   def Run(name, idata):
3648     """Run an iallocator script.
3649
3650     @type name: str
3651     @param name: the iallocator script name
3652     @type idata: str
3653     @param idata: the allocator input data
3654
3655     @rtype: tuple
3656     @return: two element tuple of:
3657        - status
3658        - either error message or stdout of allocator (for success)
3659
3660     """
3661     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3662                                   os.path.isfile)
3663     if alloc_script is None:
3664       _Fail("iallocator module '%s' not found in the search path", name)
3665
3666     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3667     try:
3668       os.write(fd, idata)
3669       os.close(fd)
3670       result = utils.RunCmd([alloc_script, fin_name])
3671       if result.failed:
3672         _Fail("iallocator module '%s' failed: %s, output '%s'",
3673               name, result.fail_reason, result.output)
3674     finally:
3675       os.unlink(fin_name)
3676
3677     return result.stdout
3678
3679
3680 class DevCacheManager(object):
3681   """Simple class for managing a cache of block device information.
3682
3683   """
3684   _DEV_PREFIX = "/dev/"
3685   _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3686
3687   @classmethod
3688   def _ConvertPath(cls, dev_path):
3689     """Converts a /dev/name path to the cache file name.
3690
3691     This replaces slashes with underscores and strips the /dev
3692     prefix. It then returns the full path to the cache file.
3693
3694     @type dev_path: str
3695     @param dev_path: the C{/dev/} path name
3696     @rtype: str
3697     @return: the converted path name
3698
3699     """
3700     if dev_path.startswith(cls._DEV_PREFIX):
3701       dev_path = dev_path[len(cls._DEV_PREFIX):]
3702     dev_path = dev_path.replace("/", "_")
3703     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3704     return fpath
3705
3706   @classmethod
3707   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3708     """Updates the cache information for a given device.
3709
3710     @type dev_path: str
3711     @param dev_path: the pathname of the device
3712     @type owner: str
3713     @param owner: the owner (instance name) of the device
3714     @type on_primary: bool
3715     @param on_primary: whether this is the primary
3716         node nor not
3717     @type iv_name: str
3718     @param iv_name: the instance-visible name of the
3719         device, as in objects.Disk.iv_name
3720
3721     @rtype: None
3722
3723     """
3724     if dev_path is None:
3725       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3726       return
3727     fpath = cls._ConvertPath(dev_path)
3728     if on_primary:
3729       state = "primary"
3730     else:
3731       state = "secondary"
3732     if iv_name is None:
3733       iv_name = "not_visible"
3734     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3735     try:
3736       utils.WriteFile(fpath, data=fdata)
3737     except EnvironmentError, err:
3738       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3739
3740   @classmethod
3741   def RemoveCache(cls, dev_path):
3742     """Remove data for a dev_path.
3743
3744     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3745     path name and logging.
3746
3747     @type dev_path: str
3748     @param dev_path: the pathname of the device
3749
3750     @rtype: None
3751
3752     """
3753     if dev_path is None:
3754       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3755       return
3756     fpath = cls._ConvertPath(dev_path)
3757     try:
3758       utils.RemoveFile(fpath)
3759     except EnvironmentError, err:
3760       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)