RPC/Backend: Make UploadFile uid and gid agnostic
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.RAPI_USERS_FILE,
200     constants.CONFD_HMAC_KEY,
201     constants.CLUSTER_DOMAIN_SECRET_FILE,
202     ])
203
204   for hv_name in constants.HYPER_TYPES:
205     hv_class = hypervisor.GetHypervisorClass(hv_name)
206     allowed_files.update(hv_class.GetAncillaryFiles())
207
208   return frozenset(allowed_files)
209
210
211 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212
213
214 def JobQueuePurge():
215   """Removes job queue files and archived jobs.
216
217   @rtype: tuple
218   @return: True, None
219
220   """
221   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
222   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223
224
225 def GetMasterInfo():
226   """Returns master information.
227
228   This is an utility function to compute master information, either
229   for consumption here or from the node daemon.
230
231   @rtype: tuple
232   @return: master_netdev, master_ip, master_name, primary_ip_family
233   @raise RPCFail: in case of errors
234
235   """
236   try:
237     cfg = _GetConfig()
238     master_netdev = cfg.GetMasterNetdev()
239     master_ip = cfg.GetMasterIP()
240     master_node = cfg.GetMasterNode()
241     primary_ip_family = cfg.GetPrimaryIPFamily()
242   except errors.ConfigurationError, err:
243     _Fail("Cluster configuration incomplete: %s", err, exc=True)
244   return (master_netdev, master_ip, master_node, primary_ip_family)
245
246
247 def StartMaster(start_daemons, no_voting):
248   """Activate local node as master node.
249
250   The function will either try activate the IP address of the master
251   (unless someone else has it) or also start the master daemons, based
252   on the start_daemons parameter.
253
254   @type start_daemons: boolean
255   @param start_daemons: whether to start the master daemons
256       (ganeti-masterd and ganeti-rapi), or (if false) activate the
257       master ip
258   @type no_voting: boolean
259   @param no_voting: whether to start ganeti-masterd without a node vote
260       (if start_daemons is True), but still non-interactively
261   @rtype: None
262
263   """
264   # GetMasterInfo will raise an exception if not able to return data
265   master_netdev, master_ip, _, family = GetMasterInfo()
266
267   err_msgs = []
268   # either start the master and rapi daemons
269   if start_daemons:
270     if no_voting:
271       masterd_args = "--no-voting --yes-do-it"
272     else:
273       masterd_args = ""
274
275     env = {
276       "EXTRA_MASTERD_ARGS": masterd_args,
277       }
278
279     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
280     if result.failed:
281       msg = "Can't start Ganeti master: %s" % result.output
282       logging.error(msg)
283       err_msgs.append(msg)
284   # or activate the IP
285   else:
286     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
287       if netutils.IPAddress.Own(master_ip):
288         # we already have the ip:
289         logging.debug("Master IP already configured, doing nothing")
290       else:
291         msg = "Someone else has the master ip, not activating"
292         logging.error(msg)
293         err_msgs.append(msg)
294     else:
295       ipcls = netutils.IP4Address
296       if family == netutils.IP6Address.family:
297         ipcls = netutils.IP6Address
298
299       result = utils.RunCmd(["ip", "address", "add",
300                              "%s/%d" % (master_ip, ipcls.iplen),
301                              "dev", master_netdev, "label",
302                              "%s:0" % master_netdev])
303       if result.failed:
304         msg = "Can't activate master IP: %s" % result.output
305         logging.error(msg)
306         err_msgs.append(msg)
307
308       # we ignore the exit code of the following cmds
309       if ipcls == netutils.IP4Address:
310         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
311                       master_ip, master_ip])
312       elif ipcls == netutils.IP6Address:
313         try:
314           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
315         except errors.OpExecError:
316           # TODO: Better error reporting
317           logging.warning("Can't execute ndisc6, please install if missing")
318
319   if err_msgs:
320     _Fail("; ".join(err_msgs))
321
322
323 def StopMaster(stop_daemons):
324   """Deactivate this node as master.
325
326   The function will always try to deactivate the IP address of the
327   master. It will also stop the master daemons depending on the
328   stop_daemons parameter.
329
330   @type stop_daemons: boolean
331   @param stop_daemons: whether to also stop the master daemons
332       (ganeti-masterd and ganeti-rapi)
333   @rtype: None
334
335   """
336   # TODO: log and report back to the caller the error failures; we
337   # need to decide in which case we fail the RPC for this
338
339   # GetMasterInfo will raise an exception if not able to return data
340   master_netdev, master_ip, _, family = GetMasterInfo()
341
342   ipcls = netutils.IP4Address
343   if family == netutils.IP6Address.family:
344     ipcls = netutils.IP6Address
345
346   result = utils.RunCmd(["ip", "address", "del",
347                          "%s/%d" % (master_ip, ipcls.iplen),
348                          "dev", master_netdev])
349   if result.failed:
350     logging.error("Can't remove the master IP, error: %s", result.output)
351     # but otherwise ignore the failure
352
353   if stop_daemons:
354     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355     if result.failed:
356       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357                     " and error %s",
358                     result.cmd, result.exit_code, result.output)
359
360
361 def EtcHostsModify(mode, host, ip):
362   """Modify a host entry in /etc/hosts.
363
364   @param mode: The mode to operate. Either add or remove entry
365   @param host: The host to operate on
366   @param ip: The ip associated with the entry
367
368   """
369   if mode == constants.ETC_HOSTS_ADD:
370     if not ip:
371       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
372               " present")
373     utils.AddHostToEtcHosts(host, ip)
374   elif mode == constants.ETC_HOSTS_REMOVE:
375     if ip:
376       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
377               " parameter is present")
378     utils.RemoveHostFromEtcHosts(host)
379   else:
380     RPCFail("Mode not supported")
381
382
383 def LeaveCluster(modify_ssh_setup):
384   """Cleans up and remove the current node.
385
386   This function cleans up and prepares the current node to be removed
387   from the cluster.
388
389   If processing is successful, then it raises an
390   L{errors.QuitGanetiException} which is used as a special case to
391   shutdown the node daemon.
392
393   @param modify_ssh_setup: boolean
394
395   """
396   _CleanDirectory(constants.DATA_DIR)
397   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398   JobQueuePurge()
399
400   if modify_ssh_setup:
401     try:
402       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403
404       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405
406       utils.RemoveFile(priv_key)
407       utils.RemoveFile(pub_key)
408     except errors.OpExecError:
409       logging.exception("Error while processing ssh files")
410
411   try:
412     utils.RemoveFile(constants.CONFD_HMAC_KEY)
413     utils.RemoveFile(constants.RAPI_CERT_FILE)
414     utils.RemoveFile(constants.NODED_CERT_FILE)
415   except: # pylint: disable-msg=W0702
416     logging.exception("Error while removing cluster secrets")
417
418   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419   if result.failed:
420     logging.error("Command %s failed with exitcode %s and error %s",
421                   result.cmd, result.exit_code, result.output)
422
423   # Raise a custom exception (handled in ganeti-noded)
424   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
425
426
427 def GetNodeInfo(vgname, hypervisor_type):
428   """Gives back a hash with different information about the node.
429
430   @type vgname: C{string}
431   @param vgname: the name of the volume group to ask for disk space information
432   @type hypervisor_type: C{str}
433   @param hypervisor_type: the name of the hypervisor to ask for
434       memory information
435   @rtype: C{dict}
436   @return: dictionary with the following keys:
437       - vg_size is the size of the configured volume group in MiB
438       - vg_free is the free size of the volume group in MiB
439       - memory_dom0 is the memory allocated for domain0 in MiB
440       - memory_free is the currently available (free) ram in MiB
441       - memory_total is the total number of ram in MiB
442
443   """
444   outputarray = {}
445
446   if vgname is not None:
447     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
448     vg_free = vg_size = None
449     if vginfo:
450       vg_free = int(round(vginfo[0][0], 0))
451       vg_size = int(round(vginfo[0][1], 0))
452     outputarray['vg_size'] = vg_size
453     outputarray['vg_free'] = vg_free
454
455   if hypervisor_type is not None:
456     hyper = hypervisor.GetHypervisor(hypervisor_type)
457     hyp_info = hyper.GetNodeInfo()
458     if hyp_info is not None:
459       outputarray.update(hyp_info)
460
461   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462
463   return outputarray
464
465
466 def VerifyNode(what, cluster_name):
467   """Verify the status of the local node.
468
469   Based on the input L{what} parameter, various checks are done on the
470   local node.
471
472   If the I{filelist} key is present, this list of
473   files is checksummed and the file/checksum pairs are returned.
474
475   If the I{nodelist} key is present, we check that we have
476   connectivity via ssh with the target nodes (and check the hostname
477   report).
478
479   If the I{node-net-test} key is present, we check that we have
480   connectivity to the given nodes via both primary IP and, if
481   applicable, secondary IPs.
482
483   @type what: C{dict}
484   @param what: a dictionary of things to check:
485       - filelist: list of files for which to compute checksums
486       - nodelist: list of nodes we should check ssh communication with
487       - node-net-test: list of nodes we should check node daemon port
488         connectivity with
489       - hypervisor: list with hypervisors to run the verify for
490   @rtype: dict
491   @return: a dictionary with the same keys as the input dict, and
492       values representing the result of the checks
493
494   """
495   result = {}
496   my_name = netutils.Hostname.GetSysName()
497   port = netutils.GetDaemonPort(constants.NODED)
498   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
499
500   if constants.NV_HYPERVISOR in what and vm_capable:
501     result[constants.NV_HYPERVISOR] = tmp = {}
502     for hv_name in what[constants.NV_HYPERVISOR]:
503       try:
504         val = hypervisor.GetHypervisor(hv_name).Verify()
505       except errors.HypervisorError, err:
506         val = "Error while checking hypervisor: %s" % str(err)
507       tmp[hv_name] = val
508
509   if constants.NV_HVPARAMS in what and vm_capable:
510     result[constants.NV_HVPARAMS] = tmp = []
511     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
512       try:
513         logging.info("Validating hv %s, %s", hv_name, hvparms)
514         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
515       except errors.HypervisorError, err:
516         tmp.append((source, hv_name, str(err)))
517
518   if constants.NV_FILELIST in what:
519     result[constants.NV_FILELIST] = utils.FingerprintFiles(
520       what[constants.NV_FILELIST])
521
522   if constants.NV_NODELIST in what:
523     result[constants.NV_NODELIST] = tmp = {}
524     random.shuffle(what[constants.NV_NODELIST])
525     for node in what[constants.NV_NODELIST]:
526       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
527       if not success:
528         tmp[node] = message
529
530   if constants.NV_NODENETTEST in what:
531     result[constants.NV_NODENETTEST] = tmp = {}
532     my_pip = my_sip = None
533     for name, pip, sip in what[constants.NV_NODENETTEST]:
534       if name == my_name:
535         my_pip = pip
536         my_sip = sip
537         break
538     if not my_pip:
539       tmp[my_name] = ("Can't find my own primary/secondary IP"
540                       " in the node list")
541     else:
542       for name, pip, sip in what[constants.NV_NODENETTEST]:
543         fail = []
544         if not netutils.TcpPing(pip, port, source=my_pip):
545           fail.append("primary")
546         if sip != pip:
547           if not netutils.TcpPing(sip, port, source=my_sip):
548             fail.append("secondary")
549         if fail:
550           tmp[name] = ("failure using the %s interface(s)" %
551                        " and ".join(fail))
552
553   if constants.NV_MASTERIP in what:
554     # FIXME: add checks on incoming data structures (here and in the
555     # rest of the function)
556     master_name, master_ip = what[constants.NV_MASTERIP]
557     if master_name == my_name:
558       source = constants.IP4_ADDRESS_LOCALHOST
559     else:
560       source = None
561     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
562                                                   source=source)
563
564   if constants.NV_OOB_PATHS in what:
565     result[constants.NV_OOB_PATHS] = tmp = []
566     for path in what[constants.NV_OOB_PATHS]:
567       try:
568         st = os.stat(path)
569       except OSError, err:
570         tmp.append("error stating out of band helper: %s" % err)
571       else:
572         if stat.S_ISREG(st.st_mode):
573           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
574             tmp.append(None)
575           else:
576             tmp.append("out of band helper %s is not executable" % path)
577         else:
578           tmp.append("out of band helper %s is not a file" % path)
579
580   if constants.NV_LVLIST in what and vm_capable:
581     try:
582       val = GetVolumeList(utils.ListVolumeGroups().keys())
583     except RPCFail, err:
584       val = str(err)
585     result[constants.NV_LVLIST] = val
586
587   if constants.NV_INSTANCELIST in what and vm_capable:
588     # GetInstanceList can fail
589     try:
590       val = GetInstanceList(what[constants.NV_INSTANCELIST])
591     except RPCFail, err:
592       val = str(err)
593     result[constants.NV_INSTANCELIST] = val
594
595   if constants.NV_VGLIST in what and vm_capable:
596     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
597
598   if constants.NV_PVLIST in what and vm_capable:
599     result[constants.NV_PVLIST] = \
600       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
601                                    filter_allocatable=False)
602
603   if constants.NV_VERSION in what:
604     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
605                                     constants.RELEASE_VERSION)
606
607   if constants.NV_HVINFO in what and vm_capable:
608     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
609     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
610
611   if constants.NV_DRBDLIST in what and vm_capable:
612     try:
613       used_minors = bdev.DRBD8.GetUsedDevs().keys()
614     except errors.BlockDeviceError, err:
615       logging.warning("Can't get used minors list", exc_info=True)
616       used_minors = str(err)
617     result[constants.NV_DRBDLIST] = used_minors
618
619   if constants.NV_DRBDHELPER in what and vm_capable:
620     status = True
621     try:
622       payload = bdev.BaseDRBD.GetUsermodeHelper()
623     except errors.BlockDeviceError, err:
624       logging.error("Can't get DRBD usermode helper: %s", str(err))
625       status = False
626       payload = str(err)
627     result[constants.NV_DRBDHELPER] = (status, payload)
628
629   if constants.NV_NODESETUP in what:
630     result[constants.NV_NODESETUP] = tmpr = []
631     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
632       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
633                   " under /sys, missing required directories /sys/block"
634                   " and /sys/class/net")
635     if (not os.path.isdir("/proc/sys") or
636         not os.path.isfile("/proc/sysrq-trigger")):
637       tmpr.append("The procfs filesystem doesn't seem to be mounted"
638                   " under /proc, missing required directory /proc/sys and"
639                   " the file /proc/sysrq-trigger")
640
641   if constants.NV_TIME in what:
642     result[constants.NV_TIME] = utils.SplitTime(time.time())
643
644   if constants.NV_OSLIST in what and vm_capable:
645     result[constants.NV_OSLIST] = DiagnoseOS()
646
647   if constants.NV_BRIDGES in what and vm_capable:
648     result[constants.NV_BRIDGES] = [bridge
649                                     for bridge in what[constants.NV_BRIDGES]
650                                     if not utils.BridgeExists(bridge)]
651   return result
652
653
654 def GetVolumeList(vg_names):
655   """Compute list of logical volumes and their size.
656
657   @type vg_names: list
658   @param vg_names: the volume groups whose LVs we should list, or
659       empty for all volume groups
660   @rtype: dict
661   @return:
662       dictionary of all partions (key) with value being a tuple of
663       their size (in MiB), inactive and online status::
664
665         {'xenvg/test1': ('20.06', True, True)}
666
667       in case of errors, a string is returned with the error
668       details.
669
670   """
671   lvs = {}
672   sep = '|'
673   if not vg_names:
674     vg_names = []
675   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
676                          "--separator=%s" % sep,
677                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
678   if result.failed:
679     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
680
681   for line in result.stdout.splitlines():
682     line = line.strip()
683     match = _LVSLINE_REGEX.match(line)
684     if not match:
685       logging.error("Invalid line returned from lvs output: '%s'", line)
686       continue
687     vg_name, name, size, attr = match.groups()
688     inactive = attr[4] == '-'
689     online = attr[5] == 'o'
690     virtual = attr[0] == 'v'
691     if virtual:
692       # we don't want to report such volumes as existing, since they
693       # don't really hold data
694       continue
695     lvs[vg_name+"/"+name] = (size, inactive, online)
696
697   return lvs
698
699
700 def ListVolumeGroups():
701   """List the volume groups and their size.
702
703   @rtype: dict
704   @return: dictionary with keys volume name and values the
705       size of the volume
706
707   """
708   return utils.ListVolumeGroups()
709
710
711 def NodeVolumes():
712   """List all volumes on this node.
713
714   @rtype: list
715   @return:
716     A list of dictionaries, each having four keys:
717       - name: the logical volume name,
718       - size: the size of the logical volume
719       - dev: the physical device on which the LV lives
720       - vg: the volume group to which it belongs
721
722     In case of errors, we return an empty list and log the
723     error.
724
725     Note that since a logical volume can live on multiple physical
726     volumes, the resulting list might include a logical volume
727     multiple times.
728
729   """
730   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
731                          "--separator=|",
732                          "--options=lv_name,lv_size,devices,vg_name"])
733   if result.failed:
734     _Fail("Failed to list logical volumes, lvs output: %s",
735           result.output)
736
737   def parse_dev(dev):
738     return dev.split('(')[0]
739
740   def handle_dev(dev):
741     return [parse_dev(x) for x in dev.split(",")]
742
743   def map_line(line):
744     line = [v.strip() for v in line]
745     return [{'name': line[0], 'size': line[1],
746              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
747
748   all_devs = []
749   for line in result.stdout.splitlines():
750     if line.count('|') >= 3:
751       all_devs.extend(map_line(line.split('|')))
752     else:
753       logging.warning("Strange line in the output from lvs: '%s'", line)
754   return all_devs
755
756
757 def BridgesExist(bridges_list):
758   """Check if a list of bridges exist on the current node.
759
760   @rtype: boolean
761   @return: C{True} if all of them exist, C{False} otherwise
762
763   """
764   missing = []
765   for bridge in bridges_list:
766     if not utils.BridgeExists(bridge):
767       missing.append(bridge)
768
769   if missing:
770     _Fail("Missing bridges %s", utils.CommaJoin(missing))
771
772
773 def GetInstanceList(hypervisor_list):
774   """Provides a list of instances.
775
776   @type hypervisor_list: list
777   @param hypervisor_list: the list of hypervisors to query information
778
779   @rtype: list
780   @return: a list of all running instances on the current node
781     - instance1.example.com
782     - instance2.example.com
783
784   """
785   results = []
786   for hname in hypervisor_list:
787     try:
788       names = hypervisor.GetHypervisor(hname).ListInstances()
789       results.extend(names)
790     except errors.HypervisorError, err:
791       _Fail("Error enumerating instances (hypervisor %s): %s",
792             hname, err, exc=True)
793
794   return results
795
796
797 def GetInstanceInfo(instance, hname):
798   """Gives back the information about an instance as a dictionary.
799
800   @type instance: string
801   @param instance: the instance name
802   @type hname: string
803   @param hname: the hypervisor type of the instance
804
805   @rtype: dict
806   @return: dictionary with the following keys:
807       - memory: memory size of instance (int)
808       - state: xen state of instance (string)
809       - time: cpu time of instance (float)
810
811   """
812   output = {}
813
814   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
815   if iinfo is not None:
816     output['memory'] = iinfo[2]
817     output['state'] = iinfo[4]
818     output['time'] = iinfo[5]
819
820   return output
821
822
823 def GetInstanceMigratable(instance):
824   """Gives whether an instance can be migrated.
825
826   @type instance: L{objects.Instance}
827   @param instance: object representing the instance to be checked.
828
829   @rtype: tuple
830   @return: tuple of (result, description) where:
831       - result: whether the instance can be migrated or not
832       - description: a description of the issue, if relevant
833
834   """
835   hyper = hypervisor.GetHypervisor(instance.hypervisor)
836   iname = instance.name
837   if iname not in hyper.ListInstances():
838     _Fail("Instance %s is not running", iname)
839
840   for idx in range(len(instance.disks)):
841     link_name = _GetBlockDevSymlinkPath(iname, idx)
842     if not os.path.islink(link_name):
843       logging.warning("Instance %s is missing symlink %s for disk %d",
844                       iname, link_name, idx)
845
846
847 def GetAllInstancesInfo(hypervisor_list):
848   """Gather data about all instances.
849
850   This is the equivalent of L{GetInstanceInfo}, except that it
851   computes data for all instances at once, thus being faster if one
852   needs data about more than one instance.
853
854   @type hypervisor_list: list
855   @param hypervisor_list: list of hypervisors to query for instance data
856
857   @rtype: dict
858   @return: dictionary of instance: data, with data having the following keys:
859       - memory: memory size of instance (int)
860       - state: xen state of instance (string)
861       - time: cpu time of instance (float)
862       - vcpus: the number of vcpus
863
864   """
865   output = {}
866
867   for hname in hypervisor_list:
868     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
869     if iinfo:
870       for name, _, memory, vcpus, state, times in iinfo:
871         value = {
872           'memory': memory,
873           'vcpus': vcpus,
874           'state': state,
875           'time': times,
876           }
877         if name in output:
878           # we only check static parameters, like memory and vcpus,
879           # and not state and time which can change between the
880           # invocations of the different hypervisors
881           for key in 'memory', 'vcpus':
882             if value[key] != output[name][key]:
883               _Fail("Instance %s is running twice"
884                     " with different parameters", name)
885         output[name] = value
886
887   return output
888
889
890 def _InstanceLogName(kind, os_name, instance):
891   """Compute the OS log filename for a given instance and operation.
892
893   The instance name and os name are passed in as strings since not all
894   operations have these as part of an instance object.
895
896   @type kind: string
897   @param kind: the operation type (e.g. add, import, etc.)
898   @type os_name: string
899   @param os_name: the os name
900   @type instance: string
901   @param instance: the name of the instance being imported/added/etc.
902
903   """
904   # TODO: Use tempfile.mkstemp to create unique filename
905   base = ("%s-%s-%s-%s.log" %
906           (kind, os_name, instance, utils.TimestampForFilename()))
907   return utils.PathJoin(constants.LOG_OS_DIR, base)
908
909
910 def InstanceOsAdd(instance, reinstall, debug):
911   """Add an OS to an instance.
912
913   @type instance: L{objects.Instance}
914   @param instance: Instance whose OS is to be installed
915   @type reinstall: boolean
916   @param reinstall: whether this is an instance reinstall
917   @type debug: integer
918   @param debug: debug level, passed to the OS scripts
919   @rtype: None
920
921   """
922   inst_os = OSFromDisk(instance.os)
923
924   create_env = OSEnvironment(instance, inst_os, debug)
925   if reinstall:
926     create_env['INSTANCE_REINSTALL'] = "1"
927
928   logfile = _InstanceLogName("add", instance.os, instance.name)
929
930   result = utils.RunCmd([inst_os.create_script], env=create_env,
931                         cwd=inst_os.path, output=logfile,)
932   if result.failed:
933     logging.error("os create command '%s' returned error: %s, logfile: %s,"
934                   " output: %s", result.cmd, result.fail_reason, logfile,
935                   result.output)
936     lines = [utils.SafeEncode(val)
937              for val in utils.TailFile(logfile, lines=20)]
938     _Fail("OS create script failed (%s), last lines in the"
939           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
940
941
942 def RunRenameInstance(instance, old_name, debug):
943   """Run the OS rename script for an instance.
944
945   @type instance: L{objects.Instance}
946   @param instance: Instance whose OS is to be installed
947   @type old_name: string
948   @param old_name: previous instance name
949   @type debug: integer
950   @param debug: debug level, passed to the OS scripts
951   @rtype: boolean
952   @return: the success of the operation
953
954   """
955   inst_os = OSFromDisk(instance.os)
956
957   rename_env = OSEnvironment(instance, inst_os, debug)
958   rename_env['OLD_INSTANCE_NAME'] = old_name
959
960   logfile = _InstanceLogName("rename", instance.os,
961                              "%s-%s" % (old_name, instance.name))
962
963   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
964                         cwd=inst_os.path, output=logfile)
965
966   if result.failed:
967     logging.error("os create command '%s' returned error: %s output: %s",
968                   result.cmd, result.fail_reason, result.output)
969     lines = [utils.SafeEncode(val)
970              for val in utils.TailFile(logfile, lines=20)]
971     _Fail("OS rename script failed (%s), last lines in the"
972           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
973
974
975 def _GetBlockDevSymlinkPath(instance_name, idx):
976   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
977                         (instance_name, constants.DISK_SEPARATOR, idx))
978
979
980 def _SymlinkBlockDev(instance_name, device_path, idx):
981   """Set up symlinks to a instance's block device.
982
983   This is an auxiliary function run when an instance is start (on the primary
984   node) or when an instance is migrated (on the target node).
985
986
987   @param instance_name: the name of the target instance
988   @param device_path: path of the physical block device, on the node
989   @param idx: the disk index
990   @return: absolute path to the disk's symlink
991
992   """
993   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
994   try:
995     os.symlink(device_path, link_name)
996   except OSError, err:
997     if err.errno == errno.EEXIST:
998       if (not os.path.islink(link_name) or
999           os.readlink(link_name) != device_path):
1000         os.remove(link_name)
1001         os.symlink(device_path, link_name)
1002     else:
1003       raise
1004
1005   return link_name
1006
1007
1008 def _RemoveBlockDevLinks(instance_name, disks):
1009   """Remove the block device symlinks belonging to the given instance.
1010
1011   """
1012   for idx, _ in enumerate(disks):
1013     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1014     if os.path.islink(link_name):
1015       try:
1016         os.remove(link_name)
1017       except OSError:
1018         logging.exception("Can't remove symlink '%s'", link_name)
1019
1020
1021 def _GatherAndLinkBlockDevs(instance):
1022   """Set up an instance's block device(s).
1023
1024   This is run on the primary node at instance startup. The block
1025   devices must be already assembled.
1026
1027   @type instance: L{objects.Instance}
1028   @param instance: the instance whose disks we shoul assemble
1029   @rtype: list
1030   @return: list of (disk_object, device_path)
1031
1032   """
1033   block_devices = []
1034   for idx, disk in enumerate(instance.disks):
1035     device = _RecursiveFindBD(disk)
1036     if device is None:
1037       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1038                                     str(disk))
1039     device.Open()
1040     try:
1041       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1042     except OSError, e:
1043       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1044                                     e.strerror)
1045
1046     block_devices.append((disk, link_name))
1047
1048   return block_devices
1049
1050
1051 def StartInstance(instance):
1052   """Start an instance.
1053
1054   @type instance: L{objects.Instance}
1055   @param instance: the instance object
1056   @rtype: None
1057
1058   """
1059   running_instances = GetInstanceList([instance.hypervisor])
1060
1061   if instance.name in running_instances:
1062     logging.info("Instance %s already running, not starting", instance.name)
1063     return
1064
1065   try:
1066     block_devices = _GatherAndLinkBlockDevs(instance)
1067     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1068     hyper.StartInstance(instance, block_devices)
1069   except errors.BlockDeviceError, err:
1070     _Fail("Block device error: %s", err, exc=True)
1071   except errors.HypervisorError, err:
1072     _RemoveBlockDevLinks(instance.name, instance.disks)
1073     _Fail("Hypervisor error: %s", err, exc=True)
1074
1075
1076 def InstanceShutdown(instance, timeout):
1077   """Shut an instance down.
1078
1079   @note: this functions uses polling with a hardcoded timeout.
1080
1081   @type instance: L{objects.Instance}
1082   @param instance: the instance object
1083   @type timeout: integer
1084   @param timeout: maximum timeout for soft shutdown
1085   @rtype: None
1086
1087   """
1088   hv_name = instance.hypervisor
1089   hyper = hypervisor.GetHypervisor(hv_name)
1090   iname = instance.name
1091
1092   if instance.name not in hyper.ListInstances():
1093     logging.info("Instance %s not running, doing nothing", iname)
1094     return
1095
1096   class _TryShutdown:
1097     def __init__(self):
1098       self.tried_once = False
1099
1100     def __call__(self):
1101       if iname not in hyper.ListInstances():
1102         return
1103
1104       try:
1105         hyper.StopInstance(instance, retry=self.tried_once)
1106       except errors.HypervisorError, err:
1107         if iname not in hyper.ListInstances():
1108           # if the instance is no longer existing, consider this a
1109           # success and go to cleanup
1110           return
1111
1112         _Fail("Failed to stop instance %s: %s", iname, err)
1113
1114       self.tried_once = True
1115
1116       raise utils.RetryAgain()
1117
1118   try:
1119     utils.Retry(_TryShutdown(), 5, timeout)
1120   except utils.RetryTimeout:
1121     # the shutdown did not succeed
1122     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1123
1124     try:
1125       hyper.StopInstance(instance, force=True)
1126     except errors.HypervisorError, err:
1127       if iname in hyper.ListInstances():
1128         # only raise an error if the instance still exists, otherwise
1129         # the error could simply be "instance ... unknown"!
1130         _Fail("Failed to force stop instance %s: %s", iname, err)
1131
1132     time.sleep(1)
1133
1134     if iname in hyper.ListInstances():
1135       _Fail("Could not shutdown instance %s even by destroy", iname)
1136
1137   try:
1138     hyper.CleanupInstance(instance.name)
1139   except errors.HypervisorError, err:
1140     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1141
1142   _RemoveBlockDevLinks(iname, instance.disks)
1143
1144
1145 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1146   """Reboot an instance.
1147
1148   @type instance: L{objects.Instance}
1149   @param instance: the instance object to reboot
1150   @type reboot_type: str
1151   @param reboot_type: the type of reboot, one the following
1152     constants:
1153       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1154         instance OS, do not recreate the VM
1155       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1156         restart the VM (at the hypervisor level)
1157       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1158         not accepted here, since that mode is handled differently, in
1159         cmdlib, and translates into full stop and start of the
1160         instance (instead of a call_instance_reboot RPC)
1161   @type shutdown_timeout: integer
1162   @param shutdown_timeout: maximum timeout for soft shutdown
1163   @rtype: None
1164
1165   """
1166   running_instances = GetInstanceList([instance.hypervisor])
1167
1168   if instance.name not in running_instances:
1169     _Fail("Cannot reboot instance %s that is not running", instance.name)
1170
1171   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1172   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1173     try:
1174       hyper.RebootInstance(instance)
1175     except errors.HypervisorError, err:
1176       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1177   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1178     try:
1179       InstanceShutdown(instance, shutdown_timeout)
1180       return StartInstance(instance)
1181     except errors.HypervisorError, err:
1182       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1183   else:
1184     _Fail("Invalid reboot_type received: %s", reboot_type)
1185
1186
1187 def MigrationInfo(instance):
1188   """Gather information about an instance to be migrated.
1189
1190   @type instance: L{objects.Instance}
1191   @param instance: the instance definition
1192
1193   """
1194   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1195   try:
1196     info = hyper.MigrationInfo(instance)
1197   except errors.HypervisorError, err:
1198     _Fail("Failed to fetch migration information: %s", err, exc=True)
1199   return info
1200
1201
1202 def AcceptInstance(instance, info, target):
1203   """Prepare the node to accept an instance.
1204
1205   @type instance: L{objects.Instance}
1206   @param instance: the instance definition
1207   @type info: string/data (opaque)
1208   @param info: migration information, from the source node
1209   @type target: string
1210   @param target: target host (usually ip), on this node
1211
1212   """
1213   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1214   try:
1215     hyper.AcceptInstance(instance, info, target)
1216   except errors.HypervisorError, err:
1217     _Fail("Failed to accept instance: %s", err, exc=True)
1218
1219
1220 def FinalizeMigration(instance, info, success):
1221   """Finalize any preparation to accept an instance.
1222
1223   @type instance: L{objects.Instance}
1224   @param instance: the instance definition
1225   @type info: string/data (opaque)
1226   @param info: migration information, from the source node
1227   @type success: boolean
1228   @param success: whether the migration was a success or a failure
1229
1230   """
1231   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1232   try:
1233     hyper.FinalizeMigration(instance, info, success)
1234   except errors.HypervisorError, err:
1235     _Fail("Failed to finalize migration: %s", err, exc=True)
1236
1237
1238 def MigrateInstance(instance, target, live):
1239   """Migrates an instance to another node.
1240
1241   @type instance: L{objects.Instance}
1242   @param instance: the instance definition
1243   @type target: string
1244   @param target: the target node name
1245   @type live: boolean
1246   @param live: whether the migration should be done live or not (the
1247       interpretation of this parameter is left to the hypervisor)
1248   @rtype: tuple
1249   @return: a tuple of (success, msg) where:
1250       - succes is a boolean denoting the success/failure of the operation
1251       - msg is a string with details in case of failure
1252
1253   """
1254   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1255
1256   try:
1257     hyper.MigrateInstance(instance, target, live)
1258   except errors.HypervisorError, err:
1259     _Fail("Failed to migrate instance: %s", err, exc=True)
1260
1261
1262 def BlockdevCreate(disk, size, owner, on_primary, info):
1263   """Creates a block device for an instance.
1264
1265   @type disk: L{objects.Disk}
1266   @param disk: the object describing the disk we should create
1267   @type size: int
1268   @param size: the size of the physical underlying device, in MiB
1269   @type owner: str
1270   @param owner: the name of the instance for which disk is created,
1271       used for device cache data
1272   @type on_primary: boolean
1273   @param on_primary:  indicates if it is the primary node or not
1274   @type info: string
1275   @param info: string that will be sent to the physical device
1276       creation, used for example to set (LVM) tags on LVs
1277
1278   @return: the new unique_id of the device (this can sometime be
1279       computed only after creation), or None. On secondary nodes,
1280       it's not required to return anything.
1281
1282   """
1283   # TODO: remove the obsolete 'size' argument
1284   # pylint: disable-msg=W0613
1285   clist = []
1286   if disk.children:
1287     for child in disk.children:
1288       try:
1289         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1290       except errors.BlockDeviceError, err:
1291         _Fail("Can't assemble device %s: %s", child, err)
1292       if on_primary or disk.AssembleOnSecondary():
1293         # we need the children open in case the device itself has to
1294         # be assembled
1295         try:
1296           # pylint: disable-msg=E1103
1297           crdev.Open()
1298         except errors.BlockDeviceError, err:
1299           _Fail("Can't make child '%s' read-write: %s", child, err)
1300       clist.append(crdev)
1301
1302   try:
1303     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1304   except errors.BlockDeviceError, err:
1305     _Fail("Can't create block device: %s", err)
1306
1307   if on_primary or disk.AssembleOnSecondary():
1308     try:
1309       device.Assemble()
1310     except errors.BlockDeviceError, err:
1311       _Fail("Can't assemble device after creation, unusual event: %s", err)
1312     device.SetSyncSpeed(constants.SYNC_SPEED)
1313     if on_primary or disk.OpenOnSecondary():
1314       try:
1315         device.Open(force=True)
1316       except errors.BlockDeviceError, err:
1317         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1318     DevCacheManager.UpdateCache(device.dev_path, owner,
1319                                 on_primary, disk.iv_name)
1320
1321   device.SetInfo(info)
1322
1323   return device.unique_id
1324
1325
1326 def _WipeDevice(path, offset, size):
1327   """This function actually wipes the device.
1328
1329   @param path: The path to the device to wipe
1330   @param offset: The offset in MiB in the file
1331   @param size: The size in MiB to write
1332
1333   """
1334   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1335          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1336          "count=%d" % size]
1337   result = utils.RunCmd(cmd)
1338
1339   if result.failed:
1340     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1341           result.fail_reason, result.output)
1342
1343
1344 def BlockdevWipe(disk, offset, size):
1345   """Wipes a block device.
1346
1347   @type disk: L{objects.Disk}
1348   @param disk: the disk object we want to wipe
1349   @type offset: int
1350   @param offset: The offset in MiB in the file
1351   @type size: int
1352   @param size: The size in MiB to write
1353
1354   """
1355   try:
1356     rdev = _RecursiveFindBD(disk)
1357   except errors.BlockDeviceError:
1358     rdev = None
1359
1360   if not rdev:
1361     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1362
1363   # Do cross verify some of the parameters
1364   if offset > rdev.size:
1365     _Fail("Offset is bigger than device size")
1366   if (offset + size) > rdev.size:
1367     _Fail("The provided offset and size to wipe is bigger than device size")
1368
1369   _WipeDevice(rdev.dev_path, offset, size)
1370
1371
1372 def BlockdevPauseResumeSync(disks, pause):
1373   """Pause or resume the sync of the block device.
1374
1375   @type disks: list of L{objects.Disk}
1376   @param disks: the disks object we want to pause/resume
1377   @type pause: bool
1378   @param pause: Wheater to pause or resume
1379
1380   """
1381   success = []
1382   for disk in disks:
1383     try:
1384       rdev = _RecursiveFindBD(disk)
1385     except errors.BlockDeviceError:
1386       rdev = None
1387
1388     if not rdev:
1389       success.append((False, ("Cannot change sync for device %s:"
1390                               " device not found" % disk.iv_name)))
1391       continue
1392
1393     result = rdev.PauseResumeSync(pause)
1394
1395     if result:
1396       success.append((result, None))
1397     else:
1398       if pause:
1399         msg = "Pause"
1400       else:
1401         msg = "Resume"
1402       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1403
1404   return success
1405
1406
1407 def BlockdevRemove(disk):
1408   """Remove a block device.
1409
1410   @note: This is intended to be called recursively.
1411
1412   @type disk: L{objects.Disk}
1413   @param disk: the disk object we should remove
1414   @rtype: boolean
1415   @return: the success of the operation
1416
1417   """
1418   msgs = []
1419   try:
1420     rdev = _RecursiveFindBD(disk)
1421   except errors.BlockDeviceError, err:
1422     # probably can't attach
1423     logging.info("Can't attach to device %s in remove", disk)
1424     rdev = None
1425   if rdev is not None:
1426     r_path = rdev.dev_path
1427     try:
1428       rdev.Remove()
1429     except errors.BlockDeviceError, err:
1430       msgs.append(str(err))
1431     if not msgs:
1432       DevCacheManager.RemoveCache(r_path)
1433
1434   if disk.children:
1435     for child in disk.children:
1436       try:
1437         BlockdevRemove(child)
1438       except RPCFail, err:
1439         msgs.append(str(err))
1440
1441   if msgs:
1442     _Fail("; ".join(msgs))
1443
1444
1445 def _RecursiveAssembleBD(disk, owner, as_primary):
1446   """Activate a block device for an instance.
1447
1448   This is run on the primary and secondary nodes for an instance.
1449
1450   @note: this function is called recursively.
1451
1452   @type disk: L{objects.Disk}
1453   @param disk: the disk we try to assemble
1454   @type owner: str
1455   @param owner: the name of the instance which owns the disk
1456   @type as_primary: boolean
1457   @param as_primary: if we should make the block device
1458       read/write
1459
1460   @return: the assembled device or None (in case no device
1461       was assembled)
1462   @raise errors.BlockDeviceError: in case there is an error
1463       during the activation of the children or the device
1464       itself
1465
1466   """
1467   children = []
1468   if disk.children:
1469     mcn = disk.ChildrenNeeded()
1470     if mcn == -1:
1471       mcn = 0 # max number of Nones allowed
1472     else:
1473       mcn = len(disk.children) - mcn # max number of Nones
1474     for chld_disk in disk.children:
1475       try:
1476         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1477       except errors.BlockDeviceError, err:
1478         if children.count(None) >= mcn:
1479           raise
1480         cdev = None
1481         logging.error("Error in child activation (but continuing): %s",
1482                       str(err))
1483       children.append(cdev)
1484
1485   if as_primary or disk.AssembleOnSecondary():
1486     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1487     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1488     result = r_dev
1489     if as_primary or disk.OpenOnSecondary():
1490       r_dev.Open()
1491     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1492                                 as_primary, disk.iv_name)
1493
1494   else:
1495     result = True
1496   return result
1497
1498
1499 def BlockdevAssemble(disk, owner, as_primary, idx):
1500   """Activate a block device for an instance.
1501
1502   This is a wrapper over _RecursiveAssembleBD.
1503
1504   @rtype: str or boolean
1505   @return: a C{/dev/...} path for primary nodes, and
1506       C{True} for secondary nodes
1507
1508   """
1509   try:
1510     result = _RecursiveAssembleBD(disk, owner, as_primary)
1511     if isinstance(result, bdev.BlockDev):
1512       # pylint: disable-msg=E1103
1513       result = result.dev_path
1514       if as_primary:
1515         _SymlinkBlockDev(owner, result, idx)
1516   except errors.BlockDeviceError, err:
1517     _Fail("Error while assembling disk: %s", err, exc=True)
1518   except OSError, err:
1519     _Fail("Error while symlinking disk: %s", err, exc=True)
1520
1521   return result
1522
1523
1524 def BlockdevShutdown(disk):
1525   """Shut down a block device.
1526
1527   First, if the device is assembled (Attach() is successful), then
1528   the device is shutdown. Then the children of the device are
1529   shutdown.
1530
1531   This function is called recursively. Note that we don't cache the
1532   children or such, as oppossed to assemble, shutdown of different
1533   devices doesn't require that the upper device was active.
1534
1535   @type disk: L{objects.Disk}
1536   @param disk: the description of the disk we should
1537       shutdown
1538   @rtype: None
1539
1540   """
1541   msgs = []
1542   r_dev = _RecursiveFindBD(disk)
1543   if r_dev is not None:
1544     r_path = r_dev.dev_path
1545     try:
1546       r_dev.Shutdown()
1547       DevCacheManager.RemoveCache(r_path)
1548     except errors.BlockDeviceError, err:
1549       msgs.append(str(err))
1550
1551   if disk.children:
1552     for child in disk.children:
1553       try:
1554         BlockdevShutdown(child)
1555       except RPCFail, err:
1556         msgs.append(str(err))
1557
1558   if msgs:
1559     _Fail("; ".join(msgs))
1560
1561
1562 def BlockdevAddchildren(parent_cdev, new_cdevs):
1563   """Extend a mirrored block device.
1564
1565   @type parent_cdev: L{objects.Disk}
1566   @param parent_cdev: the disk to which we should add children
1567   @type new_cdevs: list of L{objects.Disk}
1568   @param new_cdevs: the list of children which we should add
1569   @rtype: None
1570
1571   """
1572   parent_bdev = _RecursiveFindBD(parent_cdev)
1573   if parent_bdev is None:
1574     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1575   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1576   if new_bdevs.count(None) > 0:
1577     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1578   parent_bdev.AddChildren(new_bdevs)
1579
1580
1581 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1582   """Shrink a mirrored block device.
1583
1584   @type parent_cdev: L{objects.Disk}
1585   @param parent_cdev: the disk from which we should remove children
1586   @type new_cdevs: list of L{objects.Disk}
1587   @param new_cdevs: the list of children which we should remove
1588   @rtype: None
1589
1590   """
1591   parent_bdev = _RecursiveFindBD(parent_cdev)
1592   if parent_bdev is None:
1593     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1594   devs = []
1595   for disk in new_cdevs:
1596     rpath = disk.StaticDevPath()
1597     if rpath is None:
1598       bd = _RecursiveFindBD(disk)
1599       if bd is None:
1600         _Fail("Can't find device %s while removing children", disk)
1601       else:
1602         devs.append(bd.dev_path)
1603     else:
1604       if not utils.IsNormAbsPath(rpath):
1605         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1606       devs.append(rpath)
1607   parent_bdev.RemoveChildren(devs)
1608
1609
1610 def BlockdevGetmirrorstatus(disks):
1611   """Get the mirroring status of a list of devices.
1612
1613   @type disks: list of L{objects.Disk}
1614   @param disks: the list of disks which we should query
1615   @rtype: disk
1616   @return: List of L{objects.BlockDevStatus}, one for each disk
1617   @raise errors.BlockDeviceError: if any of the disks cannot be
1618       found
1619
1620   """
1621   stats = []
1622   for dsk in disks:
1623     rbd = _RecursiveFindBD(dsk)
1624     if rbd is None:
1625       _Fail("Can't find device %s", dsk)
1626
1627     stats.append(rbd.CombinedSyncStatus())
1628
1629   return stats
1630
1631
1632 def BlockdevGetmirrorstatusMulti(disks):
1633   """Get the mirroring status of a list of devices.
1634
1635   @type disks: list of L{objects.Disk}
1636   @param disks: the list of disks which we should query
1637   @rtype: disk
1638   @return: List of tuples, (bool, status), one for each disk; bool denotes
1639     success/failure, status is L{objects.BlockDevStatus} on success, string
1640     otherwise
1641
1642   """
1643   result = []
1644   for disk in disks:
1645     try:
1646       rbd = _RecursiveFindBD(disk)
1647       if rbd is None:
1648         result.append((False, "Can't find device %s" % disk))
1649         continue
1650
1651       status = rbd.CombinedSyncStatus()
1652     except errors.BlockDeviceError, err:
1653       logging.exception("Error while getting disk status")
1654       result.append((False, str(err)))
1655     else:
1656       result.append((True, status))
1657
1658   assert len(disks) == len(result)
1659
1660   return result
1661
1662
1663 def _RecursiveFindBD(disk):
1664   """Check if a device is activated.
1665
1666   If so, return information about the real device.
1667
1668   @type disk: L{objects.Disk}
1669   @param disk: the disk object we need to find
1670
1671   @return: None if the device can't be found,
1672       otherwise the device instance
1673
1674   """
1675   children = []
1676   if disk.children:
1677     for chdisk in disk.children:
1678       children.append(_RecursiveFindBD(chdisk))
1679
1680   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1681
1682
1683 def _OpenRealBD(disk):
1684   """Opens the underlying block device of a disk.
1685
1686   @type disk: L{objects.Disk}
1687   @param disk: the disk object we want to open
1688
1689   """
1690   real_disk = _RecursiveFindBD(disk)
1691   if real_disk is None:
1692     _Fail("Block device '%s' is not set up", disk)
1693
1694   real_disk.Open()
1695
1696   return real_disk
1697
1698
1699 def BlockdevFind(disk):
1700   """Check if a device is activated.
1701
1702   If it is, return information about the real device.
1703
1704   @type disk: L{objects.Disk}
1705   @param disk: the disk to find
1706   @rtype: None or objects.BlockDevStatus
1707   @return: None if the disk cannot be found, otherwise a the current
1708            information
1709
1710   """
1711   try:
1712     rbd = _RecursiveFindBD(disk)
1713   except errors.BlockDeviceError, err:
1714     _Fail("Failed to find device: %s", err, exc=True)
1715
1716   if rbd is None:
1717     return None
1718
1719   return rbd.GetSyncStatus()
1720
1721
1722 def BlockdevGetsize(disks):
1723   """Computes the size of the given disks.
1724
1725   If a disk is not found, returns None instead.
1726
1727   @type disks: list of L{objects.Disk}
1728   @param disks: the list of disk to compute the size for
1729   @rtype: list
1730   @return: list with elements None if the disk cannot be found,
1731       otherwise the size
1732
1733   """
1734   result = []
1735   for cf in disks:
1736     try:
1737       rbd = _RecursiveFindBD(cf)
1738     except errors.BlockDeviceError:
1739       result.append(None)
1740       continue
1741     if rbd is None:
1742       result.append(None)
1743     else:
1744       result.append(rbd.GetActualSize())
1745   return result
1746
1747
1748 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1749   """Export a block device to a remote node.
1750
1751   @type disk: L{objects.Disk}
1752   @param disk: the description of the disk to export
1753   @type dest_node: str
1754   @param dest_node: the destination node to export to
1755   @type dest_path: str
1756   @param dest_path: the destination path on the target node
1757   @type cluster_name: str
1758   @param cluster_name: the cluster name, needed for SSH hostalias
1759   @rtype: None
1760
1761   """
1762   real_disk = _OpenRealBD(disk)
1763
1764   # the block size on the read dd is 1MiB to match our units
1765   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1766                                "dd if=%s bs=1048576 count=%s",
1767                                real_disk.dev_path, str(disk.size))
1768
1769   # we set here a smaller block size as, due to ssh buffering, more
1770   # than 64-128k will mostly ignored; we use nocreat to fail if the
1771   # device is not already there or we pass a wrong path; we use
1772   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1773   # to not buffer too much memory; this means that at best, we flush
1774   # every 64k, which will not be very fast
1775   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1776                                 " oflag=dsync", dest_path)
1777
1778   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1779                                                    constants.GANETI_RUNAS,
1780                                                    destcmd)
1781
1782   # all commands have been checked, so we're safe to combine them
1783   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1784
1785   result = utils.RunCmd(["bash", "-c", command])
1786
1787   if result.failed:
1788     _Fail("Disk copy command '%s' returned error: %s"
1789           " output: %s", command, result.fail_reason, result.output)
1790
1791
1792 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1793   """Write a file to the filesystem.
1794
1795   This allows the master to overwrite(!) a file. It will only perform
1796   the operation if the file belongs to a list of configuration files.
1797
1798   @type file_name: str
1799   @param file_name: the target file name
1800   @type data: str
1801   @param data: the new contents of the file
1802   @type mode: int
1803   @param mode: the mode to give the file (can be None)
1804   @type uid: string
1805   @param uid: the owner of the file
1806   @type gid: string
1807   @param gid: the group of the file
1808   @type atime: float
1809   @param atime: the atime to set on the file (can be None)
1810   @type mtime: float
1811   @param mtime: the mtime to set on the file (can be None)
1812   @rtype: None
1813
1814   """
1815   if not os.path.isabs(file_name):
1816     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1817
1818   if file_name not in _ALLOWED_UPLOAD_FILES:
1819     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1820           file_name)
1821
1822   raw_data = _Decompress(data)
1823
1824   if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1825     _Fail("Invalid username/groupname type")
1826
1827   getents = runtime.GetEnts()
1828   uid = getents.LookupUser(uid)
1829   gid = getents.LookupGroup(gid)
1830
1831   utils.SafeWriteFile(file_name, None,
1832                       data=raw_data, mode=mode, uid=uid, gid=gid,
1833                       atime=atime, mtime=mtime)
1834
1835
1836 def RunOob(oob_program, command, node, timeout):
1837   """Executes oob_program with given command on given node.
1838
1839   @param oob_program: The path to the executable oob_program
1840   @param command: The command to invoke on oob_program
1841   @param node: The node given as an argument to the program
1842   @param timeout: Timeout after which we kill the oob program
1843
1844   @return: stdout
1845   @raise RPCFail: If execution fails for some reason
1846
1847   """
1848   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1849
1850   if result.failed:
1851     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1852           result.fail_reason, result.output)
1853
1854   return result.stdout
1855
1856
1857 def WriteSsconfFiles(values):
1858   """Update all ssconf files.
1859
1860   Wrapper around the SimpleStore.WriteFiles.
1861
1862   """
1863   ssconf.SimpleStore().WriteFiles(values)
1864
1865
1866 def _ErrnoOrStr(err):
1867   """Format an EnvironmentError exception.
1868
1869   If the L{err} argument has an errno attribute, it will be looked up
1870   and converted into a textual C{E...} description. Otherwise the
1871   string representation of the error will be returned.
1872
1873   @type err: L{EnvironmentError}
1874   @param err: the exception to format
1875
1876   """
1877   if hasattr(err, 'errno'):
1878     detail = errno.errorcode[err.errno]
1879   else:
1880     detail = str(err)
1881   return detail
1882
1883
1884 def _OSOndiskAPIVersion(os_dir):
1885   """Compute and return the API version of a given OS.
1886
1887   This function will try to read the API version of the OS residing in
1888   the 'os_dir' directory.
1889
1890   @type os_dir: str
1891   @param os_dir: the directory in which we should look for the OS
1892   @rtype: tuple
1893   @return: tuple (status, data) with status denoting the validity and
1894       data holding either the vaid versions or an error message
1895
1896   """
1897   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1898
1899   try:
1900     st = os.stat(api_file)
1901   except EnvironmentError, err:
1902     return False, ("Required file '%s' not found under path %s: %s" %
1903                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1904
1905   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1906     return False, ("File '%s' in %s is not a regular file" %
1907                    (constants.OS_API_FILE, os_dir))
1908
1909   try:
1910     api_versions = utils.ReadFile(api_file).splitlines()
1911   except EnvironmentError, err:
1912     return False, ("Error while reading the API version file at %s: %s" %
1913                    (api_file, _ErrnoOrStr(err)))
1914
1915   try:
1916     api_versions = [int(version.strip()) for version in api_versions]
1917   except (TypeError, ValueError), err:
1918     return False, ("API version(s) can't be converted to integer: %s" %
1919                    str(err))
1920
1921   return True, api_versions
1922
1923
1924 def DiagnoseOS(top_dirs=None):
1925   """Compute the validity for all OSes.
1926
1927   @type top_dirs: list
1928   @param top_dirs: the list of directories in which to
1929       search (if not given defaults to
1930       L{constants.OS_SEARCH_PATH})
1931   @rtype: list of L{objects.OS}
1932   @return: a list of tuples (name, path, status, diagnose, variants,
1933       parameters, api_version) for all (potential) OSes under all
1934       search paths, where:
1935           - name is the (potential) OS name
1936           - path is the full path to the OS
1937           - status True/False is the validity of the OS
1938           - diagnose is the error message for an invalid OS, otherwise empty
1939           - variants is a list of supported OS variants, if any
1940           - parameters is a list of (name, help) parameters, if any
1941           - api_version is a list of support OS API versions
1942
1943   """
1944   if top_dirs is None:
1945     top_dirs = constants.OS_SEARCH_PATH
1946
1947   result = []
1948   for dir_name in top_dirs:
1949     if os.path.isdir(dir_name):
1950       try:
1951         f_names = utils.ListVisibleFiles(dir_name)
1952       except EnvironmentError, err:
1953         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1954         break
1955       for name in f_names:
1956         os_path = utils.PathJoin(dir_name, name)
1957         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1958         if status:
1959           diagnose = ""
1960           variants = os_inst.supported_variants
1961           parameters = os_inst.supported_parameters
1962           api_versions = os_inst.api_versions
1963         else:
1964           diagnose = os_inst
1965           variants = parameters = api_versions = []
1966         result.append((name, os_path, status, diagnose, variants,
1967                        parameters, api_versions))
1968
1969   return result
1970
1971
1972 def _TryOSFromDisk(name, base_dir=None):
1973   """Create an OS instance from disk.
1974
1975   This function will return an OS instance if the given name is a
1976   valid OS name.
1977
1978   @type base_dir: string
1979   @keyword base_dir: Base directory containing OS installations.
1980                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1981   @rtype: tuple
1982   @return: success and either the OS instance if we find a valid one,
1983       or error message
1984
1985   """
1986   if base_dir is None:
1987     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1988   else:
1989     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1990
1991   if os_dir is None:
1992     return False, "Directory for OS %s not found in search path" % name
1993
1994   status, api_versions = _OSOndiskAPIVersion(os_dir)
1995   if not status:
1996     # push the error up
1997     return status, api_versions
1998
1999   if not constants.OS_API_VERSIONS.intersection(api_versions):
2000     return False, ("API version mismatch for path '%s': found %s, want %s." %
2001                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2002
2003   # OS Files dictionary, we will populate it with the absolute path names
2004   os_files = dict.fromkeys(constants.OS_SCRIPTS)
2005
2006   if max(api_versions) >= constants.OS_API_V15:
2007     os_files[constants.OS_VARIANTS_FILE] = ''
2008
2009   if max(api_versions) >= constants.OS_API_V20:
2010     os_files[constants.OS_PARAMETERS_FILE] = ''
2011   else:
2012     del os_files[constants.OS_SCRIPT_VERIFY]
2013
2014   for filename in os_files:
2015     os_files[filename] = utils.PathJoin(os_dir, filename)
2016
2017     try:
2018       st = os.stat(os_files[filename])
2019     except EnvironmentError, err:
2020       return False, ("File '%s' under path '%s' is missing (%s)" %
2021                      (filename, os_dir, _ErrnoOrStr(err)))
2022
2023     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2024       return False, ("File '%s' under path '%s' is not a regular file" %
2025                      (filename, os_dir))
2026
2027     if filename in constants.OS_SCRIPTS:
2028       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2029         return False, ("File '%s' under path '%s' is not executable" %
2030                        (filename, os_dir))
2031
2032   variants = []
2033   if constants.OS_VARIANTS_FILE in os_files:
2034     variants_file = os_files[constants.OS_VARIANTS_FILE]
2035     try:
2036       variants = utils.ReadFile(variants_file).splitlines()
2037     except EnvironmentError, err:
2038       return False, ("Error while reading the OS variants file at %s: %s" %
2039                      (variants_file, _ErrnoOrStr(err)))
2040     if not variants:
2041       return False, ("No supported os variant found")
2042
2043   parameters = []
2044   if constants.OS_PARAMETERS_FILE in os_files:
2045     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2046     try:
2047       parameters = utils.ReadFile(parameters_file).splitlines()
2048     except EnvironmentError, err:
2049       return False, ("Error while reading the OS parameters file at %s: %s" %
2050                      (parameters_file, _ErrnoOrStr(err)))
2051     parameters = [v.split(None, 1) for v in parameters]
2052
2053   os_obj = objects.OS(name=name, path=os_dir,
2054                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2055                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2056                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2057                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2058                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2059                                                  None),
2060                       supported_variants=variants,
2061                       supported_parameters=parameters,
2062                       api_versions=api_versions)
2063   return True, os_obj
2064
2065
2066 def OSFromDisk(name, base_dir=None):
2067   """Create an OS instance from disk.
2068
2069   This function will return an OS instance if the given name is a
2070   valid OS name. Otherwise, it will raise an appropriate
2071   L{RPCFail} exception, detailing why this is not a valid OS.
2072
2073   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2074   an exception but returns true/false status data.
2075
2076   @type base_dir: string
2077   @keyword base_dir: Base directory containing OS installations.
2078                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2079   @rtype: L{objects.OS}
2080   @return: the OS instance if we find a valid one
2081   @raise RPCFail: if we don't find a valid OS
2082
2083   """
2084   name_only = objects.OS.GetName(name)
2085   status, payload = _TryOSFromDisk(name_only, base_dir)
2086
2087   if not status:
2088     _Fail(payload)
2089
2090   return payload
2091
2092
2093 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2094   """Calculate the basic environment for an os script.
2095
2096   @type os_name: str
2097   @param os_name: full operating system name (including variant)
2098   @type inst_os: L{objects.OS}
2099   @param inst_os: operating system for which the environment is being built
2100   @type os_params: dict
2101   @param os_params: the OS parameters
2102   @type debug: integer
2103   @param debug: debug level (0 or 1, for OS Api 10)
2104   @rtype: dict
2105   @return: dict of environment variables
2106   @raise errors.BlockDeviceError: if the block device
2107       cannot be found
2108
2109   """
2110   result = {}
2111   api_version = \
2112     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2113   result['OS_API_VERSION'] = '%d' % api_version
2114   result['OS_NAME'] = inst_os.name
2115   result['DEBUG_LEVEL'] = '%d' % debug
2116
2117   # OS variants
2118   if api_version >= constants.OS_API_V15:
2119     variant = objects.OS.GetVariant(os_name)
2120     if not variant:
2121       variant = inst_os.supported_variants[0]
2122     result['OS_VARIANT'] = variant
2123
2124   # OS params
2125   for pname, pvalue in os_params.items():
2126     result['OSP_%s' % pname.upper()] = pvalue
2127
2128   return result
2129
2130
2131 def OSEnvironment(instance, inst_os, debug=0):
2132   """Calculate the environment for an os script.
2133
2134   @type instance: L{objects.Instance}
2135   @param instance: target instance for the os script run
2136   @type inst_os: L{objects.OS}
2137   @param inst_os: operating system for which the environment is being built
2138   @type debug: integer
2139   @param debug: debug level (0 or 1, for OS Api 10)
2140   @rtype: dict
2141   @return: dict of environment variables
2142   @raise errors.BlockDeviceError: if the block device
2143       cannot be found
2144
2145   """
2146   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2147
2148   for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2149     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2150
2151   result['HYPERVISOR'] = instance.hypervisor
2152   result['DISK_COUNT'] = '%d' % len(instance.disks)
2153   result['NIC_COUNT'] = '%d' % len(instance.nics)
2154   result['INSTANCE_SECONDARY_NODES'] = \
2155       ('%s' % " ".join(instance.secondary_nodes))
2156
2157   # Disks
2158   for idx, disk in enumerate(instance.disks):
2159     real_disk = _OpenRealBD(disk)
2160     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2161     result['DISK_%d_ACCESS' % idx] = disk.mode
2162     if constants.HV_DISK_TYPE in instance.hvparams:
2163       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2164         instance.hvparams[constants.HV_DISK_TYPE]
2165     if disk.dev_type in constants.LDS_BLOCK:
2166       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2167     elif disk.dev_type == constants.LD_FILE:
2168       result['DISK_%d_BACKEND_TYPE' % idx] = \
2169         'file:%s' % disk.physical_id[0]
2170
2171   # NICs
2172   for idx, nic in enumerate(instance.nics):
2173     result['NIC_%d_MAC' % idx] = nic.mac
2174     if nic.ip:
2175       result['NIC_%d_IP' % idx] = nic.ip
2176     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2177     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2178       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2179     if nic.nicparams[constants.NIC_LINK]:
2180       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2181     if constants.HV_NIC_TYPE in instance.hvparams:
2182       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2183         instance.hvparams[constants.HV_NIC_TYPE]
2184
2185   # HV/BE params
2186   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2187     for key, value in source.items():
2188       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2189
2190   return result
2191
2192
2193 def BlockdevGrow(disk, amount):
2194   """Grow a stack of block devices.
2195
2196   This function is called recursively, with the childrens being the
2197   first ones to resize.
2198
2199   @type disk: L{objects.Disk}
2200   @param disk: the disk to be grown
2201   @rtype: (status, result)
2202   @return: a tuple with the status of the operation
2203       (True/False), and the errors message if status
2204       is False
2205
2206   """
2207   r_dev = _RecursiveFindBD(disk)
2208   if r_dev is None:
2209     _Fail("Cannot find block device %s", disk)
2210
2211   try:
2212     r_dev.Grow(amount)
2213   except errors.BlockDeviceError, err:
2214     _Fail("Failed to grow block device: %s", err, exc=True)
2215
2216
2217 def BlockdevSnapshot(disk):
2218   """Create a snapshot copy of a block device.
2219
2220   This function is called recursively, and the snapshot is actually created
2221   just for the leaf lvm backend device.
2222
2223   @type disk: L{objects.Disk}
2224   @param disk: the disk to be snapshotted
2225   @rtype: string
2226   @return: snapshot disk ID as (vg, lv)
2227
2228   """
2229   if disk.dev_type == constants.LD_DRBD8:
2230     if not disk.children:
2231       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2232             disk.unique_id)
2233     return BlockdevSnapshot(disk.children[0])
2234   elif disk.dev_type == constants.LD_LV:
2235     r_dev = _RecursiveFindBD(disk)
2236     if r_dev is not None:
2237       # FIXME: choose a saner value for the snapshot size
2238       # let's stay on the safe side and ask for the full size, for now
2239       return r_dev.Snapshot(disk.size)
2240     else:
2241       _Fail("Cannot find block device %s", disk)
2242   else:
2243     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2244           disk.unique_id, disk.dev_type)
2245
2246
2247 def FinalizeExport(instance, snap_disks):
2248   """Write out the export configuration information.
2249
2250   @type instance: L{objects.Instance}
2251   @param instance: the instance which we export, used for
2252       saving configuration
2253   @type snap_disks: list of L{objects.Disk}
2254   @param snap_disks: list of snapshot block devices, which
2255       will be used to get the actual name of the dump file
2256
2257   @rtype: None
2258
2259   """
2260   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2261   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2262
2263   config = objects.SerializableConfigParser()
2264
2265   config.add_section(constants.INISECT_EXP)
2266   config.set(constants.INISECT_EXP, 'version', '0')
2267   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2268   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2269   config.set(constants.INISECT_EXP, 'os', instance.os)
2270   config.set(constants.INISECT_EXP, "compression", "none")
2271
2272   config.add_section(constants.INISECT_INS)
2273   config.set(constants.INISECT_INS, 'name', instance.name)
2274   config.set(constants.INISECT_INS, 'memory', '%d' %
2275              instance.beparams[constants.BE_MEMORY])
2276   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2277              instance.beparams[constants.BE_VCPUS])
2278   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2279   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2280
2281   nic_total = 0
2282   for nic_count, nic in enumerate(instance.nics):
2283     nic_total += 1
2284     config.set(constants.INISECT_INS, 'nic%d_mac' %
2285                nic_count, '%s' % nic.mac)
2286     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2287     for param in constants.NICS_PARAMETER_TYPES:
2288       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2289                  '%s' % nic.nicparams.get(param, None))
2290   # TODO: redundant: on load can read nics until it doesn't exist
2291   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2292
2293   disk_total = 0
2294   for disk_count, disk in enumerate(snap_disks):
2295     if disk:
2296       disk_total += 1
2297       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2298                  ('%s' % disk.iv_name))
2299       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2300                  ('%s' % disk.physical_id[1]))
2301       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2302                  ('%d' % disk.size))
2303
2304   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2305
2306   # New-style hypervisor/backend parameters
2307
2308   config.add_section(constants.INISECT_HYP)
2309   for name, value in instance.hvparams.items():
2310     if name not in constants.HVC_GLOBALS:
2311       config.set(constants.INISECT_HYP, name, str(value))
2312
2313   config.add_section(constants.INISECT_BEP)
2314   for name, value in instance.beparams.items():
2315     config.set(constants.INISECT_BEP, name, str(value))
2316
2317   config.add_section(constants.INISECT_OSP)
2318   for name, value in instance.osparams.items():
2319     config.set(constants.INISECT_OSP, name, str(value))
2320
2321   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2322                   data=config.Dumps())
2323   shutil.rmtree(finaldestdir, ignore_errors=True)
2324   shutil.move(destdir, finaldestdir)
2325
2326
2327 def ExportInfo(dest):
2328   """Get export configuration information.
2329
2330   @type dest: str
2331   @param dest: directory containing the export
2332
2333   @rtype: L{objects.SerializableConfigParser}
2334   @return: a serializable config file containing the
2335       export info
2336
2337   """
2338   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2339
2340   config = objects.SerializableConfigParser()
2341   config.read(cff)
2342
2343   if (not config.has_section(constants.INISECT_EXP) or
2344       not config.has_section(constants.INISECT_INS)):
2345     _Fail("Export info file doesn't have the required fields")
2346
2347   return config.Dumps()
2348
2349
2350 def ListExports():
2351   """Return a list of exports currently available on this machine.
2352
2353   @rtype: list
2354   @return: list of the exports
2355
2356   """
2357   if os.path.isdir(constants.EXPORT_DIR):
2358     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2359   else:
2360     _Fail("No exports directory")
2361
2362
2363 def RemoveExport(export):
2364   """Remove an existing export from the node.
2365
2366   @type export: str
2367   @param export: the name of the export to remove
2368   @rtype: None
2369
2370   """
2371   target = utils.PathJoin(constants.EXPORT_DIR, export)
2372
2373   try:
2374     shutil.rmtree(target)
2375   except EnvironmentError, err:
2376     _Fail("Error while removing the export: %s", err, exc=True)
2377
2378
2379 def BlockdevRename(devlist):
2380   """Rename a list of block devices.
2381
2382   @type devlist: list of tuples
2383   @param devlist: list of tuples of the form  (disk,
2384       new_logical_id, new_physical_id); disk is an
2385       L{objects.Disk} object describing the current disk,
2386       and new logical_id/physical_id is the name we
2387       rename it to
2388   @rtype: boolean
2389   @return: True if all renames succeeded, False otherwise
2390
2391   """
2392   msgs = []
2393   result = True
2394   for disk, unique_id in devlist:
2395     dev = _RecursiveFindBD(disk)
2396     if dev is None:
2397       msgs.append("Can't find device %s in rename" % str(disk))
2398       result = False
2399       continue
2400     try:
2401       old_rpath = dev.dev_path
2402       dev.Rename(unique_id)
2403       new_rpath = dev.dev_path
2404       if old_rpath != new_rpath:
2405         DevCacheManager.RemoveCache(old_rpath)
2406         # FIXME: we should add the new cache information here, like:
2407         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2408         # but we don't have the owner here - maybe parse from existing
2409         # cache? for now, we only lose lvm data when we rename, which
2410         # is less critical than DRBD or MD
2411     except errors.BlockDeviceError, err:
2412       msgs.append("Can't rename device '%s' to '%s': %s" %
2413                   (dev, unique_id, err))
2414       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2415       result = False
2416   if not result:
2417     _Fail("; ".join(msgs))
2418
2419
2420 def _TransformFileStorageDir(file_storage_dir):
2421   """Checks whether given file_storage_dir is valid.
2422
2423   Checks wheter the given file_storage_dir is within the cluster-wide
2424   default file_storage_dir stored in SimpleStore. Only paths under that
2425   directory are allowed.
2426
2427   @type file_storage_dir: str
2428   @param file_storage_dir: the path to check
2429
2430   @return: the normalized path if valid, None otherwise
2431
2432   """
2433   if not constants.ENABLE_FILE_STORAGE:
2434     _Fail("File storage disabled at configure time")
2435   cfg = _GetConfig()
2436   file_storage_dir = os.path.normpath(file_storage_dir)
2437   base_file_storage_dir = cfg.GetFileStorageDir()
2438   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2439       base_file_storage_dir):
2440     _Fail("File storage directory '%s' is not under base file"
2441           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2442   return file_storage_dir
2443
2444
2445 def CreateFileStorageDir(file_storage_dir):
2446   """Create file storage directory.
2447
2448   @type file_storage_dir: str
2449   @param file_storage_dir: directory to create
2450
2451   @rtype: tuple
2452   @return: tuple with first element a boolean indicating wheter dir
2453       creation was successful or not
2454
2455   """
2456   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2457   if os.path.exists(file_storage_dir):
2458     if not os.path.isdir(file_storage_dir):
2459       _Fail("Specified storage dir '%s' is not a directory",
2460             file_storage_dir)
2461   else:
2462     try:
2463       os.makedirs(file_storage_dir, 0750)
2464     except OSError, err:
2465       _Fail("Cannot create file storage directory '%s': %s",
2466             file_storage_dir, err, exc=True)
2467
2468
2469 def RemoveFileStorageDir(file_storage_dir):
2470   """Remove file storage directory.
2471
2472   Remove it only if it's empty. If not log an error and return.
2473
2474   @type file_storage_dir: str
2475   @param file_storage_dir: the directory we should cleanup
2476   @rtype: tuple (success,)
2477   @return: tuple of one element, C{success}, denoting
2478       whether the operation was successful
2479
2480   """
2481   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2482   if os.path.exists(file_storage_dir):
2483     if not os.path.isdir(file_storage_dir):
2484       _Fail("Specified Storage directory '%s' is not a directory",
2485             file_storage_dir)
2486     # deletes dir only if empty, otherwise we want to fail the rpc call
2487     try:
2488       os.rmdir(file_storage_dir)
2489     except OSError, err:
2490       _Fail("Cannot remove file storage directory '%s': %s",
2491             file_storage_dir, err)
2492
2493
2494 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2495   """Rename the file storage directory.
2496
2497   @type old_file_storage_dir: str
2498   @param old_file_storage_dir: the current path
2499   @type new_file_storage_dir: str
2500   @param new_file_storage_dir: the name we should rename to
2501   @rtype: tuple (success,)
2502   @return: tuple of one element, C{success}, denoting
2503       whether the operation was successful
2504
2505   """
2506   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2507   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2508   if not os.path.exists(new_file_storage_dir):
2509     if os.path.isdir(old_file_storage_dir):
2510       try:
2511         os.rename(old_file_storage_dir, new_file_storage_dir)
2512       except OSError, err:
2513         _Fail("Cannot rename '%s' to '%s': %s",
2514               old_file_storage_dir, new_file_storage_dir, err)
2515     else:
2516       _Fail("Specified storage dir '%s' is not a directory",
2517             old_file_storage_dir)
2518   else:
2519     if os.path.exists(old_file_storage_dir):
2520       _Fail("Cannot rename '%s' to '%s': both locations exist",
2521             old_file_storage_dir, new_file_storage_dir)
2522
2523
2524 def _EnsureJobQueueFile(file_name):
2525   """Checks whether the given filename is in the queue directory.
2526
2527   @type file_name: str
2528   @param file_name: the file name we should check
2529   @rtype: None
2530   @raises RPCFail: if the file is not valid
2531
2532   """
2533   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2534   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2535
2536   if not result:
2537     _Fail("Passed job queue file '%s' does not belong to"
2538           " the queue directory '%s'", file_name, queue_dir)
2539
2540
2541 def JobQueueUpdate(file_name, content):
2542   """Updates a file in the queue directory.
2543
2544   This is just a wrapper over L{utils.io.WriteFile}, with proper
2545   checking.
2546
2547   @type file_name: str
2548   @param file_name: the job file name
2549   @type content: str
2550   @param content: the new job contents
2551   @rtype: boolean
2552   @return: the success of the operation
2553
2554   """
2555   _EnsureJobQueueFile(file_name)
2556   getents = runtime.GetEnts()
2557
2558   # Write and replace the file atomically
2559   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2560                   gid=getents.masterd_gid)
2561
2562
2563 def JobQueueRename(old, new):
2564   """Renames a job queue file.
2565
2566   This is just a wrapper over os.rename with proper checking.
2567
2568   @type old: str
2569   @param old: the old (actual) file name
2570   @type new: str
2571   @param new: the desired file name
2572   @rtype: tuple
2573   @return: the success of the operation and payload
2574
2575   """
2576   _EnsureJobQueueFile(old)
2577   _EnsureJobQueueFile(new)
2578
2579   utils.RenameFile(old, new, mkdir=True)
2580
2581
2582 def BlockdevClose(instance_name, disks):
2583   """Closes the given block devices.
2584
2585   This means they will be switched to secondary mode (in case of
2586   DRBD).
2587
2588   @param instance_name: if the argument is not empty, the symlinks
2589       of this instance will be removed
2590   @type disks: list of L{objects.Disk}
2591   @param disks: the list of disks to be closed
2592   @rtype: tuple (success, message)
2593   @return: a tuple of success and message, where success
2594       indicates the succes of the operation, and message
2595       which will contain the error details in case we
2596       failed
2597
2598   """
2599   bdevs = []
2600   for cf in disks:
2601     rd = _RecursiveFindBD(cf)
2602     if rd is None:
2603       _Fail("Can't find device %s", cf)
2604     bdevs.append(rd)
2605
2606   msg = []
2607   for rd in bdevs:
2608     try:
2609       rd.Close()
2610     except errors.BlockDeviceError, err:
2611       msg.append(str(err))
2612   if msg:
2613     _Fail("Can't make devices secondary: %s", ",".join(msg))
2614   else:
2615     if instance_name:
2616       _RemoveBlockDevLinks(instance_name, disks)
2617
2618
2619 def ValidateHVParams(hvname, hvparams):
2620   """Validates the given hypervisor parameters.
2621
2622   @type hvname: string
2623   @param hvname: the hypervisor name
2624   @type hvparams: dict
2625   @param hvparams: the hypervisor parameters to be validated
2626   @rtype: None
2627
2628   """
2629   try:
2630     hv_type = hypervisor.GetHypervisor(hvname)
2631     hv_type.ValidateParameters(hvparams)
2632   except errors.HypervisorError, err:
2633     _Fail(str(err), log=False)
2634
2635
2636 def _CheckOSPList(os_obj, parameters):
2637   """Check whether a list of parameters is supported by the OS.
2638
2639   @type os_obj: L{objects.OS}
2640   @param os_obj: OS object to check
2641   @type parameters: list
2642   @param parameters: the list of parameters to check
2643
2644   """
2645   supported = [v[0] for v in os_obj.supported_parameters]
2646   delta = frozenset(parameters).difference(supported)
2647   if delta:
2648     _Fail("The following parameters are not supported"
2649           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2650
2651
2652 def ValidateOS(required, osname, checks, osparams):
2653   """Validate the given OS' parameters.
2654
2655   @type required: boolean
2656   @param required: whether absence of the OS should translate into
2657       failure or not
2658   @type osname: string
2659   @param osname: the OS to be validated
2660   @type checks: list
2661   @param checks: list of the checks to run (currently only 'parameters')
2662   @type osparams: dict
2663   @param osparams: dictionary with OS parameters
2664   @rtype: boolean
2665   @return: True if the validation passed, or False if the OS was not
2666       found and L{required} was false
2667
2668   """
2669   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2670     _Fail("Unknown checks required for OS %s: %s", osname,
2671           set(checks).difference(constants.OS_VALIDATE_CALLS))
2672
2673   name_only = objects.OS.GetName(osname)
2674   status, tbv = _TryOSFromDisk(name_only, None)
2675
2676   if not status:
2677     if required:
2678       _Fail(tbv)
2679     else:
2680       return False
2681
2682   if max(tbv.api_versions) < constants.OS_API_V20:
2683     return True
2684
2685   if constants.OS_VALIDATE_PARAMETERS in checks:
2686     _CheckOSPList(tbv, osparams.keys())
2687
2688   validate_env = OSCoreEnv(osname, tbv, osparams)
2689   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2690                         cwd=tbv.path)
2691   if result.failed:
2692     logging.error("os validate command '%s' returned error: %s output: %s",
2693                   result.cmd, result.fail_reason, result.output)
2694     _Fail("OS validation script failed (%s), output: %s",
2695           result.fail_reason, result.output, log=False)
2696
2697   return True
2698
2699
2700 def DemoteFromMC():
2701   """Demotes the current node from master candidate role.
2702
2703   """
2704   # try to ensure we're not the master by mistake
2705   master, myself = ssconf.GetMasterAndMyself()
2706   if master == myself:
2707     _Fail("ssconf status shows I'm the master node, will not demote")
2708
2709   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2710   if not result.failed:
2711     _Fail("The master daemon is running, will not demote")
2712
2713   try:
2714     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2715       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2716   except EnvironmentError, err:
2717     if err.errno != errno.ENOENT:
2718       _Fail("Error while backing up cluster file: %s", err, exc=True)
2719
2720   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2721
2722
2723 def _GetX509Filenames(cryptodir, name):
2724   """Returns the full paths for the private key and certificate.
2725
2726   """
2727   return (utils.PathJoin(cryptodir, name),
2728           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2729           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2730
2731
2732 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2733   """Creates a new X509 certificate for SSL/TLS.
2734
2735   @type validity: int
2736   @param validity: Validity in seconds
2737   @rtype: tuple; (string, string)
2738   @return: Certificate name and public part
2739
2740   """
2741   (key_pem, cert_pem) = \
2742     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2743                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2744
2745   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2746                               prefix="x509-%s-" % utils.TimestampForFilename())
2747   try:
2748     name = os.path.basename(cert_dir)
2749     assert len(name) > 5
2750
2751     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2752
2753     utils.WriteFile(key_file, mode=0400, data=key_pem)
2754     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2755
2756     # Never return private key as it shouldn't leave the node
2757     return (name, cert_pem)
2758   except Exception:
2759     shutil.rmtree(cert_dir, ignore_errors=True)
2760     raise
2761
2762
2763 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2764   """Removes a X509 certificate.
2765
2766   @type name: string
2767   @param name: Certificate name
2768
2769   """
2770   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2771
2772   utils.RemoveFile(key_file)
2773   utils.RemoveFile(cert_file)
2774
2775   try:
2776     os.rmdir(cert_dir)
2777   except EnvironmentError, err:
2778     _Fail("Cannot remove certificate directory '%s': %s",
2779           cert_dir, err)
2780
2781
2782 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2783   """Returns the command for the requested input/output.
2784
2785   @type instance: L{objects.Instance}
2786   @param instance: The instance object
2787   @param mode: Import/export mode
2788   @param ieio: Input/output type
2789   @param ieargs: Input/output arguments
2790
2791   """
2792   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2793
2794   env = None
2795   prefix = None
2796   suffix = None
2797   exp_size = None
2798
2799   if ieio == constants.IEIO_FILE:
2800     (filename, ) = ieargs
2801
2802     if not utils.IsNormAbsPath(filename):
2803       _Fail("Path '%s' is not normalized or absolute", filename)
2804
2805     directory = os.path.normpath(os.path.dirname(filename))
2806
2807     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2808         constants.EXPORT_DIR):
2809       _Fail("File '%s' is not under exports directory '%s'",
2810             filename, constants.EXPORT_DIR)
2811
2812     # Create directory
2813     utils.Makedirs(directory, mode=0750)
2814
2815     quoted_filename = utils.ShellQuote(filename)
2816
2817     if mode == constants.IEM_IMPORT:
2818       suffix = "> %s" % quoted_filename
2819     elif mode == constants.IEM_EXPORT:
2820       suffix = "< %s" % quoted_filename
2821
2822       # Retrieve file size
2823       try:
2824         st = os.stat(filename)
2825       except EnvironmentError, err:
2826         logging.error("Can't stat(2) %s: %s", filename, err)
2827       else:
2828         exp_size = utils.BytesToMebibyte(st.st_size)
2829
2830   elif ieio == constants.IEIO_RAW_DISK:
2831     (disk, ) = ieargs
2832
2833     real_disk = _OpenRealBD(disk)
2834
2835     if mode == constants.IEM_IMPORT:
2836       # we set here a smaller block size as, due to transport buffering, more
2837       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2838       # is not already there or we pass a wrong path; we use notrunc to no
2839       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2840       # much memory; this means that at best, we flush every 64k, which will
2841       # not be very fast
2842       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2843                                     " bs=%s oflag=dsync"),
2844                                     real_disk.dev_path,
2845                                     str(64 * 1024))
2846
2847     elif mode == constants.IEM_EXPORT:
2848       # the block size on the read dd is 1MiB to match our units
2849       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2850                                    real_disk.dev_path,
2851                                    str(1024 * 1024), # 1 MB
2852                                    str(disk.size))
2853       exp_size = disk.size
2854
2855   elif ieio == constants.IEIO_SCRIPT:
2856     (disk, disk_index, ) = ieargs
2857
2858     assert isinstance(disk_index, (int, long))
2859
2860     real_disk = _OpenRealBD(disk)
2861
2862     inst_os = OSFromDisk(instance.os)
2863     env = OSEnvironment(instance, inst_os)
2864
2865     if mode == constants.IEM_IMPORT:
2866       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2867       env["IMPORT_INDEX"] = str(disk_index)
2868       script = inst_os.import_script
2869
2870     elif mode == constants.IEM_EXPORT:
2871       env["EXPORT_DEVICE"] = real_disk.dev_path
2872       env["EXPORT_INDEX"] = str(disk_index)
2873       script = inst_os.export_script
2874
2875     # TODO: Pass special environment only to script
2876     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2877
2878     if mode == constants.IEM_IMPORT:
2879       suffix = "| %s" % script_cmd
2880
2881     elif mode == constants.IEM_EXPORT:
2882       prefix = "%s |" % script_cmd
2883
2884     # Let script predict size
2885     exp_size = constants.IE_CUSTOM_SIZE
2886
2887   else:
2888     _Fail("Invalid %s I/O mode %r", mode, ieio)
2889
2890   return (env, prefix, suffix, exp_size)
2891
2892
2893 def _CreateImportExportStatusDir(prefix):
2894   """Creates status directory for import/export.
2895
2896   """
2897   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2898                           prefix=("%s-%s-" %
2899                                   (prefix, utils.TimestampForFilename())))
2900
2901
2902 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2903   """Starts an import or export daemon.
2904
2905   @param mode: Import/output mode
2906   @type opts: L{objects.ImportExportOptions}
2907   @param opts: Daemon options
2908   @type host: string
2909   @param host: Remote host for export (None for import)
2910   @type port: int
2911   @param port: Remote port for export (None for import)
2912   @type instance: L{objects.Instance}
2913   @param instance: Instance object
2914   @param ieio: Input/output type
2915   @param ieioargs: Input/output arguments
2916
2917   """
2918   if mode == constants.IEM_IMPORT:
2919     prefix = "import"
2920
2921     if not (host is None and port is None):
2922       _Fail("Can not specify host or port on import")
2923
2924   elif mode == constants.IEM_EXPORT:
2925     prefix = "export"
2926
2927     if host is None or port is None:
2928       _Fail("Host and port must be specified for an export")
2929
2930   else:
2931     _Fail("Invalid mode %r", mode)
2932
2933   if (opts.key_name is None) ^ (opts.ca_pem is None):
2934     _Fail("Cluster certificate can only be used for both key and CA")
2935
2936   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2937     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2938
2939   if opts.key_name is None:
2940     # Use server.pem
2941     key_path = constants.NODED_CERT_FILE
2942     cert_path = constants.NODED_CERT_FILE
2943     assert opts.ca_pem is None
2944   else:
2945     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2946                                                  opts.key_name)
2947     assert opts.ca_pem is not None
2948
2949   for i in [key_path, cert_path]:
2950     if not os.path.exists(i):
2951       _Fail("File '%s' does not exist" % i)
2952
2953   status_dir = _CreateImportExportStatusDir(prefix)
2954   try:
2955     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2956     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2957     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2958
2959     if opts.ca_pem is None:
2960       # Use server.pem
2961       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2962     else:
2963       ca = opts.ca_pem
2964
2965     # Write CA file
2966     utils.WriteFile(ca_file, data=ca, mode=0400)
2967
2968     cmd = [
2969       constants.IMPORT_EXPORT_DAEMON,
2970       status_file, mode,
2971       "--key=%s" % key_path,
2972       "--cert=%s" % cert_path,
2973       "--ca=%s" % ca_file,
2974       ]
2975
2976     if host:
2977       cmd.append("--host=%s" % host)
2978
2979     if port:
2980       cmd.append("--port=%s" % port)
2981
2982     if opts.ipv6:
2983       cmd.append("--ipv6")
2984     else:
2985       cmd.append("--ipv4")
2986
2987     if opts.compress:
2988       cmd.append("--compress=%s" % opts.compress)
2989
2990     if opts.magic:
2991       cmd.append("--magic=%s" % opts.magic)
2992
2993     if exp_size is not None:
2994       cmd.append("--expected-size=%s" % exp_size)
2995
2996     if cmd_prefix:
2997       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2998
2999     if cmd_suffix:
3000       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3001
3002     if mode == constants.IEM_EXPORT:
3003       # Retry connection a few times when connecting to remote peer
3004       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3005       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3006     elif opts.connect_timeout is not None:
3007       assert mode == constants.IEM_IMPORT
3008       # Overall timeout for establishing connection while listening
3009       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3010
3011     logfile = _InstanceLogName(prefix, instance.os, instance.name)
3012
3013     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3014     # support for receiving a file descriptor for output
3015     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3016                       output=logfile)
3017
3018     # The import/export name is simply the status directory name
3019     return os.path.basename(status_dir)
3020
3021   except Exception:
3022     shutil.rmtree(status_dir, ignore_errors=True)
3023     raise
3024
3025
3026 def GetImportExportStatus(names):
3027   """Returns import/export daemon status.
3028
3029   @type names: sequence
3030   @param names: List of names
3031   @rtype: List of dicts
3032   @return: Returns a list of the state of each named import/export or None if a
3033            status couldn't be read
3034
3035   """
3036   result = []
3037
3038   for name in names:
3039     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3040                                  _IES_STATUS_FILE)
3041
3042     try:
3043       data = utils.ReadFile(status_file)
3044     except EnvironmentError, err:
3045       if err.errno != errno.ENOENT:
3046         raise
3047       data = None
3048
3049     if not data:
3050       result.append(None)
3051       continue
3052
3053     result.append(serializer.LoadJson(data))
3054
3055   return result
3056
3057
3058 def AbortImportExport(name):
3059   """Sends SIGTERM to a running import/export daemon.
3060
3061   """
3062   logging.info("Abort import/export %s", name)
3063
3064   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3065   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3066
3067   if pid:
3068     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3069                  name, pid)
3070     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3071
3072
3073 def CleanupImportExport(name):
3074   """Cleanup after an import or export.
3075
3076   If the import/export daemon is still running it's killed. Afterwards the
3077   whole status directory is removed.
3078
3079   """
3080   logging.info("Finalizing import/export %s", name)
3081
3082   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3083
3084   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3085
3086   if pid:
3087     logging.info("Import/export %s is still running with PID %s",
3088                  name, pid)
3089     utils.KillProcess(pid, waitpid=False)
3090
3091   shutil.rmtree(status_dir, ignore_errors=True)
3092
3093
3094 def _FindDisks(nodes_ip, disks):
3095   """Sets the physical ID on disks and returns the block devices.
3096
3097   """
3098   # set the correct physical ID
3099   my_name = netutils.Hostname.GetSysName()
3100   for cf in disks:
3101     cf.SetPhysicalID(my_name, nodes_ip)
3102
3103   bdevs = []
3104
3105   for cf in disks:
3106     rd = _RecursiveFindBD(cf)
3107     if rd is None:
3108       _Fail("Can't find device %s", cf)
3109     bdevs.append(rd)
3110   return bdevs
3111
3112
3113 def DrbdDisconnectNet(nodes_ip, disks):
3114   """Disconnects the network on a list of drbd devices.
3115
3116   """
3117   bdevs = _FindDisks(nodes_ip, disks)
3118
3119   # disconnect disks
3120   for rd in bdevs:
3121     try:
3122       rd.DisconnectNet()
3123     except errors.BlockDeviceError, err:
3124       _Fail("Can't change network configuration to standalone mode: %s",
3125             err, exc=True)
3126
3127
3128 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3129   """Attaches the network on a list of drbd devices.
3130
3131   """
3132   bdevs = _FindDisks(nodes_ip, disks)
3133
3134   if multimaster:
3135     for idx, rd in enumerate(bdevs):
3136       try:
3137         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3138       except EnvironmentError, err:
3139         _Fail("Can't create symlink: %s", err)
3140   # reconnect disks, switch to new master configuration and if
3141   # needed primary mode
3142   for rd in bdevs:
3143     try:
3144       rd.AttachNet(multimaster)
3145     except errors.BlockDeviceError, err:
3146       _Fail("Can't change network configuration: %s", err)
3147
3148   # wait until the disks are connected; we need to retry the re-attach
3149   # if the device becomes standalone, as this might happen if the one
3150   # node disconnects and reconnects in a different mode before the
3151   # other node reconnects; in this case, one or both of the nodes will
3152   # decide it has wrong configuration and switch to standalone
3153
3154   def _Attach():
3155     all_connected = True
3156
3157     for rd in bdevs:
3158       stats = rd.GetProcStatus()
3159
3160       all_connected = (all_connected and
3161                        (stats.is_connected or stats.is_in_resync))
3162
3163       if stats.is_standalone:
3164         # peer had different config info and this node became
3165         # standalone, even though this should not happen with the
3166         # new staged way of changing disk configs
3167         try:
3168           rd.AttachNet(multimaster)
3169         except errors.BlockDeviceError, err:
3170           _Fail("Can't change network configuration: %s", err)
3171
3172     if not all_connected:
3173       raise utils.RetryAgain()
3174
3175   try:
3176     # Start with a delay of 100 miliseconds and go up to 5 seconds
3177     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3178   except utils.RetryTimeout:
3179     _Fail("Timeout in disk reconnecting")
3180
3181   if multimaster:
3182     # change to primary mode
3183     for rd in bdevs:
3184       try:
3185         rd.Open()
3186       except errors.BlockDeviceError, err:
3187         _Fail("Can't change to primary mode: %s", err)
3188
3189
3190 def DrbdWaitSync(nodes_ip, disks):
3191   """Wait until DRBDs have synchronized.
3192
3193   """
3194   def _helper(rd):
3195     stats = rd.GetProcStatus()
3196     if not (stats.is_connected or stats.is_in_resync):
3197       raise utils.RetryAgain()
3198     return stats
3199
3200   bdevs = _FindDisks(nodes_ip, disks)
3201
3202   min_resync = 100
3203   alldone = True
3204   for rd in bdevs:
3205     try:
3206       # poll each second for 15 seconds
3207       stats = utils.Retry(_helper, 1, 15, args=[rd])
3208     except utils.RetryTimeout:
3209       stats = rd.GetProcStatus()
3210       # last check
3211       if not (stats.is_connected or stats.is_in_resync):
3212         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3213     alldone = alldone and (not stats.is_in_resync)
3214     if stats.sync_percent is not None:
3215       min_resync = min(min_resync, stats.sync_percent)
3216
3217   return (alldone, min_resync)
3218
3219
3220 def GetDrbdUsermodeHelper():
3221   """Returns DRBD usermode helper currently configured.
3222
3223   """
3224   try:
3225     return bdev.BaseDRBD.GetUsermodeHelper()
3226   except errors.BlockDeviceError, err:
3227     _Fail(str(err))
3228
3229
3230 def PowercycleNode(hypervisor_type):
3231   """Hard-powercycle the node.
3232
3233   Because we need to return first, and schedule the powercycle in the
3234   background, we won't be able to report failures nicely.
3235
3236   """
3237   hyper = hypervisor.GetHypervisor(hypervisor_type)
3238   try:
3239     pid = os.fork()
3240   except OSError:
3241     # if we can't fork, we'll pretend that we're in the child process
3242     pid = 0
3243   if pid > 0:
3244     return "Reboot scheduled in 5 seconds"
3245   # ensure the child is running on ram
3246   try:
3247     utils.Mlockall()
3248   except Exception: # pylint: disable-msg=W0703
3249     pass
3250   time.sleep(5)
3251   hyper.PowercycleNode()
3252
3253
3254 class HooksRunner(object):
3255   """Hook runner.
3256
3257   This class is instantiated on the node side (ganeti-noded) and not
3258   on the master side.
3259
3260   """
3261   def __init__(self, hooks_base_dir=None):
3262     """Constructor for hooks runner.
3263
3264     @type hooks_base_dir: str or None
3265     @param hooks_base_dir: if not None, this overrides the
3266         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3267
3268     """
3269     if hooks_base_dir is None:
3270       hooks_base_dir = constants.HOOKS_BASE_DIR
3271     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3272     # constant
3273     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3274
3275   def RunHooks(self, hpath, phase, env):
3276     """Run the scripts in the hooks directory.
3277
3278     @type hpath: str
3279     @param hpath: the path to the hooks directory which
3280         holds the scripts
3281     @type phase: str
3282     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3283         L{constants.HOOKS_PHASE_POST}
3284     @type env: dict
3285     @param env: dictionary with the environment for the hook
3286     @rtype: list
3287     @return: list of 3-element tuples:
3288       - script path
3289       - script result, either L{constants.HKR_SUCCESS} or
3290         L{constants.HKR_FAIL}
3291       - output of the script
3292
3293     @raise errors.ProgrammerError: for invalid input
3294         parameters
3295
3296     """
3297     if phase == constants.HOOKS_PHASE_PRE:
3298       suffix = "pre"
3299     elif phase == constants.HOOKS_PHASE_POST:
3300       suffix = "post"
3301     else:
3302       _Fail("Unknown hooks phase '%s'", phase)
3303
3304
3305     subdir = "%s-%s.d" % (hpath, suffix)
3306     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3307
3308     results = []
3309
3310     if not os.path.isdir(dir_name):
3311       # for non-existing/non-dirs, we simply exit instead of logging a
3312       # warning at every operation
3313       return results
3314
3315     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3316
3317     for (relname, relstatus, runresult)  in runparts_results:
3318       if relstatus == constants.RUNPARTS_SKIP:
3319         rrval = constants.HKR_SKIP
3320         output = ""
3321       elif relstatus == constants.RUNPARTS_ERR:
3322         rrval = constants.HKR_FAIL
3323         output = "Hook script execution error: %s" % runresult
3324       elif relstatus == constants.RUNPARTS_RUN:
3325         if runresult.failed:
3326           rrval = constants.HKR_FAIL
3327         else:
3328           rrval = constants.HKR_SUCCESS
3329         output = utils.SafeEncode(runresult.output.strip())
3330       results.append(("%s/%s" % (subdir, relname), rrval, output))
3331
3332     return results
3333
3334
3335 class IAllocatorRunner(object):
3336   """IAllocator runner.
3337
3338   This class is instantiated on the node side (ganeti-noded) and not on
3339   the master side.
3340
3341   """
3342   @staticmethod
3343   def Run(name, idata):
3344     """Run an iallocator script.
3345
3346     @type name: str
3347     @param name: the iallocator script name
3348     @type idata: str
3349     @param idata: the allocator input data
3350
3351     @rtype: tuple
3352     @return: two element tuple of:
3353        - status
3354        - either error message or stdout of allocator (for success)
3355
3356     """
3357     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3358                                   os.path.isfile)
3359     if alloc_script is None:
3360       _Fail("iallocator module '%s' not found in the search path", name)
3361
3362     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3363     try:
3364       os.write(fd, idata)
3365       os.close(fd)
3366       result = utils.RunCmd([alloc_script, fin_name])
3367       if result.failed:
3368         _Fail("iallocator module '%s' failed: %s, output '%s'",
3369               name, result.fail_reason, result.output)
3370     finally:
3371       os.unlink(fin_name)
3372
3373     return result.stdout
3374
3375
3376 class DevCacheManager(object):
3377   """Simple class for managing a cache of block device information.
3378
3379   """
3380   _DEV_PREFIX = "/dev/"
3381   _ROOT_DIR = constants.BDEV_CACHE_DIR
3382
3383   @classmethod
3384   def _ConvertPath(cls, dev_path):
3385     """Converts a /dev/name path to the cache file name.
3386
3387     This replaces slashes with underscores and strips the /dev
3388     prefix. It then returns the full path to the cache file.
3389
3390     @type dev_path: str
3391     @param dev_path: the C{/dev/} path name
3392     @rtype: str
3393     @return: the converted path name
3394
3395     """
3396     if dev_path.startswith(cls._DEV_PREFIX):
3397       dev_path = dev_path[len(cls._DEV_PREFIX):]
3398     dev_path = dev_path.replace("/", "_")
3399     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3400     return fpath
3401
3402   @classmethod
3403   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3404     """Updates the cache information for a given device.
3405
3406     @type dev_path: str
3407     @param dev_path: the pathname of the device
3408     @type owner: str
3409     @param owner: the owner (instance name) of the device
3410     @type on_primary: bool
3411     @param on_primary: whether this is the primary
3412         node nor not
3413     @type iv_name: str
3414     @param iv_name: the instance-visible name of the
3415         device, as in objects.Disk.iv_name
3416
3417     @rtype: None
3418
3419     """
3420     if dev_path is None:
3421       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3422       return
3423     fpath = cls._ConvertPath(dev_path)
3424     if on_primary:
3425       state = "primary"
3426     else:
3427       state = "secondary"
3428     if iv_name is None:
3429       iv_name = "not_visible"
3430     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3431     try:
3432       utils.WriteFile(fpath, data=fdata)
3433     except EnvironmentError, err:
3434       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3435
3436   @classmethod
3437   def RemoveCache(cls, dev_path):
3438     """Remove data for a dev_path.
3439
3440     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3441     path name and logging.
3442
3443     @type dev_path: str
3444     @param dev_path: the pathname of the device
3445
3446     @rtype: None
3447
3448     """
3449     if dev_path is None:
3450       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3451       return
3452     fpath = cls._ConvertPath(dev_path)
3453     try:
3454       utils.RemoveFile(fpath)
3455     except EnvironmentError, err:
3456       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)