Re-create instance disk symlinks on activate
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.RAPI_USERS_FILE,
200     constants.CONFD_HMAC_KEY,
201     constants.CLUSTER_DOMAIN_SECRET_FILE,
202     ])
203
204   for hv_name in constants.HYPER_TYPES:
205     hv_class = hypervisor.GetHypervisorClass(hv_name)
206     allowed_files.update(hv_class.GetAncillaryFiles())
207
208   return frozenset(allowed_files)
209
210
211 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212
213
214 def JobQueuePurge():
215   """Removes job queue files and archived jobs.
216
217   @rtype: tuple
218   @return: True, None
219
220   """
221   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
222   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223
224
225 def GetMasterInfo():
226   """Returns master information.
227
228   This is an utility function to compute master information, either
229   for consumption here or from the node daemon.
230
231   @rtype: tuple
232   @return: master_netdev, master_ip, master_name, primary_ip_family
233   @raise RPCFail: in case of errors
234
235   """
236   try:
237     cfg = _GetConfig()
238     master_netdev = cfg.GetMasterNetdev()
239     master_ip = cfg.GetMasterIP()
240     master_node = cfg.GetMasterNode()
241     primary_ip_family = cfg.GetPrimaryIPFamily()
242   except errors.ConfigurationError, err:
243     _Fail("Cluster configuration incomplete: %s", err, exc=True)
244   return (master_netdev, master_ip, master_node, primary_ip_family)
245
246
247 def StartMaster(start_daemons, no_voting):
248   """Activate local node as master node.
249
250   The function will either try activate the IP address of the master
251   (unless someone else has it) or also start the master daemons, based
252   on the start_daemons parameter.
253
254   @type start_daemons: boolean
255   @param start_daemons: whether to start the master daemons
256       (ganeti-masterd and ganeti-rapi), or (if false) activate the
257       master ip
258   @type no_voting: boolean
259   @param no_voting: whether to start ganeti-masterd without a node vote
260       (if start_daemons is True), but still non-interactively
261   @rtype: None
262
263   """
264   # GetMasterInfo will raise an exception if not able to return data
265   master_netdev, master_ip, _, family = GetMasterInfo()
266
267   err_msgs = []
268   # either start the master and rapi daemons
269   if start_daemons:
270     if no_voting:
271       masterd_args = "--no-voting --yes-do-it"
272     else:
273       masterd_args = ""
274
275     env = {
276       "EXTRA_MASTERD_ARGS": masterd_args,
277       }
278
279     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
280     if result.failed:
281       msg = "Can't start Ganeti master: %s" % result.output
282       logging.error(msg)
283       err_msgs.append(msg)
284   # or activate the IP
285   else:
286     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
287       if netutils.IPAddress.Own(master_ip):
288         # we already have the ip:
289         logging.debug("Master IP already configured, doing nothing")
290       else:
291         msg = "Someone else has the master ip, not activating"
292         logging.error(msg)
293         err_msgs.append(msg)
294     else:
295       ipcls = netutils.IP4Address
296       if family == netutils.IP6Address.family:
297         ipcls = netutils.IP6Address
298
299       result = utils.RunCmd(["ip", "address", "add",
300                              "%s/%d" % (master_ip, ipcls.iplen),
301                              "dev", master_netdev, "label",
302                              "%s:0" % master_netdev])
303       if result.failed:
304         msg = "Can't activate master IP: %s" % result.output
305         logging.error(msg)
306         err_msgs.append(msg)
307
308       # we ignore the exit code of the following cmds
309       if ipcls == netutils.IP4Address:
310         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
311                       master_ip, master_ip])
312       elif ipcls == netutils.IP6Address:
313         try:
314           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
315         except errors.OpExecError:
316           # TODO: Better error reporting
317           logging.warning("Can't execute ndisc6, please install if missing")
318
319   if err_msgs:
320     _Fail("; ".join(err_msgs))
321
322
323 def StopMaster(stop_daemons):
324   """Deactivate this node as master.
325
326   The function will always try to deactivate the IP address of the
327   master. It will also stop the master daemons depending on the
328   stop_daemons parameter.
329
330   @type stop_daemons: boolean
331   @param stop_daemons: whether to also stop the master daemons
332       (ganeti-masterd and ganeti-rapi)
333   @rtype: None
334
335   """
336   # TODO: log and report back to the caller the error failures; we
337   # need to decide in which case we fail the RPC for this
338
339   # GetMasterInfo will raise an exception if not able to return data
340   master_netdev, master_ip, _, family = GetMasterInfo()
341
342   ipcls = netutils.IP4Address
343   if family == netutils.IP6Address.family:
344     ipcls = netutils.IP6Address
345
346   result = utils.RunCmd(["ip", "address", "del",
347                          "%s/%d" % (master_ip, ipcls.iplen),
348                          "dev", master_netdev])
349   if result.failed:
350     logging.error("Can't remove the master IP, error: %s", result.output)
351     # but otherwise ignore the failure
352
353   if stop_daemons:
354     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355     if result.failed:
356       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357                     " and error %s",
358                     result.cmd, result.exit_code, result.output)
359
360
361 def EtcHostsModify(mode, host, ip):
362   """Modify a host entry in /etc/hosts.
363
364   @param mode: The mode to operate. Either add or remove entry
365   @param host: The host to operate on
366   @param ip: The ip associated with the entry
367
368   """
369   if mode == constants.ETC_HOSTS_ADD:
370     if not ip:
371       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
372               " present")
373     utils.AddHostToEtcHosts(host, ip)
374   elif mode == constants.ETC_HOSTS_REMOVE:
375     if ip:
376       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
377               " parameter is present")
378     utils.RemoveHostFromEtcHosts(host)
379   else:
380     RPCFail("Mode not supported")
381
382
383 def LeaveCluster(modify_ssh_setup):
384   """Cleans up and remove the current node.
385
386   This function cleans up and prepares the current node to be removed
387   from the cluster.
388
389   If processing is successful, then it raises an
390   L{errors.QuitGanetiException} which is used as a special case to
391   shutdown the node daemon.
392
393   @param modify_ssh_setup: boolean
394
395   """
396   _CleanDirectory(constants.DATA_DIR)
397   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398   JobQueuePurge()
399
400   if modify_ssh_setup:
401     try:
402       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403
404       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405
406       utils.RemoveFile(priv_key)
407       utils.RemoveFile(pub_key)
408     except errors.OpExecError:
409       logging.exception("Error while processing ssh files")
410
411   try:
412     utils.RemoveFile(constants.CONFD_HMAC_KEY)
413     utils.RemoveFile(constants.RAPI_CERT_FILE)
414     utils.RemoveFile(constants.NODED_CERT_FILE)
415   except: # pylint: disable-msg=W0702
416     logging.exception("Error while removing cluster secrets")
417
418   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419   if result.failed:
420     logging.error("Command %s failed with exitcode %s and error %s",
421                   result.cmd, result.exit_code, result.output)
422
423   # Raise a custom exception (handled in ganeti-noded)
424   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
425
426
427 def GetNodeInfo(vgname, hypervisor_type):
428   """Gives back a hash with different information about the node.
429
430   @type vgname: C{string}
431   @param vgname: the name of the volume group to ask for disk space information
432   @type hypervisor_type: C{str}
433   @param hypervisor_type: the name of the hypervisor to ask for
434       memory information
435   @rtype: C{dict}
436   @return: dictionary with the following keys:
437       - vg_size is the size of the configured volume group in MiB
438       - vg_free is the free size of the volume group in MiB
439       - memory_dom0 is the memory allocated for domain0 in MiB
440       - memory_free is the currently available (free) ram in MiB
441       - memory_total is the total number of ram in MiB
442
443   """
444   outputarray = {}
445
446   if vgname is not None:
447     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
448     vg_free = vg_size = None
449     if vginfo:
450       vg_free = int(round(vginfo[0][0], 0))
451       vg_size = int(round(vginfo[0][1], 0))
452     outputarray['vg_size'] = vg_size
453     outputarray['vg_free'] = vg_free
454
455   if hypervisor_type is not None:
456     hyper = hypervisor.GetHypervisor(hypervisor_type)
457     hyp_info = hyper.GetNodeInfo()
458     if hyp_info is not None:
459       outputarray.update(hyp_info)
460
461   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462
463   return outputarray
464
465
466 def VerifyNode(what, cluster_name):
467   """Verify the status of the local node.
468
469   Based on the input L{what} parameter, various checks are done on the
470   local node.
471
472   If the I{filelist} key is present, this list of
473   files is checksummed and the file/checksum pairs are returned.
474
475   If the I{nodelist} key is present, we check that we have
476   connectivity via ssh with the target nodes (and check the hostname
477   report).
478
479   If the I{node-net-test} key is present, we check that we have
480   connectivity to the given nodes via both primary IP and, if
481   applicable, secondary IPs.
482
483   @type what: C{dict}
484   @param what: a dictionary of things to check:
485       - filelist: list of files for which to compute checksums
486       - nodelist: list of nodes we should check ssh communication with
487       - node-net-test: list of nodes we should check node daemon port
488         connectivity with
489       - hypervisor: list with hypervisors to run the verify for
490   @rtype: dict
491   @return: a dictionary with the same keys as the input dict, and
492       values representing the result of the checks
493
494   """
495   result = {}
496   my_name = netutils.Hostname.GetSysName()
497   port = netutils.GetDaemonPort(constants.NODED)
498   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
499
500   if constants.NV_HYPERVISOR in what and vm_capable:
501     result[constants.NV_HYPERVISOR] = tmp = {}
502     for hv_name in what[constants.NV_HYPERVISOR]:
503       try:
504         val = hypervisor.GetHypervisor(hv_name).Verify()
505       except errors.HypervisorError, err:
506         val = "Error while checking hypervisor: %s" % str(err)
507       tmp[hv_name] = val
508
509   if constants.NV_HVPARAMS in what and vm_capable:
510     result[constants.NV_HVPARAMS] = tmp = []
511     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
512       try:
513         logging.info("Validating hv %s, %s", hv_name, hvparms)
514         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
515       except errors.HypervisorError, err:
516         tmp.append((source, hv_name, str(err)))
517
518   if constants.NV_FILELIST in what:
519     result[constants.NV_FILELIST] = utils.FingerprintFiles(
520       what[constants.NV_FILELIST])
521
522   if constants.NV_NODELIST in what:
523     result[constants.NV_NODELIST] = tmp = {}
524     random.shuffle(what[constants.NV_NODELIST])
525     for node in what[constants.NV_NODELIST]:
526       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
527       if not success:
528         tmp[node] = message
529
530   if constants.NV_NODENETTEST in what:
531     result[constants.NV_NODENETTEST] = tmp = {}
532     my_pip = my_sip = None
533     for name, pip, sip in what[constants.NV_NODENETTEST]:
534       if name == my_name:
535         my_pip = pip
536         my_sip = sip
537         break
538     if not my_pip:
539       tmp[my_name] = ("Can't find my own primary/secondary IP"
540                       " in the node list")
541     else:
542       for name, pip, sip in what[constants.NV_NODENETTEST]:
543         fail = []
544         if not netutils.TcpPing(pip, port, source=my_pip):
545           fail.append("primary")
546         if sip != pip:
547           if not netutils.TcpPing(sip, port, source=my_sip):
548             fail.append("secondary")
549         if fail:
550           tmp[name] = ("failure using the %s interface(s)" %
551                        " and ".join(fail))
552
553   if constants.NV_MASTERIP in what:
554     # FIXME: add checks on incoming data structures (here and in the
555     # rest of the function)
556     master_name, master_ip = what[constants.NV_MASTERIP]
557     if master_name == my_name:
558       source = constants.IP4_ADDRESS_LOCALHOST
559     else:
560       source = None
561     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
562                                                   source=source)
563
564   if constants.NV_OOB_PATHS in what:
565     result[constants.NV_OOB_PATHS] = tmp = []
566     for path in what[constants.NV_OOB_PATHS]:
567       try:
568         st = os.stat(path)
569       except OSError, err:
570         tmp.append("error stating out of band helper: %s" % err)
571       else:
572         if stat.S_ISREG(st.st_mode):
573           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
574             tmp.append(None)
575           else:
576             tmp.append("out of band helper %s is not executable" % path)
577         else:
578           tmp.append("out of band helper %s is not a file" % path)
579
580   if constants.NV_LVLIST in what and vm_capable:
581     try:
582       val = GetVolumeList(utils.ListVolumeGroups().keys())
583     except RPCFail, err:
584       val = str(err)
585     result[constants.NV_LVLIST] = val
586
587   if constants.NV_INSTANCELIST in what and vm_capable:
588     # GetInstanceList can fail
589     try:
590       val = GetInstanceList(what[constants.NV_INSTANCELIST])
591     except RPCFail, err:
592       val = str(err)
593     result[constants.NV_INSTANCELIST] = val
594
595   if constants.NV_VGLIST in what and vm_capable:
596     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
597
598   if constants.NV_PVLIST in what and vm_capable:
599     result[constants.NV_PVLIST] = \
600       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
601                                    filter_allocatable=False)
602
603   if constants.NV_VERSION in what:
604     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
605                                     constants.RELEASE_VERSION)
606
607   if constants.NV_HVINFO in what and vm_capable:
608     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
609     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
610
611   if constants.NV_DRBDLIST in what and vm_capable:
612     try:
613       used_minors = bdev.DRBD8.GetUsedDevs().keys()
614     except errors.BlockDeviceError, err:
615       logging.warning("Can't get used minors list", exc_info=True)
616       used_minors = str(err)
617     result[constants.NV_DRBDLIST] = used_minors
618
619   if constants.NV_DRBDHELPER in what and vm_capable:
620     status = True
621     try:
622       payload = bdev.BaseDRBD.GetUsermodeHelper()
623     except errors.BlockDeviceError, err:
624       logging.error("Can't get DRBD usermode helper: %s", str(err))
625       status = False
626       payload = str(err)
627     result[constants.NV_DRBDHELPER] = (status, payload)
628
629   if constants.NV_NODESETUP in what:
630     result[constants.NV_NODESETUP] = tmpr = []
631     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
632       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
633                   " under /sys, missing required directories /sys/block"
634                   " and /sys/class/net")
635     if (not os.path.isdir("/proc/sys") or
636         not os.path.isfile("/proc/sysrq-trigger")):
637       tmpr.append("The procfs filesystem doesn't seem to be mounted"
638                   " under /proc, missing required directory /proc/sys and"
639                   " the file /proc/sysrq-trigger")
640
641   if constants.NV_TIME in what:
642     result[constants.NV_TIME] = utils.SplitTime(time.time())
643
644   if constants.NV_OSLIST in what and vm_capable:
645     result[constants.NV_OSLIST] = DiagnoseOS()
646
647   return result
648
649
650 def GetVolumeList(vg_names):
651   """Compute list of logical volumes and their size.
652
653   @type vg_names: list
654   @param vg_names: the volume groups whose LVs we should list, or
655       empty for all volume groups
656   @rtype: dict
657   @return:
658       dictionary of all partions (key) with value being a tuple of
659       their size (in MiB), inactive and online status::
660
661         {'xenvg/test1': ('20.06', True, True)}
662
663       in case of errors, a string is returned with the error
664       details.
665
666   """
667   lvs = {}
668   sep = '|'
669   if not vg_names:
670     vg_names = []
671   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
672                          "--separator=%s" % sep,
673                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
674   if result.failed:
675     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
676
677   for line in result.stdout.splitlines():
678     line = line.strip()
679     match = _LVSLINE_REGEX.match(line)
680     if not match:
681       logging.error("Invalid line returned from lvs output: '%s'", line)
682       continue
683     vg_name, name, size, attr = match.groups()
684     inactive = attr[4] == '-'
685     online = attr[5] == 'o'
686     virtual = attr[0] == 'v'
687     if virtual:
688       # we don't want to report such volumes as existing, since they
689       # don't really hold data
690       continue
691     lvs[vg_name+"/"+name] = (size, inactive, online)
692
693   return lvs
694
695
696 def ListVolumeGroups():
697   """List the volume groups and their size.
698
699   @rtype: dict
700   @return: dictionary with keys volume name and values the
701       size of the volume
702
703   """
704   return utils.ListVolumeGroups()
705
706
707 def NodeVolumes():
708   """List all volumes on this node.
709
710   @rtype: list
711   @return:
712     A list of dictionaries, each having four keys:
713       - name: the logical volume name,
714       - size: the size of the logical volume
715       - dev: the physical device on which the LV lives
716       - vg: the volume group to which it belongs
717
718     In case of errors, we return an empty list and log the
719     error.
720
721     Note that since a logical volume can live on multiple physical
722     volumes, the resulting list might include a logical volume
723     multiple times.
724
725   """
726   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
727                          "--separator=|",
728                          "--options=lv_name,lv_size,devices,vg_name"])
729   if result.failed:
730     _Fail("Failed to list logical volumes, lvs output: %s",
731           result.output)
732
733   def parse_dev(dev):
734     return dev.split('(')[0]
735
736   def handle_dev(dev):
737     return [parse_dev(x) for x in dev.split(",")]
738
739   def map_line(line):
740     line = [v.strip() for v in line]
741     return [{'name': line[0], 'size': line[1],
742              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
743
744   all_devs = []
745   for line in result.stdout.splitlines():
746     if line.count('|') >= 3:
747       all_devs.extend(map_line(line.split('|')))
748     else:
749       logging.warning("Strange line in the output from lvs: '%s'", line)
750   return all_devs
751
752
753 def BridgesExist(bridges_list):
754   """Check if a list of bridges exist on the current node.
755
756   @rtype: boolean
757   @return: C{True} if all of them exist, C{False} otherwise
758
759   """
760   missing = []
761   for bridge in bridges_list:
762     if not utils.BridgeExists(bridge):
763       missing.append(bridge)
764
765   if missing:
766     _Fail("Missing bridges %s", utils.CommaJoin(missing))
767
768
769 def GetInstanceList(hypervisor_list):
770   """Provides a list of instances.
771
772   @type hypervisor_list: list
773   @param hypervisor_list: the list of hypervisors to query information
774
775   @rtype: list
776   @return: a list of all running instances on the current node
777     - instance1.example.com
778     - instance2.example.com
779
780   """
781   results = []
782   for hname in hypervisor_list:
783     try:
784       names = hypervisor.GetHypervisor(hname).ListInstances()
785       results.extend(names)
786     except errors.HypervisorError, err:
787       _Fail("Error enumerating instances (hypervisor %s): %s",
788             hname, err, exc=True)
789
790   return results
791
792
793 def GetInstanceInfo(instance, hname):
794   """Gives back the information about an instance as a dictionary.
795
796   @type instance: string
797   @param instance: the instance name
798   @type hname: string
799   @param hname: the hypervisor type of the instance
800
801   @rtype: dict
802   @return: dictionary with the following keys:
803       - memory: memory size of instance (int)
804       - state: xen state of instance (string)
805       - time: cpu time of instance (float)
806
807   """
808   output = {}
809
810   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
811   if iinfo is not None:
812     output['memory'] = iinfo[2]
813     output['state'] = iinfo[4]
814     output['time'] = iinfo[5]
815
816   return output
817
818
819 def GetInstanceMigratable(instance):
820   """Gives whether an instance can be migrated.
821
822   @type instance: L{objects.Instance}
823   @param instance: object representing the instance to be checked.
824
825   @rtype: tuple
826   @return: tuple of (result, description) where:
827       - result: whether the instance can be migrated or not
828       - description: a description of the issue, if relevant
829
830   """
831   hyper = hypervisor.GetHypervisor(instance.hypervisor)
832   iname = instance.name
833   if iname not in hyper.ListInstances():
834     _Fail("Instance %s is not running", iname)
835
836   for idx in range(len(instance.disks)):
837     link_name = _GetBlockDevSymlinkPath(iname, idx)
838     if not os.path.islink(link_name):
839       logging.warning("Instance %s is missing symlink %s for disk %d",
840                       iname, link_name, idx)
841
842
843 def GetAllInstancesInfo(hypervisor_list):
844   """Gather data about all instances.
845
846   This is the equivalent of L{GetInstanceInfo}, except that it
847   computes data for all instances at once, thus being faster if one
848   needs data about more than one instance.
849
850   @type hypervisor_list: list
851   @param hypervisor_list: list of hypervisors to query for instance data
852
853   @rtype: dict
854   @return: dictionary of instance: data, with data having the following keys:
855       - memory: memory size of instance (int)
856       - state: xen state of instance (string)
857       - time: cpu time of instance (float)
858       - vcpus: the number of vcpus
859
860   """
861   output = {}
862
863   for hname in hypervisor_list:
864     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
865     if iinfo:
866       for name, _, memory, vcpus, state, times in iinfo:
867         value = {
868           'memory': memory,
869           'vcpus': vcpus,
870           'state': state,
871           'time': times,
872           }
873         if name in output:
874           # we only check static parameters, like memory and vcpus,
875           # and not state and time which can change between the
876           # invocations of the different hypervisors
877           for key in 'memory', 'vcpus':
878             if value[key] != output[name][key]:
879               _Fail("Instance %s is running twice"
880                     " with different parameters", name)
881         output[name] = value
882
883   return output
884
885
886 def _InstanceLogName(kind, os_name, instance):
887   """Compute the OS log filename for a given instance and operation.
888
889   The instance name and os name are passed in as strings since not all
890   operations have these as part of an instance object.
891
892   @type kind: string
893   @param kind: the operation type (e.g. add, import, etc.)
894   @type os_name: string
895   @param os_name: the os name
896   @type instance: string
897   @param instance: the name of the instance being imported/added/etc.
898
899   """
900   # TODO: Use tempfile.mkstemp to create unique filename
901   base = ("%s-%s-%s-%s.log" %
902           (kind, os_name, instance, utils.TimestampForFilename()))
903   return utils.PathJoin(constants.LOG_OS_DIR, base)
904
905
906 def InstanceOsAdd(instance, reinstall, debug):
907   """Add an OS to an instance.
908
909   @type instance: L{objects.Instance}
910   @param instance: Instance whose OS is to be installed
911   @type reinstall: boolean
912   @param reinstall: whether this is an instance reinstall
913   @type debug: integer
914   @param debug: debug level, passed to the OS scripts
915   @rtype: None
916
917   """
918   inst_os = OSFromDisk(instance.os)
919
920   create_env = OSEnvironment(instance, inst_os, debug)
921   if reinstall:
922     create_env['INSTANCE_REINSTALL'] = "1"
923
924   logfile = _InstanceLogName("add", instance.os, instance.name)
925
926   result = utils.RunCmd([inst_os.create_script], env=create_env,
927                         cwd=inst_os.path, output=logfile,)
928   if result.failed:
929     logging.error("os create command '%s' returned error: %s, logfile: %s,"
930                   " output: %s", result.cmd, result.fail_reason, logfile,
931                   result.output)
932     lines = [utils.SafeEncode(val)
933              for val in utils.TailFile(logfile, lines=20)]
934     _Fail("OS create script failed (%s), last lines in the"
935           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
936
937
938 def RunRenameInstance(instance, old_name, debug):
939   """Run the OS rename script for an instance.
940
941   @type instance: L{objects.Instance}
942   @param instance: Instance whose OS is to be installed
943   @type old_name: string
944   @param old_name: previous instance name
945   @type debug: integer
946   @param debug: debug level, passed to the OS scripts
947   @rtype: boolean
948   @return: the success of the operation
949
950   """
951   inst_os = OSFromDisk(instance.os)
952
953   rename_env = OSEnvironment(instance, inst_os, debug)
954   rename_env['OLD_INSTANCE_NAME'] = old_name
955
956   logfile = _InstanceLogName("rename", instance.os,
957                              "%s-%s" % (old_name, instance.name))
958
959   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
960                         cwd=inst_os.path, output=logfile)
961
962   if result.failed:
963     logging.error("os create command '%s' returned error: %s output: %s",
964                   result.cmd, result.fail_reason, result.output)
965     lines = [utils.SafeEncode(val)
966              for val in utils.TailFile(logfile, lines=20)]
967     _Fail("OS rename script failed (%s), last lines in the"
968           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
969
970
971 def _GetBlockDevSymlinkPath(instance_name, idx):
972   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
973                         (instance_name, constants.DISK_SEPARATOR, idx))
974
975
976 def _SymlinkBlockDev(instance_name, device_path, idx):
977   """Set up symlinks to a instance's block device.
978
979   This is an auxiliary function run when an instance is start (on the primary
980   node) or when an instance is migrated (on the target node).
981
982
983   @param instance_name: the name of the target instance
984   @param device_path: path of the physical block device, on the node
985   @param idx: the disk index
986   @return: absolute path to the disk's symlink
987
988   """
989   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
990   try:
991     os.symlink(device_path, link_name)
992   except OSError, err:
993     if err.errno == errno.EEXIST:
994       if (not os.path.islink(link_name) or
995           os.readlink(link_name) != device_path):
996         os.remove(link_name)
997         os.symlink(device_path, link_name)
998     else:
999       raise
1000
1001   return link_name
1002
1003
1004 def _RemoveBlockDevLinks(instance_name, disks):
1005   """Remove the block device symlinks belonging to the given instance.
1006
1007   """
1008   for idx, _ in enumerate(disks):
1009     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1010     if os.path.islink(link_name):
1011       try:
1012         os.remove(link_name)
1013       except OSError:
1014         logging.exception("Can't remove symlink '%s'", link_name)
1015
1016
1017 def _GatherAndLinkBlockDevs(instance):
1018   """Set up an instance's block device(s).
1019
1020   This is run on the primary node at instance startup. The block
1021   devices must be already assembled.
1022
1023   @type instance: L{objects.Instance}
1024   @param instance: the instance whose disks we shoul assemble
1025   @rtype: list
1026   @return: list of (disk_object, device_path)
1027
1028   """
1029   block_devices = []
1030   for idx, disk in enumerate(instance.disks):
1031     device = _RecursiveFindBD(disk)
1032     if device is None:
1033       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1034                                     str(disk))
1035     device.Open()
1036     try:
1037       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1038     except OSError, e:
1039       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1040                                     e.strerror)
1041
1042     block_devices.append((disk, link_name))
1043
1044   return block_devices
1045
1046
1047 def StartInstance(instance):
1048   """Start an instance.
1049
1050   @type instance: L{objects.Instance}
1051   @param instance: the instance object
1052   @rtype: None
1053
1054   """
1055   running_instances = GetInstanceList([instance.hypervisor])
1056
1057   if instance.name in running_instances:
1058     logging.info("Instance %s already running, not starting", instance.name)
1059     return
1060
1061   try:
1062     block_devices = _GatherAndLinkBlockDevs(instance)
1063     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1064     hyper.StartInstance(instance, block_devices)
1065   except errors.BlockDeviceError, err:
1066     _Fail("Block device error: %s", err, exc=True)
1067   except errors.HypervisorError, err:
1068     _RemoveBlockDevLinks(instance.name, instance.disks)
1069     _Fail("Hypervisor error: %s", err, exc=True)
1070
1071
1072 def InstanceShutdown(instance, timeout):
1073   """Shut an instance down.
1074
1075   @note: this functions uses polling with a hardcoded timeout.
1076
1077   @type instance: L{objects.Instance}
1078   @param instance: the instance object
1079   @type timeout: integer
1080   @param timeout: maximum timeout for soft shutdown
1081   @rtype: None
1082
1083   """
1084   hv_name = instance.hypervisor
1085   hyper = hypervisor.GetHypervisor(hv_name)
1086   iname = instance.name
1087
1088   if instance.name not in hyper.ListInstances():
1089     logging.info("Instance %s not running, doing nothing", iname)
1090     return
1091
1092   class _TryShutdown:
1093     def __init__(self):
1094       self.tried_once = False
1095
1096     def __call__(self):
1097       if iname not in hyper.ListInstances():
1098         return
1099
1100       try:
1101         hyper.StopInstance(instance, retry=self.tried_once)
1102       except errors.HypervisorError, err:
1103         if iname not in hyper.ListInstances():
1104           # if the instance is no longer existing, consider this a
1105           # success and go to cleanup
1106           return
1107
1108         _Fail("Failed to stop instance %s: %s", iname, err)
1109
1110       self.tried_once = True
1111
1112       raise utils.RetryAgain()
1113
1114   try:
1115     utils.Retry(_TryShutdown(), 5, timeout)
1116   except utils.RetryTimeout:
1117     # the shutdown did not succeed
1118     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1119
1120     try:
1121       hyper.StopInstance(instance, force=True)
1122     except errors.HypervisorError, err:
1123       if iname in hyper.ListInstances():
1124         # only raise an error if the instance still exists, otherwise
1125         # the error could simply be "instance ... unknown"!
1126         _Fail("Failed to force stop instance %s: %s", iname, err)
1127
1128     time.sleep(1)
1129
1130     if iname in hyper.ListInstances():
1131       _Fail("Could not shutdown instance %s even by destroy", iname)
1132
1133   try:
1134     hyper.CleanupInstance(instance.name)
1135   except errors.HypervisorError, err:
1136     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1137
1138   _RemoveBlockDevLinks(iname, instance.disks)
1139
1140
1141 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1142   """Reboot an instance.
1143
1144   @type instance: L{objects.Instance}
1145   @param instance: the instance object to reboot
1146   @type reboot_type: str
1147   @param reboot_type: the type of reboot, one the following
1148     constants:
1149       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1150         instance OS, do not recreate the VM
1151       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1152         restart the VM (at the hypervisor level)
1153       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1154         not accepted here, since that mode is handled differently, in
1155         cmdlib, and translates into full stop and start of the
1156         instance (instead of a call_instance_reboot RPC)
1157   @type shutdown_timeout: integer
1158   @param shutdown_timeout: maximum timeout for soft shutdown
1159   @rtype: None
1160
1161   """
1162   running_instances = GetInstanceList([instance.hypervisor])
1163
1164   if instance.name not in running_instances:
1165     _Fail("Cannot reboot instance %s that is not running", instance.name)
1166
1167   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1168   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1169     try:
1170       hyper.RebootInstance(instance)
1171     except errors.HypervisorError, err:
1172       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1173   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1174     try:
1175       InstanceShutdown(instance, shutdown_timeout)
1176       return StartInstance(instance)
1177     except errors.HypervisorError, err:
1178       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1179   else:
1180     _Fail("Invalid reboot_type received: %s", reboot_type)
1181
1182
1183 def MigrationInfo(instance):
1184   """Gather information about an instance to be migrated.
1185
1186   @type instance: L{objects.Instance}
1187   @param instance: the instance definition
1188
1189   """
1190   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1191   try:
1192     info = hyper.MigrationInfo(instance)
1193   except errors.HypervisorError, err:
1194     _Fail("Failed to fetch migration information: %s", err, exc=True)
1195   return info
1196
1197
1198 def AcceptInstance(instance, info, target):
1199   """Prepare the node to accept an instance.
1200
1201   @type instance: L{objects.Instance}
1202   @param instance: the instance definition
1203   @type info: string/data (opaque)
1204   @param info: migration information, from the source node
1205   @type target: string
1206   @param target: target host (usually ip), on this node
1207
1208   """
1209   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1210   try:
1211     hyper.AcceptInstance(instance, info, target)
1212   except errors.HypervisorError, err:
1213     _Fail("Failed to accept instance: %s", err, exc=True)
1214
1215
1216 def FinalizeMigration(instance, info, success):
1217   """Finalize any preparation to accept an instance.
1218
1219   @type instance: L{objects.Instance}
1220   @param instance: the instance definition
1221   @type info: string/data (opaque)
1222   @param info: migration information, from the source node
1223   @type success: boolean
1224   @param success: whether the migration was a success or a failure
1225
1226   """
1227   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1228   try:
1229     hyper.FinalizeMigration(instance, info, success)
1230   except errors.HypervisorError, err:
1231     _Fail("Failed to finalize migration: %s", err, exc=True)
1232
1233
1234 def MigrateInstance(instance, target, live):
1235   """Migrates an instance to another node.
1236
1237   @type instance: L{objects.Instance}
1238   @param instance: the instance definition
1239   @type target: string
1240   @param target: the target node name
1241   @type live: boolean
1242   @param live: whether the migration should be done live or not (the
1243       interpretation of this parameter is left to the hypervisor)
1244   @rtype: tuple
1245   @return: a tuple of (success, msg) where:
1246       - succes is a boolean denoting the success/failure of the operation
1247       - msg is a string with details in case of failure
1248
1249   """
1250   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1251
1252   try:
1253     hyper.MigrateInstance(instance, target, live)
1254   except errors.HypervisorError, err:
1255     _Fail("Failed to migrate instance: %s", err, exc=True)
1256
1257
1258 def BlockdevCreate(disk, size, owner, on_primary, info):
1259   """Creates a block device for an instance.
1260
1261   @type disk: L{objects.Disk}
1262   @param disk: the object describing the disk we should create
1263   @type size: int
1264   @param size: the size of the physical underlying device, in MiB
1265   @type owner: str
1266   @param owner: the name of the instance for which disk is created,
1267       used for device cache data
1268   @type on_primary: boolean
1269   @param on_primary:  indicates if it is the primary node or not
1270   @type info: string
1271   @param info: string that will be sent to the physical device
1272       creation, used for example to set (LVM) tags on LVs
1273
1274   @return: the new unique_id of the device (this can sometime be
1275       computed only after creation), or None. On secondary nodes,
1276       it's not required to return anything.
1277
1278   """
1279   # TODO: remove the obsolete 'size' argument
1280   # pylint: disable-msg=W0613
1281   clist = []
1282   if disk.children:
1283     for child in disk.children:
1284       try:
1285         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1286       except errors.BlockDeviceError, err:
1287         _Fail("Can't assemble device %s: %s", child, err)
1288       if on_primary or disk.AssembleOnSecondary():
1289         # we need the children open in case the device itself has to
1290         # be assembled
1291         try:
1292           # pylint: disable-msg=E1103
1293           crdev.Open()
1294         except errors.BlockDeviceError, err:
1295           _Fail("Can't make child '%s' read-write: %s", child, err)
1296       clist.append(crdev)
1297
1298   try:
1299     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1300   except errors.BlockDeviceError, err:
1301     _Fail("Can't create block device: %s", err)
1302
1303   if on_primary or disk.AssembleOnSecondary():
1304     try:
1305       device.Assemble()
1306     except errors.BlockDeviceError, err:
1307       _Fail("Can't assemble device after creation, unusual event: %s", err)
1308     device.SetSyncSpeed(constants.SYNC_SPEED)
1309     if on_primary or disk.OpenOnSecondary():
1310       try:
1311         device.Open(force=True)
1312       except errors.BlockDeviceError, err:
1313         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1314     DevCacheManager.UpdateCache(device.dev_path, owner,
1315                                 on_primary, disk.iv_name)
1316
1317   device.SetInfo(info)
1318
1319   return device.unique_id
1320
1321
1322 def _WipeDevice(path, offset, size):
1323   """This function actually wipes the device.
1324
1325   @param path: The path to the device to wipe
1326   @param offset: The offset in MiB in the file
1327   @param size: The size in MiB to write
1328
1329   """
1330   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1331          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1332          "count=%d" % size]
1333   result = utils.RunCmd(cmd)
1334
1335   if result.failed:
1336     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1337           result.fail_reason, result.output)
1338
1339
1340 def BlockdevWipe(disk, offset, size):
1341   """Wipes a block device.
1342
1343   @type disk: L{objects.Disk}
1344   @param disk: the disk object we want to wipe
1345   @type offset: int
1346   @param offset: The offset in MiB in the file
1347   @type size: int
1348   @param size: The size in MiB to write
1349
1350   """
1351   try:
1352     rdev = _RecursiveFindBD(disk)
1353   except errors.BlockDeviceError:
1354     rdev = None
1355
1356   if not rdev:
1357     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1358
1359   # Do cross verify some of the parameters
1360   if offset > rdev.size:
1361     _Fail("Offset is bigger than device size")
1362   if (offset + size) > rdev.size:
1363     _Fail("The provided offset and size to wipe is bigger than device size")
1364
1365   _WipeDevice(rdev.dev_path, offset, size)
1366
1367
1368 def BlockdevPauseResumeSync(disks, pause):
1369   """Pause or resume the sync of the block device.
1370
1371   @type disks: list of L{objects.Disk}
1372   @param disks: the disks object we want to pause/resume
1373   @type pause: bool
1374   @param pause: Wheater to pause or resume
1375
1376   """
1377   success = []
1378   for disk in disks:
1379     try:
1380       rdev = _RecursiveFindBD(disk)
1381     except errors.BlockDeviceError:
1382       rdev = None
1383
1384     if not rdev:
1385       success.append((False, ("Cannot change sync for device %s:"
1386                               " device not found" % disk.iv_name)))
1387       continue
1388
1389     result = rdev.PauseResumeSync(pause)
1390
1391     if result:
1392       success.append((result, None))
1393     else:
1394       if pause:
1395         msg = "Pause"
1396       else:
1397         msg = "Resume"
1398       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1399
1400   return success
1401
1402
1403 def BlockdevRemove(disk):
1404   """Remove a block device.
1405
1406   @note: This is intended to be called recursively.
1407
1408   @type disk: L{objects.Disk}
1409   @param disk: the disk object we should remove
1410   @rtype: boolean
1411   @return: the success of the operation
1412
1413   """
1414   msgs = []
1415   try:
1416     rdev = _RecursiveFindBD(disk)
1417   except errors.BlockDeviceError, err:
1418     # probably can't attach
1419     logging.info("Can't attach to device %s in remove", disk)
1420     rdev = None
1421   if rdev is not None:
1422     r_path = rdev.dev_path
1423     try:
1424       rdev.Remove()
1425     except errors.BlockDeviceError, err:
1426       msgs.append(str(err))
1427     if not msgs:
1428       DevCacheManager.RemoveCache(r_path)
1429
1430   if disk.children:
1431     for child in disk.children:
1432       try:
1433         BlockdevRemove(child)
1434       except RPCFail, err:
1435         msgs.append(str(err))
1436
1437   if msgs:
1438     _Fail("; ".join(msgs))
1439
1440
1441 def _RecursiveAssembleBD(disk, owner, as_primary):
1442   """Activate a block device for an instance.
1443
1444   This is run on the primary and secondary nodes for an instance.
1445
1446   @note: this function is called recursively.
1447
1448   @type disk: L{objects.Disk}
1449   @param disk: the disk we try to assemble
1450   @type owner: str
1451   @param owner: the name of the instance which owns the disk
1452   @type as_primary: boolean
1453   @param as_primary: if we should make the block device
1454       read/write
1455
1456   @return: the assembled device or None (in case no device
1457       was assembled)
1458   @raise errors.BlockDeviceError: in case there is an error
1459       during the activation of the children or the device
1460       itself
1461
1462   """
1463   children = []
1464   if disk.children:
1465     mcn = disk.ChildrenNeeded()
1466     if mcn == -1:
1467       mcn = 0 # max number of Nones allowed
1468     else:
1469       mcn = len(disk.children) - mcn # max number of Nones
1470     for chld_disk in disk.children:
1471       try:
1472         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1473       except errors.BlockDeviceError, err:
1474         if children.count(None) >= mcn:
1475           raise
1476         cdev = None
1477         logging.error("Error in child activation (but continuing): %s",
1478                       str(err))
1479       children.append(cdev)
1480
1481   if as_primary or disk.AssembleOnSecondary():
1482     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1483     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1484     result = r_dev
1485     if as_primary or disk.OpenOnSecondary():
1486       r_dev.Open()
1487     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1488                                 as_primary, disk.iv_name)
1489
1490   else:
1491     result = True
1492   return result
1493
1494
1495 def BlockdevAssemble(disk, owner, as_primary, idx):
1496   """Activate a block device for an instance.
1497
1498   This is a wrapper over _RecursiveAssembleBD.
1499
1500   @rtype: str or boolean
1501   @return: a C{/dev/...} path for primary nodes, and
1502       C{True} for secondary nodes
1503
1504   """
1505   try:
1506     result = _RecursiveAssembleBD(disk, owner, as_primary)
1507     if isinstance(result, bdev.BlockDev):
1508       # pylint: disable-msg=E1103
1509       result = result.dev_path
1510       if as_primary:
1511         _SymlinkBlockDev(owner, result, idx)
1512   except errors.BlockDeviceError, err:
1513     _Fail("Error while assembling disk: %s", err, exc=True)
1514   except OSError, err:
1515     _Fail("Error while symlinking disk: %s", err, exc=True)
1516
1517   return result
1518
1519
1520 def BlockdevShutdown(disk):
1521   """Shut down a block device.
1522
1523   First, if the device is assembled (Attach() is successful), then
1524   the device is shutdown. Then the children of the device are
1525   shutdown.
1526
1527   This function is called recursively. Note that we don't cache the
1528   children or such, as oppossed to assemble, shutdown of different
1529   devices doesn't require that the upper device was active.
1530
1531   @type disk: L{objects.Disk}
1532   @param disk: the description of the disk we should
1533       shutdown
1534   @rtype: None
1535
1536   """
1537   msgs = []
1538   r_dev = _RecursiveFindBD(disk)
1539   if r_dev is not None:
1540     r_path = r_dev.dev_path
1541     try:
1542       r_dev.Shutdown()
1543       DevCacheManager.RemoveCache(r_path)
1544     except errors.BlockDeviceError, err:
1545       msgs.append(str(err))
1546
1547   if disk.children:
1548     for child in disk.children:
1549       try:
1550         BlockdevShutdown(child)
1551       except RPCFail, err:
1552         msgs.append(str(err))
1553
1554   if msgs:
1555     _Fail("; ".join(msgs))
1556
1557
1558 def BlockdevAddchildren(parent_cdev, new_cdevs):
1559   """Extend a mirrored block device.
1560
1561   @type parent_cdev: L{objects.Disk}
1562   @param parent_cdev: the disk to which we should add children
1563   @type new_cdevs: list of L{objects.Disk}
1564   @param new_cdevs: the list of children which we should add
1565   @rtype: None
1566
1567   """
1568   parent_bdev = _RecursiveFindBD(parent_cdev)
1569   if parent_bdev is None:
1570     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1571   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1572   if new_bdevs.count(None) > 0:
1573     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1574   parent_bdev.AddChildren(new_bdevs)
1575
1576
1577 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1578   """Shrink a mirrored block device.
1579
1580   @type parent_cdev: L{objects.Disk}
1581   @param parent_cdev: the disk from which we should remove children
1582   @type new_cdevs: list of L{objects.Disk}
1583   @param new_cdevs: the list of children which we should remove
1584   @rtype: None
1585
1586   """
1587   parent_bdev = _RecursiveFindBD(parent_cdev)
1588   if parent_bdev is None:
1589     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1590   devs = []
1591   for disk in new_cdevs:
1592     rpath = disk.StaticDevPath()
1593     if rpath is None:
1594       bd = _RecursiveFindBD(disk)
1595       if bd is None:
1596         _Fail("Can't find device %s while removing children", disk)
1597       else:
1598         devs.append(bd.dev_path)
1599     else:
1600       if not utils.IsNormAbsPath(rpath):
1601         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1602       devs.append(rpath)
1603   parent_bdev.RemoveChildren(devs)
1604
1605
1606 def BlockdevGetmirrorstatus(disks):
1607   """Get the mirroring status of a list of devices.
1608
1609   @type disks: list of L{objects.Disk}
1610   @param disks: the list of disks which we should query
1611   @rtype: disk
1612   @return: List of L{objects.BlockDevStatus}, one for each disk
1613   @raise errors.BlockDeviceError: if any of the disks cannot be
1614       found
1615
1616   """
1617   stats = []
1618   for dsk in disks:
1619     rbd = _RecursiveFindBD(dsk)
1620     if rbd is None:
1621       _Fail("Can't find device %s", dsk)
1622
1623     stats.append(rbd.CombinedSyncStatus())
1624
1625   return stats
1626
1627
1628 def BlockdevGetmirrorstatusMulti(disks):
1629   """Get the mirroring status of a list of devices.
1630
1631   @type disks: list of L{objects.Disk}
1632   @param disks: the list of disks which we should query
1633   @rtype: disk
1634   @return: List of tuples, (bool, status), one for each disk; bool denotes
1635     success/failure, status is L{objects.BlockDevStatus} on success, string
1636     otherwise
1637
1638   """
1639   result = []
1640   for disk in disks:
1641     try:
1642       rbd = _RecursiveFindBD(disk)
1643       if rbd is None:
1644         result.append((False, "Can't find device %s" % disk))
1645         continue
1646
1647       status = rbd.CombinedSyncStatus()
1648     except errors.BlockDeviceError, err:
1649       logging.exception("Error while getting disk status")
1650       result.append((False, str(err)))
1651     else:
1652       result.append((True, status))
1653
1654   assert len(disks) == len(result)
1655
1656   return result
1657
1658
1659 def _RecursiveFindBD(disk):
1660   """Check if a device is activated.
1661
1662   If so, return information about the real device.
1663
1664   @type disk: L{objects.Disk}
1665   @param disk: the disk object we need to find
1666
1667   @return: None if the device can't be found,
1668       otherwise the device instance
1669
1670   """
1671   children = []
1672   if disk.children:
1673     for chdisk in disk.children:
1674       children.append(_RecursiveFindBD(chdisk))
1675
1676   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1677
1678
1679 def _OpenRealBD(disk):
1680   """Opens the underlying block device of a disk.
1681
1682   @type disk: L{objects.Disk}
1683   @param disk: the disk object we want to open
1684
1685   """
1686   real_disk = _RecursiveFindBD(disk)
1687   if real_disk is None:
1688     _Fail("Block device '%s' is not set up", disk)
1689
1690   real_disk.Open()
1691
1692   return real_disk
1693
1694
1695 def BlockdevFind(disk):
1696   """Check if a device is activated.
1697
1698   If it is, return information about the real device.
1699
1700   @type disk: L{objects.Disk}
1701   @param disk: the disk to find
1702   @rtype: None or objects.BlockDevStatus
1703   @return: None if the disk cannot be found, otherwise a the current
1704            information
1705
1706   """
1707   try:
1708     rbd = _RecursiveFindBD(disk)
1709   except errors.BlockDeviceError, err:
1710     _Fail("Failed to find device: %s", err, exc=True)
1711
1712   if rbd is None:
1713     return None
1714
1715   return rbd.GetSyncStatus()
1716
1717
1718 def BlockdevGetsize(disks):
1719   """Computes the size of the given disks.
1720
1721   If a disk is not found, returns None instead.
1722
1723   @type disks: list of L{objects.Disk}
1724   @param disks: the list of disk to compute the size for
1725   @rtype: list
1726   @return: list with elements None if the disk cannot be found,
1727       otherwise the size
1728
1729   """
1730   result = []
1731   for cf in disks:
1732     try:
1733       rbd = _RecursiveFindBD(cf)
1734     except errors.BlockDeviceError:
1735       result.append(None)
1736       continue
1737     if rbd is None:
1738       result.append(None)
1739     else:
1740       result.append(rbd.GetActualSize())
1741   return result
1742
1743
1744 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1745   """Export a block device to a remote node.
1746
1747   @type disk: L{objects.Disk}
1748   @param disk: the description of the disk to export
1749   @type dest_node: str
1750   @param dest_node: the destination node to export to
1751   @type dest_path: str
1752   @param dest_path: the destination path on the target node
1753   @type cluster_name: str
1754   @param cluster_name: the cluster name, needed for SSH hostalias
1755   @rtype: None
1756
1757   """
1758   real_disk = _OpenRealBD(disk)
1759
1760   # the block size on the read dd is 1MiB to match our units
1761   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1762                                "dd if=%s bs=1048576 count=%s",
1763                                real_disk.dev_path, str(disk.size))
1764
1765   # we set here a smaller block size as, due to ssh buffering, more
1766   # than 64-128k will mostly ignored; we use nocreat to fail if the
1767   # device is not already there or we pass a wrong path; we use
1768   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1769   # to not buffer too much memory; this means that at best, we flush
1770   # every 64k, which will not be very fast
1771   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1772                                 " oflag=dsync", dest_path)
1773
1774   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1775                                                    constants.GANETI_RUNAS,
1776                                                    destcmd)
1777
1778   # all commands have been checked, so we're safe to combine them
1779   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1780
1781   result = utils.RunCmd(["bash", "-c", command])
1782
1783   if result.failed:
1784     _Fail("Disk copy command '%s' returned error: %s"
1785           " output: %s", command, result.fail_reason, result.output)
1786
1787
1788 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1789   """Write a file to the filesystem.
1790
1791   This allows the master to overwrite(!) a file. It will only perform
1792   the operation if the file belongs to a list of configuration files.
1793
1794   @type file_name: str
1795   @param file_name: the target file name
1796   @type data: str
1797   @param data: the new contents of the file
1798   @type mode: int
1799   @param mode: the mode to give the file (can be None)
1800   @type uid: int
1801   @param uid: the owner of the file (can be -1 for default)
1802   @type gid: int
1803   @param gid: the group of the file (can be -1 for default)
1804   @type atime: float
1805   @param atime: the atime to set on the file (can be None)
1806   @type mtime: float
1807   @param mtime: the mtime to set on the file (can be None)
1808   @rtype: None
1809
1810   """
1811   if not os.path.isabs(file_name):
1812     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1813
1814   if file_name not in _ALLOWED_UPLOAD_FILES:
1815     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1816           file_name)
1817
1818   raw_data = _Decompress(data)
1819
1820   utils.SafeWriteFile(file_name, None,
1821                       data=raw_data, mode=mode, uid=uid, gid=gid,
1822                       atime=atime, mtime=mtime)
1823
1824
1825 def RunOob(oob_program, command, node, timeout):
1826   """Executes oob_program with given command on given node.
1827
1828   @param oob_program: The path to the executable oob_program
1829   @param command: The command to invoke on oob_program
1830   @param node: The node given as an argument to the program
1831   @param timeout: Timeout after which we kill the oob program
1832
1833   @return: stdout
1834   @raise RPCFail: If execution fails for some reason
1835
1836   """
1837   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1838
1839   if result.failed:
1840     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1841           result.fail_reason, result.output)
1842
1843   return result.stdout
1844
1845
1846 def WriteSsconfFiles(values):
1847   """Update all ssconf files.
1848
1849   Wrapper around the SimpleStore.WriteFiles.
1850
1851   """
1852   ssconf.SimpleStore().WriteFiles(values)
1853
1854
1855 def _ErrnoOrStr(err):
1856   """Format an EnvironmentError exception.
1857
1858   If the L{err} argument has an errno attribute, it will be looked up
1859   and converted into a textual C{E...} description. Otherwise the
1860   string representation of the error will be returned.
1861
1862   @type err: L{EnvironmentError}
1863   @param err: the exception to format
1864
1865   """
1866   if hasattr(err, 'errno'):
1867     detail = errno.errorcode[err.errno]
1868   else:
1869     detail = str(err)
1870   return detail
1871
1872
1873 def _OSOndiskAPIVersion(os_dir):
1874   """Compute and return the API version of a given OS.
1875
1876   This function will try to read the API version of the OS residing in
1877   the 'os_dir' directory.
1878
1879   @type os_dir: str
1880   @param os_dir: the directory in which we should look for the OS
1881   @rtype: tuple
1882   @return: tuple (status, data) with status denoting the validity and
1883       data holding either the vaid versions or an error message
1884
1885   """
1886   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1887
1888   try:
1889     st = os.stat(api_file)
1890   except EnvironmentError, err:
1891     return False, ("Required file '%s' not found under path %s: %s" %
1892                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1893
1894   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1895     return False, ("File '%s' in %s is not a regular file" %
1896                    (constants.OS_API_FILE, os_dir))
1897
1898   try:
1899     api_versions = utils.ReadFile(api_file).splitlines()
1900   except EnvironmentError, err:
1901     return False, ("Error while reading the API version file at %s: %s" %
1902                    (api_file, _ErrnoOrStr(err)))
1903
1904   try:
1905     api_versions = [int(version.strip()) for version in api_versions]
1906   except (TypeError, ValueError), err:
1907     return False, ("API version(s) can't be converted to integer: %s" %
1908                    str(err))
1909
1910   return True, api_versions
1911
1912
1913 def DiagnoseOS(top_dirs=None):
1914   """Compute the validity for all OSes.
1915
1916   @type top_dirs: list
1917   @param top_dirs: the list of directories in which to
1918       search (if not given defaults to
1919       L{constants.OS_SEARCH_PATH})
1920   @rtype: list of L{objects.OS}
1921   @return: a list of tuples (name, path, status, diagnose, variants,
1922       parameters, api_version) for all (potential) OSes under all
1923       search paths, where:
1924           - name is the (potential) OS name
1925           - path is the full path to the OS
1926           - status True/False is the validity of the OS
1927           - diagnose is the error message for an invalid OS, otherwise empty
1928           - variants is a list of supported OS variants, if any
1929           - parameters is a list of (name, help) parameters, if any
1930           - api_version is a list of support OS API versions
1931
1932   """
1933   if top_dirs is None:
1934     top_dirs = constants.OS_SEARCH_PATH
1935
1936   result = []
1937   for dir_name in top_dirs:
1938     if os.path.isdir(dir_name):
1939       try:
1940         f_names = utils.ListVisibleFiles(dir_name)
1941       except EnvironmentError, err:
1942         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1943         break
1944       for name in f_names:
1945         os_path = utils.PathJoin(dir_name, name)
1946         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1947         if status:
1948           diagnose = ""
1949           variants = os_inst.supported_variants
1950           parameters = os_inst.supported_parameters
1951           api_versions = os_inst.api_versions
1952         else:
1953           diagnose = os_inst
1954           variants = parameters = api_versions = []
1955         result.append((name, os_path, status, diagnose, variants,
1956                        parameters, api_versions))
1957
1958   return result
1959
1960
1961 def _TryOSFromDisk(name, base_dir=None):
1962   """Create an OS instance from disk.
1963
1964   This function will return an OS instance if the given name is a
1965   valid OS name.
1966
1967   @type base_dir: string
1968   @keyword base_dir: Base directory containing OS installations.
1969                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1970   @rtype: tuple
1971   @return: success and either the OS instance if we find a valid one,
1972       or error message
1973
1974   """
1975   if base_dir is None:
1976     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1977   else:
1978     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1979
1980   if os_dir is None:
1981     return False, "Directory for OS %s not found in search path" % name
1982
1983   status, api_versions = _OSOndiskAPIVersion(os_dir)
1984   if not status:
1985     # push the error up
1986     return status, api_versions
1987
1988   if not constants.OS_API_VERSIONS.intersection(api_versions):
1989     return False, ("API version mismatch for path '%s': found %s, want %s." %
1990                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1991
1992   # OS Files dictionary, we will populate it with the absolute path names
1993   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1994
1995   if max(api_versions) >= constants.OS_API_V15:
1996     os_files[constants.OS_VARIANTS_FILE] = ''
1997
1998   if max(api_versions) >= constants.OS_API_V20:
1999     os_files[constants.OS_PARAMETERS_FILE] = ''
2000   else:
2001     del os_files[constants.OS_SCRIPT_VERIFY]
2002
2003   for filename in os_files:
2004     os_files[filename] = utils.PathJoin(os_dir, filename)
2005
2006     try:
2007       st = os.stat(os_files[filename])
2008     except EnvironmentError, err:
2009       return False, ("File '%s' under path '%s' is missing (%s)" %
2010                      (filename, os_dir, _ErrnoOrStr(err)))
2011
2012     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2013       return False, ("File '%s' under path '%s' is not a regular file" %
2014                      (filename, os_dir))
2015
2016     if filename in constants.OS_SCRIPTS:
2017       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2018         return False, ("File '%s' under path '%s' is not executable" %
2019                        (filename, os_dir))
2020
2021   variants = []
2022   if constants.OS_VARIANTS_FILE in os_files:
2023     variants_file = os_files[constants.OS_VARIANTS_FILE]
2024     try:
2025       variants = utils.ReadFile(variants_file).splitlines()
2026     except EnvironmentError, err:
2027       return False, ("Error while reading the OS variants file at %s: %s" %
2028                      (variants_file, _ErrnoOrStr(err)))
2029     if not variants:
2030       return False, ("No supported os variant found")
2031
2032   parameters = []
2033   if constants.OS_PARAMETERS_FILE in os_files:
2034     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2035     try:
2036       parameters = utils.ReadFile(parameters_file).splitlines()
2037     except EnvironmentError, err:
2038       return False, ("Error while reading the OS parameters file at %s: %s" %
2039                      (parameters_file, _ErrnoOrStr(err)))
2040     parameters = [v.split(None, 1) for v in parameters]
2041
2042   os_obj = objects.OS(name=name, path=os_dir,
2043                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2044                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2045                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2046                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2047                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2048                                                  None),
2049                       supported_variants=variants,
2050                       supported_parameters=parameters,
2051                       api_versions=api_versions)
2052   return True, os_obj
2053
2054
2055 def OSFromDisk(name, base_dir=None):
2056   """Create an OS instance from disk.
2057
2058   This function will return an OS instance if the given name is a
2059   valid OS name. Otherwise, it will raise an appropriate
2060   L{RPCFail} exception, detailing why this is not a valid OS.
2061
2062   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2063   an exception but returns true/false status data.
2064
2065   @type base_dir: string
2066   @keyword base_dir: Base directory containing OS installations.
2067                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2068   @rtype: L{objects.OS}
2069   @return: the OS instance if we find a valid one
2070   @raise RPCFail: if we don't find a valid OS
2071
2072   """
2073   name_only = objects.OS.GetName(name)
2074   status, payload = _TryOSFromDisk(name_only, base_dir)
2075
2076   if not status:
2077     _Fail(payload)
2078
2079   return payload
2080
2081
2082 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2083   """Calculate the basic environment for an os script.
2084
2085   @type os_name: str
2086   @param os_name: full operating system name (including variant)
2087   @type inst_os: L{objects.OS}
2088   @param inst_os: operating system for which the environment is being built
2089   @type os_params: dict
2090   @param os_params: the OS parameters
2091   @type debug: integer
2092   @param debug: debug level (0 or 1, for OS Api 10)
2093   @rtype: dict
2094   @return: dict of environment variables
2095   @raise errors.BlockDeviceError: if the block device
2096       cannot be found
2097
2098   """
2099   result = {}
2100   api_version = \
2101     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2102   result['OS_API_VERSION'] = '%d' % api_version
2103   result['OS_NAME'] = inst_os.name
2104   result['DEBUG_LEVEL'] = '%d' % debug
2105
2106   # OS variants
2107   if api_version >= constants.OS_API_V15:
2108     variant = objects.OS.GetVariant(os_name)
2109     if not variant:
2110       variant = inst_os.supported_variants[0]
2111     result['OS_VARIANT'] = variant
2112
2113   # OS params
2114   for pname, pvalue in os_params.items():
2115     result['OSP_%s' % pname.upper()] = pvalue
2116
2117   return result
2118
2119
2120 def OSEnvironment(instance, inst_os, debug=0):
2121   """Calculate the environment for an os script.
2122
2123   @type instance: L{objects.Instance}
2124   @param instance: target instance for the os script run
2125   @type inst_os: L{objects.OS}
2126   @param inst_os: operating system for which the environment is being built
2127   @type debug: integer
2128   @param debug: debug level (0 or 1, for OS Api 10)
2129   @rtype: dict
2130   @return: dict of environment variables
2131   @raise errors.BlockDeviceError: if the block device
2132       cannot be found
2133
2134   """
2135   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2136
2137   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2138     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2139
2140   result['HYPERVISOR'] = instance.hypervisor
2141   result['DISK_COUNT'] = '%d' % len(instance.disks)
2142   result['NIC_COUNT'] = '%d' % len(instance.nics)
2143
2144   # Disks
2145   for idx, disk in enumerate(instance.disks):
2146     real_disk = _OpenRealBD(disk)
2147     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2148     result['DISK_%d_ACCESS' % idx] = disk.mode
2149     if constants.HV_DISK_TYPE in instance.hvparams:
2150       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2151         instance.hvparams[constants.HV_DISK_TYPE]
2152     if disk.dev_type in constants.LDS_BLOCK:
2153       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2154     elif disk.dev_type == constants.LD_FILE:
2155       result['DISK_%d_BACKEND_TYPE' % idx] = \
2156         'file:%s' % disk.physical_id[0]
2157
2158   # NICs
2159   for idx, nic in enumerate(instance.nics):
2160     result['NIC_%d_MAC' % idx] = nic.mac
2161     if nic.ip:
2162       result['NIC_%d_IP' % idx] = nic.ip
2163     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2164     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2165       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2166     if nic.nicparams[constants.NIC_LINK]:
2167       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2168     if constants.HV_NIC_TYPE in instance.hvparams:
2169       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2170         instance.hvparams[constants.HV_NIC_TYPE]
2171
2172   # HV/BE params
2173   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2174     for key, value in source.items():
2175       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2176
2177   return result
2178
2179
2180 def BlockdevGrow(disk, amount):
2181   """Grow a stack of block devices.
2182
2183   This function is called recursively, with the childrens being the
2184   first ones to resize.
2185
2186   @type disk: L{objects.Disk}
2187   @param disk: the disk to be grown
2188   @rtype: (status, result)
2189   @return: a tuple with the status of the operation
2190       (True/False), and the errors message if status
2191       is False
2192
2193   """
2194   r_dev = _RecursiveFindBD(disk)
2195   if r_dev is None:
2196     _Fail("Cannot find block device %s", disk)
2197
2198   try:
2199     r_dev.Grow(amount)
2200   except errors.BlockDeviceError, err:
2201     _Fail("Failed to grow block device: %s", err, exc=True)
2202
2203
2204 def BlockdevSnapshot(disk):
2205   """Create a snapshot copy of a block device.
2206
2207   This function is called recursively, and the snapshot is actually created
2208   just for the leaf lvm backend device.
2209
2210   @type disk: L{objects.Disk}
2211   @param disk: the disk to be snapshotted
2212   @rtype: string
2213   @return: snapshot disk ID as (vg, lv)
2214
2215   """
2216   if disk.dev_type == constants.LD_DRBD8:
2217     if not disk.children:
2218       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2219             disk.unique_id)
2220     return BlockdevSnapshot(disk.children[0])
2221   elif disk.dev_type == constants.LD_LV:
2222     r_dev = _RecursiveFindBD(disk)
2223     if r_dev is not None:
2224       # FIXME: choose a saner value for the snapshot size
2225       # let's stay on the safe side and ask for the full size, for now
2226       return r_dev.Snapshot(disk.size)
2227     else:
2228       _Fail("Cannot find block device %s", disk)
2229   else:
2230     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2231           disk.unique_id, disk.dev_type)
2232
2233
2234 def FinalizeExport(instance, snap_disks):
2235   """Write out the export configuration information.
2236
2237   @type instance: L{objects.Instance}
2238   @param instance: the instance which we export, used for
2239       saving configuration
2240   @type snap_disks: list of L{objects.Disk}
2241   @param snap_disks: list of snapshot block devices, which
2242       will be used to get the actual name of the dump file
2243
2244   @rtype: None
2245
2246   """
2247   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2248   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2249
2250   config = objects.SerializableConfigParser()
2251
2252   config.add_section(constants.INISECT_EXP)
2253   config.set(constants.INISECT_EXP, 'version', '0')
2254   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2255   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2256   config.set(constants.INISECT_EXP, 'os', instance.os)
2257   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2258
2259   config.add_section(constants.INISECT_INS)
2260   config.set(constants.INISECT_INS, 'name', instance.name)
2261   config.set(constants.INISECT_INS, 'memory', '%d' %
2262              instance.beparams[constants.BE_MEMORY])
2263   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2264              instance.beparams[constants.BE_VCPUS])
2265   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2266   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2267
2268   nic_total = 0
2269   for nic_count, nic in enumerate(instance.nics):
2270     nic_total += 1
2271     config.set(constants.INISECT_INS, 'nic%d_mac' %
2272                nic_count, '%s' % nic.mac)
2273     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2274     for param in constants.NICS_PARAMETER_TYPES:
2275       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2276                  '%s' % nic.nicparams.get(param, None))
2277   # TODO: redundant: on load can read nics until it doesn't exist
2278   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2279
2280   disk_total = 0
2281   for disk_count, disk in enumerate(snap_disks):
2282     if disk:
2283       disk_total += 1
2284       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2285                  ('%s' % disk.iv_name))
2286       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2287                  ('%s' % disk.physical_id[1]))
2288       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2289                  ('%d' % disk.size))
2290
2291   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2292
2293   # New-style hypervisor/backend parameters
2294
2295   config.add_section(constants.INISECT_HYP)
2296   for name, value in instance.hvparams.items():
2297     if name not in constants.HVC_GLOBALS:
2298       config.set(constants.INISECT_HYP, name, str(value))
2299
2300   config.add_section(constants.INISECT_BEP)
2301   for name, value in instance.beparams.items():
2302     config.set(constants.INISECT_BEP, name, str(value))
2303
2304   config.add_section(constants.INISECT_OSP)
2305   for name, value in instance.osparams.items():
2306     config.set(constants.INISECT_OSP, name, str(value))
2307
2308   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2309                   data=config.Dumps())
2310   shutil.rmtree(finaldestdir, ignore_errors=True)
2311   shutil.move(destdir, finaldestdir)
2312
2313
2314 def ExportInfo(dest):
2315   """Get export configuration information.
2316
2317   @type dest: str
2318   @param dest: directory containing the export
2319
2320   @rtype: L{objects.SerializableConfigParser}
2321   @return: a serializable config file containing the
2322       export info
2323
2324   """
2325   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2326
2327   config = objects.SerializableConfigParser()
2328   config.read(cff)
2329
2330   if (not config.has_section(constants.INISECT_EXP) or
2331       not config.has_section(constants.INISECT_INS)):
2332     _Fail("Export info file doesn't have the required fields")
2333
2334   return config.Dumps()
2335
2336
2337 def ListExports():
2338   """Return a list of exports currently available on this machine.
2339
2340   @rtype: list
2341   @return: list of the exports
2342
2343   """
2344   if os.path.isdir(constants.EXPORT_DIR):
2345     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2346   else:
2347     _Fail("No exports directory")
2348
2349
2350 def RemoveExport(export):
2351   """Remove an existing export from the node.
2352
2353   @type export: str
2354   @param export: the name of the export to remove
2355   @rtype: None
2356
2357   """
2358   target = utils.PathJoin(constants.EXPORT_DIR, export)
2359
2360   try:
2361     shutil.rmtree(target)
2362   except EnvironmentError, err:
2363     _Fail("Error while removing the export: %s", err, exc=True)
2364
2365
2366 def BlockdevRename(devlist):
2367   """Rename a list of block devices.
2368
2369   @type devlist: list of tuples
2370   @param devlist: list of tuples of the form  (disk,
2371       new_logical_id, new_physical_id); disk is an
2372       L{objects.Disk} object describing the current disk,
2373       and new logical_id/physical_id is the name we
2374       rename it to
2375   @rtype: boolean
2376   @return: True if all renames succeeded, False otherwise
2377
2378   """
2379   msgs = []
2380   result = True
2381   for disk, unique_id in devlist:
2382     dev = _RecursiveFindBD(disk)
2383     if dev is None:
2384       msgs.append("Can't find device %s in rename" % str(disk))
2385       result = False
2386       continue
2387     try:
2388       old_rpath = dev.dev_path
2389       dev.Rename(unique_id)
2390       new_rpath = dev.dev_path
2391       if old_rpath != new_rpath:
2392         DevCacheManager.RemoveCache(old_rpath)
2393         # FIXME: we should add the new cache information here, like:
2394         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2395         # but we don't have the owner here - maybe parse from existing
2396         # cache? for now, we only lose lvm data when we rename, which
2397         # is less critical than DRBD or MD
2398     except errors.BlockDeviceError, err:
2399       msgs.append("Can't rename device '%s' to '%s': %s" %
2400                   (dev, unique_id, err))
2401       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2402       result = False
2403   if not result:
2404     _Fail("; ".join(msgs))
2405
2406
2407 def _TransformFileStorageDir(file_storage_dir):
2408   """Checks whether given file_storage_dir is valid.
2409
2410   Checks wheter the given file_storage_dir is within the cluster-wide
2411   default file_storage_dir stored in SimpleStore. Only paths under that
2412   directory are allowed.
2413
2414   @type file_storage_dir: str
2415   @param file_storage_dir: the path to check
2416
2417   @return: the normalized path if valid, None otherwise
2418
2419   """
2420   if not constants.ENABLE_FILE_STORAGE:
2421     _Fail("File storage disabled at configure time")
2422   cfg = _GetConfig()
2423   file_storage_dir = os.path.normpath(file_storage_dir)
2424   base_file_storage_dir = cfg.GetFileStorageDir()
2425   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2426       base_file_storage_dir):
2427     _Fail("File storage directory '%s' is not under base file"
2428           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2429   return file_storage_dir
2430
2431
2432 def CreateFileStorageDir(file_storage_dir):
2433   """Create file storage directory.
2434
2435   @type file_storage_dir: str
2436   @param file_storage_dir: directory to create
2437
2438   @rtype: tuple
2439   @return: tuple with first element a boolean indicating wheter dir
2440       creation was successful or not
2441
2442   """
2443   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2444   if os.path.exists(file_storage_dir):
2445     if not os.path.isdir(file_storage_dir):
2446       _Fail("Specified storage dir '%s' is not a directory",
2447             file_storage_dir)
2448   else:
2449     try:
2450       os.makedirs(file_storage_dir, 0750)
2451     except OSError, err:
2452       _Fail("Cannot create file storage directory '%s': %s",
2453             file_storage_dir, err, exc=True)
2454
2455
2456 def RemoveFileStorageDir(file_storage_dir):
2457   """Remove file storage directory.
2458
2459   Remove it only if it's empty. If not log an error and return.
2460
2461   @type file_storage_dir: str
2462   @param file_storage_dir: the directory we should cleanup
2463   @rtype: tuple (success,)
2464   @return: tuple of one element, C{success}, denoting
2465       whether the operation was successful
2466
2467   """
2468   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2469   if os.path.exists(file_storage_dir):
2470     if not os.path.isdir(file_storage_dir):
2471       _Fail("Specified Storage directory '%s' is not a directory",
2472             file_storage_dir)
2473     # deletes dir only if empty, otherwise we want to fail the rpc call
2474     try:
2475       os.rmdir(file_storage_dir)
2476     except OSError, err:
2477       _Fail("Cannot remove file storage directory '%s': %s",
2478             file_storage_dir, err)
2479
2480
2481 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2482   """Rename the file storage directory.
2483
2484   @type old_file_storage_dir: str
2485   @param old_file_storage_dir: the current path
2486   @type new_file_storage_dir: str
2487   @param new_file_storage_dir: the name we should rename to
2488   @rtype: tuple (success,)
2489   @return: tuple of one element, C{success}, denoting
2490       whether the operation was successful
2491
2492   """
2493   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2494   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2495   if not os.path.exists(new_file_storage_dir):
2496     if os.path.isdir(old_file_storage_dir):
2497       try:
2498         os.rename(old_file_storage_dir, new_file_storage_dir)
2499       except OSError, err:
2500         _Fail("Cannot rename '%s' to '%s': %s",
2501               old_file_storage_dir, new_file_storage_dir, err)
2502     else:
2503       _Fail("Specified storage dir '%s' is not a directory",
2504             old_file_storage_dir)
2505   else:
2506     if os.path.exists(old_file_storage_dir):
2507       _Fail("Cannot rename '%s' to '%s': both locations exist",
2508             old_file_storage_dir, new_file_storage_dir)
2509
2510
2511 def _EnsureJobQueueFile(file_name):
2512   """Checks whether the given filename is in the queue directory.
2513
2514   @type file_name: str
2515   @param file_name: the file name we should check
2516   @rtype: None
2517   @raises RPCFail: if the file is not valid
2518
2519   """
2520   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2521   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2522
2523   if not result:
2524     _Fail("Passed job queue file '%s' does not belong to"
2525           " the queue directory '%s'", file_name, queue_dir)
2526
2527
2528 def JobQueueUpdate(file_name, content):
2529   """Updates a file in the queue directory.
2530
2531   This is just a wrapper over L{utils.io.WriteFile}, with proper
2532   checking.
2533
2534   @type file_name: str
2535   @param file_name: the job file name
2536   @type content: str
2537   @param content: the new job contents
2538   @rtype: boolean
2539   @return: the success of the operation
2540
2541   """
2542   _EnsureJobQueueFile(file_name)
2543   getents = runtime.GetEnts()
2544
2545   # Write and replace the file atomically
2546   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2547                   gid=getents.masterd_gid)
2548
2549
2550 def JobQueueRename(old, new):
2551   """Renames a job queue file.
2552
2553   This is just a wrapper over os.rename with proper checking.
2554
2555   @type old: str
2556   @param old: the old (actual) file name
2557   @type new: str
2558   @param new: the desired file name
2559   @rtype: tuple
2560   @return: the success of the operation and payload
2561
2562   """
2563   _EnsureJobQueueFile(old)
2564   _EnsureJobQueueFile(new)
2565
2566   utils.RenameFile(old, new, mkdir=True)
2567
2568
2569 def BlockdevClose(instance_name, disks):
2570   """Closes the given block devices.
2571
2572   This means they will be switched to secondary mode (in case of
2573   DRBD).
2574
2575   @param instance_name: if the argument is not empty, the symlinks
2576       of this instance will be removed
2577   @type disks: list of L{objects.Disk}
2578   @param disks: the list of disks to be closed
2579   @rtype: tuple (success, message)
2580   @return: a tuple of success and message, where success
2581       indicates the succes of the operation, and message
2582       which will contain the error details in case we
2583       failed
2584
2585   """
2586   bdevs = []
2587   for cf in disks:
2588     rd = _RecursiveFindBD(cf)
2589     if rd is None:
2590       _Fail("Can't find device %s", cf)
2591     bdevs.append(rd)
2592
2593   msg = []
2594   for rd in bdevs:
2595     try:
2596       rd.Close()
2597     except errors.BlockDeviceError, err:
2598       msg.append(str(err))
2599   if msg:
2600     _Fail("Can't make devices secondary: %s", ",".join(msg))
2601   else:
2602     if instance_name:
2603       _RemoveBlockDevLinks(instance_name, disks)
2604
2605
2606 def ValidateHVParams(hvname, hvparams):
2607   """Validates the given hypervisor parameters.
2608
2609   @type hvname: string
2610   @param hvname: the hypervisor name
2611   @type hvparams: dict
2612   @param hvparams: the hypervisor parameters to be validated
2613   @rtype: None
2614
2615   """
2616   try:
2617     hv_type = hypervisor.GetHypervisor(hvname)
2618     hv_type.ValidateParameters(hvparams)
2619   except errors.HypervisorError, err:
2620     _Fail(str(err), log=False)
2621
2622
2623 def _CheckOSPList(os_obj, parameters):
2624   """Check whether a list of parameters is supported by the OS.
2625
2626   @type os_obj: L{objects.OS}
2627   @param os_obj: OS object to check
2628   @type parameters: list
2629   @param parameters: the list of parameters to check
2630
2631   """
2632   supported = [v[0] for v in os_obj.supported_parameters]
2633   delta = frozenset(parameters).difference(supported)
2634   if delta:
2635     _Fail("The following parameters are not supported"
2636           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2637
2638
2639 def ValidateOS(required, osname, checks, osparams):
2640   """Validate the given OS' parameters.
2641
2642   @type required: boolean
2643   @param required: whether absence of the OS should translate into
2644       failure or not
2645   @type osname: string
2646   @param osname: the OS to be validated
2647   @type checks: list
2648   @param checks: list of the checks to run (currently only 'parameters')
2649   @type osparams: dict
2650   @param osparams: dictionary with OS parameters
2651   @rtype: boolean
2652   @return: True if the validation passed, or False if the OS was not
2653       found and L{required} was false
2654
2655   """
2656   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2657     _Fail("Unknown checks required for OS %s: %s", osname,
2658           set(checks).difference(constants.OS_VALIDATE_CALLS))
2659
2660   name_only = objects.OS.GetName(osname)
2661   status, tbv = _TryOSFromDisk(name_only, None)
2662
2663   if not status:
2664     if required:
2665       _Fail(tbv)
2666     else:
2667       return False
2668
2669   if max(tbv.api_versions) < constants.OS_API_V20:
2670     return True
2671
2672   if constants.OS_VALIDATE_PARAMETERS in checks:
2673     _CheckOSPList(tbv, osparams.keys())
2674
2675   validate_env = OSCoreEnv(osname, tbv, osparams)
2676   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2677                         cwd=tbv.path)
2678   if result.failed:
2679     logging.error("os validate command '%s' returned error: %s output: %s",
2680                   result.cmd, result.fail_reason, result.output)
2681     _Fail("OS validation script failed (%s), output: %s",
2682           result.fail_reason, result.output, log=False)
2683
2684   return True
2685
2686
2687 def DemoteFromMC():
2688   """Demotes the current node from master candidate role.
2689
2690   """
2691   # try to ensure we're not the master by mistake
2692   master, myself = ssconf.GetMasterAndMyself()
2693   if master == myself:
2694     _Fail("ssconf status shows I'm the master node, will not demote")
2695
2696   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2697   if not result.failed:
2698     _Fail("The master daemon is running, will not demote")
2699
2700   try:
2701     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2702       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2703   except EnvironmentError, err:
2704     if err.errno != errno.ENOENT:
2705       _Fail("Error while backing up cluster file: %s", err, exc=True)
2706
2707   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2708
2709
2710 def _GetX509Filenames(cryptodir, name):
2711   """Returns the full paths for the private key and certificate.
2712
2713   """
2714   return (utils.PathJoin(cryptodir, name),
2715           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2716           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2717
2718
2719 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2720   """Creates a new X509 certificate for SSL/TLS.
2721
2722   @type validity: int
2723   @param validity: Validity in seconds
2724   @rtype: tuple; (string, string)
2725   @return: Certificate name and public part
2726
2727   """
2728   (key_pem, cert_pem) = \
2729     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2730                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2731
2732   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2733                               prefix="x509-%s-" % utils.TimestampForFilename())
2734   try:
2735     name = os.path.basename(cert_dir)
2736     assert len(name) > 5
2737
2738     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2739
2740     utils.WriteFile(key_file, mode=0400, data=key_pem)
2741     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2742
2743     # Never return private key as it shouldn't leave the node
2744     return (name, cert_pem)
2745   except Exception:
2746     shutil.rmtree(cert_dir, ignore_errors=True)
2747     raise
2748
2749
2750 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2751   """Removes a X509 certificate.
2752
2753   @type name: string
2754   @param name: Certificate name
2755
2756   """
2757   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2758
2759   utils.RemoveFile(key_file)
2760   utils.RemoveFile(cert_file)
2761
2762   try:
2763     os.rmdir(cert_dir)
2764   except EnvironmentError, err:
2765     _Fail("Cannot remove certificate directory '%s': %s",
2766           cert_dir, err)
2767
2768
2769 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2770   """Returns the command for the requested input/output.
2771
2772   @type instance: L{objects.Instance}
2773   @param instance: The instance object
2774   @param mode: Import/export mode
2775   @param ieio: Input/output type
2776   @param ieargs: Input/output arguments
2777
2778   """
2779   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2780
2781   env = None
2782   prefix = None
2783   suffix = None
2784   exp_size = None
2785
2786   if ieio == constants.IEIO_FILE:
2787     (filename, ) = ieargs
2788
2789     if not utils.IsNormAbsPath(filename):
2790       _Fail("Path '%s' is not normalized or absolute", filename)
2791
2792     directory = os.path.normpath(os.path.dirname(filename))
2793
2794     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2795         constants.EXPORT_DIR):
2796       _Fail("File '%s' is not under exports directory '%s'",
2797             filename, constants.EXPORT_DIR)
2798
2799     # Create directory
2800     utils.Makedirs(directory, mode=0750)
2801
2802     quoted_filename = utils.ShellQuote(filename)
2803
2804     if mode == constants.IEM_IMPORT:
2805       suffix = "> %s" % quoted_filename
2806     elif mode == constants.IEM_EXPORT:
2807       suffix = "< %s" % quoted_filename
2808
2809       # Retrieve file size
2810       try:
2811         st = os.stat(filename)
2812       except EnvironmentError, err:
2813         logging.error("Can't stat(2) %s: %s", filename, err)
2814       else:
2815         exp_size = utils.BytesToMebibyte(st.st_size)
2816
2817   elif ieio == constants.IEIO_RAW_DISK:
2818     (disk, ) = ieargs
2819
2820     real_disk = _OpenRealBD(disk)
2821
2822     if mode == constants.IEM_IMPORT:
2823       # we set here a smaller block size as, due to transport buffering, more
2824       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2825       # is not already there or we pass a wrong path; we use notrunc to no
2826       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2827       # much memory; this means that at best, we flush every 64k, which will
2828       # not be very fast
2829       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2830                                     " bs=%s oflag=dsync"),
2831                                     real_disk.dev_path,
2832                                     str(64 * 1024))
2833
2834     elif mode == constants.IEM_EXPORT:
2835       # the block size on the read dd is 1MiB to match our units
2836       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2837                                    real_disk.dev_path,
2838                                    str(1024 * 1024), # 1 MB
2839                                    str(disk.size))
2840       exp_size = disk.size
2841
2842   elif ieio == constants.IEIO_SCRIPT:
2843     (disk, disk_index, ) = ieargs
2844
2845     assert isinstance(disk_index, (int, long))
2846
2847     real_disk = _OpenRealBD(disk)
2848
2849     inst_os = OSFromDisk(instance.os)
2850     env = OSEnvironment(instance, inst_os)
2851
2852     if mode == constants.IEM_IMPORT:
2853       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2854       env["IMPORT_INDEX"] = str(disk_index)
2855       script = inst_os.import_script
2856
2857     elif mode == constants.IEM_EXPORT:
2858       env["EXPORT_DEVICE"] = real_disk.dev_path
2859       env["EXPORT_INDEX"] = str(disk_index)
2860       script = inst_os.export_script
2861
2862     # TODO: Pass special environment only to script
2863     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2864
2865     if mode == constants.IEM_IMPORT:
2866       suffix = "| %s" % script_cmd
2867
2868     elif mode == constants.IEM_EXPORT:
2869       prefix = "%s |" % script_cmd
2870
2871     # Let script predict size
2872     exp_size = constants.IE_CUSTOM_SIZE
2873
2874   else:
2875     _Fail("Invalid %s I/O mode %r", mode, ieio)
2876
2877   return (env, prefix, suffix, exp_size)
2878
2879
2880 def _CreateImportExportStatusDir(prefix):
2881   """Creates status directory for import/export.
2882
2883   """
2884   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2885                           prefix=("%s-%s-" %
2886                                   (prefix, utils.TimestampForFilename())))
2887
2888
2889 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2890   """Starts an import or export daemon.
2891
2892   @param mode: Import/output mode
2893   @type opts: L{objects.ImportExportOptions}
2894   @param opts: Daemon options
2895   @type host: string
2896   @param host: Remote host for export (None for import)
2897   @type port: int
2898   @param port: Remote port for export (None for import)
2899   @type instance: L{objects.Instance}
2900   @param instance: Instance object
2901   @param ieio: Input/output type
2902   @param ieioargs: Input/output arguments
2903
2904   """
2905   if mode == constants.IEM_IMPORT:
2906     prefix = "import"
2907
2908     if not (host is None and port is None):
2909       _Fail("Can not specify host or port on import")
2910
2911   elif mode == constants.IEM_EXPORT:
2912     prefix = "export"
2913
2914     if host is None or port is None:
2915       _Fail("Host and port must be specified for an export")
2916
2917   else:
2918     _Fail("Invalid mode %r", mode)
2919
2920   if (opts.key_name is None) ^ (opts.ca_pem is None):
2921     _Fail("Cluster certificate can only be used for both key and CA")
2922
2923   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2924     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2925
2926   if opts.key_name is None:
2927     # Use server.pem
2928     key_path = constants.NODED_CERT_FILE
2929     cert_path = constants.NODED_CERT_FILE
2930     assert opts.ca_pem is None
2931   else:
2932     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2933                                                  opts.key_name)
2934     assert opts.ca_pem is not None
2935
2936   for i in [key_path, cert_path]:
2937     if not os.path.exists(i):
2938       _Fail("File '%s' does not exist" % i)
2939
2940   status_dir = _CreateImportExportStatusDir(prefix)
2941   try:
2942     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2943     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2944     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2945
2946     if opts.ca_pem is None:
2947       # Use server.pem
2948       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2949     else:
2950       ca = opts.ca_pem
2951
2952     # Write CA file
2953     utils.WriteFile(ca_file, data=ca, mode=0400)
2954
2955     cmd = [
2956       constants.IMPORT_EXPORT_DAEMON,
2957       status_file, mode,
2958       "--key=%s" % key_path,
2959       "--cert=%s" % cert_path,
2960       "--ca=%s" % ca_file,
2961       ]
2962
2963     if host:
2964       cmd.append("--host=%s" % host)
2965
2966     if port:
2967       cmd.append("--port=%s" % port)
2968
2969     if opts.ipv6:
2970       cmd.append("--ipv6")
2971     else:
2972       cmd.append("--ipv4")
2973
2974     if opts.compress:
2975       cmd.append("--compress=%s" % opts.compress)
2976
2977     if opts.magic:
2978       cmd.append("--magic=%s" % opts.magic)
2979
2980     if exp_size is not None:
2981       cmd.append("--expected-size=%s" % exp_size)
2982
2983     if cmd_prefix:
2984       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2985
2986     if cmd_suffix:
2987       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2988
2989     if mode == constants.IEM_EXPORT:
2990       # Retry connection a few times when connecting to remote peer
2991       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
2992       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
2993     elif opts.connect_timeout is not None:
2994       assert mode == constants.IEM_IMPORT
2995       # Overall timeout for establishing connection while listening
2996       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
2997
2998     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2999
3000     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3001     # support for receiving a file descriptor for output
3002     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3003                       output=logfile)
3004
3005     # The import/export name is simply the status directory name
3006     return os.path.basename(status_dir)
3007
3008   except Exception:
3009     shutil.rmtree(status_dir, ignore_errors=True)
3010     raise
3011
3012
3013 def GetImportExportStatus(names):
3014   """Returns import/export daemon status.
3015
3016   @type names: sequence
3017   @param names: List of names
3018   @rtype: List of dicts
3019   @return: Returns a list of the state of each named import/export or None if a
3020            status couldn't be read
3021
3022   """
3023   result = []
3024
3025   for name in names:
3026     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3027                                  _IES_STATUS_FILE)
3028
3029     try:
3030       data = utils.ReadFile(status_file)
3031     except EnvironmentError, err:
3032       if err.errno != errno.ENOENT:
3033         raise
3034       data = None
3035
3036     if not data:
3037       result.append(None)
3038       continue
3039
3040     result.append(serializer.LoadJson(data))
3041
3042   return result
3043
3044
3045 def AbortImportExport(name):
3046   """Sends SIGTERM to a running import/export daemon.
3047
3048   """
3049   logging.info("Abort import/export %s", name)
3050
3051   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3052   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3053
3054   if pid:
3055     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3056                  name, pid)
3057     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3058
3059
3060 def CleanupImportExport(name):
3061   """Cleanup after an import or export.
3062
3063   If the import/export daemon is still running it's killed. Afterwards the
3064   whole status directory is removed.
3065
3066   """
3067   logging.info("Finalizing import/export %s", name)
3068
3069   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3070
3071   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3072
3073   if pid:
3074     logging.info("Import/export %s is still running with PID %s",
3075                  name, pid)
3076     utils.KillProcess(pid, waitpid=False)
3077
3078   shutil.rmtree(status_dir, ignore_errors=True)
3079
3080
3081 def _FindDisks(nodes_ip, disks):
3082   """Sets the physical ID on disks and returns the block devices.
3083
3084   """
3085   # set the correct physical ID
3086   my_name = netutils.Hostname.GetSysName()
3087   for cf in disks:
3088     cf.SetPhysicalID(my_name, nodes_ip)
3089
3090   bdevs = []
3091
3092   for cf in disks:
3093     rd = _RecursiveFindBD(cf)
3094     if rd is None:
3095       _Fail("Can't find device %s", cf)
3096     bdevs.append(rd)
3097   return bdevs
3098
3099
3100 def DrbdDisconnectNet(nodes_ip, disks):
3101   """Disconnects the network on a list of drbd devices.
3102
3103   """
3104   bdevs = _FindDisks(nodes_ip, disks)
3105
3106   # disconnect disks
3107   for rd in bdevs:
3108     try:
3109       rd.DisconnectNet()
3110     except errors.BlockDeviceError, err:
3111       _Fail("Can't change network configuration to standalone mode: %s",
3112             err, exc=True)
3113
3114
3115 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3116   """Attaches the network on a list of drbd devices.
3117
3118   """
3119   bdevs = _FindDisks(nodes_ip, disks)
3120
3121   if multimaster:
3122     for idx, rd in enumerate(bdevs):
3123       try:
3124         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3125       except EnvironmentError, err:
3126         _Fail("Can't create symlink: %s", err)
3127   # reconnect disks, switch to new master configuration and if
3128   # needed primary mode
3129   for rd in bdevs:
3130     try:
3131       rd.AttachNet(multimaster)
3132     except errors.BlockDeviceError, err:
3133       _Fail("Can't change network configuration: %s", err)
3134
3135   # wait until the disks are connected; we need to retry the re-attach
3136   # if the device becomes standalone, as this might happen if the one
3137   # node disconnects and reconnects in a different mode before the
3138   # other node reconnects; in this case, one or both of the nodes will
3139   # decide it has wrong configuration and switch to standalone
3140
3141   def _Attach():
3142     all_connected = True
3143
3144     for rd in bdevs:
3145       stats = rd.GetProcStatus()
3146
3147       all_connected = (all_connected and
3148                        (stats.is_connected or stats.is_in_resync))
3149
3150       if stats.is_standalone:
3151         # peer had different config info and this node became
3152         # standalone, even though this should not happen with the
3153         # new staged way of changing disk configs
3154         try:
3155           rd.AttachNet(multimaster)
3156         except errors.BlockDeviceError, err:
3157           _Fail("Can't change network configuration: %s", err)
3158
3159     if not all_connected:
3160       raise utils.RetryAgain()
3161
3162   try:
3163     # Start with a delay of 100 miliseconds and go up to 5 seconds
3164     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3165   except utils.RetryTimeout:
3166     _Fail("Timeout in disk reconnecting")
3167
3168   if multimaster:
3169     # change to primary mode
3170     for rd in bdevs:
3171       try:
3172         rd.Open()
3173       except errors.BlockDeviceError, err:
3174         _Fail("Can't change to primary mode: %s", err)
3175
3176
3177 def DrbdWaitSync(nodes_ip, disks):
3178   """Wait until DRBDs have synchronized.
3179
3180   """
3181   def _helper(rd):
3182     stats = rd.GetProcStatus()
3183     if not (stats.is_connected or stats.is_in_resync):
3184       raise utils.RetryAgain()
3185     return stats
3186
3187   bdevs = _FindDisks(nodes_ip, disks)
3188
3189   min_resync = 100
3190   alldone = True
3191   for rd in bdevs:
3192     try:
3193       # poll each second for 15 seconds
3194       stats = utils.Retry(_helper, 1, 15, args=[rd])
3195     except utils.RetryTimeout:
3196       stats = rd.GetProcStatus()
3197       # last check
3198       if not (stats.is_connected or stats.is_in_resync):
3199         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3200     alldone = alldone and (not stats.is_in_resync)
3201     if stats.sync_percent is not None:
3202       min_resync = min(min_resync, stats.sync_percent)
3203
3204   return (alldone, min_resync)
3205
3206
3207 def GetDrbdUsermodeHelper():
3208   """Returns DRBD usermode helper currently configured.
3209
3210   """
3211   try:
3212     return bdev.BaseDRBD.GetUsermodeHelper()
3213   except errors.BlockDeviceError, err:
3214     _Fail(str(err))
3215
3216
3217 def PowercycleNode(hypervisor_type):
3218   """Hard-powercycle the node.
3219
3220   Because we need to return first, and schedule the powercycle in the
3221   background, we won't be able to report failures nicely.
3222
3223   """
3224   hyper = hypervisor.GetHypervisor(hypervisor_type)
3225   try:
3226     pid = os.fork()
3227   except OSError:
3228     # if we can't fork, we'll pretend that we're in the child process
3229     pid = 0
3230   if pid > 0:
3231     return "Reboot scheduled in 5 seconds"
3232   # ensure the child is running on ram
3233   try:
3234     utils.Mlockall()
3235   except Exception: # pylint: disable-msg=W0703
3236     pass
3237   time.sleep(5)
3238   hyper.PowercycleNode()
3239
3240
3241 class HooksRunner(object):
3242   """Hook runner.
3243
3244   This class is instantiated on the node side (ganeti-noded) and not
3245   on the master side.
3246
3247   """
3248   def __init__(self, hooks_base_dir=None):
3249     """Constructor for hooks runner.
3250
3251     @type hooks_base_dir: str or None
3252     @param hooks_base_dir: if not None, this overrides the
3253         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3254
3255     """
3256     if hooks_base_dir is None:
3257       hooks_base_dir = constants.HOOKS_BASE_DIR
3258     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3259     # constant
3260     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3261
3262   def RunHooks(self, hpath, phase, env):
3263     """Run the scripts in the hooks directory.
3264
3265     @type hpath: str
3266     @param hpath: the path to the hooks directory which
3267         holds the scripts
3268     @type phase: str
3269     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3270         L{constants.HOOKS_PHASE_POST}
3271     @type env: dict
3272     @param env: dictionary with the environment for the hook
3273     @rtype: list
3274     @return: list of 3-element tuples:
3275       - script path
3276       - script result, either L{constants.HKR_SUCCESS} or
3277         L{constants.HKR_FAIL}
3278       - output of the script
3279
3280     @raise errors.ProgrammerError: for invalid input
3281         parameters
3282
3283     """
3284     if phase == constants.HOOKS_PHASE_PRE:
3285       suffix = "pre"
3286     elif phase == constants.HOOKS_PHASE_POST:
3287       suffix = "post"
3288     else:
3289       _Fail("Unknown hooks phase '%s'", phase)
3290
3291
3292     subdir = "%s-%s.d" % (hpath, suffix)
3293     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3294
3295     results = []
3296
3297     if not os.path.isdir(dir_name):
3298       # for non-existing/non-dirs, we simply exit instead of logging a
3299       # warning at every operation
3300       return results
3301
3302     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3303
3304     for (relname, relstatus, runresult)  in runparts_results:
3305       if relstatus == constants.RUNPARTS_SKIP:
3306         rrval = constants.HKR_SKIP
3307         output = ""
3308       elif relstatus == constants.RUNPARTS_ERR:
3309         rrval = constants.HKR_FAIL
3310         output = "Hook script execution error: %s" % runresult
3311       elif relstatus == constants.RUNPARTS_RUN:
3312         if runresult.failed:
3313           rrval = constants.HKR_FAIL
3314         else:
3315           rrval = constants.HKR_SUCCESS
3316         output = utils.SafeEncode(runresult.output.strip())
3317       results.append(("%s/%s" % (subdir, relname), rrval, output))
3318
3319     return results
3320
3321
3322 class IAllocatorRunner(object):
3323   """IAllocator runner.
3324
3325   This class is instantiated on the node side (ganeti-noded) and not on
3326   the master side.
3327
3328   """
3329   @staticmethod
3330   def Run(name, idata):
3331     """Run an iallocator script.
3332
3333     @type name: str
3334     @param name: the iallocator script name
3335     @type idata: str
3336     @param idata: the allocator input data
3337
3338     @rtype: tuple
3339     @return: two element tuple of:
3340        - status
3341        - either error message or stdout of allocator (for success)
3342
3343     """
3344     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3345                                   os.path.isfile)
3346     if alloc_script is None:
3347       _Fail("iallocator module '%s' not found in the search path", name)
3348
3349     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3350     try:
3351       os.write(fd, idata)
3352       os.close(fd)
3353       result = utils.RunCmd([alloc_script, fin_name])
3354       if result.failed:
3355         _Fail("iallocator module '%s' failed: %s, output '%s'",
3356               name, result.fail_reason, result.output)
3357     finally:
3358       os.unlink(fin_name)
3359
3360     return result.stdout
3361
3362
3363 class DevCacheManager(object):
3364   """Simple class for managing a cache of block device information.
3365
3366   """
3367   _DEV_PREFIX = "/dev/"
3368   _ROOT_DIR = constants.BDEV_CACHE_DIR
3369
3370   @classmethod
3371   def _ConvertPath(cls, dev_path):
3372     """Converts a /dev/name path to the cache file name.
3373
3374     This replaces slashes with underscores and strips the /dev
3375     prefix. It then returns the full path to the cache file.
3376
3377     @type dev_path: str
3378     @param dev_path: the C{/dev/} path name
3379     @rtype: str
3380     @return: the converted path name
3381
3382     """
3383     if dev_path.startswith(cls._DEV_PREFIX):
3384       dev_path = dev_path[len(cls._DEV_PREFIX):]
3385     dev_path = dev_path.replace("/", "_")
3386     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3387     return fpath
3388
3389   @classmethod
3390   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3391     """Updates the cache information for a given device.
3392
3393     @type dev_path: str
3394     @param dev_path: the pathname of the device
3395     @type owner: str
3396     @param owner: the owner (instance name) of the device
3397     @type on_primary: bool
3398     @param on_primary: whether this is the primary
3399         node nor not
3400     @type iv_name: str
3401     @param iv_name: the instance-visible name of the
3402         device, as in objects.Disk.iv_name
3403
3404     @rtype: None
3405
3406     """
3407     if dev_path is None:
3408       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3409       return
3410     fpath = cls._ConvertPath(dev_path)
3411     if on_primary:
3412       state = "primary"
3413     else:
3414       state = "secondary"
3415     if iv_name is None:
3416       iv_name = "not_visible"
3417     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3418     try:
3419       utils.WriteFile(fpath, data=fdata)
3420     except EnvironmentError, err:
3421       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3422
3423   @classmethod
3424   def RemoveCache(cls, dev_path):
3425     """Remove data for a dev_path.
3426
3427     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3428     path name and logging.
3429
3430     @type dev_path: str
3431     @param dev_path: the pathname of the device
3432
3433     @rtype: None
3434
3435     """
3436     if dev_path is None:
3437       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3438       return
3439     fpath = cls._ConvertPath(dev_path)
3440     try:
3441       utils.RemoveFile(fpath)
3442     except EnvironmentError, err:
3443       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)