backend: Add support for IPv6 in import/export
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.RAPI_USERS_FILE,
200     constants.CONFD_HMAC_KEY,
201     constants.CLUSTER_DOMAIN_SECRET_FILE,
202     ])
203
204   for hv_name in constants.HYPER_TYPES:
205     hv_class = hypervisor.GetHypervisorClass(hv_name)
206     allowed_files.update(hv_class.GetAncillaryFiles())
207
208   return frozenset(allowed_files)
209
210
211 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212
213
214 def JobQueuePurge():
215   """Removes job queue files and archived jobs.
216
217   @rtype: tuple
218   @return: True, None
219
220   """
221   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
222   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223
224
225 def GetMasterInfo():
226   """Returns master information.
227
228   This is an utility function to compute master information, either
229   for consumption here or from the node daemon.
230
231   @rtype: tuple
232   @return: master_netdev, master_ip, master_name, primary_ip_family
233   @raise RPCFail: in case of errors
234
235   """
236   try:
237     cfg = _GetConfig()
238     master_netdev = cfg.GetMasterNetdev()
239     master_ip = cfg.GetMasterIP()
240     master_node = cfg.GetMasterNode()
241     primary_ip_family = cfg.GetPrimaryIPFamily()
242   except errors.ConfigurationError, err:
243     _Fail("Cluster configuration incomplete: %s", err, exc=True)
244   return (master_netdev, master_ip, master_node, primary_ip_family)
245
246
247 def StartMaster(start_daemons, no_voting):
248   """Activate local node as master node.
249
250   The function will either try activate the IP address of the master
251   (unless someone else has it) or also start the master daemons, based
252   on the start_daemons parameter.
253
254   @type start_daemons: boolean
255   @param start_daemons: whether to start the master daemons
256       (ganeti-masterd and ganeti-rapi), or (if false) activate the
257       master ip
258   @type no_voting: boolean
259   @param no_voting: whether to start ganeti-masterd without a node vote
260       (if start_daemons is True), but still non-interactively
261   @rtype: None
262
263   """
264   # GetMasterInfo will raise an exception if not able to return data
265   master_netdev, master_ip, _, family = GetMasterInfo()
266
267   err_msgs = []
268   # either start the master and rapi daemons
269   if start_daemons:
270     if no_voting:
271       masterd_args = "--no-voting --yes-do-it"
272     else:
273       masterd_args = ""
274
275     env = {
276       "EXTRA_MASTERD_ARGS": masterd_args,
277       }
278
279     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
280     if result.failed:
281       msg = "Can't start Ganeti master: %s" % result.output
282       logging.error(msg)
283       err_msgs.append(msg)
284   # or activate the IP
285   else:
286     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
287       if netutils.IPAddress.Own(master_ip):
288         # we already have the ip:
289         logging.debug("Master IP already configured, doing nothing")
290       else:
291         msg = "Someone else has the master ip, not activating"
292         logging.error(msg)
293         err_msgs.append(msg)
294     else:
295       ipcls = netutils.IP4Address
296       if family == netutils.IP6Address.family:
297         ipcls = netutils.IP6Address
298
299       result = utils.RunCmd(["ip", "address", "add",
300                              "%s/%d" % (master_ip, ipcls.iplen),
301                              "dev", master_netdev, "label",
302                              "%s:0" % master_netdev])
303       if result.failed:
304         msg = "Can't activate master IP: %s" % result.output
305         logging.error(msg)
306         err_msgs.append(msg)
307
308       # we ignore the exit code of the following cmds
309       if ipcls == netutils.IP4Address:
310         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
311                       master_ip, master_ip])
312       elif ipcls == netutils.IP6Address:
313         try:
314           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
315         except errors.OpExecError:
316           # TODO: Better error reporting
317           logging.warning("Can't execute ndisc6, please install if missing")
318
319   if err_msgs:
320     _Fail("; ".join(err_msgs))
321
322
323 def StopMaster(stop_daemons):
324   """Deactivate this node as master.
325
326   The function will always try to deactivate the IP address of the
327   master. It will also stop the master daemons depending on the
328   stop_daemons parameter.
329
330   @type stop_daemons: boolean
331   @param stop_daemons: whether to also stop the master daemons
332       (ganeti-masterd and ganeti-rapi)
333   @rtype: None
334
335   """
336   # TODO: log and report back to the caller the error failures; we
337   # need to decide in which case we fail the RPC for this
338
339   # GetMasterInfo will raise an exception if not able to return data
340   master_netdev, master_ip, _, family = GetMasterInfo()
341
342   ipcls = netutils.IP4Address
343   if family == netutils.IP6Address.family:
344     ipcls = netutils.IP6Address
345
346   result = utils.RunCmd(["ip", "address", "del",
347                          "%s/%d" % (master_ip, ipcls.iplen),
348                          "dev", master_netdev])
349   if result.failed:
350     logging.error("Can't remove the master IP, error: %s", result.output)
351     # but otherwise ignore the failure
352
353   if stop_daemons:
354     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355     if result.failed:
356       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357                     " and error %s",
358                     result.cmd, result.exit_code, result.output)
359
360
361 def EtcHostsModify(mode, host, ip):
362   """Modify a host entry in /etc/hosts.
363
364   @param mode: The mode to operate. Either add or remove entry
365   @param host: The host to operate on
366   @param ip: The ip associated with the entry
367
368   """
369   if mode == constants.ETC_HOSTS_ADD:
370     if not ip:
371       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
372               " present")
373     utils.AddHostToEtcHosts(host, ip)
374   elif mode == constants.ETC_HOSTS_REMOVE:
375     if ip:
376       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
377               " parameter is present")
378     utils.RemoveHostFromEtcHosts(host)
379   else:
380     RPCFail("Mode not supported")
381
382
383 def LeaveCluster(modify_ssh_setup):
384   """Cleans up and remove the current node.
385
386   This function cleans up and prepares the current node to be removed
387   from the cluster.
388
389   If processing is successful, then it raises an
390   L{errors.QuitGanetiException} which is used as a special case to
391   shutdown the node daemon.
392
393   @param modify_ssh_setup: boolean
394
395   """
396   _CleanDirectory(constants.DATA_DIR)
397   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398   JobQueuePurge()
399
400   if modify_ssh_setup:
401     try:
402       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403
404       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405
406       utils.RemoveFile(priv_key)
407       utils.RemoveFile(pub_key)
408     except errors.OpExecError:
409       logging.exception("Error while processing ssh files")
410
411   try:
412     utils.RemoveFile(constants.CONFD_HMAC_KEY)
413     utils.RemoveFile(constants.RAPI_CERT_FILE)
414     utils.RemoveFile(constants.NODED_CERT_FILE)
415   except: # pylint: disable-msg=W0702
416     logging.exception("Error while removing cluster secrets")
417
418   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419   if result.failed:
420     logging.error("Command %s failed with exitcode %s and error %s",
421                   result.cmd, result.exit_code, result.output)
422
423   # Raise a custom exception (handled in ganeti-noded)
424   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
425
426
427 def GetNodeInfo(vgname, hypervisor_type):
428   """Gives back a hash with different information about the node.
429
430   @type vgname: C{string}
431   @param vgname: the name of the volume group to ask for disk space information
432   @type hypervisor_type: C{str}
433   @param hypervisor_type: the name of the hypervisor to ask for
434       memory information
435   @rtype: C{dict}
436   @return: dictionary with the following keys:
437       - vg_size is the size of the configured volume group in MiB
438       - vg_free is the free size of the volume group in MiB
439       - memory_dom0 is the memory allocated for domain0 in MiB
440       - memory_free is the currently available (free) ram in MiB
441       - memory_total is the total number of ram in MiB
442
443   """
444   outputarray = {}
445
446   if vgname is not None:
447     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
448     vg_free = vg_size = None
449     if vginfo:
450       vg_free = int(round(vginfo[0][0], 0))
451       vg_size = int(round(vginfo[0][1], 0))
452     outputarray['vg_size'] = vg_size
453     outputarray['vg_free'] = vg_free
454
455   if hypervisor_type is not None:
456     hyper = hypervisor.GetHypervisor(hypervisor_type)
457     hyp_info = hyper.GetNodeInfo()
458     if hyp_info is not None:
459       outputarray.update(hyp_info)
460
461   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462
463   return outputarray
464
465
466 def VerifyNode(what, cluster_name):
467   """Verify the status of the local node.
468
469   Based on the input L{what} parameter, various checks are done on the
470   local node.
471
472   If the I{filelist} key is present, this list of
473   files is checksummed and the file/checksum pairs are returned.
474
475   If the I{nodelist} key is present, we check that we have
476   connectivity via ssh with the target nodes (and check the hostname
477   report).
478
479   If the I{node-net-test} key is present, we check that we have
480   connectivity to the given nodes via both primary IP and, if
481   applicable, secondary IPs.
482
483   @type what: C{dict}
484   @param what: a dictionary of things to check:
485       - filelist: list of files for which to compute checksums
486       - nodelist: list of nodes we should check ssh communication with
487       - node-net-test: list of nodes we should check node daemon port
488         connectivity with
489       - hypervisor: list with hypervisors to run the verify for
490   @rtype: dict
491   @return: a dictionary with the same keys as the input dict, and
492       values representing the result of the checks
493
494   """
495   result = {}
496   my_name = netutils.Hostname.GetSysName()
497   port = netutils.GetDaemonPort(constants.NODED)
498   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
499
500   if constants.NV_HYPERVISOR in what and vm_capable:
501     result[constants.NV_HYPERVISOR] = tmp = {}
502     for hv_name in what[constants.NV_HYPERVISOR]:
503       try:
504         val = hypervisor.GetHypervisor(hv_name).Verify()
505       except errors.HypervisorError, err:
506         val = "Error while checking hypervisor: %s" % str(err)
507       tmp[hv_name] = val
508
509   if constants.NV_FILELIST in what:
510     result[constants.NV_FILELIST] = utils.FingerprintFiles(
511       what[constants.NV_FILELIST])
512
513   if constants.NV_NODELIST in what:
514     result[constants.NV_NODELIST] = tmp = {}
515     random.shuffle(what[constants.NV_NODELIST])
516     for node in what[constants.NV_NODELIST]:
517       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
518       if not success:
519         tmp[node] = message
520
521   if constants.NV_NODENETTEST in what:
522     result[constants.NV_NODENETTEST] = tmp = {}
523     my_pip = my_sip = None
524     for name, pip, sip in what[constants.NV_NODENETTEST]:
525       if name == my_name:
526         my_pip = pip
527         my_sip = sip
528         break
529     if not my_pip:
530       tmp[my_name] = ("Can't find my own primary/secondary IP"
531                       " in the node list")
532     else:
533       for name, pip, sip in what[constants.NV_NODENETTEST]:
534         fail = []
535         if not netutils.TcpPing(pip, port, source=my_pip):
536           fail.append("primary")
537         if sip != pip:
538           if not netutils.TcpPing(sip, port, source=my_sip):
539             fail.append("secondary")
540         if fail:
541           tmp[name] = ("failure using the %s interface(s)" %
542                        " and ".join(fail))
543
544   if constants.NV_MASTERIP in what:
545     # FIXME: add checks on incoming data structures (here and in the
546     # rest of the function)
547     master_name, master_ip = what[constants.NV_MASTERIP]
548     if master_name == my_name:
549       source = constants.IP4_ADDRESS_LOCALHOST
550     else:
551       source = None
552     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
553                                                   source=source)
554
555   if constants.NV_LVLIST in what and vm_capable:
556     try:
557       val = GetVolumeList(what[constants.NV_LVLIST])
558     except RPCFail, err:
559       val = str(err)
560     result[constants.NV_LVLIST] = val
561
562   if constants.NV_INSTANCELIST in what and vm_capable:
563     # GetInstanceList can fail
564     try:
565       val = GetInstanceList(what[constants.NV_INSTANCELIST])
566     except RPCFail, err:
567       val = str(err)
568     result[constants.NV_INSTANCELIST] = val
569
570   if constants.NV_VGLIST in what and vm_capable:
571     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
572
573   if constants.NV_PVLIST in what and vm_capable:
574     result[constants.NV_PVLIST] = \
575       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
576                                    filter_allocatable=False)
577
578   if constants.NV_VERSION in what:
579     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
580                                     constants.RELEASE_VERSION)
581
582   if constants.NV_HVINFO in what and vm_capable:
583     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
584     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
585
586   if constants.NV_DRBDLIST in what and vm_capable:
587     try:
588       used_minors = bdev.DRBD8.GetUsedDevs().keys()
589     except errors.BlockDeviceError, err:
590       logging.warning("Can't get used minors list", exc_info=True)
591       used_minors = str(err)
592     result[constants.NV_DRBDLIST] = used_minors
593
594   if constants.NV_DRBDHELPER in what and vm_capable:
595     status = True
596     try:
597       payload = bdev.BaseDRBD.GetUsermodeHelper()
598     except errors.BlockDeviceError, err:
599       logging.error("Can't get DRBD usermode helper: %s", str(err))
600       status = False
601       payload = str(err)
602     result[constants.NV_DRBDHELPER] = (status, payload)
603
604   if constants.NV_NODESETUP in what:
605     result[constants.NV_NODESETUP] = tmpr = []
606     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
607       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
608                   " under /sys, missing required directories /sys/block"
609                   " and /sys/class/net")
610     if (not os.path.isdir("/proc/sys") or
611         not os.path.isfile("/proc/sysrq-trigger")):
612       tmpr.append("The procfs filesystem doesn't seem to be mounted"
613                   " under /proc, missing required directory /proc/sys and"
614                   " the file /proc/sysrq-trigger")
615
616   if constants.NV_TIME in what:
617     result[constants.NV_TIME] = utils.SplitTime(time.time())
618
619   if constants.NV_OSLIST in what and vm_capable:
620     result[constants.NV_OSLIST] = DiagnoseOS()
621
622   return result
623
624
625 def GetVolumeList(vg_name):
626   """Compute list of logical volumes and their size.
627
628   @type vg_name: str
629   @param vg_name: the volume group whose LVs we should list
630   @rtype: dict
631   @return:
632       dictionary of all partions (key) with value being a tuple of
633       their size (in MiB), inactive and online status::
634
635         {'test1': ('20.06', True, True)}
636
637       in case of errors, a string is returned with the error
638       details.
639
640   """
641   lvs = {}
642   sep = '|'
643   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
644                          "--separator=%s" % sep,
645                          "-olv_name,lv_size,lv_attr", vg_name])
646   if result.failed:
647     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
648
649   for line in result.stdout.splitlines():
650     line = line.strip()
651     match = _LVSLINE_REGEX.match(line)
652     if not match:
653       logging.error("Invalid line returned from lvs output: '%s'", line)
654       continue
655     name, size, attr = match.groups()
656     inactive = attr[4] == '-'
657     online = attr[5] == 'o'
658     virtual = attr[0] == 'v'
659     if virtual:
660       # we don't want to report such volumes as existing, since they
661       # don't really hold data
662       continue
663     lvs[name] = (size, inactive, online)
664
665   return lvs
666
667
668 def ListVolumeGroups():
669   """List the volume groups and their size.
670
671   @rtype: dict
672   @return: dictionary with keys volume name and values the
673       size of the volume
674
675   """
676   return utils.ListVolumeGroups()
677
678
679 def NodeVolumes():
680   """List all volumes on this node.
681
682   @rtype: list
683   @return:
684     A list of dictionaries, each having four keys:
685       - name: the logical volume name,
686       - size: the size of the logical volume
687       - dev: the physical device on which the LV lives
688       - vg: the volume group to which it belongs
689
690     In case of errors, we return an empty list and log the
691     error.
692
693     Note that since a logical volume can live on multiple physical
694     volumes, the resulting list might include a logical volume
695     multiple times.
696
697   """
698   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
699                          "--separator=|",
700                          "--options=lv_name,lv_size,devices,vg_name"])
701   if result.failed:
702     _Fail("Failed to list logical volumes, lvs output: %s",
703           result.output)
704
705   def parse_dev(dev):
706     return dev.split('(')[0]
707
708   def handle_dev(dev):
709     return [parse_dev(x) for x in dev.split(",")]
710
711   def map_line(line):
712     line = [v.strip() for v in line]
713     return [{'name': line[0], 'size': line[1],
714              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
715
716   all_devs = []
717   for line in result.stdout.splitlines():
718     if line.count('|') >= 3:
719       all_devs.extend(map_line(line.split('|')))
720     else:
721       logging.warning("Strange line in the output from lvs: '%s'", line)
722   return all_devs
723
724
725 def BridgesExist(bridges_list):
726   """Check if a list of bridges exist on the current node.
727
728   @rtype: boolean
729   @return: C{True} if all of them exist, C{False} otherwise
730
731   """
732   missing = []
733   for bridge in bridges_list:
734     if not utils.BridgeExists(bridge):
735       missing.append(bridge)
736
737   if missing:
738     _Fail("Missing bridges %s", utils.CommaJoin(missing))
739
740
741 def GetInstanceList(hypervisor_list):
742   """Provides a list of instances.
743
744   @type hypervisor_list: list
745   @param hypervisor_list: the list of hypervisors to query information
746
747   @rtype: list
748   @return: a list of all running instances on the current node
749     - instance1.example.com
750     - instance2.example.com
751
752   """
753   results = []
754   for hname in hypervisor_list:
755     try:
756       names = hypervisor.GetHypervisor(hname).ListInstances()
757       results.extend(names)
758     except errors.HypervisorError, err:
759       _Fail("Error enumerating instances (hypervisor %s): %s",
760             hname, err, exc=True)
761
762   return results
763
764
765 def GetInstanceInfo(instance, hname):
766   """Gives back the information about an instance as a dictionary.
767
768   @type instance: string
769   @param instance: the instance name
770   @type hname: string
771   @param hname: the hypervisor type of the instance
772
773   @rtype: dict
774   @return: dictionary with the following keys:
775       - memory: memory size of instance (int)
776       - state: xen state of instance (string)
777       - time: cpu time of instance (float)
778
779   """
780   output = {}
781
782   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
783   if iinfo is not None:
784     output['memory'] = iinfo[2]
785     output['state'] = iinfo[4]
786     output['time'] = iinfo[5]
787
788   return output
789
790
791 def GetInstanceMigratable(instance):
792   """Gives whether an instance can be migrated.
793
794   @type instance: L{objects.Instance}
795   @param instance: object representing the instance to be checked.
796
797   @rtype: tuple
798   @return: tuple of (result, description) where:
799       - result: whether the instance can be migrated or not
800       - description: a description of the issue, if relevant
801
802   """
803   hyper = hypervisor.GetHypervisor(instance.hypervisor)
804   iname = instance.name
805   if iname not in hyper.ListInstances():
806     _Fail("Instance %s is not running", iname)
807
808   for idx in range(len(instance.disks)):
809     link_name = _GetBlockDevSymlinkPath(iname, idx)
810     if not os.path.islink(link_name):
811       logging.warning("Instance %s is missing symlink %s for disk %d",
812                       iname, link_name, idx)
813
814
815 def GetAllInstancesInfo(hypervisor_list):
816   """Gather data about all instances.
817
818   This is the equivalent of L{GetInstanceInfo}, except that it
819   computes data for all instances at once, thus being faster if one
820   needs data about more than one instance.
821
822   @type hypervisor_list: list
823   @param hypervisor_list: list of hypervisors to query for instance data
824
825   @rtype: dict
826   @return: dictionary of instance: data, with data having the following keys:
827       - memory: memory size of instance (int)
828       - state: xen state of instance (string)
829       - time: cpu time of instance (float)
830       - vcpus: the number of vcpus
831
832   """
833   output = {}
834
835   for hname in hypervisor_list:
836     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
837     if iinfo:
838       for name, _, memory, vcpus, state, times in iinfo:
839         value = {
840           'memory': memory,
841           'vcpus': vcpus,
842           'state': state,
843           'time': times,
844           }
845         if name in output:
846           # we only check static parameters, like memory and vcpus,
847           # and not state and time which can change between the
848           # invocations of the different hypervisors
849           for key in 'memory', 'vcpus':
850             if value[key] != output[name][key]:
851               _Fail("Instance %s is running twice"
852                     " with different parameters", name)
853         output[name] = value
854
855   return output
856
857
858 def _InstanceLogName(kind, os_name, instance):
859   """Compute the OS log filename for a given instance and operation.
860
861   The instance name and os name are passed in as strings since not all
862   operations have these as part of an instance object.
863
864   @type kind: string
865   @param kind: the operation type (e.g. add, import, etc.)
866   @type os_name: string
867   @param os_name: the os name
868   @type instance: string
869   @param instance: the name of the instance being imported/added/etc.
870
871   """
872   # TODO: Use tempfile.mkstemp to create unique filename
873   base = ("%s-%s-%s-%s.log" %
874           (kind, os_name, instance, utils.TimestampForFilename()))
875   return utils.PathJoin(constants.LOG_OS_DIR, base)
876
877
878 def InstanceOsAdd(instance, reinstall, debug):
879   """Add an OS to an instance.
880
881   @type instance: L{objects.Instance}
882   @param instance: Instance whose OS is to be installed
883   @type reinstall: boolean
884   @param reinstall: whether this is an instance reinstall
885   @type debug: integer
886   @param debug: debug level, passed to the OS scripts
887   @rtype: None
888
889   """
890   inst_os = OSFromDisk(instance.os)
891
892   create_env = OSEnvironment(instance, inst_os, debug)
893   if reinstall:
894     create_env['INSTANCE_REINSTALL'] = "1"
895
896   logfile = _InstanceLogName("add", instance.os, instance.name)
897
898   result = utils.RunCmd([inst_os.create_script], env=create_env,
899                         cwd=inst_os.path, output=logfile,)
900   if result.failed:
901     logging.error("os create command '%s' returned error: %s, logfile: %s,"
902                   " output: %s", result.cmd, result.fail_reason, logfile,
903                   result.output)
904     lines = [utils.SafeEncode(val)
905              for val in utils.TailFile(logfile, lines=20)]
906     _Fail("OS create script failed (%s), last lines in the"
907           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
908
909
910 def RunRenameInstance(instance, old_name, debug):
911   """Run the OS rename script for an instance.
912
913   @type instance: L{objects.Instance}
914   @param instance: Instance whose OS is to be installed
915   @type old_name: string
916   @param old_name: previous instance name
917   @type debug: integer
918   @param debug: debug level, passed to the OS scripts
919   @rtype: boolean
920   @return: the success of the operation
921
922   """
923   inst_os = OSFromDisk(instance.os)
924
925   rename_env = OSEnvironment(instance, inst_os, debug)
926   rename_env['OLD_INSTANCE_NAME'] = old_name
927
928   logfile = _InstanceLogName("rename", instance.os,
929                              "%s-%s" % (old_name, instance.name))
930
931   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
932                         cwd=inst_os.path, output=logfile)
933
934   if result.failed:
935     logging.error("os create command '%s' returned error: %s output: %s",
936                   result.cmd, result.fail_reason, result.output)
937     lines = [utils.SafeEncode(val)
938              for val in utils.TailFile(logfile, lines=20)]
939     _Fail("OS rename script failed (%s), last lines in the"
940           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
941
942
943 def _GetBlockDevSymlinkPath(instance_name, idx):
944   return utils.PathJoin(constants.DISK_LINKS_DIR,
945                         "%s:%d" % (instance_name, idx))
946
947
948 def _SymlinkBlockDev(instance_name, device_path, idx):
949   """Set up symlinks to a instance's block device.
950
951   This is an auxiliary function run when an instance is start (on the primary
952   node) or when an instance is migrated (on the target node).
953
954
955   @param instance_name: the name of the target instance
956   @param device_path: path of the physical block device, on the node
957   @param idx: the disk index
958   @return: absolute path to the disk's symlink
959
960   """
961   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
962   try:
963     os.symlink(device_path, link_name)
964   except OSError, err:
965     if err.errno == errno.EEXIST:
966       if (not os.path.islink(link_name) or
967           os.readlink(link_name) != device_path):
968         os.remove(link_name)
969         os.symlink(device_path, link_name)
970     else:
971       raise
972
973   return link_name
974
975
976 def _RemoveBlockDevLinks(instance_name, disks):
977   """Remove the block device symlinks belonging to the given instance.
978
979   """
980   for idx, _ in enumerate(disks):
981     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
982     if os.path.islink(link_name):
983       try:
984         os.remove(link_name)
985       except OSError:
986         logging.exception("Can't remove symlink '%s'", link_name)
987
988
989 def _GatherAndLinkBlockDevs(instance):
990   """Set up an instance's block device(s).
991
992   This is run on the primary node at instance startup. The block
993   devices must be already assembled.
994
995   @type instance: L{objects.Instance}
996   @param instance: the instance whose disks we shoul assemble
997   @rtype: list
998   @return: list of (disk_object, device_path)
999
1000   """
1001   block_devices = []
1002   for idx, disk in enumerate(instance.disks):
1003     device = _RecursiveFindBD(disk)
1004     if device is None:
1005       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1006                                     str(disk))
1007     device.Open()
1008     try:
1009       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1010     except OSError, e:
1011       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1012                                     e.strerror)
1013
1014     block_devices.append((disk, link_name))
1015
1016   return block_devices
1017
1018
1019 def StartInstance(instance):
1020   """Start an instance.
1021
1022   @type instance: L{objects.Instance}
1023   @param instance: the instance object
1024   @rtype: None
1025
1026   """
1027   running_instances = GetInstanceList([instance.hypervisor])
1028
1029   if instance.name in running_instances:
1030     logging.info("Instance %s already running, not starting", instance.name)
1031     return
1032
1033   try:
1034     block_devices = _GatherAndLinkBlockDevs(instance)
1035     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1036     hyper.StartInstance(instance, block_devices)
1037   except errors.BlockDeviceError, err:
1038     _Fail("Block device error: %s", err, exc=True)
1039   except errors.HypervisorError, err:
1040     _RemoveBlockDevLinks(instance.name, instance.disks)
1041     _Fail("Hypervisor error: %s", err, exc=True)
1042
1043
1044 def InstanceShutdown(instance, timeout):
1045   """Shut an instance down.
1046
1047   @note: this functions uses polling with a hardcoded timeout.
1048
1049   @type instance: L{objects.Instance}
1050   @param instance: the instance object
1051   @type timeout: integer
1052   @param timeout: maximum timeout for soft shutdown
1053   @rtype: None
1054
1055   """
1056   hv_name = instance.hypervisor
1057   hyper = hypervisor.GetHypervisor(hv_name)
1058   iname = instance.name
1059
1060   if instance.name not in hyper.ListInstances():
1061     logging.info("Instance %s not running, doing nothing", iname)
1062     return
1063
1064   class _TryShutdown:
1065     def __init__(self):
1066       self.tried_once = False
1067
1068     def __call__(self):
1069       if iname not in hyper.ListInstances():
1070         return
1071
1072       try:
1073         hyper.StopInstance(instance, retry=self.tried_once)
1074       except errors.HypervisorError, err:
1075         if iname not in hyper.ListInstances():
1076           # if the instance is no longer existing, consider this a
1077           # success and go to cleanup
1078           return
1079
1080         _Fail("Failed to stop instance %s: %s", iname, err)
1081
1082       self.tried_once = True
1083
1084       raise utils.RetryAgain()
1085
1086   try:
1087     utils.Retry(_TryShutdown(), 5, timeout)
1088   except utils.RetryTimeout:
1089     # the shutdown did not succeed
1090     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1091
1092     try:
1093       hyper.StopInstance(instance, force=True)
1094     except errors.HypervisorError, err:
1095       if iname in hyper.ListInstances():
1096         # only raise an error if the instance still exists, otherwise
1097         # the error could simply be "instance ... unknown"!
1098         _Fail("Failed to force stop instance %s: %s", iname, err)
1099
1100     time.sleep(1)
1101
1102     if iname in hyper.ListInstances():
1103       _Fail("Could not shutdown instance %s even by destroy", iname)
1104
1105   try:
1106     hyper.CleanupInstance(instance.name)
1107   except errors.HypervisorError, err:
1108     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1109
1110   _RemoveBlockDevLinks(iname, instance.disks)
1111
1112
1113 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1114   """Reboot an instance.
1115
1116   @type instance: L{objects.Instance}
1117   @param instance: the instance object to reboot
1118   @type reboot_type: str
1119   @param reboot_type: the type of reboot, one the following
1120     constants:
1121       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1122         instance OS, do not recreate the VM
1123       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1124         restart the VM (at the hypervisor level)
1125       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1126         not accepted here, since that mode is handled differently, in
1127         cmdlib, and translates into full stop and start of the
1128         instance (instead of a call_instance_reboot RPC)
1129   @type shutdown_timeout: integer
1130   @param shutdown_timeout: maximum timeout for soft shutdown
1131   @rtype: None
1132
1133   """
1134   running_instances = GetInstanceList([instance.hypervisor])
1135
1136   if instance.name not in running_instances:
1137     _Fail("Cannot reboot instance %s that is not running", instance.name)
1138
1139   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1140   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1141     try:
1142       hyper.RebootInstance(instance)
1143     except errors.HypervisorError, err:
1144       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1145   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1146     try:
1147       InstanceShutdown(instance, shutdown_timeout)
1148       return StartInstance(instance)
1149     except errors.HypervisorError, err:
1150       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1151   else:
1152     _Fail("Invalid reboot_type received: %s", reboot_type)
1153
1154
1155 def MigrationInfo(instance):
1156   """Gather information about an instance to be migrated.
1157
1158   @type instance: L{objects.Instance}
1159   @param instance: the instance definition
1160
1161   """
1162   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1163   try:
1164     info = hyper.MigrationInfo(instance)
1165   except errors.HypervisorError, err:
1166     _Fail("Failed to fetch migration information: %s", err, exc=True)
1167   return info
1168
1169
1170 def AcceptInstance(instance, info, target):
1171   """Prepare the node to accept an instance.
1172
1173   @type instance: L{objects.Instance}
1174   @param instance: the instance definition
1175   @type info: string/data (opaque)
1176   @param info: migration information, from the source node
1177   @type target: string
1178   @param target: target host (usually ip), on this node
1179
1180   """
1181   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1182   try:
1183     hyper.AcceptInstance(instance, info, target)
1184   except errors.HypervisorError, err:
1185     _Fail("Failed to accept instance: %s", err, exc=True)
1186
1187
1188 def FinalizeMigration(instance, info, success):
1189   """Finalize any preparation to accept an instance.
1190
1191   @type instance: L{objects.Instance}
1192   @param instance: the instance definition
1193   @type info: string/data (opaque)
1194   @param info: migration information, from the source node
1195   @type success: boolean
1196   @param success: whether the migration was a success or a failure
1197
1198   """
1199   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1200   try:
1201     hyper.FinalizeMigration(instance, info, success)
1202   except errors.HypervisorError, err:
1203     _Fail("Failed to finalize migration: %s", err, exc=True)
1204
1205
1206 def MigrateInstance(instance, target, live):
1207   """Migrates an instance to another node.
1208
1209   @type instance: L{objects.Instance}
1210   @param instance: the instance definition
1211   @type target: string
1212   @param target: the target node name
1213   @type live: boolean
1214   @param live: whether the migration should be done live or not (the
1215       interpretation of this parameter is left to the hypervisor)
1216   @rtype: tuple
1217   @return: a tuple of (success, msg) where:
1218       - succes is a boolean denoting the success/failure of the operation
1219       - msg is a string with details in case of failure
1220
1221   """
1222   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1223
1224   try:
1225     hyper.MigrateInstance(instance, target, live)
1226   except errors.HypervisorError, err:
1227     _Fail("Failed to migrate instance: %s", err, exc=True)
1228
1229
1230 def BlockdevCreate(disk, size, owner, on_primary, info):
1231   """Creates a block device for an instance.
1232
1233   @type disk: L{objects.Disk}
1234   @param disk: the object describing the disk we should create
1235   @type size: int
1236   @param size: the size of the physical underlying device, in MiB
1237   @type owner: str
1238   @param owner: the name of the instance for which disk is created,
1239       used for device cache data
1240   @type on_primary: boolean
1241   @param on_primary:  indicates if it is the primary node or not
1242   @type info: string
1243   @param info: string that will be sent to the physical device
1244       creation, used for example to set (LVM) tags on LVs
1245
1246   @return: the new unique_id of the device (this can sometime be
1247       computed only after creation), or None. On secondary nodes,
1248       it's not required to return anything.
1249
1250   """
1251   # TODO: remove the obsolete 'size' argument
1252   # pylint: disable-msg=W0613
1253   clist = []
1254   if disk.children:
1255     for child in disk.children:
1256       try:
1257         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1258       except errors.BlockDeviceError, err:
1259         _Fail("Can't assemble device %s: %s", child, err)
1260       if on_primary or disk.AssembleOnSecondary():
1261         # we need the children open in case the device itself has to
1262         # be assembled
1263         try:
1264           # pylint: disable-msg=E1103
1265           crdev.Open()
1266         except errors.BlockDeviceError, err:
1267           _Fail("Can't make child '%s' read-write: %s", child, err)
1268       clist.append(crdev)
1269
1270   try:
1271     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1272   except errors.BlockDeviceError, err:
1273     _Fail("Can't create block device: %s", err)
1274
1275   if on_primary or disk.AssembleOnSecondary():
1276     try:
1277       device.Assemble()
1278     except errors.BlockDeviceError, err:
1279       _Fail("Can't assemble device after creation, unusual event: %s", err)
1280     device.SetSyncSpeed(constants.SYNC_SPEED)
1281     if on_primary or disk.OpenOnSecondary():
1282       try:
1283         device.Open(force=True)
1284       except errors.BlockDeviceError, err:
1285         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1286     DevCacheManager.UpdateCache(device.dev_path, owner,
1287                                 on_primary, disk.iv_name)
1288
1289   device.SetInfo(info)
1290
1291   return device.unique_id
1292
1293
1294 def _WipeDevice(path, offset, size):
1295   """This function actually wipes the device.
1296
1297   @param path: The path to the device to wipe
1298   @param offset: The offset in MiB in the file
1299   @param size: The size in MiB to write
1300
1301   """
1302   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1303          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1304          "count=%d" % size]
1305   result = utils.RunCmd(cmd)
1306
1307   if result.failed:
1308     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1309           result.fail_reason, result.output)
1310
1311
1312 def BlockdevWipe(disk, offset, size):
1313   """Wipes a block device.
1314
1315   @type disk: L{objects.Disk}
1316   @param disk: the disk object we want to wipe
1317   @type offset: int
1318   @param offset: The offset in MiB in the file
1319   @type size: int
1320   @param size: The size in MiB to write
1321
1322   """
1323   try:
1324     rdev = _RecursiveFindBD(disk)
1325   except errors.BlockDeviceError:
1326     rdev = None
1327
1328   if not rdev:
1329     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1330
1331   # Do cross verify some of the parameters
1332   if offset > rdev.size:
1333     _Fail("Offset is bigger than device size")
1334   if (offset + size) > rdev.size:
1335     _Fail("The provided offset and size to wipe is bigger than device size")
1336
1337   _WipeDevice(rdev.dev_path, offset, size)
1338
1339
1340 def BlockdevRemove(disk):
1341   """Remove a block device.
1342
1343   @note: This is intended to be called recursively.
1344
1345   @type disk: L{objects.Disk}
1346   @param disk: the disk object we should remove
1347   @rtype: boolean
1348   @return: the success of the operation
1349
1350   """
1351   msgs = []
1352   try:
1353     rdev = _RecursiveFindBD(disk)
1354   except errors.BlockDeviceError, err:
1355     # probably can't attach
1356     logging.info("Can't attach to device %s in remove", disk)
1357     rdev = None
1358   if rdev is not None:
1359     r_path = rdev.dev_path
1360     try:
1361       rdev.Remove()
1362     except errors.BlockDeviceError, err:
1363       msgs.append(str(err))
1364     if not msgs:
1365       DevCacheManager.RemoveCache(r_path)
1366
1367   if disk.children:
1368     for child in disk.children:
1369       try:
1370         BlockdevRemove(child)
1371       except RPCFail, err:
1372         msgs.append(str(err))
1373
1374   if msgs:
1375     _Fail("; ".join(msgs))
1376
1377
1378 def _RecursiveAssembleBD(disk, owner, as_primary):
1379   """Activate a block device for an instance.
1380
1381   This is run on the primary and secondary nodes for an instance.
1382
1383   @note: this function is called recursively.
1384
1385   @type disk: L{objects.Disk}
1386   @param disk: the disk we try to assemble
1387   @type owner: str
1388   @param owner: the name of the instance which owns the disk
1389   @type as_primary: boolean
1390   @param as_primary: if we should make the block device
1391       read/write
1392
1393   @return: the assembled device or None (in case no device
1394       was assembled)
1395   @raise errors.BlockDeviceError: in case there is an error
1396       during the activation of the children or the device
1397       itself
1398
1399   """
1400   children = []
1401   if disk.children:
1402     mcn = disk.ChildrenNeeded()
1403     if mcn == -1:
1404       mcn = 0 # max number of Nones allowed
1405     else:
1406       mcn = len(disk.children) - mcn # max number of Nones
1407     for chld_disk in disk.children:
1408       try:
1409         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1410       except errors.BlockDeviceError, err:
1411         if children.count(None) >= mcn:
1412           raise
1413         cdev = None
1414         logging.error("Error in child activation (but continuing): %s",
1415                       str(err))
1416       children.append(cdev)
1417
1418   if as_primary or disk.AssembleOnSecondary():
1419     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1420     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1421     result = r_dev
1422     if as_primary or disk.OpenOnSecondary():
1423       r_dev.Open()
1424     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1425                                 as_primary, disk.iv_name)
1426
1427   else:
1428     result = True
1429   return result
1430
1431
1432 def BlockdevAssemble(disk, owner, as_primary):
1433   """Activate a block device for an instance.
1434
1435   This is a wrapper over _RecursiveAssembleBD.
1436
1437   @rtype: str or boolean
1438   @return: a C{/dev/...} path for primary nodes, and
1439       C{True} for secondary nodes
1440
1441   """
1442   try:
1443     result = _RecursiveAssembleBD(disk, owner, as_primary)
1444     if isinstance(result, bdev.BlockDev):
1445       # pylint: disable-msg=E1103
1446       result = result.dev_path
1447   except errors.BlockDeviceError, err:
1448     _Fail("Error while assembling disk: %s", err, exc=True)
1449
1450   return result
1451
1452
1453 def BlockdevShutdown(disk):
1454   """Shut down a block device.
1455
1456   First, if the device is assembled (Attach() is successful), then
1457   the device is shutdown. Then the children of the device are
1458   shutdown.
1459
1460   This function is called recursively. Note that we don't cache the
1461   children or such, as oppossed to assemble, shutdown of different
1462   devices doesn't require that the upper device was active.
1463
1464   @type disk: L{objects.Disk}
1465   @param disk: the description of the disk we should
1466       shutdown
1467   @rtype: None
1468
1469   """
1470   msgs = []
1471   r_dev = _RecursiveFindBD(disk)
1472   if r_dev is not None:
1473     r_path = r_dev.dev_path
1474     try:
1475       r_dev.Shutdown()
1476       DevCacheManager.RemoveCache(r_path)
1477     except errors.BlockDeviceError, err:
1478       msgs.append(str(err))
1479
1480   if disk.children:
1481     for child in disk.children:
1482       try:
1483         BlockdevShutdown(child)
1484       except RPCFail, err:
1485         msgs.append(str(err))
1486
1487   if msgs:
1488     _Fail("; ".join(msgs))
1489
1490
1491 def BlockdevAddchildren(parent_cdev, new_cdevs):
1492   """Extend a mirrored block device.
1493
1494   @type parent_cdev: L{objects.Disk}
1495   @param parent_cdev: the disk to which we should add children
1496   @type new_cdevs: list of L{objects.Disk}
1497   @param new_cdevs: the list of children which we should add
1498   @rtype: None
1499
1500   """
1501   parent_bdev = _RecursiveFindBD(parent_cdev)
1502   if parent_bdev is None:
1503     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1504   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1505   if new_bdevs.count(None) > 0:
1506     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1507   parent_bdev.AddChildren(new_bdevs)
1508
1509
1510 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1511   """Shrink a mirrored block device.
1512
1513   @type parent_cdev: L{objects.Disk}
1514   @param parent_cdev: the disk from which we should remove children
1515   @type new_cdevs: list of L{objects.Disk}
1516   @param new_cdevs: the list of children which we should remove
1517   @rtype: None
1518
1519   """
1520   parent_bdev = _RecursiveFindBD(parent_cdev)
1521   if parent_bdev is None:
1522     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1523   devs = []
1524   for disk in new_cdevs:
1525     rpath = disk.StaticDevPath()
1526     if rpath is None:
1527       bd = _RecursiveFindBD(disk)
1528       if bd is None:
1529         _Fail("Can't find device %s while removing children", disk)
1530       else:
1531         devs.append(bd.dev_path)
1532     else:
1533       if not utils.IsNormAbsPath(rpath):
1534         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1535       devs.append(rpath)
1536   parent_bdev.RemoveChildren(devs)
1537
1538
1539 def BlockdevGetmirrorstatus(disks):
1540   """Get the mirroring status of a list of devices.
1541
1542   @type disks: list of L{objects.Disk}
1543   @param disks: the list of disks which we should query
1544   @rtype: disk
1545   @return: List of L{objects.BlockDevStatus}, one for each disk
1546   @raise errors.BlockDeviceError: if any of the disks cannot be
1547       found
1548
1549   """
1550   stats = []
1551   for dsk in disks:
1552     rbd = _RecursiveFindBD(dsk)
1553     if rbd is None:
1554       _Fail("Can't find device %s", dsk)
1555
1556     stats.append(rbd.CombinedSyncStatus())
1557
1558   return stats
1559
1560
1561 def BlockdevGetmirrorstatusMulti(disks):
1562   """Get the mirroring status of a list of devices.
1563
1564   @type disks: list of L{objects.Disk}
1565   @param disks: the list of disks which we should query
1566   @rtype: disk
1567   @return: List of tuples, (bool, status), one for each disk; bool denotes
1568     success/failure, status is L{objects.BlockDevStatus} on success, string
1569     otherwise
1570
1571   """
1572   result = []
1573   for disk in disks:
1574     try:
1575       rbd = _RecursiveFindBD(disk)
1576       if rbd is None:
1577         result.append((False, "Can't find device %s" % disk))
1578         continue
1579
1580       status = rbd.CombinedSyncStatus()
1581     except errors.BlockDeviceError, err:
1582       logging.exception("Error while getting disk status")
1583       result.append((False, str(err)))
1584     else:
1585       result.append((True, status))
1586
1587   assert len(disks) == len(result)
1588
1589   return result
1590
1591
1592 def _RecursiveFindBD(disk):
1593   """Check if a device is activated.
1594
1595   If so, return information about the real device.
1596
1597   @type disk: L{objects.Disk}
1598   @param disk: the disk object we need to find
1599
1600   @return: None if the device can't be found,
1601       otherwise the device instance
1602
1603   """
1604   children = []
1605   if disk.children:
1606     for chdisk in disk.children:
1607       children.append(_RecursiveFindBD(chdisk))
1608
1609   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1610
1611
1612 def _OpenRealBD(disk):
1613   """Opens the underlying block device of a disk.
1614
1615   @type disk: L{objects.Disk}
1616   @param disk: the disk object we want to open
1617
1618   """
1619   real_disk = _RecursiveFindBD(disk)
1620   if real_disk is None:
1621     _Fail("Block device '%s' is not set up", disk)
1622
1623   real_disk.Open()
1624
1625   return real_disk
1626
1627
1628 def BlockdevFind(disk):
1629   """Check if a device is activated.
1630
1631   If it is, return information about the real device.
1632
1633   @type disk: L{objects.Disk}
1634   @param disk: the disk to find
1635   @rtype: None or objects.BlockDevStatus
1636   @return: None if the disk cannot be found, otherwise a the current
1637            information
1638
1639   """
1640   try:
1641     rbd = _RecursiveFindBD(disk)
1642   except errors.BlockDeviceError, err:
1643     _Fail("Failed to find device: %s", err, exc=True)
1644
1645   if rbd is None:
1646     return None
1647
1648   return rbd.GetSyncStatus()
1649
1650
1651 def BlockdevGetsize(disks):
1652   """Computes the size of the given disks.
1653
1654   If a disk is not found, returns None instead.
1655
1656   @type disks: list of L{objects.Disk}
1657   @param disks: the list of disk to compute the size for
1658   @rtype: list
1659   @return: list with elements None if the disk cannot be found,
1660       otherwise the size
1661
1662   """
1663   result = []
1664   for cf in disks:
1665     try:
1666       rbd = _RecursiveFindBD(cf)
1667     except errors.BlockDeviceError:
1668       result.append(None)
1669       continue
1670     if rbd is None:
1671       result.append(None)
1672     else:
1673       result.append(rbd.GetActualSize())
1674   return result
1675
1676
1677 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1678   """Export a block device to a remote node.
1679
1680   @type disk: L{objects.Disk}
1681   @param disk: the description of the disk to export
1682   @type dest_node: str
1683   @param dest_node: the destination node to export to
1684   @type dest_path: str
1685   @param dest_path: the destination path on the target node
1686   @type cluster_name: str
1687   @param cluster_name: the cluster name, needed for SSH hostalias
1688   @rtype: None
1689
1690   """
1691   real_disk = _OpenRealBD(disk)
1692
1693   # the block size on the read dd is 1MiB to match our units
1694   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1695                                "dd if=%s bs=1048576 count=%s",
1696                                real_disk.dev_path, str(disk.size))
1697
1698   # we set here a smaller block size as, due to ssh buffering, more
1699   # than 64-128k will mostly ignored; we use nocreat to fail if the
1700   # device is not already there or we pass a wrong path; we use
1701   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1702   # to not buffer too much memory; this means that at best, we flush
1703   # every 64k, which will not be very fast
1704   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1705                                 " oflag=dsync", dest_path)
1706
1707   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1708                                                    constants.GANETI_RUNAS,
1709                                                    destcmd)
1710
1711   # all commands have been checked, so we're safe to combine them
1712   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1713
1714   result = utils.RunCmd(["bash", "-c", command])
1715
1716   if result.failed:
1717     _Fail("Disk copy command '%s' returned error: %s"
1718           " output: %s", command, result.fail_reason, result.output)
1719
1720
1721 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1722   """Write a file to the filesystem.
1723
1724   This allows the master to overwrite(!) a file. It will only perform
1725   the operation if the file belongs to a list of configuration files.
1726
1727   @type file_name: str
1728   @param file_name: the target file name
1729   @type data: str
1730   @param data: the new contents of the file
1731   @type mode: int
1732   @param mode: the mode to give the file (can be None)
1733   @type uid: int
1734   @param uid: the owner of the file (can be -1 for default)
1735   @type gid: int
1736   @param gid: the group of the file (can be -1 for default)
1737   @type atime: float
1738   @param atime: the atime to set on the file (can be None)
1739   @type mtime: float
1740   @param mtime: the mtime to set on the file (can be None)
1741   @rtype: None
1742
1743   """
1744   if not os.path.isabs(file_name):
1745     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1746
1747   if file_name not in _ALLOWED_UPLOAD_FILES:
1748     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1749           file_name)
1750
1751   raw_data = _Decompress(data)
1752
1753   utils.SafeWriteFile(file_name, None,
1754                       data=raw_data, mode=mode, uid=uid, gid=gid,
1755                       atime=atime, mtime=mtime)
1756
1757
1758 def WriteSsconfFiles(values):
1759   """Update all ssconf files.
1760
1761   Wrapper around the SimpleStore.WriteFiles.
1762
1763   """
1764   ssconf.SimpleStore().WriteFiles(values)
1765
1766
1767 def _ErrnoOrStr(err):
1768   """Format an EnvironmentError exception.
1769
1770   If the L{err} argument has an errno attribute, it will be looked up
1771   and converted into a textual C{E...} description. Otherwise the
1772   string representation of the error will be returned.
1773
1774   @type err: L{EnvironmentError}
1775   @param err: the exception to format
1776
1777   """
1778   if hasattr(err, 'errno'):
1779     detail = errno.errorcode[err.errno]
1780   else:
1781     detail = str(err)
1782   return detail
1783
1784
1785 def _OSOndiskAPIVersion(os_dir):
1786   """Compute and return the API version of a given OS.
1787
1788   This function will try to read the API version of the OS residing in
1789   the 'os_dir' directory.
1790
1791   @type os_dir: str
1792   @param os_dir: the directory in which we should look for the OS
1793   @rtype: tuple
1794   @return: tuple (status, data) with status denoting the validity and
1795       data holding either the vaid versions or an error message
1796
1797   """
1798   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1799
1800   try:
1801     st = os.stat(api_file)
1802   except EnvironmentError, err:
1803     return False, ("Required file '%s' not found under path %s: %s" %
1804                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1805
1806   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1807     return False, ("File '%s' in %s is not a regular file" %
1808                    (constants.OS_API_FILE, os_dir))
1809
1810   try:
1811     api_versions = utils.ReadFile(api_file).splitlines()
1812   except EnvironmentError, err:
1813     return False, ("Error while reading the API version file at %s: %s" %
1814                    (api_file, _ErrnoOrStr(err)))
1815
1816   try:
1817     api_versions = [int(version.strip()) for version in api_versions]
1818   except (TypeError, ValueError), err:
1819     return False, ("API version(s) can't be converted to integer: %s" %
1820                    str(err))
1821
1822   return True, api_versions
1823
1824
1825 def DiagnoseOS(top_dirs=None):
1826   """Compute the validity for all OSes.
1827
1828   @type top_dirs: list
1829   @param top_dirs: the list of directories in which to
1830       search (if not given defaults to
1831       L{constants.OS_SEARCH_PATH})
1832   @rtype: list of L{objects.OS}
1833   @return: a list of tuples (name, path, status, diagnose, variants,
1834       parameters, api_version) for all (potential) OSes under all
1835       search paths, where:
1836           - name is the (potential) OS name
1837           - path is the full path to the OS
1838           - status True/False is the validity of the OS
1839           - diagnose is the error message for an invalid OS, otherwise empty
1840           - variants is a list of supported OS variants, if any
1841           - parameters is a list of (name, help) parameters, if any
1842           - api_version is a list of support OS API versions
1843
1844   """
1845   if top_dirs is None:
1846     top_dirs = constants.OS_SEARCH_PATH
1847
1848   result = []
1849   for dir_name in top_dirs:
1850     if os.path.isdir(dir_name):
1851       try:
1852         f_names = utils.ListVisibleFiles(dir_name)
1853       except EnvironmentError, err:
1854         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1855         break
1856       for name in f_names:
1857         os_path = utils.PathJoin(dir_name, name)
1858         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1859         if status:
1860           diagnose = ""
1861           variants = os_inst.supported_variants
1862           parameters = os_inst.supported_parameters
1863           api_versions = os_inst.api_versions
1864         else:
1865           diagnose = os_inst
1866           variants = parameters = api_versions = []
1867         result.append((name, os_path, status, diagnose, variants,
1868                        parameters, api_versions))
1869
1870   return result
1871
1872
1873 def _TryOSFromDisk(name, base_dir=None):
1874   """Create an OS instance from disk.
1875
1876   This function will return an OS instance if the given name is a
1877   valid OS name.
1878
1879   @type base_dir: string
1880   @keyword base_dir: Base directory containing OS installations.
1881                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1882   @rtype: tuple
1883   @return: success and either the OS instance if we find a valid one,
1884       or error message
1885
1886   """
1887   if base_dir is None:
1888     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1889   else:
1890     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1891
1892   if os_dir is None:
1893     return False, "Directory for OS %s not found in search path" % name
1894
1895   status, api_versions = _OSOndiskAPIVersion(os_dir)
1896   if not status:
1897     # push the error up
1898     return status, api_versions
1899
1900   if not constants.OS_API_VERSIONS.intersection(api_versions):
1901     return False, ("API version mismatch for path '%s': found %s, want %s." %
1902                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1903
1904   # OS Files dictionary, we will populate it with the absolute path names
1905   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1906
1907   if max(api_versions) >= constants.OS_API_V15:
1908     os_files[constants.OS_VARIANTS_FILE] = ''
1909
1910   if max(api_versions) >= constants.OS_API_V20:
1911     os_files[constants.OS_PARAMETERS_FILE] = ''
1912   else:
1913     del os_files[constants.OS_SCRIPT_VERIFY]
1914
1915   for filename in os_files:
1916     os_files[filename] = utils.PathJoin(os_dir, filename)
1917
1918     try:
1919       st = os.stat(os_files[filename])
1920     except EnvironmentError, err:
1921       return False, ("File '%s' under path '%s' is missing (%s)" %
1922                      (filename, os_dir, _ErrnoOrStr(err)))
1923
1924     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1925       return False, ("File '%s' under path '%s' is not a regular file" %
1926                      (filename, os_dir))
1927
1928     if filename in constants.OS_SCRIPTS:
1929       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1930         return False, ("File '%s' under path '%s' is not executable" %
1931                        (filename, os_dir))
1932
1933   variants = []
1934   if constants.OS_VARIANTS_FILE in os_files:
1935     variants_file = os_files[constants.OS_VARIANTS_FILE]
1936     try:
1937       variants = utils.ReadFile(variants_file).splitlines()
1938     except EnvironmentError, err:
1939       return False, ("Error while reading the OS variants file at %s: %s" %
1940                      (variants_file, _ErrnoOrStr(err)))
1941     if not variants:
1942       return False, ("No supported os variant found")
1943
1944   parameters = []
1945   if constants.OS_PARAMETERS_FILE in os_files:
1946     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1947     try:
1948       parameters = utils.ReadFile(parameters_file).splitlines()
1949     except EnvironmentError, err:
1950       return False, ("Error while reading the OS parameters file at %s: %s" %
1951                      (parameters_file, _ErrnoOrStr(err)))
1952     parameters = [v.split(None, 1) for v in parameters]
1953
1954   os_obj = objects.OS(name=name, path=os_dir,
1955                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1956                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1957                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1958                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1959                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1960                                                  None),
1961                       supported_variants=variants,
1962                       supported_parameters=parameters,
1963                       api_versions=api_versions)
1964   return True, os_obj
1965
1966
1967 def OSFromDisk(name, base_dir=None):
1968   """Create an OS instance from disk.
1969
1970   This function will return an OS instance if the given name is a
1971   valid OS name. Otherwise, it will raise an appropriate
1972   L{RPCFail} exception, detailing why this is not a valid OS.
1973
1974   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1975   an exception but returns true/false status data.
1976
1977   @type base_dir: string
1978   @keyword base_dir: Base directory containing OS installations.
1979                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1980   @rtype: L{objects.OS}
1981   @return: the OS instance if we find a valid one
1982   @raise RPCFail: if we don't find a valid OS
1983
1984   """
1985   name_only = objects.OS.GetName(name)
1986   status, payload = _TryOSFromDisk(name_only, base_dir)
1987
1988   if not status:
1989     _Fail(payload)
1990
1991   return payload
1992
1993
1994 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1995   """Calculate the basic environment for an os script.
1996
1997   @type os_name: str
1998   @param os_name: full operating system name (including variant)
1999   @type inst_os: L{objects.OS}
2000   @param inst_os: operating system for which the environment is being built
2001   @type os_params: dict
2002   @param os_params: the OS parameters
2003   @type debug: integer
2004   @param debug: debug level (0 or 1, for OS Api 10)
2005   @rtype: dict
2006   @return: dict of environment variables
2007   @raise errors.BlockDeviceError: if the block device
2008       cannot be found
2009
2010   """
2011   result = {}
2012   api_version = \
2013     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2014   result['OS_API_VERSION'] = '%d' % api_version
2015   result['OS_NAME'] = inst_os.name
2016   result['DEBUG_LEVEL'] = '%d' % debug
2017
2018   # OS variants
2019   if api_version >= constants.OS_API_V15:
2020     variant = objects.OS.GetVariant(os_name)
2021     if not variant:
2022       variant = inst_os.supported_variants[0]
2023     result['OS_VARIANT'] = variant
2024
2025   # OS params
2026   for pname, pvalue in os_params.items():
2027     result['OSP_%s' % pname.upper()] = pvalue
2028
2029   return result
2030
2031
2032 def OSEnvironment(instance, inst_os, debug=0):
2033   """Calculate the environment for an os script.
2034
2035   @type instance: L{objects.Instance}
2036   @param instance: target instance for the os script run
2037   @type inst_os: L{objects.OS}
2038   @param inst_os: operating system for which the environment is being built
2039   @type debug: integer
2040   @param debug: debug level (0 or 1, for OS Api 10)
2041   @rtype: dict
2042   @return: dict of environment variables
2043   @raise errors.BlockDeviceError: if the block device
2044       cannot be found
2045
2046   """
2047   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2048
2049   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2050     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2051
2052   result['HYPERVISOR'] = instance.hypervisor
2053   result['DISK_COUNT'] = '%d' % len(instance.disks)
2054   result['NIC_COUNT'] = '%d' % len(instance.nics)
2055
2056   # Disks
2057   for idx, disk in enumerate(instance.disks):
2058     real_disk = _OpenRealBD(disk)
2059     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2060     result['DISK_%d_ACCESS' % idx] = disk.mode
2061     if constants.HV_DISK_TYPE in instance.hvparams:
2062       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2063         instance.hvparams[constants.HV_DISK_TYPE]
2064     if disk.dev_type in constants.LDS_BLOCK:
2065       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2066     elif disk.dev_type == constants.LD_FILE:
2067       result['DISK_%d_BACKEND_TYPE' % idx] = \
2068         'file:%s' % disk.physical_id[0]
2069
2070   # NICs
2071   for idx, nic in enumerate(instance.nics):
2072     result['NIC_%d_MAC' % idx] = nic.mac
2073     if nic.ip:
2074       result['NIC_%d_IP' % idx] = nic.ip
2075     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2076     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2077       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2078     if nic.nicparams[constants.NIC_LINK]:
2079       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2080     if constants.HV_NIC_TYPE in instance.hvparams:
2081       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2082         instance.hvparams[constants.HV_NIC_TYPE]
2083
2084   # HV/BE params
2085   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2086     for key, value in source.items():
2087       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2088
2089   return result
2090
2091
2092 def BlockdevGrow(disk, amount):
2093   """Grow a stack of block devices.
2094
2095   This function is called recursively, with the childrens being the
2096   first ones to resize.
2097
2098   @type disk: L{objects.Disk}
2099   @param disk: the disk to be grown
2100   @rtype: (status, result)
2101   @return: a tuple with the status of the operation
2102       (True/False), and the errors message if status
2103       is False
2104
2105   """
2106   r_dev = _RecursiveFindBD(disk)
2107   if r_dev is None:
2108     _Fail("Cannot find block device %s", disk)
2109
2110   try:
2111     r_dev.Grow(amount)
2112   except errors.BlockDeviceError, err:
2113     _Fail("Failed to grow block device: %s", err, exc=True)
2114
2115
2116 def BlockdevSnapshot(disk):
2117   """Create a snapshot copy of a block device.
2118
2119   This function is called recursively, and the snapshot is actually created
2120   just for the leaf lvm backend device.
2121
2122   @type disk: L{objects.Disk}
2123   @param disk: the disk to be snapshotted
2124   @rtype: string
2125   @return: snapshot disk path
2126
2127   """
2128   if disk.dev_type == constants.LD_DRBD8:
2129     if not disk.children:
2130       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2131             disk.unique_id)
2132     return BlockdevSnapshot(disk.children[0])
2133   elif disk.dev_type == constants.LD_LV:
2134     r_dev = _RecursiveFindBD(disk)
2135     if r_dev is not None:
2136       # FIXME: choose a saner value for the snapshot size
2137       # let's stay on the safe side and ask for the full size, for now
2138       return r_dev.Snapshot(disk.size)
2139     else:
2140       _Fail("Cannot find block device %s", disk)
2141   else:
2142     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2143           disk.unique_id, disk.dev_type)
2144
2145
2146 def FinalizeExport(instance, snap_disks):
2147   """Write out the export configuration information.
2148
2149   @type instance: L{objects.Instance}
2150   @param instance: the instance which we export, used for
2151       saving configuration
2152   @type snap_disks: list of L{objects.Disk}
2153   @param snap_disks: list of snapshot block devices, which
2154       will be used to get the actual name of the dump file
2155
2156   @rtype: None
2157
2158   """
2159   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2160   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2161
2162   config = objects.SerializableConfigParser()
2163
2164   config.add_section(constants.INISECT_EXP)
2165   config.set(constants.INISECT_EXP, 'version', '0')
2166   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2167   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2168   config.set(constants.INISECT_EXP, 'os', instance.os)
2169   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2170
2171   config.add_section(constants.INISECT_INS)
2172   config.set(constants.INISECT_INS, 'name', instance.name)
2173   config.set(constants.INISECT_INS, 'memory', '%d' %
2174              instance.beparams[constants.BE_MEMORY])
2175   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2176              instance.beparams[constants.BE_VCPUS])
2177   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2178   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2179
2180   nic_total = 0
2181   for nic_count, nic in enumerate(instance.nics):
2182     nic_total += 1
2183     config.set(constants.INISECT_INS, 'nic%d_mac' %
2184                nic_count, '%s' % nic.mac)
2185     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2186     for param in constants.NICS_PARAMETER_TYPES:
2187       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2188                  '%s' % nic.nicparams.get(param, None))
2189   # TODO: redundant: on load can read nics until it doesn't exist
2190   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2191
2192   disk_total = 0
2193   for disk_count, disk in enumerate(snap_disks):
2194     if disk:
2195       disk_total += 1
2196       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2197                  ('%s' % disk.iv_name))
2198       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2199                  ('%s' % disk.physical_id[1]))
2200       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2201                  ('%d' % disk.size))
2202
2203   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2204
2205   # New-style hypervisor/backend parameters
2206
2207   config.add_section(constants.INISECT_HYP)
2208   for name, value in instance.hvparams.items():
2209     if name not in constants.HVC_GLOBALS:
2210       config.set(constants.INISECT_HYP, name, str(value))
2211
2212   config.add_section(constants.INISECT_BEP)
2213   for name, value in instance.beparams.items():
2214     config.set(constants.INISECT_BEP, name, str(value))
2215
2216   config.add_section(constants.INISECT_OSP)
2217   for name, value in instance.osparams.items():
2218     config.set(constants.INISECT_OSP, name, str(value))
2219
2220   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2221                   data=config.Dumps())
2222   shutil.rmtree(finaldestdir, ignore_errors=True)
2223   shutil.move(destdir, finaldestdir)
2224
2225
2226 def ExportInfo(dest):
2227   """Get export configuration information.
2228
2229   @type dest: str
2230   @param dest: directory containing the export
2231
2232   @rtype: L{objects.SerializableConfigParser}
2233   @return: a serializable config file containing the
2234       export info
2235
2236   """
2237   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2238
2239   config = objects.SerializableConfigParser()
2240   config.read(cff)
2241
2242   if (not config.has_section(constants.INISECT_EXP) or
2243       not config.has_section(constants.INISECT_INS)):
2244     _Fail("Export info file doesn't have the required fields")
2245
2246   return config.Dumps()
2247
2248
2249 def ListExports():
2250   """Return a list of exports currently available on this machine.
2251
2252   @rtype: list
2253   @return: list of the exports
2254
2255   """
2256   if os.path.isdir(constants.EXPORT_DIR):
2257     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2258   else:
2259     _Fail("No exports directory")
2260
2261
2262 def RemoveExport(export):
2263   """Remove an existing export from the node.
2264
2265   @type export: str
2266   @param export: the name of the export to remove
2267   @rtype: None
2268
2269   """
2270   target = utils.PathJoin(constants.EXPORT_DIR, export)
2271
2272   try:
2273     shutil.rmtree(target)
2274   except EnvironmentError, err:
2275     _Fail("Error while removing the export: %s", err, exc=True)
2276
2277
2278 def BlockdevRename(devlist):
2279   """Rename a list of block devices.
2280
2281   @type devlist: list of tuples
2282   @param devlist: list of tuples of the form  (disk,
2283       new_logical_id, new_physical_id); disk is an
2284       L{objects.Disk} object describing the current disk,
2285       and new logical_id/physical_id is the name we
2286       rename it to
2287   @rtype: boolean
2288   @return: True if all renames succeeded, False otherwise
2289
2290   """
2291   msgs = []
2292   result = True
2293   for disk, unique_id in devlist:
2294     dev = _RecursiveFindBD(disk)
2295     if dev is None:
2296       msgs.append("Can't find device %s in rename" % str(disk))
2297       result = False
2298       continue
2299     try:
2300       old_rpath = dev.dev_path
2301       dev.Rename(unique_id)
2302       new_rpath = dev.dev_path
2303       if old_rpath != new_rpath:
2304         DevCacheManager.RemoveCache(old_rpath)
2305         # FIXME: we should add the new cache information here, like:
2306         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2307         # but we don't have the owner here - maybe parse from existing
2308         # cache? for now, we only lose lvm data when we rename, which
2309         # is less critical than DRBD or MD
2310     except errors.BlockDeviceError, err:
2311       msgs.append("Can't rename device '%s' to '%s': %s" %
2312                   (dev, unique_id, err))
2313       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2314       result = False
2315   if not result:
2316     _Fail("; ".join(msgs))
2317
2318
2319 def _TransformFileStorageDir(file_storage_dir):
2320   """Checks whether given file_storage_dir is valid.
2321
2322   Checks wheter the given file_storage_dir is within the cluster-wide
2323   default file_storage_dir stored in SimpleStore. Only paths under that
2324   directory are allowed.
2325
2326   @type file_storage_dir: str
2327   @param file_storage_dir: the path to check
2328
2329   @return: the normalized path if valid, None otherwise
2330
2331   """
2332   if not constants.ENABLE_FILE_STORAGE:
2333     _Fail("File storage disabled at configure time")
2334   cfg = _GetConfig()
2335   file_storage_dir = os.path.normpath(file_storage_dir)
2336   base_file_storage_dir = cfg.GetFileStorageDir()
2337   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2338       base_file_storage_dir):
2339     _Fail("File storage directory '%s' is not under base file"
2340           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2341   return file_storage_dir
2342
2343
2344 def CreateFileStorageDir(file_storage_dir):
2345   """Create file storage directory.
2346
2347   @type file_storage_dir: str
2348   @param file_storage_dir: directory to create
2349
2350   @rtype: tuple
2351   @return: tuple with first element a boolean indicating wheter dir
2352       creation was successful or not
2353
2354   """
2355   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2356   if os.path.exists(file_storage_dir):
2357     if not os.path.isdir(file_storage_dir):
2358       _Fail("Specified storage dir '%s' is not a directory",
2359             file_storage_dir)
2360   else:
2361     try:
2362       os.makedirs(file_storage_dir, 0750)
2363     except OSError, err:
2364       _Fail("Cannot create file storage directory '%s': %s",
2365             file_storage_dir, err, exc=True)
2366
2367
2368 def RemoveFileStorageDir(file_storage_dir):
2369   """Remove file storage directory.
2370
2371   Remove it only if it's empty. If not log an error and return.
2372
2373   @type file_storage_dir: str
2374   @param file_storage_dir: the directory we should cleanup
2375   @rtype: tuple (success,)
2376   @return: tuple of one element, C{success}, denoting
2377       whether the operation was successful
2378
2379   """
2380   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2381   if os.path.exists(file_storage_dir):
2382     if not os.path.isdir(file_storage_dir):
2383       _Fail("Specified Storage directory '%s' is not a directory",
2384             file_storage_dir)
2385     # deletes dir only if empty, otherwise we want to fail the rpc call
2386     try:
2387       os.rmdir(file_storage_dir)
2388     except OSError, err:
2389       _Fail("Cannot remove file storage directory '%s': %s",
2390             file_storage_dir, err)
2391
2392
2393 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2394   """Rename the file storage directory.
2395
2396   @type old_file_storage_dir: str
2397   @param old_file_storage_dir: the current path
2398   @type new_file_storage_dir: str
2399   @param new_file_storage_dir: the name we should rename to
2400   @rtype: tuple (success,)
2401   @return: tuple of one element, C{success}, denoting
2402       whether the operation was successful
2403
2404   """
2405   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2406   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2407   if not os.path.exists(new_file_storage_dir):
2408     if os.path.isdir(old_file_storage_dir):
2409       try:
2410         os.rename(old_file_storage_dir, new_file_storage_dir)
2411       except OSError, err:
2412         _Fail("Cannot rename '%s' to '%s': %s",
2413               old_file_storage_dir, new_file_storage_dir, err)
2414     else:
2415       _Fail("Specified storage dir '%s' is not a directory",
2416             old_file_storage_dir)
2417   else:
2418     if os.path.exists(old_file_storage_dir):
2419       _Fail("Cannot rename '%s' to '%s': both locations exist",
2420             old_file_storage_dir, new_file_storage_dir)
2421
2422
2423 def _EnsureJobQueueFile(file_name):
2424   """Checks whether the given filename is in the queue directory.
2425
2426   @type file_name: str
2427   @param file_name: the file name we should check
2428   @rtype: None
2429   @raises RPCFail: if the file is not valid
2430
2431   """
2432   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2433   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2434
2435   if not result:
2436     _Fail("Passed job queue file '%s' does not belong to"
2437           " the queue directory '%s'", file_name, queue_dir)
2438
2439
2440 def JobQueueUpdate(file_name, content):
2441   """Updates a file in the queue directory.
2442
2443   This is just a wrapper over L{utils.WriteFile}, with proper
2444   checking.
2445
2446   @type file_name: str
2447   @param file_name: the job file name
2448   @type content: str
2449   @param content: the new job contents
2450   @rtype: boolean
2451   @return: the success of the operation
2452
2453   """
2454   _EnsureJobQueueFile(file_name)
2455   getents = runtime.GetEnts()
2456
2457   # Write and replace the file atomically
2458   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2459                   gid=getents.masterd_gid)
2460
2461
2462 def JobQueueRename(old, new):
2463   """Renames a job queue file.
2464
2465   This is just a wrapper over os.rename with proper checking.
2466
2467   @type old: str
2468   @param old: the old (actual) file name
2469   @type new: str
2470   @param new: the desired file name
2471   @rtype: tuple
2472   @return: the success of the operation and payload
2473
2474   """
2475   _EnsureJobQueueFile(old)
2476   _EnsureJobQueueFile(new)
2477
2478   utils.RenameFile(old, new, mkdir=True)
2479
2480
2481 def BlockdevClose(instance_name, disks):
2482   """Closes the given block devices.
2483
2484   This means they will be switched to secondary mode (in case of
2485   DRBD).
2486
2487   @param instance_name: if the argument is not empty, the symlinks
2488       of this instance will be removed
2489   @type disks: list of L{objects.Disk}
2490   @param disks: the list of disks to be closed
2491   @rtype: tuple (success, message)
2492   @return: a tuple of success and message, where success
2493       indicates the succes of the operation, and message
2494       which will contain the error details in case we
2495       failed
2496
2497   """
2498   bdevs = []
2499   for cf in disks:
2500     rd = _RecursiveFindBD(cf)
2501     if rd is None:
2502       _Fail("Can't find device %s", cf)
2503     bdevs.append(rd)
2504
2505   msg = []
2506   for rd in bdevs:
2507     try:
2508       rd.Close()
2509     except errors.BlockDeviceError, err:
2510       msg.append(str(err))
2511   if msg:
2512     _Fail("Can't make devices secondary: %s", ",".join(msg))
2513   else:
2514     if instance_name:
2515       _RemoveBlockDevLinks(instance_name, disks)
2516
2517
2518 def ValidateHVParams(hvname, hvparams):
2519   """Validates the given hypervisor parameters.
2520
2521   @type hvname: string
2522   @param hvname: the hypervisor name
2523   @type hvparams: dict
2524   @param hvparams: the hypervisor parameters to be validated
2525   @rtype: None
2526
2527   """
2528   try:
2529     hv_type = hypervisor.GetHypervisor(hvname)
2530     hv_type.ValidateParameters(hvparams)
2531   except errors.HypervisorError, err:
2532     _Fail(str(err), log=False)
2533
2534
2535 def _CheckOSPList(os_obj, parameters):
2536   """Check whether a list of parameters is supported by the OS.
2537
2538   @type os_obj: L{objects.OS}
2539   @param os_obj: OS object to check
2540   @type parameters: list
2541   @param parameters: the list of parameters to check
2542
2543   """
2544   supported = [v[0] for v in os_obj.supported_parameters]
2545   delta = frozenset(parameters).difference(supported)
2546   if delta:
2547     _Fail("The following parameters are not supported"
2548           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2549
2550
2551 def ValidateOS(required, osname, checks, osparams):
2552   """Validate the given OS' parameters.
2553
2554   @type required: boolean
2555   @param required: whether absence of the OS should translate into
2556       failure or not
2557   @type osname: string
2558   @param osname: the OS to be validated
2559   @type checks: list
2560   @param checks: list of the checks to run (currently only 'parameters')
2561   @type osparams: dict
2562   @param osparams: dictionary with OS parameters
2563   @rtype: boolean
2564   @return: True if the validation passed, or False if the OS was not
2565       found and L{required} was false
2566
2567   """
2568   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2569     _Fail("Unknown checks required for OS %s: %s", osname,
2570           set(checks).difference(constants.OS_VALIDATE_CALLS))
2571
2572   name_only = objects.OS.GetName(osname)
2573   status, tbv = _TryOSFromDisk(name_only, None)
2574
2575   if not status:
2576     if required:
2577       _Fail(tbv)
2578     else:
2579       return False
2580
2581   if max(tbv.api_versions) < constants.OS_API_V20:
2582     return True
2583
2584   if constants.OS_VALIDATE_PARAMETERS in checks:
2585     _CheckOSPList(tbv, osparams.keys())
2586
2587   validate_env = OSCoreEnv(osname, tbv, osparams)
2588   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2589                         cwd=tbv.path)
2590   if result.failed:
2591     logging.error("os validate command '%s' returned error: %s output: %s",
2592                   result.cmd, result.fail_reason, result.output)
2593     _Fail("OS validation script failed (%s), output: %s",
2594           result.fail_reason, result.output, log=False)
2595
2596   return True
2597
2598
2599 def DemoteFromMC():
2600   """Demotes the current node from master candidate role.
2601
2602   """
2603   # try to ensure we're not the master by mistake
2604   master, myself = ssconf.GetMasterAndMyself()
2605   if master == myself:
2606     _Fail("ssconf status shows I'm the master node, will not demote")
2607
2608   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2609   if not result.failed:
2610     _Fail("The master daemon is running, will not demote")
2611
2612   try:
2613     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2614       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2615   except EnvironmentError, err:
2616     if err.errno != errno.ENOENT:
2617       _Fail("Error while backing up cluster file: %s", err, exc=True)
2618
2619   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2620
2621
2622 def _GetX509Filenames(cryptodir, name):
2623   """Returns the full paths for the private key and certificate.
2624
2625   """
2626   return (utils.PathJoin(cryptodir, name),
2627           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2628           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2629
2630
2631 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2632   """Creates a new X509 certificate for SSL/TLS.
2633
2634   @type validity: int
2635   @param validity: Validity in seconds
2636   @rtype: tuple; (string, string)
2637   @return: Certificate name and public part
2638
2639   """
2640   (key_pem, cert_pem) = \
2641     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2642                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2643
2644   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2645                               prefix="x509-%s-" % utils.TimestampForFilename())
2646   try:
2647     name = os.path.basename(cert_dir)
2648     assert len(name) > 5
2649
2650     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2651
2652     utils.WriteFile(key_file, mode=0400, data=key_pem)
2653     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2654
2655     # Never return private key as it shouldn't leave the node
2656     return (name, cert_pem)
2657   except Exception:
2658     shutil.rmtree(cert_dir, ignore_errors=True)
2659     raise
2660
2661
2662 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2663   """Removes a X509 certificate.
2664
2665   @type name: string
2666   @param name: Certificate name
2667
2668   """
2669   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2670
2671   utils.RemoveFile(key_file)
2672   utils.RemoveFile(cert_file)
2673
2674   try:
2675     os.rmdir(cert_dir)
2676   except EnvironmentError, err:
2677     _Fail("Cannot remove certificate directory '%s': %s",
2678           cert_dir, err)
2679
2680
2681 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2682   """Returns the command for the requested input/output.
2683
2684   @type instance: L{objects.Instance}
2685   @param instance: The instance object
2686   @param mode: Import/export mode
2687   @param ieio: Input/output type
2688   @param ieargs: Input/output arguments
2689
2690   """
2691   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2692
2693   env = None
2694   prefix = None
2695   suffix = None
2696   exp_size = None
2697
2698   if ieio == constants.IEIO_FILE:
2699     (filename, ) = ieargs
2700
2701     if not utils.IsNormAbsPath(filename):
2702       _Fail("Path '%s' is not normalized or absolute", filename)
2703
2704     directory = os.path.normpath(os.path.dirname(filename))
2705
2706     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2707         constants.EXPORT_DIR):
2708       _Fail("File '%s' is not under exports directory '%s'",
2709             filename, constants.EXPORT_DIR)
2710
2711     # Create directory
2712     utils.Makedirs(directory, mode=0750)
2713
2714     quoted_filename = utils.ShellQuote(filename)
2715
2716     if mode == constants.IEM_IMPORT:
2717       suffix = "> %s" % quoted_filename
2718     elif mode == constants.IEM_EXPORT:
2719       suffix = "< %s" % quoted_filename
2720
2721       # Retrieve file size
2722       try:
2723         st = os.stat(filename)
2724       except EnvironmentError, err:
2725         logging.error("Can't stat(2) %s: %s", filename, err)
2726       else:
2727         exp_size = utils.BytesToMebibyte(st.st_size)
2728
2729   elif ieio == constants.IEIO_RAW_DISK:
2730     (disk, ) = ieargs
2731
2732     real_disk = _OpenRealBD(disk)
2733
2734     if mode == constants.IEM_IMPORT:
2735       # we set here a smaller block size as, due to transport buffering, more
2736       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2737       # is not already there or we pass a wrong path; we use notrunc to no
2738       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2739       # much memory; this means that at best, we flush every 64k, which will
2740       # not be very fast
2741       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2742                                     " bs=%s oflag=dsync"),
2743                                     real_disk.dev_path,
2744                                     str(64 * 1024))
2745
2746     elif mode == constants.IEM_EXPORT:
2747       # the block size on the read dd is 1MiB to match our units
2748       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2749                                    real_disk.dev_path,
2750                                    str(1024 * 1024), # 1 MB
2751                                    str(disk.size))
2752       exp_size = disk.size
2753
2754   elif ieio == constants.IEIO_SCRIPT:
2755     (disk, disk_index, ) = ieargs
2756
2757     assert isinstance(disk_index, (int, long))
2758
2759     real_disk = _OpenRealBD(disk)
2760
2761     inst_os = OSFromDisk(instance.os)
2762     env = OSEnvironment(instance, inst_os)
2763
2764     if mode == constants.IEM_IMPORT:
2765       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2766       env["IMPORT_INDEX"] = str(disk_index)
2767       script = inst_os.import_script
2768
2769     elif mode == constants.IEM_EXPORT:
2770       env["EXPORT_DEVICE"] = real_disk.dev_path
2771       env["EXPORT_INDEX"] = str(disk_index)
2772       script = inst_os.export_script
2773
2774     # TODO: Pass special environment only to script
2775     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2776
2777     if mode == constants.IEM_IMPORT:
2778       suffix = "| %s" % script_cmd
2779
2780     elif mode == constants.IEM_EXPORT:
2781       prefix = "%s |" % script_cmd
2782
2783     # Let script predict size
2784     exp_size = constants.IE_CUSTOM_SIZE
2785
2786   else:
2787     _Fail("Invalid %s I/O mode %r", mode, ieio)
2788
2789   return (env, prefix, suffix, exp_size)
2790
2791
2792 def _CreateImportExportStatusDir(prefix):
2793   """Creates status directory for import/export.
2794
2795   """
2796   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2797                           prefix=("%s-%s-" %
2798                                   (prefix, utils.TimestampForFilename())))
2799
2800
2801 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2802   """Starts an import or export daemon.
2803
2804   @param mode: Import/output mode
2805   @type opts: L{objects.ImportExportOptions}
2806   @param opts: Daemon options
2807   @type host: string
2808   @param host: Remote host for export (None for import)
2809   @type port: int
2810   @param port: Remote port for export (None for import)
2811   @type instance: L{objects.Instance}
2812   @param instance: Instance object
2813   @param ieio: Input/output type
2814   @param ieioargs: Input/output arguments
2815
2816   """
2817   if mode == constants.IEM_IMPORT:
2818     prefix = "import"
2819
2820     if not (host is None and port is None):
2821       _Fail("Can not specify host or port on import")
2822
2823   elif mode == constants.IEM_EXPORT:
2824     prefix = "export"
2825
2826     if host is None or port is None:
2827       _Fail("Host and port must be specified for an export")
2828
2829   else:
2830     _Fail("Invalid mode %r", mode)
2831
2832   if (opts.key_name is None) ^ (opts.ca_pem is None):
2833     _Fail("Cluster certificate can only be used for both key and CA")
2834
2835   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2836     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2837
2838   if opts.key_name is None:
2839     # Use server.pem
2840     key_path = constants.NODED_CERT_FILE
2841     cert_path = constants.NODED_CERT_FILE
2842     assert opts.ca_pem is None
2843   else:
2844     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2845                                                  opts.key_name)
2846     assert opts.ca_pem is not None
2847
2848   for i in [key_path, cert_path]:
2849     if not os.path.exists(i):
2850       _Fail("File '%s' does not exist" % i)
2851
2852   status_dir = _CreateImportExportStatusDir(prefix)
2853   try:
2854     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2855     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2856     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2857
2858     if opts.ca_pem is None:
2859       # Use server.pem
2860       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2861     else:
2862       ca = opts.ca_pem
2863
2864     # Write CA file
2865     utils.WriteFile(ca_file, data=ca, mode=0400)
2866
2867     cmd = [
2868       constants.IMPORT_EXPORT_DAEMON,
2869       status_file, mode,
2870       "--key=%s" % key_path,
2871       "--cert=%s" % cert_path,
2872       "--ca=%s" % ca_file,
2873       ]
2874
2875     if host:
2876       cmd.append("--host=%s" % host)
2877
2878     if port:
2879       cmd.append("--port=%s" % port)
2880
2881     if opts.ipv6:
2882       cmd.append("--ipv6")
2883     else:
2884       cmd.append("--ipv4")
2885
2886     if opts.compress:
2887       cmd.append("--compress=%s" % opts.compress)
2888
2889     if opts.magic:
2890       cmd.append("--magic=%s" % opts.magic)
2891
2892     if exp_size is not None:
2893       cmd.append("--expected-size=%s" % exp_size)
2894
2895     if cmd_prefix:
2896       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2897
2898     if cmd_suffix:
2899       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2900
2901     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2902
2903     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2904     # support for receiving a file descriptor for output
2905     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2906                       output=logfile)
2907
2908     # The import/export name is simply the status directory name
2909     return os.path.basename(status_dir)
2910
2911   except Exception:
2912     shutil.rmtree(status_dir, ignore_errors=True)
2913     raise
2914
2915
2916 def GetImportExportStatus(names):
2917   """Returns import/export daemon status.
2918
2919   @type names: sequence
2920   @param names: List of names
2921   @rtype: List of dicts
2922   @return: Returns a list of the state of each named import/export or None if a
2923            status couldn't be read
2924
2925   """
2926   result = []
2927
2928   for name in names:
2929     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2930                                  _IES_STATUS_FILE)
2931
2932     try:
2933       data = utils.ReadFile(status_file)
2934     except EnvironmentError, err:
2935       if err.errno != errno.ENOENT:
2936         raise
2937       data = None
2938
2939     if not data:
2940       result.append(None)
2941       continue
2942
2943     result.append(serializer.LoadJson(data))
2944
2945   return result
2946
2947
2948 def AbortImportExport(name):
2949   """Sends SIGTERM to a running import/export daemon.
2950
2951   """
2952   logging.info("Abort import/export %s", name)
2953
2954   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2955   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2956
2957   if pid:
2958     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2959                  name, pid)
2960     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2961
2962
2963 def CleanupImportExport(name):
2964   """Cleanup after an import or export.
2965
2966   If the import/export daemon is still running it's killed. Afterwards the
2967   whole status directory is removed.
2968
2969   """
2970   logging.info("Finalizing import/export %s", name)
2971
2972   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2973
2974   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2975
2976   if pid:
2977     logging.info("Import/export %s is still running with PID %s",
2978                  name, pid)
2979     utils.KillProcess(pid, waitpid=False)
2980
2981   shutil.rmtree(status_dir, ignore_errors=True)
2982
2983
2984 def _FindDisks(nodes_ip, disks):
2985   """Sets the physical ID on disks and returns the block devices.
2986
2987   """
2988   # set the correct physical ID
2989   my_name = netutils.Hostname.GetSysName()
2990   for cf in disks:
2991     cf.SetPhysicalID(my_name, nodes_ip)
2992
2993   bdevs = []
2994
2995   for cf in disks:
2996     rd = _RecursiveFindBD(cf)
2997     if rd is None:
2998       _Fail("Can't find device %s", cf)
2999     bdevs.append(rd)
3000   return bdevs
3001
3002
3003 def DrbdDisconnectNet(nodes_ip, disks):
3004   """Disconnects the network on a list of drbd devices.
3005
3006   """
3007   bdevs = _FindDisks(nodes_ip, disks)
3008
3009   # disconnect disks
3010   for rd in bdevs:
3011     try:
3012       rd.DisconnectNet()
3013     except errors.BlockDeviceError, err:
3014       _Fail("Can't change network configuration to standalone mode: %s",
3015             err, exc=True)
3016
3017
3018 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3019   """Attaches the network on a list of drbd devices.
3020
3021   """
3022   bdevs = _FindDisks(nodes_ip, disks)
3023
3024   if multimaster:
3025     for idx, rd in enumerate(bdevs):
3026       try:
3027         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3028       except EnvironmentError, err:
3029         _Fail("Can't create symlink: %s", err)
3030   # reconnect disks, switch to new master configuration and if
3031   # needed primary mode
3032   for rd in bdevs:
3033     try:
3034       rd.AttachNet(multimaster)
3035     except errors.BlockDeviceError, err:
3036       _Fail("Can't change network configuration: %s", err)
3037
3038   # wait until the disks are connected; we need to retry the re-attach
3039   # if the device becomes standalone, as this might happen if the one
3040   # node disconnects and reconnects in a different mode before the
3041   # other node reconnects; in this case, one or both of the nodes will
3042   # decide it has wrong configuration and switch to standalone
3043
3044   def _Attach():
3045     all_connected = True
3046
3047     for rd in bdevs:
3048       stats = rd.GetProcStatus()
3049
3050       all_connected = (all_connected and
3051                        (stats.is_connected or stats.is_in_resync))
3052
3053       if stats.is_standalone:
3054         # peer had different config info and this node became
3055         # standalone, even though this should not happen with the
3056         # new staged way of changing disk configs
3057         try:
3058           rd.AttachNet(multimaster)
3059         except errors.BlockDeviceError, err:
3060           _Fail("Can't change network configuration: %s", err)
3061
3062     if not all_connected:
3063       raise utils.RetryAgain()
3064
3065   try:
3066     # Start with a delay of 100 miliseconds and go up to 5 seconds
3067     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3068   except utils.RetryTimeout:
3069     _Fail("Timeout in disk reconnecting")
3070
3071   if multimaster:
3072     # change to primary mode
3073     for rd in bdevs:
3074       try:
3075         rd.Open()
3076       except errors.BlockDeviceError, err:
3077         _Fail("Can't change to primary mode: %s", err)
3078
3079
3080 def DrbdWaitSync(nodes_ip, disks):
3081   """Wait until DRBDs have synchronized.
3082
3083   """
3084   def _helper(rd):
3085     stats = rd.GetProcStatus()
3086     if not (stats.is_connected or stats.is_in_resync):
3087       raise utils.RetryAgain()
3088     return stats
3089
3090   bdevs = _FindDisks(nodes_ip, disks)
3091
3092   min_resync = 100
3093   alldone = True
3094   for rd in bdevs:
3095     try:
3096       # poll each second for 15 seconds
3097       stats = utils.Retry(_helper, 1, 15, args=[rd])
3098     except utils.RetryTimeout:
3099       stats = rd.GetProcStatus()
3100       # last check
3101       if not (stats.is_connected or stats.is_in_resync):
3102         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3103     alldone = alldone and (not stats.is_in_resync)
3104     if stats.sync_percent is not None:
3105       min_resync = min(min_resync, stats.sync_percent)
3106
3107   return (alldone, min_resync)
3108
3109
3110 def GetDrbdUsermodeHelper():
3111   """Returns DRBD usermode helper currently configured.
3112
3113   """
3114   try:
3115     return bdev.BaseDRBD.GetUsermodeHelper()
3116   except errors.BlockDeviceError, err:
3117     _Fail(str(err))
3118
3119
3120 def PowercycleNode(hypervisor_type):
3121   """Hard-powercycle the node.
3122
3123   Because we need to return first, and schedule the powercycle in the
3124   background, we won't be able to report failures nicely.
3125
3126   """
3127   hyper = hypervisor.GetHypervisor(hypervisor_type)
3128   try:
3129     pid = os.fork()
3130   except OSError:
3131     # if we can't fork, we'll pretend that we're in the child process
3132     pid = 0
3133   if pid > 0:
3134     return "Reboot scheduled in 5 seconds"
3135   # ensure the child is running on ram
3136   try:
3137     utils.Mlockall()
3138   except Exception: # pylint: disable-msg=W0703
3139     pass
3140   time.sleep(5)
3141   hyper.PowercycleNode()
3142
3143
3144 class HooksRunner(object):
3145   """Hook runner.
3146
3147   This class is instantiated on the node side (ganeti-noded) and not
3148   on the master side.
3149
3150   """
3151   def __init__(self, hooks_base_dir=None):
3152     """Constructor for hooks runner.
3153
3154     @type hooks_base_dir: str or None
3155     @param hooks_base_dir: if not None, this overrides the
3156         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3157
3158     """
3159     if hooks_base_dir is None:
3160       hooks_base_dir = constants.HOOKS_BASE_DIR
3161     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3162     # constant
3163     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3164
3165   def RunHooks(self, hpath, phase, env):
3166     """Run the scripts in the hooks directory.
3167
3168     @type hpath: str
3169     @param hpath: the path to the hooks directory which
3170         holds the scripts
3171     @type phase: str
3172     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3173         L{constants.HOOKS_PHASE_POST}
3174     @type env: dict
3175     @param env: dictionary with the environment for the hook
3176     @rtype: list
3177     @return: list of 3-element tuples:
3178       - script path
3179       - script result, either L{constants.HKR_SUCCESS} or
3180         L{constants.HKR_FAIL}
3181       - output of the script
3182
3183     @raise errors.ProgrammerError: for invalid input
3184         parameters
3185
3186     """
3187     if phase == constants.HOOKS_PHASE_PRE:
3188       suffix = "pre"
3189     elif phase == constants.HOOKS_PHASE_POST:
3190       suffix = "post"
3191     else:
3192       _Fail("Unknown hooks phase '%s'", phase)
3193
3194
3195     subdir = "%s-%s.d" % (hpath, suffix)
3196     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3197
3198     results = []
3199
3200     if not os.path.isdir(dir_name):
3201       # for non-existing/non-dirs, we simply exit instead of logging a
3202       # warning at every operation
3203       return results
3204
3205     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3206
3207     for (relname, relstatus, runresult)  in runparts_results:
3208       if relstatus == constants.RUNPARTS_SKIP:
3209         rrval = constants.HKR_SKIP
3210         output = ""
3211       elif relstatus == constants.RUNPARTS_ERR:
3212         rrval = constants.HKR_FAIL
3213         output = "Hook script execution error: %s" % runresult
3214       elif relstatus == constants.RUNPARTS_RUN:
3215         if runresult.failed:
3216           rrval = constants.HKR_FAIL
3217         else:
3218           rrval = constants.HKR_SUCCESS
3219         output = utils.SafeEncode(runresult.output.strip())
3220       results.append(("%s/%s" % (subdir, relname), rrval, output))
3221
3222     return results
3223
3224
3225 class IAllocatorRunner(object):
3226   """IAllocator runner.
3227
3228   This class is instantiated on the node side (ganeti-noded) and not on
3229   the master side.
3230
3231   """
3232   @staticmethod
3233   def Run(name, idata):
3234     """Run an iallocator script.
3235
3236     @type name: str
3237     @param name: the iallocator script name
3238     @type idata: str
3239     @param idata: the allocator input data
3240
3241     @rtype: tuple
3242     @return: two element tuple of:
3243        - status
3244        - either error message or stdout of allocator (for success)
3245
3246     """
3247     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3248                                   os.path.isfile)
3249     if alloc_script is None:
3250       _Fail("iallocator module '%s' not found in the search path", name)
3251
3252     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3253     try:
3254       os.write(fd, idata)
3255       os.close(fd)
3256       result = utils.RunCmd([alloc_script, fin_name])
3257       if result.failed:
3258         _Fail("iallocator module '%s' failed: %s, output '%s'",
3259               name, result.fail_reason, result.output)
3260     finally:
3261       os.unlink(fin_name)
3262
3263     return result.stdout
3264
3265
3266 class DevCacheManager(object):
3267   """Simple class for managing a cache of block device information.
3268
3269   """
3270   _DEV_PREFIX = "/dev/"
3271   _ROOT_DIR = constants.BDEV_CACHE_DIR
3272
3273   @classmethod
3274   def _ConvertPath(cls, dev_path):
3275     """Converts a /dev/name path to the cache file name.
3276
3277     This replaces slashes with underscores and strips the /dev
3278     prefix. It then returns the full path to the cache file.
3279
3280     @type dev_path: str
3281     @param dev_path: the C{/dev/} path name
3282     @rtype: str
3283     @return: the converted path name
3284
3285     """
3286     if dev_path.startswith(cls._DEV_PREFIX):
3287       dev_path = dev_path[len(cls._DEV_PREFIX):]
3288     dev_path = dev_path.replace("/", "_")
3289     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3290     return fpath
3291
3292   @classmethod
3293   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3294     """Updates the cache information for a given device.
3295
3296     @type dev_path: str
3297     @param dev_path: the pathname of the device
3298     @type owner: str
3299     @param owner: the owner (instance name) of the device
3300     @type on_primary: bool
3301     @param on_primary: whether this is the primary
3302         node nor not
3303     @type iv_name: str
3304     @param iv_name: the instance-visible name of the
3305         device, as in objects.Disk.iv_name
3306
3307     @rtype: None
3308
3309     """
3310     if dev_path is None:
3311       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3312       return
3313     fpath = cls._ConvertPath(dev_path)
3314     if on_primary:
3315       state = "primary"
3316     else:
3317       state = "secondary"
3318     if iv_name is None:
3319       iv_name = "not_visible"
3320     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3321     try:
3322       utils.WriteFile(fpath, data=fdata)
3323     except EnvironmentError, err:
3324       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3325
3326   @classmethod
3327   def RemoveCache(cls, dev_path):
3328     """Remove data for a dev_path.
3329
3330     This is just a wrapper over L{utils.RemoveFile} with a converted
3331     path name and logging.
3332
3333     @type dev_path: str
3334     @param dev_path: the pathname of the device
3335
3336     @rtype: None
3337
3338     """
3339     if dev_path is None:
3340       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3341       return
3342     fpath = cls._ConvertPath(dev_path)
3343     try:
3344       utils.RemoveFile(fpath)
3345     except EnvironmentError, err:
3346       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)