locking: Don't set default priority as keyword default
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79
80 class RPCFail(Exception):
81   """Class denoting RPC failure.
82
83   Its argument is the error message.
84
85   """
86
87
88 def _Fail(msg, *args, **kwargs):
89   """Log an error and the raise an RPCFail exception.
90
91   This exception is then handled specially in the ganeti daemon and
92   turned into a 'failed' return type. As such, this function is a
93   useful shortcut for logging the error and returning it to the master
94   daemon.
95
96   @type msg: string
97   @param msg: the text of the exception
98   @raise RPCFail
99
100   """
101   if args:
102     msg = msg % args
103   if "log" not in kwargs or kwargs["log"]: # if we should log this error
104     if "exc" in kwargs and kwargs["exc"]:
105       logging.exception(msg)
106     else:
107       logging.error(msg)
108   raise RPCFail(msg)
109
110
111 def _GetConfig():
112   """Simple wrapper to return a SimpleStore.
113
114   @rtype: L{ssconf.SimpleStore}
115   @return: a SimpleStore instance
116
117   """
118   return ssconf.SimpleStore()
119
120
121 def _GetSshRunner(cluster_name):
122   """Simple wrapper to return an SshRunner.
123
124   @type cluster_name: str
125   @param cluster_name: the cluster name, which is needed
126       by the SshRunner constructor
127   @rtype: L{ssh.SshRunner}
128   @return: an SshRunner instance
129
130   """
131   return ssh.SshRunner(cluster_name)
132
133
134 def _Decompress(data):
135   """Unpacks data compressed by the RPC client.
136
137   @type data: list or tuple
138   @param data: Data sent by RPC client
139   @rtype: str
140   @return: Decompressed data
141
142   """
143   assert isinstance(data, (list, tuple))
144   assert len(data) == 2
145   (encoding, content) = data
146   if encoding == constants.RPC_ENCODING_NONE:
147     return content
148   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149     return zlib.decompress(base64.b64decode(content))
150   else:
151     raise AssertionError("Unknown data encoding")
152
153
154 def _CleanDirectory(path, exclude=None):
155   """Removes all regular files in a directory.
156
157   @type path: str
158   @param path: the directory to clean
159   @type exclude: list
160   @param exclude: list of files to be excluded, defaults
161       to the empty list
162
163   """
164   if path not in _ALLOWED_CLEAN_DIRS:
165     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166           path)
167
168   if not os.path.isdir(path):
169     return
170   if exclude is None:
171     exclude = []
172   else:
173     # Normalize excluded paths
174     exclude = [os.path.normpath(i) for i in exclude]
175
176   for rel_name in utils.ListVisibleFiles(path):
177     full_name = utils.PathJoin(path, rel_name)
178     if full_name in exclude:
179       continue
180     if os.path.isfile(full_name) and not os.path.islink(full_name):
181       utils.RemoveFile(full_name)
182
183
184 def _BuildUploadFileList():
185   """Build the list of allowed upload files.
186
187   This is abstracted so that it's built only once at module import time.
188
189   """
190   allowed_files = set([
191     constants.CLUSTER_CONF_FILE,
192     constants.ETC_HOSTS,
193     constants.SSH_KNOWN_HOSTS_FILE,
194     constants.VNC_PASSWORD_FILE,
195     constants.RAPI_CERT_FILE,
196     constants.RAPI_USERS_FILE,
197     constants.CONFD_HMAC_KEY,
198     constants.CLUSTER_DOMAIN_SECRET_FILE,
199     ])
200
201   for hv_name in constants.HYPER_TYPES:
202     hv_class = hypervisor.GetHypervisorClass(hv_name)
203     allowed_files.update(hv_class.GetAncillaryFiles())
204
205   return frozenset(allowed_files)
206
207
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
209
210
211 def JobQueuePurge():
212   """Removes job queue files and archived jobs.
213
214   @rtype: tuple
215   @return: True, None
216
217   """
218   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
220
221
222 def GetMasterInfo():
223   """Returns master information.
224
225   This is an utility function to compute master information, either
226   for consumption here or from the node daemon.
227
228   @rtype: tuple
229   @return: master_netdev, master_ip, master_name, primary_ip_family
230   @raise RPCFail: in case of errors
231
232   """
233   try:
234     cfg = _GetConfig()
235     master_netdev = cfg.GetMasterNetdev()
236     master_ip = cfg.GetMasterIP()
237     master_node = cfg.GetMasterNode()
238     primary_ip_family = cfg.GetPrimaryIPFamily()
239   except errors.ConfigurationError, err:
240     _Fail("Cluster configuration incomplete: %s", err, exc=True)
241   return (master_netdev, master_ip, master_node, primary_ip_family)
242
243
244 def StartMaster(start_daemons, no_voting):
245   """Activate local node as master node.
246
247   The function will either try activate the IP address of the master
248   (unless someone else has it) or also start the master daemons, based
249   on the start_daemons parameter.
250
251   @type start_daemons: boolean
252   @param start_daemons: whether to start the master daemons
253       (ganeti-masterd and ganeti-rapi), or (if false) activate the
254       master ip
255   @type no_voting: boolean
256   @param no_voting: whether to start ganeti-masterd without a node vote
257       (if start_daemons is True), but still non-interactively
258   @rtype: None
259
260   """
261   # GetMasterInfo will raise an exception if not able to return data
262   master_netdev, master_ip, _, family = GetMasterInfo()
263
264   err_msgs = []
265   # either start the master and rapi daemons
266   if start_daemons:
267     if no_voting:
268       masterd_args = "--no-voting --yes-do-it"
269     else:
270       masterd_args = ""
271
272     env = {
273       "EXTRA_MASTERD_ARGS": masterd_args,
274       }
275
276     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
277     if result.failed:
278       msg = "Can't start Ganeti master: %s" % result.output
279       logging.error(msg)
280       err_msgs.append(msg)
281   # or activate the IP
282   else:
283     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284       if netutils.IPAddress.Own(master_ip):
285         # we already have the ip:
286         logging.debug("Master IP already configured, doing nothing")
287       else:
288         msg = "Someone else has the master ip, not activating"
289         logging.error(msg)
290         err_msgs.append(msg)
291     else:
292       ipcls = netutils.IP4Address
293       if family == netutils.IP6Address.family:
294         ipcls = netutils.IP6Address
295
296       result = utils.RunCmd(["ip", "address", "add",
297                              "%s/%d" % (master_ip, ipcls.iplen),
298                              "dev", master_netdev, "label",
299                              "%s:0" % master_netdev])
300       if result.failed:
301         msg = "Can't activate master IP: %s" % result.output
302         logging.error(msg)
303         err_msgs.append(msg)
304
305       # we ignore the exit code of the following cmds
306       if ipcls == netutils.IP4Address:
307         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308                       master_ip, master_ip])
309       elif ipcls == netutils.IP6Address:
310         try:
311           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312         except errors.OpExecError:
313           # TODO: Better error reporting
314           logging.warning("Can't execute ndisc6, please install if missing")
315
316   if err_msgs:
317     _Fail("; ".join(err_msgs))
318
319
320 def StopMaster(stop_daemons):
321   """Deactivate this node as master.
322
323   The function will always try to deactivate the IP address of the
324   master. It will also stop the master daemons depending on the
325   stop_daemons parameter.
326
327   @type stop_daemons: boolean
328   @param stop_daemons: whether to also stop the master daemons
329       (ganeti-masterd and ganeti-rapi)
330   @rtype: None
331
332   """
333   # TODO: log and report back to the caller the error failures; we
334   # need to decide in which case we fail the RPC for this
335
336   # GetMasterInfo will raise an exception if not able to return data
337   master_netdev, master_ip, _, family = GetMasterInfo()
338
339   ipcls = netutils.IP4Address
340   if family == netutils.IP6Address.family:
341     ipcls = netutils.IP6Address
342
343   result = utils.RunCmd(["ip", "address", "del",
344                          "%s/%d" % (master_ip, ipcls.iplen),
345                          "dev", master_netdev])
346   if result.failed:
347     logging.error("Can't remove the master IP, error: %s", result.output)
348     # but otherwise ignore the failure
349
350   if stop_daemons:
351     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
352     if result.failed:
353       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
354                     " and error %s",
355                     result.cmd, result.exit_code, result.output)
356
357
358 def EtcHostsModify(mode, host, ip):
359   """Modify a host entry in /etc/hosts.
360
361   @param mode: The mode to operate. Either add or remove entry
362   @param host: The host to operate on
363   @param ip: The ip associated with the entry
364
365   """
366   if mode == constants.ETC_HOSTS_ADD:
367     if not ip:
368       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
369               " present")
370     utils.AddHostToEtcHosts(host, ip)
371   elif mode == constants.ETC_HOSTS_REMOVE:
372     if ip:
373       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374               " parameter is present")
375     utils.RemoveHostFromEtcHosts(host)
376   else:
377     RPCFail("Mode not supported")
378
379
380 def LeaveCluster(modify_ssh_setup):
381   """Cleans up and remove the current node.
382
383   This function cleans up and prepares the current node to be removed
384   from the cluster.
385
386   If processing is successful, then it raises an
387   L{errors.QuitGanetiException} which is used as a special case to
388   shutdown the node daemon.
389
390   @param modify_ssh_setup: boolean
391
392   """
393   _CleanDirectory(constants.DATA_DIR)
394   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
395   JobQueuePurge()
396
397   if modify_ssh_setup:
398     try:
399       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
400
401       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
402
403       utils.RemoveFile(priv_key)
404       utils.RemoveFile(pub_key)
405     except errors.OpExecError:
406       logging.exception("Error while processing ssh files")
407
408   try:
409     utils.RemoveFile(constants.CONFD_HMAC_KEY)
410     utils.RemoveFile(constants.RAPI_CERT_FILE)
411     utils.RemoveFile(constants.NODED_CERT_FILE)
412   except: # pylint: disable-msg=W0702
413     logging.exception("Error while removing cluster secrets")
414
415   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
416   if result.failed:
417     logging.error("Command %s failed with exitcode %s and error %s",
418                   result.cmd, result.exit_code, result.output)
419
420   # Raise a custom exception (handled in ganeti-noded)
421   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
422
423
424 def GetNodeInfo(vgname, hypervisor_type):
425   """Gives back a hash with different information about the node.
426
427   @type vgname: C{string}
428   @param vgname: the name of the volume group to ask for disk space information
429   @type hypervisor_type: C{str}
430   @param hypervisor_type: the name of the hypervisor to ask for
431       memory information
432   @rtype: C{dict}
433   @return: dictionary with the following keys:
434       - vg_size is the size of the configured volume group in MiB
435       - vg_free is the free size of the volume group in MiB
436       - memory_dom0 is the memory allocated for domain0 in MiB
437       - memory_free is the currently available (free) ram in MiB
438       - memory_total is the total number of ram in MiB
439
440   """
441   outputarray = {}
442
443   vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444   vg_free = vg_size = None
445   if vginfo:
446     vg_free = int(round(vginfo[0][0], 0))
447     vg_size = int(round(vginfo[0][1], 0))
448
449   outputarray['vg_size'] = vg_size
450   outputarray['vg_free'] = vg_free
451
452   hyper = hypervisor.GetHypervisor(hypervisor_type)
453   hyp_info = hyper.GetNodeInfo()
454   if hyp_info is not None:
455     outputarray.update(hyp_info)
456
457   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
458
459   return outputarray
460
461
462 def VerifyNode(what, cluster_name):
463   """Verify the status of the local node.
464
465   Based on the input L{what} parameter, various checks are done on the
466   local node.
467
468   If the I{filelist} key is present, this list of
469   files is checksummed and the file/checksum pairs are returned.
470
471   If the I{nodelist} key is present, we check that we have
472   connectivity via ssh with the target nodes (and check the hostname
473   report).
474
475   If the I{node-net-test} key is present, we check that we have
476   connectivity to the given nodes via both primary IP and, if
477   applicable, secondary IPs.
478
479   @type what: C{dict}
480   @param what: a dictionary of things to check:
481       - filelist: list of files for which to compute checksums
482       - nodelist: list of nodes we should check ssh communication with
483       - node-net-test: list of nodes we should check node daemon port
484         connectivity with
485       - hypervisor: list with hypervisors to run the verify for
486   @rtype: dict
487   @return: a dictionary with the same keys as the input dict, and
488       values representing the result of the checks
489
490   """
491   result = {}
492   my_name = netutils.Hostname.GetSysName()
493   port = netutils.GetDaemonPort(constants.NODED)
494
495   if constants.NV_HYPERVISOR in what:
496     result[constants.NV_HYPERVISOR] = tmp = {}
497     for hv_name in what[constants.NV_HYPERVISOR]:
498       try:
499         val = hypervisor.GetHypervisor(hv_name).Verify()
500       except errors.HypervisorError, err:
501         val = "Error while checking hypervisor: %s" % str(err)
502       tmp[hv_name] = val
503
504   if constants.NV_FILELIST in what:
505     result[constants.NV_FILELIST] = utils.FingerprintFiles(
506       what[constants.NV_FILELIST])
507
508   if constants.NV_NODELIST in what:
509     result[constants.NV_NODELIST] = tmp = {}
510     random.shuffle(what[constants.NV_NODELIST])
511     for node in what[constants.NV_NODELIST]:
512       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
513       if not success:
514         tmp[node] = message
515
516   if constants.NV_NODENETTEST in what:
517     result[constants.NV_NODENETTEST] = tmp = {}
518     my_pip = my_sip = None
519     for name, pip, sip in what[constants.NV_NODENETTEST]:
520       if name == my_name:
521         my_pip = pip
522         my_sip = sip
523         break
524     if not my_pip:
525       tmp[my_name] = ("Can't find my own primary/secondary IP"
526                       " in the node list")
527     else:
528       for name, pip, sip in what[constants.NV_NODENETTEST]:
529         fail = []
530         if not netutils.TcpPing(pip, port, source=my_pip):
531           fail.append("primary")
532         if sip != pip:
533           if not netutils.TcpPing(sip, port, source=my_sip):
534             fail.append("secondary")
535         if fail:
536           tmp[name] = ("failure using the %s interface(s)" %
537                        " and ".join(fail))
538
539   if constants.NV_MASTERIP in what:
540     # FIXME: add checks on incoming data structures (here and in the
541     # rest of the function)
542     master_name, master_ip = what[constants.NV_MASTERIP]
543     if master_name == my_name:
544       source = constants.IP4_ADDRESS_LOCALHOST
545     else:
546       source = None
547     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
548                                                   source=source)
549
550   if constants.NV_LVLIST in what:
551     try:
552       val = GetVolumeList(what[constants.NV_LVLIST])
553     except RPCFail, err:
554       val = str(err)
555     result[constants.NV_LVLIST] = val
556
557   if constants.NV_INSTANCELIST in what:
558     # GetInstanceList can fail
559     try:
560       val = GetInstanceList(what[constants.NV_INSTANCELIST])
561     except RPCFail, err:
562       val = str(err)
563     result[constants.NV_INSTANCELIST] = val
564
565   if constants.NV_VGLIST in what:
566     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
567
568   if constants.NV_PVLIST in what:
569     result[constants.NV_PVLIST] = \
570       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
571                                    filter_allocatable=False)
572
573   if constants.NV_VERSION in what:
574     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
575                                     constants.RELEASE_VERSION)
576
577   if constants.NV_HVINFO in what:
578     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
579     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
580
581   if constants.NV_DRBDLIST in what:
582     try:
583       used_minors = bdev.DRBD8.GetUsedDevs().keys()
584     except errors.BlockDeviceError, err:
585       logging.warning("Can't get used minors list", exc_info=True)
586       used_minors = str(err)
587     result[constants.NV_DRBDLIST] = used_minors
588
589   if constants.NV_DRBDHELPER in what:
590     status = True
591     try:
592       payload = bdev.BaseDRBD.GetUsermodeHelper()
593     except errors.BlockDeviceError, err:
594       logging.error("Can't get DRBD usermode helper: %s", str(err))
595       status = False
596       payload = str(err)
597     result[constants.NV_DRBDHELPER] = (status, payload)
598
599   if constants.NV_NODESETUP in what:
600     result[constants.NV_NODESETUP] = tmpr = []
601     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
602       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
603                   " under /sys, missing required directories /sys/block"
604                   " and /sys/class/net")
605     if (not os.path.isdir("/proc/sys") or
606         not os.path.isfile("/proc/sysrq-trigger")):
607       tmpr.append("The procfs filesystem doesn't seem to be mounted"
608                   " under /proc, missing required directory /proc/sys and"
609                   " the file /proc/sysrq-trigger")
610
611   if constants.NV_TIME in what:
612     result[constants.NV_TIME] = utils.SplitTime(time.time())
613
614   if constants.NV_OSLIST in what:
615     result[constants.NV_OSLIST] = DiagnoseOS()
616
617   return result
618
619
620 def GetVolumeList(vg_name):
621   """Compute list of logical volumes and their size.
622
623   @type vg_name: str
624   @param vg_name: the volume group whose LVs we should list
625   @rtype: dict
626   @return:
627       dictionary of all partions (key) with value being a tuple of
628       their size (in MiB), inactive and online status::
629
630         {'test1': ('20.06', True, True)}
631
632       in case of errors, a string is returned with the error
633       details.
634
635   """
636   lvs = {}
637   sep = '|'
638   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
639                          "--separator=%s" % sep,
640                          "-olv_name,lv_size,lv_attr", vg_name])
641   if result.failed:
642     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
643
644   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
645   for line in result.stdout.splitlines():
646     line = line.strip()
647     match = valid_line_re.match(line)
648     if not match:
649       logging.error("Invalid line returned from lvs output: '%s'", line)
650       continue
651     name, size, attr = match.groups()
652     inactive = attr[4] == '-'
653     online = attr[5] == 'o'
654     virtual = attr[0] == 'v'
655     if virtual:
656       # we don't want to report such volumes as existing, since they
657       # don't really hold data
658       continue
659     lvs[name] = (size, inactive, online)
660
661   return lvs
662
663
664 def ListVolumeGroups():
665   """List the volume groups and their size.
666
667   @rtype: dict
668   @return: dictionary with keys volume name and values the
669       size of the volume
670
671   """
672   return utils.ListVolumeGroups()
673
674
675 def NodeVolumes():
676   """List all volumes on this node.
677
678   @rtype: list
679   @return:
680     A list of dictionaries, each having four keys:
681       - name: the logical volume name,
682       - size: the size of the logical volume
683       - dev: the physical device on which the LV lives
684       - vg: the volume group to which it belongs
685
686     In case of errors, we return an empty list and log the
687     error.
688
689     Note that since a logical volume can live on multiple physical
690     volumes, the resulting list might include a logical volume
691     multiple times.
692
693   """
694   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
695                          "--separator=|",
696                          "--options=lv_name,lv_size,devices,vg_name"])
697   if result.failed:
698     _Fail("Failed to list logical volumes, lvs output: %s",
699           result.output)
700
701   def parse_dev(dev):
702     return dev.split('(')[0]
703
704   def handle_dev(dev):
705     return [parse_dev(x) for x in dev.split(",")]
706
707   def map_line(line):
708     line = [v.strip() for v in line]
709     return [{'name': line[0], 'size': line[1],
710              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
711
712   all_devs = []
713   for line in result.stdout.splitlines():
714     if line.count('|') >= 3:
715       all_devs.extend(map_line(line.split('|')))
716     else:
717       logging.warning("Strange line in the output from lvs: '%s'", line)
718   return all_devs
719
720
721 def BridgesExist(bridges_list):
722   """Check if a list of bridges exist on the current node.
723
724   @rtype: boolean
725   @return: C{True} if all of them exist, C{False} otherwise
726
727   """
728   missing = []
729   for bridge in bridges_list:
730     if not utils.BridgeExists(bridge):
731       missing.append(bridge)
732
733   if missing:
734     _Fail("Missing bridges %s", utils.CommaJoin(missing))
735
736
737 def GetInstanceList(hypervisor_list):
738   """Provides a list of instances.
739
740   @type hypervisor_list: list
741   @param hypervisor_list: the list of hypervisors to query information
742
743   @rtype: list
744   @return: a list of all running instances on the current node
745     - instance1.example.com
746     - instance2.example.com
747
748   """
749   results = []
750   for hname in hypervisor_list:
751     try:
752       names = hypervisor.GetHypervisor(hname).ListInstances()
753       results.extend(names)
754     except errors.HypervisorError, err:
755       _Fail("Error enumerating instances (hypervisor %s): %s",
756             hname, err, exc=True)
757
758   return results
759
760
761 def GetInstanceInfo(instance, hname):
762   """Gives back the information about an instance as a dictionary.
763
764   @type instance: string
765   @param instance: the instance name
766   @type hname: string
767   @param hname: the hypervisor type of the instance
768
769   @rtype: dict
770   @return: dictionary with the following keys:
771       - memory: memory size of instance (int)
772       - state: xen state of instance (string)
773       - time: cpu time of instance (float)
774
775   """
776   output = {}
777
778   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
779   if iinfo is not None:
780     output['memory'] = iinfo[2]
781     output['state'] = iinfo[4]
782     output['time'] = iinfo[5]
783
784   return output
785
786
787 def GetInstanceMigratable(instance):
788   """Gives whether an instance can be migrated.
789
790   @type instance: L{objects.Instance}
791   @param instance: object representing the instance to be checked.
792
793   @rtype: tuple
794   @return: tuple of (result, description) where:
795       - result: whether the instance can be migrated or not
796       - description: a description of the issue, if relevant
797
798   """
799   hyper = hypervisor.GetHypervisor(instance.hypervisor)
800   iname = instance.name
801   if iname not in hyper.ListInstances():
802     _Fail("Instance %s is not running", iname)
803
804   for idx in range(len(instance.disks)):
805     link_name = _GetBlockDevSymlinkPath(iname, idx)
806     if not os.path.islink(link_name):
807       logging.warning("Instance %s is missing symlink %s for disk %d",
808                       iname, link_name, idx)
809
810
811 def GetAllInstancesInfo(hypervisor_list):
812   """Gather data about all instances.
813
814   This is the equivalent of L{GetInstanceInfo}, except that it
815   computes data for all instances at once, thus being faster if one
816   needs data about more than one instance.
817
818   @type hypervisor_list: list
819   @param hypervisor_list: list of hypervisors to query for instance data
820
821   @rtype: dict
822   @return: dictionary of instance: data, with data having the following keys:
823       - memory: memory size of instance (int)
824       - state: xen state of instance (string)
825       - time: cpu time of instance (float)
826       - vcpus: the number of vcpus
827
828   """
829   output = {}
830
831   for hname in hypervisor_list:
832     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
833     if iinfo:
834       for name, _, memory, vcpus, state, times in iinfo:
835         value = {
836           'memory': memory,
837           'vcpus': vcpus,
838           'state': state,
839           'time': times,
840           }
841         if name in output:
842           # we only check static parameters, like memory and vcpus,
843           # and not state and time which can change between the
844           # invocations of the different hypervisors
845           for key in 'memory', 'vcpus':
846             if value[key] != output[name][key]:
847               _Fail("Instance %s is running twice"
848                     " with different parameters", name)
849         output[name] = value
850
851   return output
852
853
854 def _InstanceLogName(kind, os_name, instance):
855   """Compute the OS log filename for a given instance and operation.
856
857   The instance name and os name are passed in as strings since not all
858   operations have these as part of an instance object.
859
860   @type kind: string
861   @param kind: the operation type (e.g. add, import, etc.)
862   @type os_name: string
863   @param os_name: the os name
864   @type instance: string
865   @param instance: the name of the instance being imported/added/etc.
866
867   """
868   # TODO: Use tempfile.mkstemp to create unique filename
869   base = ("%s-%s-%s-%s.log" %
870           (kind, os_name, instance, utils.TimestampForFilename()))
871   return utils.PathJoin(constants.LOG_OS_DIR, base)
872
873
874 def InstanceOsAdd(instance, reinstall, debug):
875   """Add an OS to an instance.
876
877   @type instance: L{objects.Instance}
878   @param instance: Instance whose OS is to be installed
879   @type reinstall: boolean
880   @param reinstall: whether this is an instance reinstall
881   @type debug: integer
882   @param debug: debug level, passed to the OS scripts
883   @rtype: None
884
885   """
886   inst_os = OSFromDisk(instance.os)
887
888   create_env = OSEnvironment(instance, inst_os, debug)
889   if reinstall:
890     create_env['INSTANCE_REINSTALL'] = "1"
891
892   logfile = _InstanceLogName("add", instance.os, instance.name)
893
894   result = utils.RunCmd([inst_os.create_script], env=create_env,
895                         cwd=inst_os.path, output=logfile,)
896   if result.failed:
897     logging.error("os create command '%s' returned error: %s, logfile: %s,"
898                   " output: %s", result.cmd, result.fail_reason, logfile,
899                   result.output)
900     lines = [utils.SafeEncode(val)
901              for val in utils.TailFile(logfile, lines=20)]
902     _Fail("OS create script failed (%s), last lines in the"
903           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
904
905
906 def RunRenameInstance(instance, old_name, debug):
907   """Run the OS rename script for an instance.
908
909   @type instance: L{objects.Instance}
910   @param instance: Instance whose OS is to be installed
911   @type old_name: string
912   @param old_name: previous instance name
913   @type debug: integer
914   @param debug: debug level, passed to the OS scripts
915   @rtype: boolean
916   @return: the success of the operation
917
918   """
919   inst_os = OSFromDisk(instance.os)
920
921   rename_env = OSEnvironment(instance, inst_os, debug)
922   rename_env['OLD_INSTANCE_NAME'] = old_name
923
924   logfile = _InstanceLogName("rename", instance.os,
925                              "%s-%s" % (old_name, instance.name))
926
927   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928                         cwd=inst_os.path, output=logfile)
929
930   if result.failed:
931     logging.error("os create command '%s' returned error: %s output: %s",
932                   result.cmd, result.fail_reason, result.output)
933     lines = [utils.SafeEncode(val)
934              for val in utils.TailFile(logfile, lines=20)]
935     _Fail("OS rename script failed (%s), last lines in the"
936           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
937
938
939 def _GetBlockDevSymlinkPath(instance_name, idx):
940   return utils.PathJoin(constants.DISK_LINKS_DIR,
941                         "%s:%d" % (instance_name, idx))
942
943
944 def _SymlinkBlockDev(instance_name, device_path, idx):
945   """Set up symlinks to a instance's block device.
946
947   This is an auxiliary function run when an instance is start (on the primary
948   node) or when an instance is migrated (on the target node).
949
950
951   @param instance_name: the name of the target instance
952   @param device_path: path of the physical block device, on the node
953   @param idx: the disk index
954   @return: absolute path to the disk's symlink
955
956   """
957   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
958   try:
959     os.symlink(device_path, link_name)
960   except OSError, err:
961     if err.errno == errno.EEXIST:
962       if (not os.path.islink(link_name) or
963           os.readlink(link_name) != device_path):
964         os.remove(link_name)
965         os.symlink(device_path, link_name)
966     else:
967       raise
968
969   return link_name
970
971
972 def _RemoveBlockDevLinks(instance_name, disks):
973   """Remove the block device symlinks belonging to the given instance.
974
975   """
976   for idx, _ in enumerate(disks):
977     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978     if os.path.islink(link_name):
979       try:
980         os.remove(link_name)
981       except OSError:
982         logging.exception("Can't remove symlink '%s'", link_name)
983
984
985 def _GatherAndLinkBlockDevs(instance):
986   """Set up an instance's block device(s).
987
988   This is run on the primary node at instance startup. The block
989   devices must be already assembled.
990
991   @type instance: L{objects.Instance}
992   @param instance: the instance whose disks we shoul assemble
993   @rtype: list
994   @return: list of (disk_object, device_path)
995
996   """
997   block_devices = []
998   for idx, disk in enumerate(instance.disks):
999     device = _RecursiveFindBD(disk)
1000     if device is None:
1001       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1002                                     str(disk))
1003     device.Open()
1004     try:
1005       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1006     except OSError, e:
1007       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1008                                     e.strerror)
1009
1010     block_devices.append((disk, link_name))
1011
1012   return block_devices
1013
1014
1015 def StartInstance(instance):
1016   """Start an instance.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: the instance object
1020   @rtype: None
1021
1022   """
1023   running_instances = GetInstanceList([instance.hypervisor])
1024
1025   if instance.name in running_instances:
1026     logging.info("Instance %s already running, not starting", instance.name)
1027     return
1028
1029   try:
1030     block_devices = _GatherAndLinkBlockDevs(instance)
1031     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1032     hyper.StartInstance(instance, block_devices)
1033   except errors.BlockDeviceError, err:
1034     _Fail("Block device error: %s", err, exc=True)
1035   except errors.HypervisorError, err:
1036     _RemoveBlockDevLinks(instance.name, instance.disks)
1037     _Fail("Hypervisor error: %s", err, exc=True)
1038
1039
1040 def InstanceShutdown(instance, timeout):
1041   """Shut an instance down.
1042
1043   @note: this functions uses polling with a hardcoded timeout.
1044
1045   @type instance: L{objects.Instance}
1046   @param instance: the instance object
1047   @type timeout: integer
1048   @param timeout: maximum timeout for soft shutdown
1049   @rtype: None
1050
1051   """
1052   hv_name = instance.hypervisor
1053   hyper = hypervisor.GetHypervisor(hv_name)
1054   iname = instance.name
1055
1056   if instance.name not in hyper.ListInstances():
1057     logging.info("Instance %s not running, doing nothing", iname)
1058     return
1059
1060   class _TryShutdown:
1061     def __init__(self):
1062       self.tried_once = False
1063
1064     def __call__(self):
1065       if iname not in hyper.ListInstances():
1066         return
1067
1068       try:
1069         hyper.StopInstance(instance, retry=self.tried_once)
1070       except errors.HypervisorError, err:
1071         if iname not in hyper.ListInstances():
1072           # if the instance is no longer existing, consider this a
1073           # success and go to cleanup
1074           return
1075
1076         _Fail("Failed to stop instance %s: %s", iname, err)
1077
1078       self.tried_once = True
1079
1080       raise utils.RetryAgain()
1081
1082   try:
1083     utils.Retry(_TryShutdown(), 5, timeout)
1084   except utils.RetryTimeout:
1085     # the shutdown did not succeed
1086     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1087
1088     try:
1089       hyper.StopInstance(instance, force=True)
1090     except errors.HypervisorError, err:
1091       if iname in hyper.ListInstances():
1092         # only raise an error if the instance still exists, otherwise
1093         # the error could simply be "instance ... unknown"!
1094         _Fail("Failed to force stop instance %s: %s", iname, err)
1095
1096     time.sleep(1)
1097
1098     if iname in hyper.ListInstances():
1099       _Fail("Could not shutdown instance %s even by destroy", iname)
1100
1101   try:
1102     hyper.CleanupInstance(instance.name)
1103   except errors.HypervisorError, err:
1104     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1105
1106   _RemoveBlockDevLinks(iname, instance.disks)
1107
1108
1109 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1110   """Reboot an instance.
1111
1112   @type instance: L{objects.Instance}
1113   @param instance: the instance object to reboot
1114   @type reboot_type: str
1115   @param reboot_type: the type of reboot, one the following
1116     constants:
1117       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1118         instance OS, do not recreate the VM
1119       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1120         restart the VM (at the hypervisor level)
1121       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1122         not accepted here, since that mode is handled differently, in
1123         cmdlib, and translates into full stop and start of the
1124         instance (instead of a call_instance_reboot RPC)
1125   @type shutdown_timeout: integer
1126   @param shutdown_timeout: maximum timeout for soft shutdown
1127   @rtype: None
1128
1129   """
1130   running_instances = GetInstanceList([instance.hypervisor])
1131
1132   if instance.name not in running_instances:
1133     _Fail("Cannot reboot instance %s that is not running", instance.name)
1134
1135   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1136   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1137     try:
1138       hyper.RebootInstance(instance)
1139     except errors.HypervisorError, err:
1140       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1141   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1142     try:
1143       InstanceShutdown(instance, shutdown_timeout)
1144       return StartInstance(instance)
1145     except errors.HypervisorError, err:
1146       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1147   else:
1148     _Fail("Invalid reboot_type received: %s", reboot_type)
1149
1150
1151 def MigrationInfo(instance):
1152   """Gather information about an instance to be migrated.
1153
1154   @type instance: L{objects.Instance}
1155   @param instance: the instance definition
1156
1157   """
1158   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159   try:
1160     info = hyper.MigrationInfo(instance)
1161   except errors.HypervisorError, err:
1162     _Fail("Failed to fetch migration information: %s", err, exc=True)
1163   return info
1164
1165
1166 def AcceptInstance(instance, info, target):
1167   """Prepare the node to accept an instance.
1168
1169   @type instance: L{objects.Instance}
1170   @param instance: the instance definition
1171   @type info: string/data (opaque)
1172   @param info: migration information, from the source node
1173   @type target: string
1174   @param target: target host (usually ip), on this node
1175
1176   """
1177   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1178   try:
1179     hyper.AcceptInstance(instance, info, target)
1180   except errors.HypervisorError, err:
1181     _Fail("Failed to accept instance: %s", err, exc=True)
1182
1183
1184 def FinalizeMigration(instance, info, success):
1185   """Finalize any preparation to accept an instance.
1186
1187   @type instance: L{objects.Instance}
1188   @param instance: the instance definition
1189   @type info: string/data (opaque)
1190   @param info: migration information, from the source node
1191   @type success: boolean
1192   @param success: whether the migration was a success or a failure
1193
1194   """
1195   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1196   try:
1197     hyper.FinalizeMigration(instance, info, success)
1198   except errors.HypervisorError, err:
1199     _Fail("Failed to finalize migration: %s", err, exc=True)
1200
1201
1202 def MigrateInstance(instance, target, live):
1203   """Migrates an instance to another node.
1204
1205   @type instance: L{objects.Instance}
1206   @param instance: the instance definition
1207   @type target: string
1208   @param target: the target node name
1209   @type live: boolean
1210   @param live: whether the migration should be done live or not (the
1211       interpretation of this parameter is left to the hypervisor)
1212   @rtype: tuple
1213   @return: a tuple of (success, msg) where:
1214       - succes is a boolean denoting the success/failure of the operation
1215       - msg is a string with details in case of failure
1216
1217   """
1218   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1219
1220   try:
1221     hyper.MigrateInstance(instance, target, live)
1222   except errors.HypervisorError, err:
1223     _Fail("Failed to migrate instance: %s", err, exc=True)
1224
1225
1226 def BlockdevCreate(disk, size, owner, on_primary, info):
1227   """Creates a block device for an instance.
1228
1229   @type disk: L{objects.Disk}
1230   @param disk: the object describing the disk we should create
1231   @type size: int
1232   @param size: the size of the physical underlying device, in MiB
1233   @type owner: str
1234   @param owner: the name of the instance for which disk is created,
1235       used for device cache data
1236   @type on_primary: boolean
1237   @param on_primary:  indicates if it is the primary node or not
1238   @type info: string
1239   @param info: string that will be sent to the physical device
1240       creation, used for example to set (LVM) tags on LVs
1241
1242   @return: the new unique_id of the device (this can sometime be
1243       computed only after creation), or None. On secondary nodes,
1244       it's not required to return anything.
1245
1246   """
1247   # TODO: remove the obsolete 'size' argument
1248   # pylint: disable-msg=W0613
1249   clist = []
1250   if disk.children:
1251     for child in disk.children:
1252       try:
1253         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1254       except errors.BlockDeviceError, err:
1255         _Fail("Can't assemble device %s: %s", child, err)
1256       if on_primary or disk.AssembleOnSecondary():
1257         # we need the children open in case the device itself has to
1258         # be assembled
1259         try:
1260           # pylint: disable-msg=E1103
1261           crdev.Open()
1262         except errors.BlockDeviceError, err:
1263           _Fail("Can't make child '%s' read-write: %s", child, err)
1264       clist.append(crdev)
1265
1266   try:
1267     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1268   except errors.BlockDeviceError, err:
1269     _Fail("Can't create block device: %s", err)
1270
1271   if on_primary or disk.AssembleOnSecondary():
1272     try:
1273       device.Assemble()
1274     except errors.BlockDeviceError, err:
1275       _Fail("Can't assemble device after creation, unusual event: %s", err)
1276     device.SetSyncSpeed(constants.SYNC_SPEED)
1277     if on_primary or disk.OpenOnSecondary():
1278       try:
1279         device.Open(force=True)
1280       except errors.BlockDeviceError, err:
1281         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1282     DevCacheManager.UpdateCache(device.dev_path, owner,
1283                                 on_primary, disk.iv_name)
1284
1285   device.SetInfo(info)
1286
1287   return device.unique_id
1288
1289
1290 def BlockdevRemove(disk):
1291   """Remove a block device.
1292
1293   @note: This is intended to be called recursively.
1294
1295   @type disk: L{objects.Disk}
1296   @param disk: the disk object we should remove
1297   @rtype: boolean
1298   @return: the success of the operation
1299
1300   """
1301   msgs = []
1302   try:
1303     rdev = _RecursiveFindBD(disk)
1304   except errors.BlockDeviceError, err:
1305     # probably can't attach
1306     logging.info("Can't attach to device %s in remove", disk)
1307     rdev = None
1308   if rdev is not None:
1309     r_path = rdev.dev_path
1310     try:
1311       rdev.Remove()
1312     except errors.BlockDeviceError, err:
1313       msgs.append(str(err))
1314     if not msgs:
1315       DevCacheManager.RemoveCache(r_path)
1316
1317   if disk.children:
1318     for child in disk.children:
1319       try:
1320         BlockdevRemove(child)
1321       except RPCFail, err:
1322         msgs.append(str(err))
1323
1324   if msgs:
1325     _Fail("; ".join(msgs))
1326
1327
1328 def _RecursiveAssembleBD(disk, owner, as_primary):
1329   """Activate a block device for an instance.
1330
1331   This is run on the primary and secondary nodes for an instance.
1332
1333   @note: this function is called recursively.
1334
1335   @type disk: L{objects.Disk}
1336   @param disk: the disk we try to assemble
1337   @type owner: str
1338   @param owner: the name of the instance which owns the disk
1339   @type as_primary: boolean
1340   @param as_primary: if we should make the block device
1341       read/write
1342
1343   @return: the assembled device or None (in case no device
1344       was assembled)
1345   @raise errors.BlockDeviceError: in case there is an error
1346       during the activation of the children or the device
1347       itself
1348
1349   """
1350   children = []
1351   if disk.children:
1352     mcn = disk.ChildrenNeeded()
1353     if mcn == -1:
1354       mcn = 0 # max number of Nones allowed
1355     else:
1356       mcn = len(disk.children) - mcn # max number of Nones
1357     for chld_disk in disk.children:
1358       try:
1359         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1360       except errors.BlockDeviceError, err:
1361         if children.count(None) >= mcn:
1362           raise
1363         cdev = None
1364         logging.error("Error in child activation (but continuing): %s",
1365                       str(err))
1366       children.append(cdev)
1367
1368   if as_primary or disk.AssembleOnSecondary():
1369     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1370     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1371     result = r_dev
1372     if as_primary or disk.OpenOnSecondary():
1373       r_dev.Open()
1374     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1375                                 as_primary, disk.iv_name)
1376
1377   else:
1378     result = True
1379   return result
1380
1381
1382 def BlockdevAssemble(disk, owner, as_primary):
1383   """Activate a block device for an instance.
1384
1385   This is a wrapper over _RecursiveAssembleBD.
1386
1387   @rtype: str or boolean
1388   @return: a C{/dev/...} path for primary nodes, and
1389       C{True} for secondary nodes
1390
1391   """
1392   try:
1393     result = _RecursiveAssembleBD(disk, owner, as_primary)
1394     if isinstance(result, bdev.BlockDev):
1395       # pylint: disable-msg=E1103
1396       result = result.dev_path
1397   except errors.BlockDeviceError, err:
1398     _Fail("Error while assembling disk: %s", err, exc=True)
1399
1400   return result
1401
1402
1403 def BlockdevShutdown(disk):
1404   """Shut down a block device.
1405
1406   First, if the device is assembled (Attach() is successful), then
1407   the device is shutdown. Then the children of the device are
1408   shutdown.
1409
1410   This function is called recursively. Note that we don't cache the
1411   children or such, as oppossed to assemble, shutdown of different
1412   devices doesn't require that the upper device was active.
1413
1414   @type disk: L{objects.Disk}
1415   @param disk: the description of the disk we should
1416       shutdown
1417   @rtype: None
1418
1419   """
1420   msgs = []
1421   r_dev = _RecursiveFindBD(disk)
1422   if r_dev is not None:
1423     r_path = r_dev.dev_path
1424     try:
1425       r_dev.Shutdown()
1426       DevCacheManager.RemoveCache(r_path)
1427     except errors.BlockDeviceError, err:
1428       msgs.append(str(err))
1429
1430   if disk.children:
1431     for child in disk.children:
1432       try:
1433         BlockdevShutdown(child)
1434       except RPCFail, err:
1435         msgs.append(str(err))
1436
1437   if msgs:
1438     _Fail("; ".join(msgs))
1439
1440
1441 def BlockdevAddchildren(parent_cdev, new_cdevs):
1442   """Extend a mirrored block device.
1443
1444   @type parent_cdev: L{objects.Disk}
1445   @param parent_cdev: the disk to which we should add children
1446   @type new_cdevs: list of L{objects.Disk}
1447   @param new_cdevs: the list of children which we should add
1448   @rtype: None
1449
1450   """
1451   parent_bdev = _RecursiveFindBD(parent_cdev)
1452   if parent_bdev is None:
1453     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1454   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1455   if new_bdevs.count(None) > 0:
1456     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1457   parent_bdev.AddChildren(new_bdevs)
1458
1459
1460 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1461   """Shrink a mirrored block device.
1462
1463   @type parent_cdev: L{objects.Disk}
1464   @param parent_cdev: the disk from which we should remove children
1465   @type new_cdevs: list of L{objects.Disk}
1466   @param new_cdevs: the list of children which we should remove
1467   @rtype: None
1468
1469   """
1470   parent_bdev = _RecursiveFindBD(parent_cdev)
1471   if parent_bdev is None:
1472     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1473   devs = []
1474   for disk in new_cdevs:
1475     rpath = disk.StaticDevPath()
1476     if rpath is None:
1477       bd = _RecursiveFindBD(disk)
1478       if bd is None:
1479         _Fail("Can't find device %s while removing children", disk)
1480       else:
1481         devs.append(bd.dev_path)
1482     else:
1483       if not utils.IsNormAbsPath(rpath):
1484         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1485       devs.append(rpath)
1486   parent_bdev.RemoveChildren(devs)
1487
1488
1489 def BlockdevGetmirrorstatus(disks):
1490   """Get the mirroring status of a list of devices.
1491
1492   @type disks: list of L{objects.Disk}
1493   @param disks: the list of disks which we should query
1494   @rtype: disk
1495   @return:
1496       a list of (mirror_done, estimated_time) tuples, which
1497       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1498   @raise errors.BlockDeviceError: if any of the disks cannot be
1499       found
1500
1501   """
1502   stats = []
1503   for dsk in disks:
1504     rbd = _RecursiveFindBD(dsk)
1505     if rbd is None:
1506       _Fail("Can't find device %s", dsk)
1507
1508     stats.append(rbd.CombinedSyncStatus())
1509
1510   return stats
1511
1512
1513 def _RecursiveFindBD(disk):
1514   """Check if a device is activated.
1515
1516   If so, return information about the real device.
1517
1518   @type disk: L{objects.Disk}
1519   @param disk: the disk object we need to find
1520
1521   @return: None if the device can't be found,
1522       otherwise the device instance
1523
1524   """
1525   children = []
1526   if disk.children:
1527     for chdisk in disk.children:
1528       children.append(_RecursiveFindBD(chdisk))
1529
1530   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1531
1532
1533 def _OpenRealBD(disk):
1534   """Opens the underlying block device of a disk.
1535
1536   @type disk: L{objects.Disk}
1537   @param disk: the disk object we want to open
1538
1539   """
1540   real_disk = _RecursiveFindBD(disk)
1541   if real_disk is None:
1542     _Fail("Block device '%s' is not set up", disk)
1543
1544   real_disk.Open()
1545
1546   return real_disk
1547
1548
1549 def BlockdevFind(disk):
1550   """Check if a device is activated.
1551
1552   If it is, return information about the real device.
1553
1554   @type disk: L{objects.Disk}
1555   @param disk: the disk to find
1556   @rtype: None or objects.BlockDevStatus
1557   @return: None if the disk cannot be found, otherwise a the current
1558            information
1559
1560   """
1561   try:
1562     rbd = _RecursiveFindBD(disk)
1563   except errors.BlockDeviceError, err:
1564     _Fail("Failed to find device: %s", err, exc=True)
1565
1566   if rbd is None:
1567     return None
1568
1569   return rbd.GetSyncStatus()
1570
1571
1572 def BlockdevGetsize(disks):
1573   """Computes the size of the given disks.
1574
1575   If a disk is not found, returns None instead.
1576
1577   @type disks: list of L{objects.Disk}
1578   @param disks: the list of disk to compute the size for
1579   @rtype: list
1580   @return: list with elements None if the disk cannot be found,
1581       otherwise the size
1582
1583   """
1584   result = []
1585   for cf in disks:
1586     try:
1587       rbd = _RecursiveFindBD(cf)
1588     except errors.BlockDeviceError:
1589       result.append(None)
1590       continue
1591     if rbd is None:
1592       result.append(None)
1593     else:
1594       result.append(rbd.GetActualSize())
1595   return result
1596
1597
1598 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1599   """Export a block device to a remote node.
1600
1601   @type disk: L{objects.Disk}
1602   @param disk: the description of the disk to export
1603   @type dest_node: str
1604   @param dest_node: the destination node to export to
1605   @type dest_path: str
1606   @param dest_path: the destination path on the target node
1607   @type cluster_name: str
1608   @param cluster_name: the cluster name, needed for SSH hostalias
1609   @rtype: None
1610
1611   """
1612   real_disk = _OpenRealBD(disk)
1613
1614   # the block size on the read dd is 1MiB to match our units
1615   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1616                                "dd if=%s bs=1048576 count=%s",
1617                                real_disk.dev_path, str(disk.size))
1618
1619   # we set here a smaller block size as, due to ssh buffering, more
1620   # than 64-128k will mostly ignored; we use nocreat to fail if the
1621   # device is not already there or we pass a wrong path; we use
1622   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1623   # to not buffer too much memory; this means that at best, we flush
1624   # every 64k, which will not be very fast
1625   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1626                                 " oflag=dsync", dest_path)
1627
1628   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1629                                                    constants.GANETI_RUNAS,
1630                                                    destcmd)
1631
1632   # all commands have been checked, so we're safe to combine them
1633   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1634
1635   result = utils.RunCmd(["bash", "-c", command])
1636
1637   if result.failed:
1638     _Fail("Disk copy command '%s' returned error: %s"
1639           " output: %s", command, result.fail_reason, result.output)
1640
1641
1642 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1643   """Write a file to the filesystem.
1644
1645   This allows the master to overwrite(!) a file. It will only perform
1646   the operation if the file belongs to a list of configuration files.
1647
1648   @type file_name: str
1649   @param file_name: the target file name
1650   @type data: str
1651   @param data: the new contents of the file
1652   @type mode: int
1653   @param mode: the mode to give the file (can be None)
1654   @type uid: int
1655   @param uid: the owner of the file (can be -1 for default)
1656   @type gid: int
1657   @param gid: the group of the file (can be -1 for default)
1658   @type atime: float
1659   @param atime: the atime to set on the file (can be None)
1660   @type mtime: float
1661   @param mtime: the mtime to set on the file (can be None)
1662   @rtype: None
1663
1664   """
1665   if not os.path.isabs(file_name):
1666     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1667
1668   if file_name not in _ALLOWED_UPLOAD_FILES:
1669     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1670           file_name)
1671
1672   raw_data = _Decompress(data)
1673
1674   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1675                   atime=atime, mtime=mtime)
1676
1677
1678 def WriteSsconfFiles(values):
1679   """Update all ssconf files.
1680
1681   Wrapper around the SimpleStore.WriteFiles.
1682
1683   """
1684   ssconf.SimpleStore().WriteFiles(values)
1685
1686
1687 def _ErrnoOrStr(err):
1688   """Format an EnvironmentError exception.
1689
1690   If the L{err} argument has an errno attribute, it will be looked up
1691   and converted into a textual C{E...} description. Otherwise the
1692   string representation of the error will be returned.
1693
1694   @type err: L{EnvironmentError}
1695   @param err: the exception to format
1696
1697   """
1698   if hasattr(err, 'errno'):
1699     detail = errno.errorcode[err.errno]
1700   else:
1701     detail = str(err)
1702   return detail
1703
1704
1705 def _OSOndiskAPIVersion(os_dir):
1706   """Compute and return the API version of a given OS.
1707
1708   This function will try to read the API version of the OS residing in
1709   the 'os_dir' directory.
1710
1711   @type os_dir: str
1712   @param os_dir: the directory in which we should look for the OS
1713   @rtype: tuple
1714   @return: tuple (status, data) with status denoting the validity and
1715       data holding either the vaid versions or an error message
1716
1717   """
1718   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1719
1720   try:
1721     st = os.stat(api_file)
1722   except EnvironmentError, err:
1723     return False, ("Required file '%s' not found under path %s: %s" %
1724                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1725
1726   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1727     return False, ("File '%s' in %s is not a regular file" %
1728                    (constants.OS_API_FILE, os_dir))
1729
1730   try:
1731     api_versions = utils.ReadFile(api_file).splitlines()
1732   except EnvironmentError, err:
1733     return False, ("Error while reading the API version file at %s: %s" %
1734                    (api_file, _ErrnoOrStr(err)))
1735
1736   try:
1737     api_versions = [int(version.strip()) for version in api_versions]
1738   except (TypeError, ValueError), err:
1739     return False, ("API version(s) can't be converted to integer: %s" %
1740                    str(err))
1741
1742   return True, api_versions
1743
1744
1745 def DiagnoseOS(top_dirs=None):
1746   """Compute the validity for all OSes.
1747
1748   @type top_dirs: list
1749   @param top_dirs: the list of directories in which to
1750       search (if not given defaults to
1751       L{constants.OS_SEARCH_PATH})
1752   @rtype: list of L{objects.OS}
1753   @return: a list of tuples (name, path, status, diagnose, variants,
1754       parameters, api_version) for all (potential) OSes under all
1755       search paths, where:
1756           - name is the (potential) OS name
1757           - path is the full path to the OS
1758           - status True/False is the validity of the OS
1759           - diagnose is the error message for an invalid OS, otherwise empty
1760           - variants is a list of supported OS variants, if any
1761           - parameters is a list of (name, help) parameters, if any
1762           - api_version is a list of support OS API versions
1763
1764   """
1765   if top_dirs is None:
1766     top_dirs = constants.OS_SEARCH_PATH
1767
1768   result = []
1769   for dir_name in top_dirs:
1770     if os.path.isdir(dir_name):
1771       try:
1772         f_names = utils.ListVisibleFiles(dir_name)
1773       except EnvironmentError, err:
1774         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1775         break
1776       for name in f_names:
1777         os_path = utils.PathJoin(dir_name, name)
1778         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1779         if status:
1780           diagnose = ""
1781           variants = os_inst.supported_variants
1782           parameters = os_inst.supported_parameters
1783           api_versions = os_inst.api_versions
1784         else:
1785           diagnose = os_inst
1786           variants = parameters = api_versions = []
1787         result.append((name, os_path, status, diagnose, variants,
1788                        parameters, api_versions))
1789
1790   return result
1791
1792
1793 def _TryOSFromDisk(name, base_dir=None):
1794   """Create an OS instance from disk.
1795
1796   This function will return an OS instance if the given name is a
1797   valid OS name.
1798
1799   @type base_dir: string
1800   @keyword base_dir: Base directory containing OS installations.
1801                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1802   @rtype: tuple
1803   @return: success and either the OS instance if we find a valid one,
1804       or error message
1805
1806   """
1807   if base_dir is None:
1808     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1809   else:
1810     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1811
1812   if os_dir is None:
1813     return False, "Directory for OS %s not found in search path" % name
1814
1815   status, api_versions = _OSOndiskAPIVersion(os_dir)
1816   if not status:
1817     # push the error up
1818     return status, api_versions
1819
1820   if not constants.OS_API_VERSIONS.intersection(api_versions):
1821     return False, ("API version mismatch for path '%s': found %s, want %s." %
1822                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1823
1824   # OS Files dictionary, we will populate it with the absolute path names
1825   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1826
1827   if max(api_versions) >= constants.OS_API_V15:
1828     os_files[constants.OS_VARIANTS_FILE] = ''
1829
1830   if max(api_versions) >= constants.OS_API_V20:
1831     os_files[constants.OS_PARAMETERS_FILE] = ''
1832   else:
1833     del os_files[constants.OS_SCRIPT_VERIFY]
1834
1835   for filename in os_files:
1836     os_files[filename] = utils.PathJoin(os_dir, filename)
1837
1838     try:
1839       st = os.stat(os_files[filename])
1840     except EnvironmentError, err:
1841       return False, ("File '%s' under path '%s' is missing (%s)" %
1842                      (filename, os_dir, _ErrnoOrStr(err)))
1843
1844     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1845       return False, ("File '%s' under path '%s' is not a regular file" %
1846                      (filename, os_dir))
1847
1848     if filename in constants.OS_SCRIPTS:
1849       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1850         return False, ("File '%s' under path '%s' is not executable" %
1851                        (filename, os_dir))
1852
1853   variants = []
1854   if constants.OS_VARIANTS_FILE in os_files:
1855     variants_file = os_files[constants.OS_VARIANTS_FILE]
1856     try:
1857       variants = utils.ReadFile(variants_file).splitlines()
1858     except EnvironmentError, err:
1859       return False, ("Error while reading the OS variants file at %s: %s" %
1860                      (variants_file, _ErrnoOrStr(err)))
1861     if not variants:
1862       return False, ("No supported os variant found")
1863
1864   parameters = []
1865   if constants.OS_PARAMETERS_FILE in os_files:
1866     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1867     try:
1868       parameters = utils.ReadFile(parameters_file).splitlines()
1869     except EnvironmentError, err:
1870       return False, ("Error while reading the OS parameters file at %s: %s" %
1871                      (parameters_file, _ErrnoOrStr(err)))
1872     parameters = [v.split(None, 1) for v in parameters]
1873
1874   os_obj = objects.OS(name=name, path=os_dir,
1875                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1876                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1877                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1878                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1879                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1880                                                  None),
1881                       supported_variants=variants,
1882                       supported_parameters=parameters,
1883                       api_versions=api_versions)
1884   return True, os_obj
1885
1886
1887 def OSFromDisk(name, base_dir=None):
1888   """Create an OS instance from disk.
1889
1890   This function will return an OS instance if the given name is a
1891   valid OS name. Otherwise, it will raise an appropriate
1892   L{RPCFail} exception, detailing why this is not a valid OS.
1893
1894   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1895   an exception but returns true/false status data.
1896
1897   @type base_dir: string
1898   @keyword base_dir: Base directory containing OS installations.
1899                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1900   @rtype: L{objects.OS}
1901   @return: the OS instance if we find a valid one
1902   @raise RPCFail: if we don't find a valid OS
1903
1904   """
1905   name_only = name.split("+", 1)[0]
1906   status, payload = _TryOSFromDisk(name_only, base_dir)
1907
1908   if not status:
1909     _Fail(payload)
1910
1911   return payload
1912
1913
1914 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1915   """Calculate the basic environment for an os script.
1916
1917   @type os_name: str
1918   @param os_name: full operating system name (including variant)
1919   @type inst_os: L{objects.OS}
1920   @param inst_os: operating system for which the environment is being built
1921   @type os_params: dict
1922   @param os_params: the OS parameters
1923   @type debug: integer
1924   @param debug: debug level (0 or 1, for OS Api 10)
1925   @rtype: dict
1926   @return: dict of environment variables
1927   @raise errors.BlockDeviceError: if the block device
1928       cannot be found
1929
1930   """
1931   result = {}
1932   api_version = \
1933     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1934   result['OS_API_VERSION'] = '%d' % api_version
1935   result['OS_NAME'] = inst_os.name
1936   result['DEBUG_LEVEL'] = '%d' % debug
1937
1938   # OS variants
1939   if api_version >= constants.OS_API_V15:
1940     try:
1941       variant = os_name.split('+', 1)[1]
1942     except IndexError:
1943       variant = inst_os.supported_variants[0]
1944     result['OS_VARIANT'] = variant
1945
1946   # OS params
1947   for pname, pvalue in os_params.items():
1948     result['OSP_%s' % pname.upper()] = pvalue
1949
1950   return result
1951
1952
1953 def OSEnvironment(instance, inst_os, debug=0):
1954   """Calculate the environment for an os script.
1955
1956   @type instance: L{objects.Instance}
1957   @param instance: target instance for the os script run
1958   @type inst_os: L{objects.OS}
1959   @param inst_os: operating system for which the environment is being built
1960   @type debug: integer
1961   @param debug: debug level (0 or 1, for OS Api 10)
1962   @rtype: dict
1963   @return: dict of environment variables
1964   @raise errors.BlockDeviceError: if the block device
1965       cannot be found
1966
1967   """
1968   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
1969
1970   result['INSTANCE_NAME'] = instance.name
1971   result['INSTANCE_OS'] = instance.os
1972   result['HYPERVISOR'] = instance.hypervisor
1973   result['DISK_COUNT'] = '%d' % len(instance.disks)
1974   result['NIC_COUNT'] = '%d' % len(instance.nics)
1975
1976   # Disks
1977   for idx, disk in enumerate(instance.disks):
1978     real_disk = _OpenRealBD(disk)
1979     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1980     result['DISK_%d_ACCESS' % idx] = disk.mode
1981     if constants.HV_DISK_TYPE in instance.hvparams:
1982       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1983         instance.hvparams[constants.HV_DISK_TYPE]
1984     if disk.dev_type in constants.LDS_BLOCK:
1985       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1986     elif disk.dev_type == constants.LD_FILE:
1987       result['DISK_%d_BACKEND_TYPE' % idx] = \
1988         'file:%s' % disk.physical_id[0]
1989
1990   # NICs
1991   for idx, nic in enumerate(instance.nics):
1992     result['NIC_%d_MAC' % idx] = nic.mac
1993     if nic.ip:
1994       result['NIC_%d_IP' % idx] = nic.ip
1995     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1996     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1997       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1998     if nic.nicparams[constants.NIC_LINK]:
1999       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2000     if constants.HV_NIC_TYPE in instance.hvparams:
2001       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2002         instance.hvparams[constants.HV_NIC_TYPE]
2003
2004   # HV/BE params
2005   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2006     for key, value in source.items():
2007       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2008
2009   return result
2010
2011
2012 def BlockdevGrow(disk, amount):
2013   """Grow a stack of block devices.
2014
2015   This function is called recursively, with the childrens being the
2016   first ones to resize.
2017
2018   @type disk: L{objects.Disk}
2019   @param disk: the disk to be grown
2020   @rtype: (status, result)
2021   @return: a tuple with the status of the operation
2022       (True/False), and the errors message if status
2023       is False
2024
2025   """
2026   r_dev = _RecursiveFindBD(disk)
2027   if r_dev is None:
2028     _Fail("Cannot find block device %s", disk)
2029
2030   try:
2031     r_dev.Grow(amount)
2032   except errors.BlockDeviceError, err:
2033     _Fail("Failed to grow block device: %s", err, exc=True)
2034
2035
2036 def BlockdevSnapshot(disk):
2037   """Create a snapshot copy of a block device.
2038
2039   This function is called recursively, and the snapshot is actually created
2040   just for the leaf lvm backend device.
2041
2042   @type disk: L{objects.Disk}
2043   @param disk: the disk to be snapshotted
2044   @rtype: string
2045   @return: snapshot disk path
2046
2047   """
2048   if disk.dev_type == constants.LD_DRBD8:
2049     if not disk.children:
2050       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2051             disk.unique_id)
2052     return BlockdevSnapshot(disk.children[0])
2053   elif disk.dev_type == constants.LD_LV:
2054     r_dev = _RecursiveFindBD(disk)
2055     if r_dev is not None:
2056       # FIXME: choose a saner value for the snapshot size
2057       # let's stay on the safe side and ask for the full size, for now
2058       return r_dev.Snapshot(disk.size)
2059     else:
2060       _Fail("Cannot find block device %s", disk)
2061   else:
2062     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2063           disk.unique_id, disk.dev_type)
2064
2065
2066 def FinalizeExport(instance, snap_disks):
2067   """Write out the export configuration information.
2068
2069   @type instance: L{objects.Instance}
2070   @param instance: the instance which we export, used for
2071       saving configuration
2072   @type snap_disks: list of L{objects.Disk}
2073   @param snap_disks: list of snapshot block devices, which
2074       will be used to get the actual name of the dump file
2075
2076   @rtype: None
2077
2078   """
2079   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2080   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2081
2082   config = objects.SerializableConfigParser()
2083
2084   config.add_section(constants.INISECT_EXP)
2085   config.set(constants.INISECT_EXP, 'version', '0')
2086   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2087   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2088   config.set(constants.INISECT_EXP, 'os', instance.os)
2089   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2090
2091   config.add_section(constants.INISECT_INS)
2092   config.set(constants.INISECT_INS, 'name', instance.name)
2093   config.set(constants.INISECT_INS, 'memory', '%d' %
2094              instance.beparams[constants.BE_MEMORY])
2095   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2096              instance.beparams[constants.BE_VCPUS])
2097   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2098   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2099
2100   nic_total = 0
2101   for nic_count, nic in enumerate(instance.nics):
2102     nic_total += 1
2103     config.set(constants.INISECT_INS, 'nic%d_mac' %
2104                nic_count, '%s' % nic.mac)
2105     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2106     for param in constants.NICS_PARAMETER_TYPES:
2107       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2108                  '%s' % nic.nicparams.get(param, None))
2109   # TODO: redundant: on load can read nics until it doesn't exist
2110   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2111
2112   disk_total = 0
2113   for disk_count, disk in enumerate(snap_disks):
2114     if disk:
2115       disk_total += 1
2116       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2117                  ('%s' % disk.iv_name))
2118       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2119                  ('%s' % disk.physical_id[1]))
2120       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2121                  ('%d' % disk.size))
2122
2123   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2124
2125   # New-style hypervisor/backend parameters
2126
2127   config.add_section(constants.INISECT_HYP)
2128   for name, value in instance.hvparams.items():
2129     if name not in constants.HVC_GLOBALS:
2130       config.set(constants.INISECT_HYP, name, str(value))
2131
2132   config.add_section(constants.INISECT_BEP)
2133   for name, value in instance.beparams.items():
2134     config.set(constants.INISECT_BEP, name, str(value))
2135
2136   config.add_section(constants.INISECT_OSP)
2137   for name, value in instance.osparams.items():
2138     config.set(constants.INISECT_OSP, name, str(value))
2139
2140   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2141                   data=config.Dumps())
2142   shutil.rmtree(finaldestdir, ignore_errors=True)
2143   shutil.move(destdir, finaldestdir)
2144
2145
2146 def ExportInfo(dest):
2147   """Get export configuration information.
2148
2149   @type dest: str
2150   @param dest: directory containing the export
2151
2152   @rtype: L{objects.SerializableConfigParser}
2153   @return: a serializable config file containing the
2154       export info
2155
2156   """
2157   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2158
2159   config = objects.SerializableConfigParser()
2160   config.read(cff)
2161
2162   if (not config.has_section(constants.INISECT_EXP) or
2163       not config.has_section(constants.INISECT_INS)):
2164     _Fail("Export info file doesn't have the required fields")
2165
2166   return config.Dumps()
2167
2168
2169 def ListExports():
2170   """Return a list of exports currently available on this machine.
2171
2172   @rtype: list
2173   @return: list of the exports
2174
2175   """
2176   if os.path.isdir(constants.EXPORT_DIR):
2177     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2178   else:
2179     _Fail("No exports directory")
2180
2181
2182 def RemoveExport(export):
2183   """Remove an existing export from the node.
2184
2185   @type export: str
2186   @param export: the name of the export to remove
2187   @rtype: None
2188
2189   """
2190   target = utils.PathJoin(constants.EXPORT_DIR, export)
2191
2192   try:
2193     shutil.rmtree(target)
2194   except EnvironmentError, err:
2195     _Fail("Error while removing the export: %s", err, exc=True)
2196
2197
2198 def BlockdevRename(devlist):
2199   """Rename a list of block devices.
2200
2201   @type devlist: list of tuples
2202   @param devlist: list of tuples of the form  (disk,
2203       new_logical_id, new_physical_id); disk is an
2204       L{objects.Disk} object describing the current disk,
2205       and new logical_id/physical_id is the name we
2206       rename it to
2207   @rtype: boolean
2208   @return: True if all renames succeeded, False otherwise
2209
2210   """
2211   msgs = []
2212   result = True
2213   for disk, unique_id in devlist:
2214     dev = _RecursiveFindBD(disk)
2215     if dev is None:
2216       msgs.append("Can't find device %s in rename" % str(disk))
2217       result = False
2218       continue
2219     try:
2220       old_rpath = dev.dev_path
2221       dev.Rename(unique_id)
2222       new_rpath = dev.dev_path
2223       if old_rpath != new_rpath:
2224         DevCacheManager.RemoveCache(old_rpath)
2225         # FIXME: we should add the new cache information here, like:
2226         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2227         # but we don't have the owner here - maybe parse from existing
2228         # cache? for now, we only lose lvm data when we rename, which
2229         # is less critical than DRBD or MD
2230     except errors.BlockDeviceError, err:
2231       msgs.append("Can't rename device '%s' to '%s': %s" %
2232                   (dev, unique_id, err))
2233       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2234       result = False
2235   if not result:
2236     _Fail("; ".join(msgs))
2237
2238
2239 def _TransformFileStorageDir(file_storage_dir):
2240   """Checks whether given file_storage_dir is valid.
2241
2242   Checks wheter the given file_storage_dir is within the cluster-wide
2243   default file_storage_dir stored in SimpleStore. Only paths under that
2244   directory are allowed.
2245
2246   @type file_storage_dir: str
2247   @param file_storage_dir: the path to check
2248
2249   @return: the normalized path if valid, None otherwise
2250
2251   """
2252   if not constants.ENABLE_FILE_STORAGE:
2253     _Fail("File storage disabled at configure time")
2254   cfg = _GetConfig()
2255   file_storage_dir = os.path.normpath(file_storage_dir)
2256   base_file_storage_dir = cfg.GetFileStorageDir()
2257   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2258       base_file_storage_dir):
2259     _Fail("File storage directory '%s' is not under base file"
2260           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2261   return file_storage_dir
2262
2263
2264 def CreateFileStorageDir(file_storage_dir):
2265   """Create file storage directory.
2266
2267   @type file_storage_dir: str
2268   @param file_storage_dir: directory to create
2269
2270   @rtype: tuple
2271   @return: tuple with first element a boolean indicating wheter dir
2272       creation was successful or not
2273
2274   """
2275   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2276   if os.path.exists(file_storage_dir):
2277     if not os.path.isdir(file_storage_dir):
2278       _Fail("Specified storage dir '%s' is not a directory",
2279             file_storage_dir)
2280   else:
2281     try:
2282       os.makedirs(file_storage_dir, 0750)
2283     except OSError, err:
2284       _Fail("Cannot create file storage directory '%s': %s",
2285             file_storage_dir, err, exc=True)
2286
2287
2288 def RemoveFileStorageDir(file_storage_dir):
2289   """Remove file storage directory.
2290
2291   Remove it only if it's empty. If not log an error and return.
2292
2293   @type file_storage_dir: str
2294   @param file_storage_dir: the directory we should cleanup
2295   @rtype: tuple (success,)
2296   @return: tuple of one element, C{success}, denoting
2297       whether the operation was successful
2298
2299   """
2300   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2301   if os.path.exists(file_storage_dir):
2302     if not os.path.isdir(file_storage_dir):
2303       _Fail("Specified Storage directory '%s' is not a directory",
2304             file_storage_dir)
2305     # deletes dir only if empty, otherwise we want to fail the rpc call
2306     try:
2307       os.rmdir(file_storage_dir)
2308     except OSError, err:
2309       _Fail("Cannot remove file storage directory '%s': %s",
2310             file_storage_dir, err)
2311
2312
2313 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2314   """Rename the file storage directory.
2315
2316   @type old_file_storage_dir: str
2317   @param old_file_storage_dir: the current path
2318   @type new_file_storage_dir: str
2319   @param new_file_storage_dir: the name we should rename to
2320   @rtype: tuple (success,)
2321   @return: tuple of one element, C{success}, denoting
2322       whether the operation was successful
2323
2324   """
2325   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2326   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2327   if not os.path.exists(new_file_storage_dir):
2328     if os.path.isdir(old_file_storage_dir):
2329       try:
2330         os.rename(old_file_storage_dir, new_file_storage_dir)
2331       except OSError, err:
2332         _Fail("Cannot rename '%s' to '%s': %s",
2333               old_file_storage_dir, new_file_storage_dir, err)
2334     else:
2335       _Fail("Specified storage dir '%s' is not a directory",
2336             old_file_storage_dir)
2337   else:
2338     if os.path.exists(old_file_storage_dir):
2339       _Fail("Cannot rename '%s' to '%s': both locations exist",
2340             old_file_storage_dir, new_file_storage_dir)
2341
2342
2343 def _EnsureJobQueueFile(file_name):
2344   """Checks whether the given filename is in the queue directory.
2345
2346   @type file_name: str
2347   @param file_name: the file name we should check
2348   @rtype: None
2349   @raises RPCFail: if the file is not valid
2350
2351   """
2352   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2353   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2354
2355   if not result:
2356     _Fail("Passed job queue file '%s' does not belong to"
2357           " the queue directory '%s'", file_name, queue_dir)
2358
2359
2360 def JobQueueUpdate(file_name, content):
2361   """Updates a file in the queue directory.
2362
2363   This is just a wrapper over L{utils.WriteFile}, with proper
2364   checking.
2365
2366   @type file_name: str
2367   @param file_name: the job file name
2368   @type content: str
2369   @param content: the new job contents
2370   @rtype: boolean
2371   @return: the success of the operation
2372
2373   """
2374   _EnsureJobQueueFile(file_name)
2375   getents = runtime.GetEnts()
2376
2377   # Write and replace the file atomically
2378   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2379                   gid=getents.masterd_gid)
2380
2381
2382 def JobQueueRename(old, new):
2383   """Renames a job queue file.
2384
2385   This is just a wrapper over os.rename with proper checking.
2386
2387   @type old: str
2388   @param old: the old (actual) file name
2389   @type new: str
2390   @param new: the desired file name
2391   @rtype: tuple
2392   @return: the success of the operation and payload
2393
2394   """
2395   _EnsureJobQueueFile(old)
2396   _EnsureJobQueueFile(new)
2397
2398   utils.RenameFile(old, new, mkdir=True)
2399
2400
2401 def BlockdevClose(instance_name, disks):
2402   """Closes the given block devices.
2403
2404   This means they will be switched to secondary mode (in case of
2405   DRBD).
2406
2407   @param instance_name: if the argument is not empty, the symlinks
2408       of this instance will be removed
2409   @type disks: list of L{objects.Disk}
2410   @param disks: the list of disks to be closed
2411   @rtype: tuple (success, message)
2412   @return: a tuple of success and message, where success
2413       indicates the succes of the operation, and message
2414       which will contain the error details in case we
2415       failed
2416
2417   """
2418   bdevs = []
2419   for cf in disks:
2420     rd = _RecursiveFindBD(cf)
2421     if rd is None:
2422       _Fail("Can't find device %s", cf)
2423     bdevs.append(rd)
2424
2425   msg = []
2426   for rd in bdevs:
2427     try:
2428       rd.Close()
2429     except errors.BlockDeviceError, err:
2430       msg.append(str(err))
2431   if msg:
2432     _Fail("Can't make devices secondary: %s", ",".join(msg))
2433   else:
2434     if instance_name:
2435       _RemoveBlockDevLinks(instance_name, disks)
2436
2437
2438 def ValidateHVParams(hvname, hvparams):
2439   """Validates the given hypervisor parameters.
2440
2441   @type hvname: string
2442   @param hvname: the hypervisor name
2443   @type hvparams: dict
2444   @param hvparams: the hypervisor parameters to be validated
2445   @rtype: None
2446
2447   """
2448   try:
2449     hv_type = hypervisor.GetHypervisor(hvname)
2450     hv_type.ValidateParameters(hvparams)
2451   except errors.HypervisorError, err:
2452     _Fail(str(err), log=False)
2453
2454
2455 def _CheckOSPList(os_obj, parameters):
2456   """Check whether a list of parameters is supported by the OS.
2457
2458   @type os_obj: L{objects.OS}
2459   @param os_obj: OS object to check
2460   @type parameters: list
2461   @param parameters: the list of parameters to check
2462
2463   """
2464   supported = [v[0] for v in os_obj.supported_parameters]
2465   delta = frozenset(parameters).difference(supported)
2466   if delta:
2467     _Fail("The following parameters are not supported"
2468           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2469
2470
2471 def ValidateOS(required, osname, checks, osparams):
2472   """Validate the given OS' parameters.
2473
2474   @type required: boolean
2475   @param required: whether absence of the OS should translate into
2476       failure or not
2477   @type osname: string
2478   @param osname: the OS to be validated
2479   @type checks: list
2480   @param checks: list of the checks to run (currently only 'parameters')
2481   @type osparams: dict
2482   @param osparams: dictionary with OS parameters
2483   @rtype: boolean
2484   @return: True if the validation passed, or False if the OS was not
2485       found and L{required} was false
2486
2487   """
2488   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2489     _Fail("Unknown checks required for OS %s: %s", osname,
2490           set(checks).difference(constants.OS_VALIDATE_CALLS))
2491
2492   name_only = osname.split("+", 1)[0]
2493   status, tbv = _TryOSFromDisk(name_only, None)
2494
2495   if not status:
2496     if required:
2497       _Fail(tbv)
2498     else:
2499       return False
2500
2501   if max(tbv.api_versions) < constants.OS_API_V20:
2502     return True
2503
2504   if constants.OS_VALIDATE_PARAMETERS in checks:
2505     _CheckOSPList(tbv, osparams.keys())
2506
2507   validate_env = OSCoreEnv(osname, tbv, osparams)
2508   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2509                         cwd=tbv.path)
2510   if result.failed:
2511     logging.error("os validate command '%s' returned error: %s output: %s",
2512                   result.cmd, result.fail_reason, result.output)
2513     _Fail("OS validation script failed (%s), output: %s",
2514           result.fail_reason, result.output, log=False)
2515
2516   return True
2517
2518
2519 def DemoteFromMC():
2520   """Demotes the current node from master candidate role.
2521
2522   """
2523   # try to ensure we're not the master by mistake
2524   master, myself = ssconf.GetMasterAndMyself()
2525   if master == myself:
2526     _Fail("ssconf status shows I'm the master node, will not demote")
2527
2528   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2529   if not result.failed:
2530     _Fail("The master daemon is running, will not demote")
2531
2532   try:
2533     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2534       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2535   except EnvironmentError, err:
2536     if err.errno != errno.ENOENT:
2537       _Fail("Error while backing up cluster file: %s", err, exc=True)
2538
2539   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2540
2541
2542 def _GetX509Filenames(cryptodir, name):
2543   """Returns the full paths for the private key and certificate.
2544
2545   """
2546   return (utils.PathJoin(cryptodir, name),
2547           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2548           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2549
2550
2551 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2552   """Creates a new X509 certificate for SSL/TLS.
2553
2554   @type validity: int
2555   @param validity: Validity in seconds
2556   @rtype: tuple; (string, string)
2557   @return: Certificate name and public part
2558
2559   """
2560   (key_pem, cert_pem) = \
2561     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2562                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2563
2564   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2565                               prefix="x509-%s-" % utils.TimestampForFilename())
2566   try:
2567     name = os.path.basename(cert_dir)
2568     assert len(name) > 5
2569
2570     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2571
2572     utils.WriteFile(key_file, mode=0400, data=key_pem)
2573     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2574
2575     # Never return private key as it shouldn't leave the node
2576     return (name, cert_pem)
2577   except Exception:
2578     shutil.rmtree(cert_dir, ignore_errors=True)
2579     raise
2580
2581
2582 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2583   """Removes a X509 certificate.
2584
2585   @type name: string
2586   @param name: Certificate name
2587
2588   """
2589   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2590
2591   utils.RemoveFile(key_file)
2592   utils.RemoveFile(cert_file)
2593
2594   try:
2595     os.rmdir(cert_dir)
2596   except EnvironmentError, err:
2597     _Fail("Cannot remove certificate directory '%s': %s",
2598           cert_dir, err)
2599
2600
2601 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2602   """Returns the command for the requested input/output.
2603
2604   @type instance: L{objects.Instance}
2605   @param instance: The instance object
2606   @param mode: Import/export mode
2607   @param ieio: Input/output type
2608   @param ieargs: Input/output arguments
2609
2610   """
2611   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2612
2613   env = None
2614   prefix = None
2615   suffix = None
2616   exp_size = None
2617
2618   if ieio == constants.IEIO_FILE:
2619     (filename, ) = ieargs
2620
2621     if not utils.IsNormAbsPath(filename):
2622       _Fail("Path '%s' is not normalized or absolute", filename)
2623
2624     directory = os.path.normpath(os.path.dirname(filename))
2625
2626     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2627         constants.EXPORT_DIR):
2628       _Fail("File '%s' is not under exports directory '%s'",
2629             filename, constants.EXPORT_DIR)
2630
2631     # Create directory
2632     utils.Makedirs(directory, mode=0750)
2633
2634     quoted_filename = utils.ShellQuote(filename)
2635
2636     if mode == constants.IEM_IMPORT:
2637       suffix = "> %s" % quoted_filename
2638     elif mode == constants.IEM_EXPORT:
2639       suffix = "< %s" % quoted_filename
2640
2641       # Retrieve file size
2642       try:
2643         st = os.stat(filename)
2644       except EnvironmentError, err:
2645         logging.error("Can't stat(2) %s: %s", filename, err)
2646       else:
2647         exp_size = utils.BytesToMebibyte(st.st_size)
2648
2649   elif ieio == constants.IEIO_RAW_DISK:
2650     (disk, ) = ieargs
2651
2652     real_disk = _OpenRealBD(disk)
2653
2654     if mode == constants.IEM_IMPORT:
2655       # we set here a smaller block size as, due to transport buffering, more
2656       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2657       # is not already there or we pass a wrong path; we use notrunc to no
2658       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2659       # much memory; this means that at best, we flush every 64k, which will
2660       # not be very fast
2661       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2662                                     " bs=%s oflag=dsync"),
2663                                     real_disk.dev_path,
2664                                     str(64 * 1024))
2665
2666     elif mode == constants.IEM_EXPORT:
2667       # the block size on the read dd is 1MiB to match our units
2668       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2669                                    real_disk.dev_path,
2670                                    str(1024 * 1024), # 1 MB
2671                                    str(disk.size))
2672       exp_size = disk.size
2673
2674   elif ieio == constants.IEIO_SCRIPT:
2675     (disk, disk_index, ) = ieargs
2676
2677     assert isinstance(disk_index, (int, long))
2678
2679     real_disk = _OpenRealBD(disk)
2680
2681     inst_os = OSFromDisk(instance.os)
2682     env = OSEnvironment(instance, inst_os)
2683
2684     if mode == constants.IEM_IMPORT:
2685       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2686       env["IMPORT_INDEX"] = str(disk_index)
2687       script = inst_os.import_script
2688
2689     elif mode == constants.IEM_EXPORT:
2690       env["EXPORT_DEVICE"] = real_disk.dev_path
2691       env["EXPORT_INDEX"] = str(disk_index)
2692       script = inst_os.export_script
2693
2694     # TODO: Pass special environment only to script
2695     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2696
2697     if mode == constants.IEM_IMPORT:
2698       suffix = "| %s" % script_cmd
2699
2700     elif mode == constants.IEM_EXPORT:
2701       prefix = "%s |" % script_cmd
2702
2703     # Let script predict size
2704     exp_size = constants.IE_CUSTOM_SIZE
2705
2706   else:
2707     _Fail("Invalid %s I/O mode %r", mode, ieio)
2708
2709   return (env, prefix, suffix, exp_size)
2710
2711
2712 def _CreateImportExportStatusDir(prefix):
2713   """Creates status directory for import/export.
2714
2715   """
2716   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2717                           prefix=("%s-%s-" %
2718                                   (prefix, utils.TimestampForFilename())))
2719
2720
2721 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2722   """Starts an import or export daemon.
2723
2724   @param mode: Import/output mode
2725   @type opts: L{objects.ImportExportOptions}
2726   @param opts: Daemon options
2727   @type host: string
2728   @param host: Remote host for export (None for import)
2729   @type port: int
2730   @param port: Remote port for export (None for import)
2731   @type instance: L{objects.Instance}
2732   @param instance: Instance object
2733   @param ieio: Input/output type
2734   @param ieioargs: Input/output arguments
2735
2736   """
2737   if mode == constants.IEM_IMPORT:
2738     prefix = "import"
2739
2740     if not (host is None and port is None):
2741       _Fail("Can not specify host or port on import")
2742
2743   elif mode == constants.IEM_EXPORT:
2744     prefix = "export"
2745
2746     if host is None or port is None:
2747       _Fail("Host and port must be specified for an export")
2748
2749   else:
2750     _Fail("Invalid mode %r", mode)
2751
2752   if (opts.key_name is None) ^ (opts.ca_pem is None):
2753     _Fail("Cluster certificate can only be used for both key and CA")
2754
2755   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2756     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2757
2758   if opts.key_name is None:
2759     # Use server.pem
2760     key_path = constants.NODED_CERT_FILE
2761     cert_path = constants.NODED_CERT_FILE
2762     assert opts.ca_pem is None
2763   else:
2764     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2765                                                  opts.key_name)
2766     assert opts.ca_pem is not None
2767
2768   for i in [key_path, cert_path]:
2769     if not os.path.exists(i):
2770       _Fail("File '%s' does not exist" % i)
2771
2772   status_dir = _CreateImportExportStatusDir(prefix)
2773   try:
2774     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2775     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2776     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2777
2778     if opts.ca_pem is None:
2779       # Use server.pem
2780       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2781     else:
2782       ca = opts.ca_pem
2783
2784     # Write CA file
2785     utils.WriteFile(ca_file, data=ca, mode=0400)
2786
2787     cmd = [
2788       constants.IMPORT_EXPORT_DAEMON,
2789       status_file, mode,
2790       "--key=%s" % key_path,
2791       "--cert=%s" % cert_path,
2792       "--ca=%s" % ca_file,
2793       ]
2794
2795     if host:
2796       cmd.append("--host=%s" % host)
2797
2798     if port:
2799       cmd.append("--port=%s" % port)
2800
2801     if opts.compress:
2802       cmd.append("--compress=%s" % opts.compress)
2803
2804     if opts.magic:
2805       cmd.append("--magic=%s" % opts.magic)
2806
2807     if exp_size is not None:
2808       cmd.append("--expected-size=%s" % exp_size)
2809
2810     if cmd_prefix:
2811       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2812
2813     if cmd_suffix:
2814       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2815
2816     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2817
2818     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2819     # support for receiving a file descriptor for output
2820     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2821                       output=logfile)
2822
2823     # The import/export name is simply the status directory name
2824     return os.path.basename(status_dir)
2825
2826   except Exception:
2827     shutil.rmtree(status_dir, ignore_errors=True)
2828     raise
2829
2830
2831 def GetImportExportStatus(names):
2832   """Returns import/export daemon status.
2833
2834   @type names: sequence
2835   @param names: List of names
2836   @rtype: List of dicts
2837   @return: Returns a list of the state of each named import/export or None if a
2838            status couldn't be read
2839
2840   """
2841   result = []
2842
2843   for name in names:
2844     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2845                                  _IES_STATUS_FILE)
2846
2847     try:
2848       data = utils.ReadFile(status_file)
2849     except EnvironmentError, err:
2850       if err.errno != errno.ENOENT:
2851         raise
2852       data = None
2853
2854     if not data:
2855       result.append(None)
2856       continue
2857
2858     result.append(serializer.LoadJson(data))
2859
2860   return result
2861
2862
2863 def AbortImportExport(name):
2864   """Sends SIGTERM to a running import/export daemon.
2865
2866   """
2867   logging.info("Abort import/export %s", name)
2868
2869   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2870   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2871
2872   if pid:
2873     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2874                  name, pid)
2875     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2876
2877
2878 def CleanupImportExport(name):
2879   """Cleanup after an import or export.
2880
2881   If the import/export daemon is still running it's killed. Afterwards the
2882   whole status directory is removed.
2883
2884   """
2885   logging.info("Finalizing import/export %s", name)
2886
2887   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2888
2889   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2890
2891   if pid:
2892     logging.info("Import/export %s is still running with PID %s",
2893                  name, pid)
2894     utils.KillProcess(pid, waitpid=False)
2895
2896   shutil.rmtree(status_dir, ignore_errors=True)
2897
2898
2899 def _FindDisks(nodes_ip, disks):
2900   """Sets the physical ID on disks and returns the block devices.
2901
2902   """
2903   # set the correct physical ID
2904   my_name = netutils.Hostname.GetSysName()
2905   for cf in disks:
2906     cf.SetPhysicalID(my_name, nodes_ip)
2907
2908   bdevs = []
2909
2910   for cf in disks:
2911     rd = _RecursiveFindBD(cf)
2912     if rd is None:
2913       _Fail("Can't find device %s", cf)
2914     bdevs.append(rd)
2915   return bdevs
2916
2917
2918 def DrbdDisconnectNet(nodes_ip, disks):
2919   """Disconnects the network on a list of drbd devices.
2920
2921   """
2922   bdevs = _FindDisks(nodes_ip, disks)
2923
2924   # disconnect disks
2925   for rd in bdevs:
2926     try:
2927       rd.DisconnectNet()
2928     except errors.BlockDeviceError, err:
2929       _Fail("Can't change network configuration to standalone mode: %s",
2930             err, exc=True)
2931
2932
2933 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2934   """Attaches the network on a list of drbd devices.
2935
2936   """
2937   bdevs = _FindDisks(nodes_ip, disks)
2938
2939   if multimaster:
2940     for idx, rd in enumerate(bdevs):
2941       try:
2942         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2943       except EnvironmentError, err:
2944         _Fail("Can't create symlink: %s", err)
2945   # reconnect disks, switch to new master configuration and if
2946   # needed primary mode
2947   for rd in bdevs:
2948     try:
2949       rd.AttachNet(multimaster)
2950     except errors.BlockDeviceError, err:
2951       _Fail("Can't change network configuration: %s", err)
2952
2953   # wait until the disks are connected; we need to retry the re-attach
2954   # if the device becomes standalone, as this might happen if the one
2955   # node disconnects and reconnects in a different mode before the
2956   # other node reconnects; in this case, one or both of the nodes will
2957   # decide it has wrong configuration and switch to standalone
2958
2959   def _Attach():
2960     all_connected = True
2961
2962     for rd in bdevs:
2963       stats = rd.GetProcStatus()
2964
2965       all_connected = (all_connected and
2966                        (stats.is_connected or stats.is_in_resync))
2967
2968       if stats.is_standalone:
2969         # peer had different config info and this node became
2970         # standalone, even though this should not happen with the
2971         # new staged way of changing disk configs
2972         try:
2973           rd.AttachNet(multimaster)
2974         except errors.BlockDeviceError, err:
2975           _Fail("Can't change network configuration: %s", err)
2976
2977     if not all_connected:
2978       raise utils.RetryAgain()
2979
2980   try:
2981     # Start with a delay of 100 miliseconds and go up to 5 seconds
2982     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2983   except utils.RetryTimeout:
2984     _Fail("Timeout in disk reconnecting")
2985
2986   if multimaster:
2987     # change to primary mode
2988     for rd in bdevs:
2989       try:
2990         rd.Open()
2991       except errors.BlockDeviceError, err:
2992         _Fail("Can't change to primary mode: %s", err)
2993
2994
2995 def DrbdWaitSync(nodes_ip, disks):
2996   """Wait until DRBDs have synchronized.
2997
2998   """
2999   def _helper(rd):
3000     stats = rd.GetProcStatus()
3001     if not (stats.is_connected or stats.is_in_resync):
3002       raise utils.RetryAgain()
3003     return stats
3004
3005   bdevs = _FindDisks(nodes_ip, disks)
3006
3007   min_resync = 100
3008   alldone = True
3009   for rd in bdevs:
3010     try:
3011       # poll each second for 15 seconds
3012       stats = utils.Retry(_helper, 1, 15, args=[rd])
3013     except utils.RetryTimeout:
3014       stats = rd.GetProcStatus()
3015       # last check
3016       if not (stats.is_connected or stats.is_in_resync):
3017         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3018     alldone = alldone and (not stats.is_in_resync)
3019     if stats.sync_percent is not None:
3020       min_resync = min(min_resync, stats.sync_percent)
3021
3022   return (alldone, min_resync)
3023
3024
3025 def GetDrbdUsermodeHelper():
3026   """Returns DRBD usermode helper currently configured.
3027
3028   """
3029   try:
3030     return bdev.BaseDRBD.GetUsermodeHelper()
3031   except errors.BlockDeviceError, err:
3032     _Fail(str(err))
3033
3034
3035 def PowercycleNode(hypervisor_type):
3036   """Hard-powercycle the node.
3037
3038   Because we need to return first, and schedule the powercycle in the
3039   background, we won't be able to report failures nicely.
3040
3041   """
3042   hyper = hypervisor.GetHypervisor(hypervisor_type)
3043   try:
3044     pid = os.fork()
3045   except OSError:
3046     # if we can't fork, we'll pretend that we're in the child process
3047     pid = 0
3048   if pid > 0:
3049     return "Reboot scheduled in 5 seconds"
3050   # ensure the child is running on ram
3051   try:
3052     utils.Mlockall()
3053   except Exception: # pylint: disable-msg=W0703
3054     pass
3055   time.sleep(5)
3056   hyper.PowercycleNode()
3057
3058
3059 class HooksRunner(object):
3060   """Hook runner.
3061
3062   This class is instantiated on the node side (ganeti-noded) and not
3063   on the master side.
3064
3065   """
3066   def __init__(self, hooks_base_dir=None):
3067     """Constructor for hooks runner.
3068
3069     @type hooks_base_dir: str or None
3070     @param hooks_base_dir: if not None, this overrides the
3071         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3072
3073     """
3074     if hooks_base_dir is None:
3075       hooks_base_dir = constants.HOOKS_BASE_DIR
3076     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3077     # constant
3078     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3079
3080   def RunHooks(self, hpath, phase, env):
3081     """Run the scripts in the hooks directory.
3082
3083     @type hpath: str
3084     @param hpath: the path to the hooks directory which
3085         holds the scripts
3086     @type phase: str
3087     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3088         L{constants.HOOKS_PHASE_POST}
3089     @type env: dict
3090     @param env: dictionary with the environment for the hook
3091     @rtype: list
3092     @return: list of 3-element tuples:
3093       - script path
3094       - script result, either L{constants.HKR_SUCCESS} or
3095         L{constants.HKR_FAIL}
3096       - output of the script
3097
3098     @raise errors.ProgrammerError: for invalid input
3099         parameters
3100
3101     """
3102     if phase == constants.HOOKS_PHASE_PRE:
3103       suffix = "pre"
3104     elif phase == constants.HOOKS_PHASE_POST:
3105       suffix = "post"
3106     else:
3107       _Fail("Unknown hooks phase '%s'", phase)
3108
3109
3110     subdir = "%s-%s.d" % (hpath, suffix)
3111     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3112
3113     results = []
3114
3115     if not os.path.isdir(dir_name):
3116       # for non-existing/non-dirs, we simply exit instead of logging a
3117       # warning at every operation
3118       return results
3119
3120     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3121
3122     for (relname, relstatus, runresult)  in runparts_results:
3123       if relstatus == constants.RUNPARTS_SKIP:
3124         rrval = constants.HKR_SKIP
3125         output = ""
3126       elif relstatus == constants.RUNPARTS_ERR:
3127         rrval = constants.HKR_FAIL
3128         output = "Hook script execution error: %s" % runresult
3129       elif relstatus == constants.RUNPARTS_RUN:
3130         if runresult.failed:
3131           rrval = constants.HKR_FAIL
3132         else:
3133           rrval = constants.HKR_SUCCESS
3134         output = utils.SafeEncode(runresult.output.strip())
3135       results.append(("%s/%s" % (subdir, relname), rrval, output))
3136
3137     return results
3138
3139
3140 class IAllocatorRunner(object):
3141   """IAllocator runner.
3142
3143   This class is instantiated on the node side (ganeti-noded) and not on
3144   the master side.
3145
3146   """
3147   @staticmethod
3148   def Run(name, idata):
3149     """Run an iallocator script.
3150
3151     @type name: str
3152     @param name: the iallocator script name
3153     @type idata: str
3154     @param idata: the allocator input data
3155
3156     @rtype: tuple
3157     @return: two element tuple of:
3158        - status
3159        - either error message or stdout of allocator (for success)
3160
3161     """
3162     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3163                                   os.path.isfile)
3164     if alloc_script is None:
3165       _Fail("iallocator module '%s' not found in the search path", name)
3166
3167     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3168     try:
3169       os.write(fd, idata)
3170       os.close(fd)
3171       result = utils.RunCmd([alloc_script, fin_name])
3172       if result.failed:
3173         _Fail("iallocator module '%s' failed: %s, output '%s'",
3174               name, result.fail_reason, result.output)
3175     finally:
3176       os.unlink(fin_name)
3177
3178     return result.stdout
3179
3180
3181 class DevCacheManager(object):
3182   """Simple class for managing a cache of block device information.
3183
3184   """
3185   _DEV_PREFIX = "/dev/"
3186   _ROOT_DIR = constants.BDEV_CACHE_DIR
3187
3188   @classmethod
3189   def _ConvertPath(cls, dev_path):
3190     """Converts a /dev/name path to the cache file name.
3191
3192     This replaces slashes with underscores and strips the /dev
3193     prefix. It then returns the full path to the cache file.
3194
3195     @type dev_path: str
3196     @param dev_path: the C{/dev/} path name
3197     @rtype: str
3198     @return: the converted path name
3199
3200     """
3201     if dev_path.startswith(cls._DEV_PREFIX):
3202       dev_path = dev_path[len(cls._DEV_PREFIX):]
3203     dev_path = dev_path.replace("/", "_")
3204     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3205     return fpath
3206
3207   @classmethod
3208   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3209     """Updates the cache information for a given device.
3210
3211     @type dev_path: str
3212     @param dev_path: the pathname of the device
3213     @type owner: str
3214     @param owner: the owner (instance name) of the device
3215     @type on_primary: bool
3216     @param on_primary: whether this is the primary
3217         node nor not
3218     @type iv_name: str
3219     @param iv_name: the instance-visible name of the
3220         device, as in objects.Disk.iv_name
3221
3222     @rtype: None
3223
3224     """
3225     if dev_path is None:
3226       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3227       return
3228     fpath = cls._ConvertPath(dev_path)
3229     if on_primary:
3230       state = "primary"
3231     else:
3232       state = "secondary"
3233     if iv_name is None:
3234       iv_name = "not_visible"
3235     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3236     try:
3237       utils.WriteFile(fpath, data=fdata)
3238     except EnvironmentError, err:
3239       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3240
3241   @classmethod
3242   def RemoveCache(cls, dev_path):
3243     """Remove data for a dev_path.
3244
3245     This is just a wrapper over L{utils.RemoveFile} with a converted
3246     path name and logging.
3247
3248     @type dev_path: str
3249     @param dev_path: the pathname of the device
3250
3251     @rtype: None
3252
3253     """
3254     if dev_path is None:
3255       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3256       return
3257     fpath = cls._ConvertPath(dev_path)
3258     try:
3259       utils.RemoveFile(fpath)
3260     except EnvironmentError, err:
3261       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)