jqueue/gnt-job: Add job priority fields for display
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79
80 class RPCFail(Exception):
81   """Class denoting RPC failure.
82
83   Its argument is the error message.
84
85   """
86
87
88 def _Fail(msg, *args, **kwargs):
89   """Log an error and the raise an RPCFail exception.
90
91   This exception is then handled specially in the ganeti daemon and
92   turned into a 'failed' return type. As such, this function is a
93   useful shortcut for logging the error and returning it to the master
94   daemon.
95
96   @type msg: string
97   @param msg: the text of the exception
98   @raise RPCFail
99
100   """
101   if args:
102     msg = msg % args
103   if "log" not in kwargs or kwargs["log"]: # if we should log this error
104     if "exc" in kwargs and kwargs["exc"]:
105       logging.exception(msg)
106     else:
107       logging.error(msg)
108   raise RPCFail(msg)
109
110
111 def _GetConfig():
112   """Simple wrapper to return a SimpleStore.
113
114   @rtype: L{ssconf.SimpleStore}
115   @return: a SimpleStore instance
116
117   """
118   return ssconf.SimpleStore()
119
120
121 def _GetSshRunner(cluster_name):
122   """Simple wrapper to return an SshRunner.
123
124   @type cluster_name: str
125   @param cluster_name: the cluster name, which is needed
126       by the SshRunner constructor
127   @rtype: L{ssh.SshRunner}
128   @return: an SshRunner instance
129
130   """
131   return ssh.SshRunner(cluster_name)
132
133
134 def _Decompress(data):
135   """Unpacks data compressed by the RPC client.
136
137   @type data: list or tuple
138   @param data: Data sent by RPC client
139   @rtype: str
140   @return: Decompressed data
141
142   """
143   assert isinstance(data, (list, tuple))
144   assert len(data) == 2
145   (encoding, content) = data
146   if encoding == constants.RPC_ENCODING_NONE:
147     return content
148   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149     return zlib.decompress(base64.b64decode(content))
150   else:
151     raise AssertionError("Unknown data encoding")
152
153
154 def _CleanDirectory(path, exclude=None):
155   """Removes all regular files in a directory.
156
157   @type path: str
158   @param path: the directory to clean
159   @type exclude: list
160   @param exclude: list of files to be excluded, defaults
161       to the empty list
162
163   """
164   if path not in _ALLOWED_CLEAN_DIRS:
165     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166           path)
167
168   if not os.path.isdir(path):
169     return
170   if exclude is None:
171     exclude = []
172   else:
173     # Normalize excluded paths
174     exclude = [os.path.normpath(i) for i in exclude]
175
176   for rel_name in utils.ListVisibleFiles(path):
177     full_name = utils.PathJoin(path, rel_name)
178     if full_name in exclude:
179       continue
180     if os.path.isfile(full_name) and not os.path.islink(full_name):
181       utils.RemoveFile(full_name)
182
183
184 def _BuildUploadFileList():
185   """Build the list of allowed upload files.
186
187   This is abstracted so that it's built only once at module import time.
188
189   """
190   allowed_files = set([
191     constants.CLUSTER_CONF_FILE,
192     constants.ETC_HOSTS,
193     constants.SSH_KNOWN_HOSTS_FILE,
194     constants.VNC_PASSWORD_FILE,
195     constants.RAPI_CERT_FILE,
196     constants.RAPI_USERS_FILE,
197     constants.CONFD_HMAC_KEY,
198     constants.CLUSTER_DOMAIN_SECRET_FILE,
199     ])
200
201   for hv_name in constants.HYPER_TYPES:
202     hv_class = hypervisor.GetHypervisorClass(hv_name)
203     allowed_files.update(hv_class.GetAncillaryFiles())
204
205   return frozenset(allowed_files)
206
207
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
209
210
211 def JobQueuePurge():
212   """Removes job queue files and archived jobs.
213
214   @rtype: tuple
215   @return: True, None
216
217   """
218   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
220
221
222 def GetMasterInfo():
223   """Returns master information.
224
225   This is an utility function to compute master information, either
226   for consumption here or from the node daemon.
227
228   @rtype: tuple
229   @return: master_netdev, master_ip, master_name, primary_ip_family
230   @raise RPCFail: in case of errors
231
232   """
233   try:
234     cfg = _GetConfig()
235     master_netdev = cfg.GetMasterNetdev()
236     master_ip = cfg.GetMasterIP()
237     master_node = cfg.GetMasterNode()
238     primary_ip_family = cfg.GetPrimaryIPFamily()
239   except errors.ConfigurationError, err:
240     _Fail("Cluster configuration incomplete: %s", err, exc=True)
241   return (master_netdev, master_ip, master_node, primary_ip_family)
242
243
244 def StartMaster(start_daemons, no_voting):
245   """Activate local node as master node.
246
247   The function will either try activate the IP address of the master
248   (unless someone else has it) or also start the master daemons, based
249   on the start_daemons parameter.
250
251   @type start_daemons: boolean
252   @param start_daemons: whether to start the master daemons
253       (ganeti-masterd and ganeti-rapi), or (if false) activate the
254       master ip
255   @type no_voting: boolean
256   @param no_voting: whether to start ganeti-masterd without a node vote
257       (if start_daemons is True), but still non-interactively
258   @rtype: None
259
260   """
261   # GetMasterInfo will raise an exception if not able to return data
262   master_netdev, master_ip, _, family = GetMasterInfo()
263
264   err_msgs = []
265   # either start the master and rapi daemons
266   if start_daemons:
267     if no_voting:
268       masterd_args = "--no-voting --yes-do-it"
269     else:
270       masterd_args = ""
271
272     env = {
273       "EXTRA_MASTERD_ARGS": masterd_args,
274       }
275
276     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
277     if result.failed:
278       msg = "Can't start Ganeti master: %s" % result.output
279       logging.error(msg)
280       err_msgs.append(msg)
281   # or activate the IP
282   else:
283     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284       if netutils.IPAddress.Own(master_ip):
285         # we already have the ip:
286         logging.debug("Master IP already configured, doing nothing")
287       else:
288         msg = "Someone else has the master ip, not activating"
289         logging.error(msg)
290         err_msgs.append(msg)
291     else:
292       ipcls = netutils.IP4Address
293       if family == netutils.IP6Address.family:
294         ipcls = netutils.IP6Address
295
296       result = utils.RunCmd(["ip", "address", "add",
297                              "%s/%d" % (master_ip, ipcls.iplen),
298                              "dev", master_netdev, "label",
299                              "%s:0" % master_netdev])
300       if result.failed:
301         msg = "Can't activate master IP: %s" % result.output
302         logging.error(msg)
303         err_msgs.append(msg)
304
305       # we ignore the exit code of the following cmds
306       if ipcls == netutils.IP4Address:
307         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308                       master_ip, master_ip])
309       elif ipcls == netutils.IP6Address:
310         try:
311           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312         except errors.OpExecError:
313           # TODO: Better error reporting
314           logging.warning("Can't execute ndisc6, please install if missing")
315
316   if err_msgs:
317     _Fail("; ".join(err_msgs))
318
319
320 def StopMaster(stop_daemons):
321   """Deactivate this node as master.
322
323   The function will always try to deactivate the IP address of the
324   master. It will also stop the master daemons depending on the
325   stop_daemons parameter.
326
327   @type stop_daemons: boolean
328   @param stop_daemons: whether to also stop the master daemons
329       (ganeti-masterd and ganeti-rapi)
330   @rtype: None
331
332   """
333   # TODO: log and report back to the caller the error failures; we
334   # need to decide in which case we fail the RPC for this
335
336   # GetMasterInfo will raise an exception if not able to return data
337   master_netdev, master_ip, _, family = GetMasterInfo()
338
339   ipcls = netutils.IP4Address
340   if family == netutils.IP6Address.family:
341     ipcls = netutils.IP6Address
342
343   result = utils.RunCmd(["ip", "address", "del",
344                          "%s/%d" % (master_ip, ipcls.iplen),
345                          "dev", master_netdev])
346   if result.failed:
347     logging.error("Can't remove the master IP, error: %s", result.output)
348     # but otherwise ignore the failure
349
350   if stop_daemons:
351     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
352     if result.failed:
353       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
354                     " and error %s",
355                     result.cmd, result.exit_code, result.output)
356
357
358 def EtcHostsModify(mode, host, ip):
359   """Modify a host entry in /etc/hosts.
360
361   @param mode: The mode to operate. Either add or remove entry
362   @param host: The host to operate on
363   @param ip: The ip associated with the entry
364
365   """
366   if mode == constants.ETC_HOSTS_ADD:
367     if not ip:
368       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
369               " present")
370     utils.AddHostToEtcHosts(host, ip)
371   elif mode == constants.ETC_HOSTS_REMOVE:
372     if ip:
373       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374               " parameter is present")
375     utils.RemoveHostFromEtcHosts(host)
376   else:
377     RPCFail("Mode not supported")
378
379
380 def LeaveCluster(modify_ssh_setup):
381   """Cleans up and remove the current node.
382
383   This function cleans up and prepares the current node to be removed
384   from the cluster.
385
386   If processing is successful, then it raises an
387   L{errors.QuitGanetiException} which is used as a special case to
388   shutdown the node daemon.
389
390   @param modify_ssh_setup: boolean
391
392   """
393   _CleanDirectory(constants.DATA_DIR)
394   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
395   JobQueuePurge()
396
397   if modify_ssh_setup:
398     try:
399       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
400
401       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
402
403       utils.RemoveFile(priv_key)
404       utils.RemoveFile(pub_key)
405     except errors.OpExecError:
406       logging.exception("Error while processing ssh files")
407
408   try:
409     utils.RemoveFile(constants.CONFD_HMAC_KEY)
410     utils.RemoveFile(constants.RAPI_CERT_FILE)
411     utils.RemoveFile(constants.NODED_CERT_FILE)
412   except: # pylint: disable-msg=W0702
413     logging.exception("Error while removing cluster secrets")
414
415   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
416   if result.failed:
417     logging.error("Command %s failed with exitcode %s and error %s",
418                   result.cmd, result.exit_code, result.output)
419
420   # Raise a custom exception (handled in ganeti-noded)
421   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
422
423
424 def GetNodeInfo(vgname, hypervisor_type):
425   """Gives back a hash with different information about the node.
426
427   @type vgname: C{string}
428   @param vgname: the name of the volume group to ask for disk space information
429   @type hypervisor_type: C{str}
430   @param hypervisor_type: the name of the hypervisor to ask for
431       memory information
432   @rtype: C{dict}
433   @return: dictionary with the following keys:
434       - vg_size is the size of the configured volume group in MiB
435       - vg_free is the free size of the volume group in MiB
436       - memory_dom0 is the memory allocated for domain0 in MiB
437       - memory_free is the currently available (free) ram in MiB
438       - memory_total is the total number of ram in MiB
439
440   """
441   outputarray = {}
442
443   vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444   vg_free = vg_size = None
445   if vginfo:
446     vg_free = int(round(vginfo[0][0], 0))
447     vg_size = int(round(vginfo[0][1], 0))
448
449   outputarray['vg_size'] = vg_size
450   outputarray['vg_free'] = vg_free
451
452   hyper = hypervisor.GetHypervisor(hypervisor_type)
453   hyp_info = hyper.GetNodeInfo()
454   if hyp_info is not None:
455     outputarray.update(hyp_info)
456
457   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
458
459   return outputarray
460
461
462 def VerifyNode(what, cluster_name):
463   """Verify the status of the local node.
464
465   Based on the input L{what} parameter, various checks are done on the
466   local node.
467
468   If the I{filelist} key is present, this list of
469   files is checksummed and the file/checksum pairs are returned.
470
471   If the I{nodelist} key is present, we check that we have
472   connectivity via ssh with the target nodes (and check the hostname
473   report).
474
475   If the I{node-net-test} key is present, we check that we have
476   connectivity to the given nodes via both primary IP and, if
477   applicable, secondary IPs.
478
479   @type what: C{dict}
480   @param what: a dictionary of things to check:
481       - filelist: list of files for which to compute checksums
482       - nodelist: list of nodes we should check ssh communication with
483       - node-net-test: list of nodes we should check node daemon port
484         connectivity with
485       - hypervisor: list with hypervisors to run the verify for
486   @rtype: dict
487   @return: a dictionary with the same keys as the input dict, and
488       values representing the result of the checks
489
490   """
491   result = {}
492   my_name = netutils.Hostname.GetSysName()
493   port = netutils.GetDaemonPort(constants.NODED)
494
495   if constants.NV_HYPERVISOR in what:
496     result[constants.NV_HYPERVISOR] = tmp = {}
497     for hv_name in what[constants.NV_HYPERVISOR]:
498       try:
499         val = hypervisor.GetHypervisor(hv_name).Verify()
500       except errors.HypervisorError, err:
501         val = "Error while checking hypervisor: %s" % str(err)
502       tmp[hv_name] = val
503
504   if constants.NV_FILELIST in what:
505     result[constants.NV_FILELIST] = utils.FingerprintFiles(
506       what[constants.NV_FILELIST])
507
508   if constants.NV_NODELIST in what:
509     result[constants.NV_NODELIST] = tmp = {}
510     random.shuffle(what[constants.NV_NODELIST])
511     for node in what[constants.NV_NODELIST]:
512       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
513       if not success:
514         tmp[node] = message
515
516   if constants.NV_NODENETTEST in what:
517     result[constants.NV_NODENETTEST] = tmp = {}
518     my_pip = my_sip = None
519     for name, pip, sip in what[constants.NV_NODENETTEST]:
520       if name == my_name:
521         my_pip = pip
522         my_sip = sip
523         break
524     if not my_pip:
525       tmp[my_name] = ("Can't find my own primary/secondary IP"
526                       " in the node list")
527     else:
528       for name, pip, sip in what[constants.NV_NODENETTEST]:
529         fail = []
530         if not netutils.TcpPing(pip, port, source=my_pip):
531           fail.append("primary")
532         if sip != pip:
533           if not netutils.TcpPing(sip, port, source=my_sip):
534             fail.append("secondary")
535         if fail:
536           tmp[name] = ("failure using the %s interface(s)" %
537                        " and ".join(fail))
538
539   if constants.NV_MASTERIP in what:
540     # FIXME: add checks on incoming data structures (here and in the
541     # rest of the function)
542     master_name, master_ip = what[constants.NV_MASTERIP]
543     if master_name == my_name:
544       source = constants.IP4_ADDRESS_LOCALHOST
545     else:
546       source = None
547     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
548                                                   source=source)
549
550   if constants.NV_LVLIST in what:
551     try:
552       val = GetVolumeList(what[constants.NV_LVLIST])
553     except RPCFail, err:
554       val = str(err)
555     result[constants.NV_LVLIST] = val
556
557   if constants.NV_INSTANCELIST in what:
558     # GetInstanceList can fail
559     try:
560       val = GetInstanceList(what[constants.NV_INSTANCELIST])
561     except RPCFail, err:
562       val = str(err)
563     result[constants.NV_INSTANCELIST] = val
564
565   if constants.NV_VGLIST in what:
566     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
567
568   if constants.NV_PVLIST in what:
569     result[constants.NV_PVLIST] = \
570       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
571                                    filter_allocatable=False)
572
573   if constants.NV_VERSION in what:
574     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
575                                     constants.RELEASE_VERSION)
576
577   if constants.NV_HVINFO in what:
578     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
579     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
580
581   if constants.NV_DRBDLIST in what:
582     try:
583       used_minors = bdev.DRBD8.GetUsedDevs().keys()
584     except errors.BlockDeviceError, err:
585       logging.warning("Can't get used minors list", exc_info=True)
586       used_minors = str(err)
587     result[constants.NV_DRBDLIST] = used_minors
588
589   if constants.NV_DRBDHELPER in what:
590     status = True
591     try:
592       payload = bdev.BaseDRBD.GetUsermodeHelper()
593     except errors.BlockDeviceError, err:
594       logging.error("Can't get DRBD usermode helper: %s", str(err))
595       status = False
596       payload = str(err)
597     result[constants.NV_DRBDHELPER] = (status, payload)
598
599   if constants.NV_NODESETUP in what:
600     result[constants.NV_NODESETUP] = tmpr = []
601     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
602       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
603                   " under /sys, missing required directories /sys/block"
604                   " and /sys/class/net")
605     if (not os.path.isdir("/proc/sys") or
606         not os.path.isfile("/proc/sysrq-trigger")):
607       tmpr.append("The procfs filesystem doesn't seem to be mounted"
608                   " under /proc, missing required directory /proc/sys and"
609                   " the file /proc/sysrq-trigger")
610
611   if constants.NV_TIME in what:
612     result[constants.NV_TIME] = utils.SplitTime(time.time())
613
614   if constants.NV_OSLIST in what:
615     result[constants.NV_OSLIST] = DiagnoseOS()
616
617   return result
618
619
620 def GetVolumeList(vg_name):
621   """Compute list of logical volumes and their size.
622
623   @type vg_name: str
624   @param vg_name: the volume group whose LVs we should list
625   @rtype: dict
626   @return:
627       dictionary of all partions (key) with value being a tuple of
628       their size (in MiB), inactive and online status::
629
630         {'test1': ('20.06', True, True)}
631
632       in case of errors, a string is returned with the error
633       details.
634
635   """
636   lvs = {}
637   sep = '|'
638   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
639                          "--separator=%s" % sep,
640                          "-olv_name,lv_size,lv_attr", vg_name])
641   if result.failed:
642     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
643
644   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
645   for line in result.stdout.splitlines():
646     line = line.strip()
647     match = valid_line_re.match(line)
648     if not match:
649       logging.error("Invalid line returned from lvs output: '%s'", line)
650       continue
651     name, size, attr = match.groups()
652     inactive = attr[4] == '-'
653     online = attr[5] == 'o'
654     virtual = attr[0] == 'v'
655     if virtual:
656       # we don't want to report such volumes as existing, since they
657       # don't really hold data
658       continue
659     lvs[name] = (size, inactive, online)
660
661   return lvs
662
663
664 def ListVolumeGroups():
665   """List the volume groups and their size.
666
667   @rtype: dict
668   @return: dictionary with keys volume name and values the
669       size of the volume
670
671   """
672   return utils.ListVolumeGroups()
673
674
675 def NodeVolumes():
676   """List all volumes on this node.
677
678   @rtype: list
679   @return:
680     A list of dictionaries, each having four keys:
681       - name: the logical volume name,
682       - size: the size of the logical volume
683       - dev: the physical device on which the LV lives
684       - vg: the volume group to which it belongs
685
686     In case of errors, we return an empty list and log the
687     error.
688
689     Note that since a logical volume can live on multiple physical
690     volumes, the resulting list might include a logical volume
691     multiple times.
692
693   """
694   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
695                          "--separator=|",
696                          "--options=lv_name,lv_size,devices,vg_name"])
697   if result.failed:
698     _Fail("Failed to list logical volumes, lvs output: %s",
699           result.output)
700
701   def parse_dev(dev):
702     return dev.split('(')[0]
703
704   def handle_dev(dev):
705     return [parse_dev(x) for x in dev.split(",")]
706
707   def map_line(line):
708     line = [v.strip() for v in line]
709     return [{'name': line[0], 'size': line[1],
710              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
711
712   all_devs = []
713   for line in result.stdout.splitlines():
714     if line.count('|') >= 3:
715       all_devs.extend(map_line(line.split('|')))
716     else:
717       logging.warning("Strange line in the output from lvs: '%s'", line)
718   return all_devs
719
720
721 def BridgesExist(bridges_list):
722   """Check if a list of bridges exist on the current node.
723
724   @rtype: boolean
725   @return: C{True} if all of them exist, C{False} otherwise
726
727   """
728   missing = []
729   for bridge in bridges_list:
730     if not utils.BridgeExists(bridge):
731       missing.append(bridge)
732
733   if missing:
734     _Fail("Missing bridges %s", utils.CommaJoin(missing))
735
736
737 def GetInstanceList(hypervisor_list):
738   """Provides a list of instances.
739
740   @type hypervisor_list: list
741   @param hypervisor_list: the list of hypervisors to query information
742
743   @rtype: list
744   @return: a list of all running instances on the current node
745     - instance1.example.com
746     - instance2.example.com
747
748   """
749   results = []
750   for hname in hypervisor_list:
751     try:
752       names = hypervisor.GetHypervisor(hname).ListInstances()
753       results.extend(names)
754     except errors.HypervisorError, err:
755       _Fail("Error enumerating instances (hypervisor %s): %s",
756             hname, err, exc=True)
757
758   return results
759
760
761 def GetInstanceInfo(instance, hname):
762   """Gives back the information about an instance as a dictionary.
763
764   @type instance: string
765   @param instance: the instance name
766   @type hname: string
767   @param hname: the hypervisor type of the instance
768
769   @rtype: dict
770   @return: dictionary with the following keys:
771       - memory: memory size of instance (int)
772       - state: xen state of instance (string)
773       - time: cpu time of instance (float)
774
775   """
776   output = {}
777
778   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
779   if iinfo is not None:
780     output['memory'] = iinfo[2]
781     output['state'] = iinfo[4]
782     output['time'] = iinfo[5]
783
784   return output
785
786
787 def GetInstanceMigratable(instance):
788   """Gives whether an instance can be migrated.
789
790   @type instance: L{objects.Instance}
791   @param instance: object representing the instance to be checked.
792
793   @rtype: tuple
794   @return: tuple of (result, description) where:
795       - result: whether the instance can be migrated or not
796       - description: a description of the issue, if relevant
797
798   """
799   hyper = hypervisor.GetHypervisor(instance.hypervisor)
800   iname = instance.name
801   if iname not in hyper.ListInstances():
802     _Fail("Instance %s is not running", iname)
803
804   for idx in range(len(instance.disks)):
805     link_name = _GetBlockDevSymlinkPath(iname, idx)
806     if not os.path.islink(link_name):
807       logging.warning("Instance %s is missing symlink %s for disk %d",
808                       iname, link_name, idx)
809
810
811 def GetAllInstancesInfo(hypervisor_list):
812   """Gather data about all instances.
813
814   This is the equivalent of L{GetInstanceInfo}, except that it
815   computes data for all instances at once, thus being faster if one
816   needs data about more than one instance.
817
818   @type hypervisor_list: list
819   @param hypervisor_list: list of hypervisors to query for instance data
820
821   @rtype: dict
822   @return: dictionary of instance: data, with data having the following keys:
823       - memory: memory size of instance (int)
824       - state: xen state of instance (string)
825       - time: cpu time of instance (float)
826       - vcpus: the number of vcpus
827
828   """
829   output = {}
830
831   for hname in hypervisor_list:
832     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
833     if iinfo:
834       for name, _, memory, vcpus, state, times in iinfo:
835         value = {
836           'memory': memory,
837           'vcpus': vcpus,
838           'state': state,
839           'time': times,
840           }
841         if name in output:
842           # we only check static parameters, like memory and vcpus,
843           # and not state and time which can change between the
844           # invocations of the different hypervisors
845           for key in 'memory', 'vcpus':
846             if value[key] != output[name][key]:
847               _Fail("Instance %s is running twice"
848                     " with different parameters", name)
849         output[name] = value
850
851   return output
852
853
854 def _InstanceLogName(kind, os_name, instance):
855   """Compute the OS log filename for a given instance and operation.
856
857   The instance name and os name are passed in as strings since not all
858   operations have these as part of an instance object.
859
860   @type kind: string
861   @param kind: the operation type (e.g. add, import, etc.)
862   @type os_name: string
863   @param os_name: the os name
864   @type instance: string
865   @param instance: the name of the instance being imported/added/etc.
866
867   """
868   # TODO: Use tempfile.mkstemp to create unique filename
869   base = ("%s-%s-%s-%s.log" %
870           (kind, os_name, instance, utils.TimestampForFilename()))
871   return utils.PathJoin(constants.LOG_OS_DIR, base)
872
873
874 def InstanceOsAdd(instance, reinstall, debug):
875   """Add an OS to an instance.
876
877   @type instance: L{objects.Instance}
878   @param instance: Instance whose OS is to be installed
879   @type reinstall: boolean
880   @param reinstall: whether this is an instance reinstall
881   @type debug: integer
882   @param debug: debug level, passed to the OS scripts
883   @rtype: None
884
885   """
886   inst_os = OSFromDisk(instance.os)
887
888   create_env = OSEnvironment(instance, inst_os, debug)
889   if reinstall:
890     create_env['INSTANCE_REINSTALL'] = "1"
891
892   logfile = _InstanceLogName("add", instance.os, instance.name)
893
894   result = utils.RunCmd([inst_os.create_script], env=create_env,
895                         cwd=inst_os.path, output=logfile,)
896   if result.failed:
897     logging.error("os create command '%s' returned error: %s, logfile: %s,"
898                   " output: %s", result.cmd, result.fail_reason, logfile,
899                   result.output)
900     lines = [utils.SafeEncode(val)
901              for val in utils.TailFile(logfile, lines=20)]
902     _Fail("OS create script failed (%s), last lines in the"
903           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
904
905
906 def RunRenameInstance(instance, old_name, debug):
907   """Run the OS rename script for an instance.
908
909   @type instance: L{objects.Instance}
910   @param instance: Instance whose OS is to be installed
911   @type old_name: string
912   @param old_name: previous instance name
913   @type debug: integer
914   @param debug: debug level, passed to the OS scripts
915   @rtype: boolean
916   @return: the success of the operation
917
918   """
919   inst_os = OSFromDisk(instance.os)
920
921   rename_env = OSEnvironment(instance, inst_os, debug)
922   rename_env['OLD_INSTANCE_NAME'] = old_name
923
924   logfile = _InstanceLogName("rename", instance.os,
925                              "%s-%s" % (old_name, instance.name))
926
927   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928                         cwd=inst_os.path, output=logfile)
929
930   if result.failed:
931     logging.error("os create command '%s' returned error: %s output: %s",
932                   result.cmd, result.fail_reason, result.output)
933     lines = [utils.SafeEncode(val)
934              for val in utils.TailFile(logfile, lines=20)]
935     _Fail("OS rename script failed (%s), last lines in the"
936           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
937
938
939 def _GetBlockDevSymlinkPath(instance_name, idx):
940   return utils.PathJoin(constants.DISK_LINKS_DIR,
941                         "%s:%d" % (instance_name, idx))
942
943
944 def _SymlinkBlockDev(instance_name, device_path, idx):
945   """Set up symlinks to a instance's block device.
946
947   This is an auxiliary function run when an instance is start (on the primary
948   node) or when an instance is migrated (on the target node).
949
950
951   @param instance_name: the name of the target instance
952   @param device_path: path of the physical block device, on the node
953   @param idx: the disk index
954   @return: absolute path to the disk's symlink
955
956   """
957   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
958   try:
959     os.symlink(device_path, link_name)
960   except OSError, err:
961     if err.errno == errno.EEXIST:
962       if (not os.path.islink(link_name) or
963           os.readlink(link_name) != device_path):
964         os.remove(link_name)
965         os.symlink(device_path, link_name)
966     else:
967       raise
968
969   return link_name
970
971
972 def _RemoveBlockDevLinks(instance_name, disks):
973   """Remove the block device symlinks belonging to the given instance.
974
975   """
976   for idx, _ in enumerate(disks):
977     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978     if os.path.islink(link_name):
979       try:
980         os.remove(link_name)
981       except OSError:
982         logging.exception("Can't remove symlink '%s'", link_name)
983
984
985 def _GatherAndLinkBlockDevs(instance):
986   """Set up an instance's block device(s).
987
988   This is run on the primary node at instance startup. The block
989   devices must be already assembled.
990
991   @type instance: L{objects.Instance}
992   @param instance: the instance whose disks we shoul assemble
993   @rtype: list
994   @return: list of (disk_object, device_path)
995
996   """
997   block_devices = []
998   for idx, disk in enumerate(instance.disks):
999     device = _RecursiveFindBD(disk)
1000     if device is None:
1001       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1002                                     str(disk))
1003     device.Open()
1004     try:
1005       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1006     except OSError, e:
1007       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1008                                     e.strerror)
1009
1010     block_devices.append((disk, link_name))
1011
1012   return block_devices
1013
1014
1015 def StartInstance(instance):
1016   """Start an instance.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: the instance object
1020   @rtype: None
1021
1022   """
1023   running_instances = GetInstanceList([instance.hypervisor])
1024
1025   if instance.name in running_instances:
1026     logging.info("Instance %s already running, not starting", instance.name)
1027     return
1028
1029   try:
1030     block_devices = _GatherAndLinkBlockDevs(instance)
1031     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1032     hyper.StartInstance(instance, block_devices)
1033   except errors.BlockDeviceError, err:
1034     _Fail("Block device error: %s", err, exc=True)
1035   except errors.HypervisorError, err:
1036     _RemoveBlockDevLinks(instance.name, instance.disks)
1037     _Fail("Hypervisor error: %s", err, exc=True)
1038
1039
1040 def InstanceShutdown(instance, timeout):
1041   """Shut an instance down.
1042
1043   @note: this functions uses polling with a hardcoded timeout.
1044
1045   @type instance: L{objects.Instance}
1046   @param instance: the instance object
1047   @type timeout: integer
1048   @param timeout: maximum timeout for soft shutdown
1049   @rtype: None
1050
1051   """
1052   hv_name = instance.hypervisor
1053   hyper = hypervisor.GetHypervisor(hv_name)
1054   iname = instance.name
1055
1056   if instance.name not in hyper.ListInstances():
1057     logging.info("Instance %s not running, doing nothing", iname)
1058     return
1059
1060   class _TryShutdown:
1061     def __init__(self):
1062       self.tried_once = False
1063
1064     def __call__(self):
1065       if iname not in hyper.ListInstances():
1066         return
1067
1068       try:
1069         hyper.StopInstance(instance, retry=self.tried_once)
1070       except errors.HypervisorError, err:
1071         if iname not in hyper.ListInstances():
1072           # if the instance is no longer existing, consider this a
1073           # success and go to cleanup
1074           return
1075
1076         _Fail("Failed to stop instance %s: %s", iname, err)
1077
1078       self.tried_once = True
1079
1080       raise utils.RetryAgain()
1081
1082   try:
1083     utils.Retry(_TryShutdown(), 5, timeout)
1084   except utils.RetryTimeout:
1085     # the shutdown did not succeed
1086     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1087
1088     try:
1089       hyper.StopInstance(instance, force=True)
1090     except errors.HypervisorError, err:
1091       if iname in hyper.ListInstances():
1092         # only raise an error if the instance still exists, otherwise
1093         # the error could simply be "instance ... unknown"!
1094         _Fail("Failed to force stop instance %s: %s", iname, err)
1095
1096     time.sleep(1)
1097
1098     if iname in hyper.ListInstances():
1099       _Fail("Could not shutdown instance %s even by destroy", iname)
1100
1101   try:
1102     hyper.CleanupInstance(instance.name)
1103   except errors.HypervisorError, err:
1104     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1105
1106   _RemoveBlockDevLinks(iname, instance.disks)
1107
1108
1109 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1110   """Reboot an instance.
1111
1112   @type instance: L{objects.Instance}
1113   @param instance: the instance object to reboot
1114   @type reboot_type: str
1115   @param reboot_type: the type of reboot, one the following
1116     constants:
1117       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1118         instance OS, do not recreate the VM
1119       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1120         restart the VM (at the hypervisor level)
1121       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1122         not accepted here, since that mode is handled differently, in
1123         cmdlib, and translates into full stop and start of the
1124         instance (instead of a call_instance_reboot RPC)
1125   @type shutdown_timeout: integer
1126   @param shutdown_timeout: maximum timeout for soft shutdown
1127   @rtype: None
1128
1129   """
1130   running_instances = GetInstanceList([instance.hypervisor])
1131
1132   if instance.name not in running_instances:
1133     _Fail("Cannot reboot instance %s that is not running", instance.name)
1134
1135   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1136   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1137     try:
1138       hyper.RebootInstance(instance)
1139     except errors.HypervisorError, err:
1140       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1141   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1142     try:
1143       InstanceShutdown(instance, shutdown_timeout)
1144       return StartInstance(instance)
1145     except errors.HypervisorError, err:
1146       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1147   else:
1148     _Fail("Invalid reboot_type received: %s", reboot_type)
1149
1150
1151 def MigrationInfo(instance):
1152   """Gather information about an instance to be migrated.
1153
1154   @type instance: L{objects.Instance}
1155   @param instance: the instance definition
1156
1157   """
1158   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159   try:
1160     info = hyper.MigrationInfo(instance)
1161   except errors.HypervisorError, err:
1162     _Fail("Failed to fetch migration information: %s", err, exc=True)
1163   return info
1164
1165
1166 def AcceptInstance(instance, info, target):
1167   """Prepare the node to accept an instance.
1168
1169   @type instance: L{objects.Instance}
1170   @param instance: the instance definition
1171   @type info: string/data (opaque)
1172   @param info: migration information, from the source node
1173   @type target: string
1174   @param target: target host (usually ip), on this node
1175
1176   """
1177   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1178   try:
1179     hyper.AcceptInstance(instance, info, target)
1180   except errors.HypervisorError, err:
1181     _Fail("Failed to accept instance: %s", err, exc=True)
1182
1183
1184 def FinalizeMigration(instance, info, success):
1185   """Finalize any preparation to accept an instance.
1186
1187   @type instance: L{objects.Instance}
1188   @param instance: the instance definition
1189   @type info: string/data (opaque)
1190   @param info: migration information, from the source node
1191   @type success: boolean
1192   @param success: whether the migration was a success or a failure
1193
1194   """
1195   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1196   try:
1197     hyper.FinalizeMigration(instance, info, success)
1198   except errors.HypervisorError, err:
1199     _Fail("Failed to finalize migration: %s", err, exc=True)
1200
1201
1202 def MigrateInstance(instance, target, live):
1203   """Migrates an instance to another node.
1204
1205   @type instance: L{objects.Instance}
1206   @param instance: the instance definition
1207   @type target: string
1208   @param target: the target node name
1209   @type live: boolean
1210   @param live: whether the migration should be done live or not (the
1211       interpretation of this parameter is left to the hypervisor)
1212   @rtype: tuple
1213   @return: a tuple of (success, msg) where:
1214       - succes is a boolean denoting the success/failure of the operation
1215       - msg is a string with details in case of failure
1216
1217   """
1218   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1219
1220   try:
1221     hyper.MigrateInstance(instance, target, live)
1222   except errors.HypervisorError, err:
1223     _Fail("Failed to migrate instance: %s", err, exc=True)
1224
1225
1226 def BlockdevCreate(disk, size, owner, on_primary, info):
1227   """Creates a block device for an instance.
1228
1229   @type disk: L{objects.Disk}
1230   @param disk: the object describing the disk we should create
1231   @type size: int
1232   @param size: the size of the physical underlying device, in MiB
1233   @type owner: str
1234   @param owner: the name of the instance for which disk is created,
1235       used for device cache data
1236   @type on_primary: boolean
1237   @param on_primary:  indicates if it is the primary node or not
1238   @type info: string
1239   @param info: string that will be sent to the physical device
1240       creation, used for example to set (LVM) tags on LVs
1241
1242   @return: the new unique_id of the device (this can sometime be
1243       computed only after creation), or None. On secondary nodes,
1244       it's not required to return anything.
1245
1246   """
1247   # TODO: remove the obsolete 'size' argument
1248   # pylint: disable-msg=W0613
1249   clist = []
1250   if disk.children:
1251     for child in disk.children:
1252       try:
1253         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1254       except errors.BlockDeviceError, err:
1255         _Fail("Can't assemble device %s: %s", child, err)
1256       if on_primary or disk.AssembleOnSecondary():
1257         # we need the children open in case the device itself has to
1258         # be assembled
1259         try:
1260           # pylint: disable-msg=E1103
1261           crdev.Open()
1262         except errors.BlockDeviceError, err:
1263           _Fail("Can't make child '%s' read-write: %s", child, err)
1264       clist.append(crdev)
1265
1266   try:
1267     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1268   except errors.BlockDeviceError, err:
1269     _Fail("Can't create block device: %s", err)
1270
1271   if on_primary or disk.AssembleOnSecondary():
1272     try:
1273       device.Assemble()
1274     except errors.BlockDeviceError, err:
1275       _Fail("Can't assemble device after creation, unusual event: %s", err)
1276     device.SetSyncSpeed(constants.SYNC_SPEED)
1277     if on_primary or disk.OpenOnSecondary():
1278       try:
1279         device.Open(force=True)
1280       except errors.BlockDeviceError, err:
1281         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1282     DevCacheManager.UpdateCache(device.dev_path, owner,
1283                                 on_primary, disk.iv_name)
1284
1285   device.SetInfo(info)
1286
1287   return device.unique_id
1288
1289
1290 def BlockdevRemove(disk):
1291   """Remove a block device.
1292
1293   @note: This is intended to be called recursively.
1294
1295   @type disk: L{objects.Disk}
1296   @param disk: the disk object we should remove
1297   @rtype: boolean
1298   @return: the success of the operation
1299
1300   """
1301   msgs = []
1302   try:
1303     rdev = _RecursiveFindBD(disk)
1304   except errors.BlockDeviceError, err:
1305     # probably can't attach
1306     logging.info("Can't attach to device %s in remove", disk)
1307     rdev = None
1308   if rdev is not None:
1309     r_path = rdev.dev_path
1310     try:
1311       rdev.Remove()
1312     except errors.BlockDeviceError, err:
1313       msgs.append(str(err))
1314     if not msgs:
1315       DevCacheManager.RemoveCache(r_path)
1316
1317   if disk.children:
1318     for child in disk.children:
1319       try:
1320         BlockdevRemove(child)
1321       except RPCFail, err:
1322         msgs.append(str(err))
1323
1324   if msgs:
1325     _Fail("; ".join(msgs))
1326
1327
1328 def _RecursiveAssembleBD(disk, owner, as_primary):
1329   """Activate a block device for an instance.
1330
1331   This is run on the primary and secondary nodes for an instance.
1332
1333   @note: this function is called recursively.
1334
1335   @type disk: L{objects.Disk}
1336   @param disk: the disk we try to assemble
1337   @type owner: str
1338   @param owner: the name of the instance which owns the disk
1339   @type as_primary: boolean
1340   @param as_primary: if we should make the block device
1341       read/write
1342
1343   @return: the assembled device or None (in case no device
1344       was assembled)
1345   @raise errors.BlockDeviceError: in case there is an error
1346       during the activation of the children or the device
1347       itself
1348
1349   """
1350   children = []
1351   if disk.children:
1352     mcn = disk.ChildrenNeeded()
1353     if mcn == -1:
1354       mcn = 0 # max number of Nones allowed
1355     else:
1356       mcn = len(disk.children) - mcn # max number of Nones
1357     for chld_disk in disk.children:
1358       try:
1359         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1360       except errors.BlockDeviceError, err:
1361         if children.count(None) >= mcn:
1362           raise
1363         cdev = None
1364         logging.error("Error in child activation (but continuing): %s",
1365                       str(err))
1366       children.append(cdev)
1367
1368   if as_primary or disk.AssembleOnSecondary():
1369     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1370     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1371     result = r_dev
1372     if as_primary or disk.OpenOnSecondary():
1373       r_dev.Open()
1374     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1375                                 as_primary, disk.iv_name)
1376
1377   else:
1378     result = True
1379   return result
1380
1381
1382 def BlockdevAssemble(disk, owner, as_primary):
1383   """Activate a block device for an instance.
1384
1385   This is a wrapper over _RecursiveAssembleBD.
1386
1387   @rtype: str or boolean
1388   @return: a C{/dev/...} path for primary nodes, and
1389       C{True} for secondary nodes
1390
1391   """
1392   try:
1393     result = _RecursiveAssembleBD(disk, owner, as_primary)
1394     if isinstance(result, bdev.BlockDev):
1395       # pylint: disable-msg=E1103
1396       result = result.dev_path
1397   except errors.BlockDeviceError, err:
1398     _Fail("Error while assembling disk: %s", err, exc=True)
1399
1400   return result
1401
1402
1403 def BlockdevShutdown(disk):
1404   """Shut down a block device.
1405
1406   First, if the device is assembled (Attach() is successful), then
1407   the device is shutdown. Then the children of the device are
1408   shutdown.
1409
1410   This function is called recursively. Note that we don't cache the
1411   children or such, as oppossed to assemble, shutdown of different
1412   devices doesn't require that the upper device was active.
1413
1414   @type disk: L{objects.Disk}
1415   @param disk: the description of the disk we should
1416       shutdown
1417   @rtype: None
1418
1419   """
1420   msgs = []
1421   r_dev = _RecursiveFindBD(disk)
1422   if r_dev is not None:
1423     r_path = r_dev.dev_path
1424     try:
1425       r_dev.Shutdown()
1426       DevCacheManager.RemoveCache(r_path)
1427     except errors.BlockDeviceError, err:
1428       msgs.append(str(err))
1429
1430   if disk.children:
1431     for child in disk.children:
1432       try:
1433         BlockdevShutdown(child)
1434       except RPCFail, err:
1435         msgs.append(str(err))
1436
1437   if msgs:
1438     _Fail("; ".join(msgs))
1439
1440
1441 def BlockdevAddchildren(parent_cdev, new_cdevs):
1442   """Extend a mirrored block device.
1443
1444   @type parent_cdev: L{objects.Disk}
1445   @param parent_cdev: the disk to which we should add children
1446   @type new_cdevs: list of L{objects.Disk}
1447   @param new_cdevs: the list of children which we should add
1448   @rtype: None
1449
1450   """
1451   parent_bdev = _RecursiveFindBD(parent_cdev)
1452   if parent_bdev is None:
1453     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1454   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1455   if new_bdevs.count(None) > 0:
1456     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1457   parent_bdev.AddChildren(new_bdevs)
1458
1459
1460 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1461   """Shrink a mirrored block device.
1462
1463   @type parent_cdev: L{objects.Disk}
1464   @param parent_cdev: the disk from which we should remove children
1465   @type new_cdevs: list of L{objects.Disk}
1466   @param new_cdevs: the list of children which we should remove
1467   @rtype: None
1468
1469   """
1470   parent_bdev = _RecursiveFindBD(parent_cdev)
1471   if parent_bdev is None:
1472     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1473   devs = []
1474   for disk in new_cdevs:
1475     rpath = disk.StaticDevPath()
1476     if rpath is None:
1477       bd = _RecursiveFindBD(disk)
1478       if bd is None:
1479         _Fail("Can't find device %s while removing children", disk)
1480       else:
1481         devs.append(bd.dev_path)
1482     else:
1483       if not utils.IsNormAbsPath(rpath):
1484         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1485       devs.append(rpath)
1486   parent_bdev.RemoveChildren(devs)
1487
1488
1489 def BlockdevGetmirrorstatus(disks):
1490   """Get the mirroring status of a list of devices.
1491
1492   @type disks: list of L{objects.Disk}
1493   @param disks: the list of disks which we should query
1494   @rtype: disk
1495   @return:
1496       a list of (mirror_done, estimated_time) tuples, which
1497       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1498   @raise errors.BlockDeviceError: if any of the disks cannot be
1499       found
1500
1501   """
1502   stats = []
1503   for dsk in disks:
1504     rbd = _RecursiveFindBD(dsk)
1505     if rbd is None:
1506       _Fail("Can't find device %s", dsk)
1507
1508     stats.append(rbd.CombinedSyncStatus())
1509
1510   return stats
1511
1512
1513 def _RecursiveFindBD(disk):
1514   """Check if a device is activated.
1515
1516   If so, return information about the real device.
1517
1518   @type disk: L{objects.Disk}
1519   @param disk: the disk object we need to find
1520
1521   @return: None if the device can't be found,
1522       otherwise the device instance
1523
1524   """
1525   children = []
1526   if disk.children:
1527     for chdisk in disk.children:
1528       children.append(_RecursiveFindBD(chdisk))
1529
1530   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1531
1532
1533 def _OpenRealBD(disk):
1534   """Opens the underlying block device of a disk.
1535
1536   @type disk: L{objects.Disk}
1537   @param disk: the disk object we want to open
1538
1539   """
1540   real_disk = _RecursiveFindBD(disk)
1541   if real_disk is None:
1542     _Fail("Block device '%s' is not set up", disk)
1543
1544   real_disk.Open()
1545
1546   return real_disk
1547
1548
1549 def BlockdevFind(disk):
1550   """Check if a device is activated.
1551
1552   If it is, return information about the real device.
1553
1554   @type disk: L{objects.Disk}
1555   @param disk: the disk to find
1556   @rtype: None or objects.BlockDevStatus
1557   @return: None if the disk cannot be found, otherwise a the current
1558            information
1559
1560   """
1561   try:
1562     rbd = _RecursiveFindBD(disk)
1563   except errors.BlockDeviceError, err:
1564     _Fail("Failed to find device: %s", err, exc=True)
1565
1566   if rbd is None:
1567     return None
1568
1569   return rbd.GetSyncStatus()
1570
1571
1572 def BlockdevGetsize(disks):
1573   """Computes the size of the given disks.
1574
1575   If a disk is not found, returns None instead.
1576
1577   @type disks: list of L{objects.Disk}
1578   @param disks: the list of disk to compute the size for
1579   @rtype: list
1580   @return: list with elements None if the disk cannot be found,
1581       otherwise the size
1582
1583   """
1584   result = []
1585   for cf in disks:
1586     try:
1587       rbd = _RecursiveFindBD(cf)
1588     except errors.BlockDeviceError:
1589       result.append(None)
1590       continue
1591     if rbd is None:
1592       result.append(None)
1593     else:
1594       result.append(rbd.GetActualSize())
1595   return result
1596
1597
1598 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1599   """Export a block device to a remote node.
1600
1601   @type disk: L{objects.Disk}
1602   @param disk: the description of the disk to export
1603   @type dest_node: str
1604   @param dest_node: the destination node to export to
1605   @type dest_path: str
1606   @param dest_path: the destination path on the target node
1607   @type cluster_name: str
1608   @param cluster_name: the cluster name, needed for SSH hostalias
1609   @rtype: None
1610
1611   """
1612   real_disk = _OpenRealBD(disk)
1613
1614   # the block size on the read dd is 1MiB to match our units
1615   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1616                                "dd if=%s bs=1048576 count=%s",
1617                                real_disk.dev_path, str(disk.size))
1618
1619   # we set here a smaller block size as, due to ssh buffering, more
1620   # than 64-128k will mostly ignored; we use nocreat to fail if the
1621   # device is not already there or we pass a wrong path; we use
1622   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1623   # to not buffer too much memory; this means that at best, we flush
1624   # every 64k, which will not be very fast
1625   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1626                                 " oflag=dsync", dest_path)
1627
1628   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1629                                                    constants.GANETI_RUNAS,
1630                                                    destcmd)
1631
1632   # all commands have been checked, so we're safe to combine them
1633   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1634
1635   result = utils.RunCmd(["bash", "-c", command])
1636
1637   if result.failed:
1638     _Fail("Disk copy command '%s' returned error: %s"
1639           " output: %s", command, result.fail_reason, result.output)
1640
1641
1642 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1643   """Write a file to the filesystem.
1644
1645   This allows the master to overwrite(!) a file. It will only perform
1646   the operation if the file belongs to a list of configuration files.
1647
1648   @type file_name: str
1649   @param file_name: the target file name
1650   @type data: str
1651   @param data: the new contents of the file
1652   @type mode: int
1653   @param mode: the mode to give the file (can be None)
1654   @type uid: int
1655   @param uid: the owner of the file (can be -1 for default)
1656   @type gid: int
1657   @param gid: the group of the file (can be -1 for default)
1658   @type atime: float
1659   @param atime: the atime to set on the file (can be None)
1660   @type mtime: float
1661   @param mtime: the mtime to set on the file (can be None)
1662   @rtype: None
1663
1664   """
1665   if not os.path.isabs(file_name):
1666     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1667
1668   if file_name not in _ALLOWED_UPLOAD_FILES:
1669     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1670           file_name)
1671
1672   raw_data = _Decompress(data)
1673
1674   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1675                   atime=atime, mtime=mtime)
1676
1677
1678 def WriteSsconfFiles(values):
1679   """Update all ssconf files.
1680
1681   Wrapper around the SimpleStore.WriteFiles.
1682
1683   """
1684   ssconf.SimpleStore().WriteFiles(values)
1685
1686
1687 def _ErrnoOrStr(err):
1688   """Format an EnvironmentError exception.
1689
1690   If the L{err} argument has an errno attribute, it will be looked up
1691   and converted into a textual C{E...} description. Otherwise the
1692   string representation of the error will be returned.
1693
1694   @type err: L{EnvironmentError}
1695   @param err: the exception to format
1696
1697   """
1698   if hasattr(err, 'errno'):
1699     detail = errno.errorcode[err.errno]
1700   else:
1701     detail = str(err)
1702   return detail
1703
1704
1705 def _OSOndiskAPIVersion(os_dir):
1706   """Compute and return the API version of a given OS.
1707
1708   This function will try to read the API version of the OS residing in
1709   the 'os_dir' directory.
1710
1711   @type os_dir: str
1712   @param os_dir: the directory in which we should look for the OS
1713   @rtype: tuple
1714   @return: tuple (status, data) with status denoting the validity and
1715       data holding either the vaid versions or an error message
1716
1717   """
1718   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1719
1720   try:
1721     st = os.stat(api_file)
1722   except EnvironmentError, err:
1723     return False, ("Required file '%s' not found under path %s: %s" %
1724                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1725
1726   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1727     return False, ("File '%s' in %s is not a regular file" %
1728                    (constants.OS_API_FILE, os_dir))
1729
1730   try:
1731     api_versions = utils.ReadFile(api_file).splitlines()
1732   except EnvironmentError, err:
1733     return False, ("Error while reading the API version file at %s: %s" %
1734                    (api_file, _ErrnoOrStr(err)))
1735
1736   try:
1737     api_versions = [int(version.strip()) for version in api_versions]
1738   except (TypeError, ValueError), err:
1739     return False, ("API version(s) can't be converted to integer: %s" %
1740                    str(err))
1741
1742   return True, api_versions
1743
1744
1745 def DiagnoseOS(top_dirs=None):
1746   """Compute the validity for all OSes.
1747
1748   @type top_dirs: list
1749   @param top_dirs: the list of directories in which to
1750       search (if not given defaults to
1751       L{constants.OS_SEARCH_PATH})
1752   @rtype: list of L{objects.OS}
1753   @return: a list of tuples (name, path, status, diagnose, variants,
1754       parameters, api_version) for all (potential) OSes under all
1755       search paths, where:
1756           - name is the (potential) OS name
1757           - path is the full path to the OS
1758           - status True/False is the validity of the OS
1759           - diagnose is the error message for an invalid OS, otherwise empty
1760           - variants is a list of supported OS variants, if any
1761           - parameters is a list of (name, help) parameters, if any
1762           - api_version is a list of support OS API versions
1763
1764   """
1765   if top_dirs is None:
1766     top_dirs = constants.OS_SEARCH_PATH
1767
1768   result = []
1769   for dir_name in top_dirs:
1770     if os.path.isdir(dir_name):
1771       try:
1772         f_names = utils.ListVisibleFiles(dir_name)
1773       except EnvironmentError, err:
1774         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1775         break
1776       for name in f_names:
1777         os_path = utils.PathJoin(dir_name, name)
1778         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1779         if status:
1780           diagnose = ""
1781           variants = os_inst.supported_variants
1782           parameters = os_inst.supported_parameters
1783           api_versions = os_inst.api_versions
1784         else:
1785           diagnose = os_inst
1786           variants = parameters = api_versions = []
1787         result.append((name, os_path, status, diagnose, variants,
1788                        parameters, api_versions))
1789
1790   return result
1791
1792
1793 def _TryOSFromDisk(name, base_dir=None):
1794   """Create an OS instance from disk.
1795
1796   This function will return an OS instance if the given name is a
1797   valid OS name.
1798
1799   @type base_dir: string
1800   @keyword base_dir: Base directory containing OS installations.
1801                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1802   @rtype: tuple
1803   @return: success and either the OS instance if we find a valid one,
1804       or error message
1805
1806   """
1807   if base_dir is None:
1808     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1809   else:
1810     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1811
1812   if os_dir is None:
1813     return False, "Directory for OS %s not found in search path" % name
1814
1815   status, api_versions = _OSOndiskAPIVersion(os_dir)
1816   if not status:
1817     # push the error up
1818     return status, api_versions
1819
1820   if not constants.OS_API_VERSIONS.intersection(api_versions):
1821     return False, ("API version mismatch for path '%s': found %s, want %s." %
1822                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1823
1824   # OS Files dictionary, we will populate it with the absolute path names
1825   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1826
1827   if max(api_versions) >= constants.OS_API_V15:
1828     os_files[constants.OS_VARIANTS_FILE] = ''
1829
1830   if max(api_versions) >= constants.OS_API_V20:
1831     os_files[constants.OS_PARAMETERS_FILE] = ''
1832   else:
1833     del os_files[constants.OS_SCRIPT_VERIFY]
1834
1835   for filename in os_files:
1836     os_files[filename] = utils.PathJoin(os_dir, filename)
1837
1838     try:
1839       st = os.stat(os_files[filename])
1840     except EnvironmentError, err:
1841       return False, ("File '%s' under path '%s' is missing (%s)" %
1842                      (filename, os_dir, _ErrnoOrStr(err)))
1843
1844     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1845       return False, ("File '%s' under path '%s' is not a regular file" %
1846                      (filename, os_dir))
1847
1848     if filename in constants.OS_SCRIPTS:
1849       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1850         return False, ("File '%s' under path '%s' is not executable" %
1851                        (filename, os_dir))
1852
1853   variants = []
1854   if constants.OS_VARIANTS_FILE in os_files:
1855     variants_file = os_files[constants.OS_VARIANTS_FILE]
1856     try:
1857       variants = utils.ReadFile(variants_file).splitlines()
1858     except EnvironmentError, err:
1859       return False, ("Error while reading the OS variants file at %s: %s" %
1860                      (variants_file, _ErrnoOrStr(err)))
1861     if not variants:
1862       return False, ("No supported os variant found")
1863
1864   parameters = []
1865   if constants.OS_PARAMETERS_FILE in os_files:
1866     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1867     try:
1868       parameters = utils.ReadFile(parameters_file).splitlines()
1869     except EnvironmentError, err:
1870       return False, ("Error while reading the OS parameters file at %s: %s" %
1871                      (parameters_file, _ErrnoOrStr(err)))
1872     parameters = [v.split(None, 1) for v in parameters]
1873
1874   os_obj = objects.OS(name=name, path=os_dir,
1875                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1876                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1877                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1878                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1879                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1880                                                  None),
1881                       supported_variants=variants,
1882                       supported_parameters=parameters,
1883                       api_versions=api_versions)
1884   return True, os_obj
1885
1886
1887 def OSFromDisk(name, base_dir=None):
1888   """Create an OS instance from disk.
1889
1890   This function will return an OS instance if the given name is a
1891   valid OS name. Otherwise, it will raise an appropriate
1892   L{RPCFail} exception, detailing why this is not a valid OS.
1893
1894   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1895   an exception but returns true/false status data.
1896
1897   @type base_dir: string
1898   @keyword base_dir: Base directory containing OS installations.
1899                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1900   @rtype: L{objects.OS}
1901   @return: the OS instance if we find a valid one
1902   @raise RPCFail: if we don't find a valid OS
1903
1904   """
1905   name_only = objects.OS.GetName(name)
1906   status, payload = _TryOSFromDisk(name_only, base_dir)
1907
1908   if not status:
1909     _Fail(payload)
1910
1911   return payload
1912
1913
1914 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1915   """Calculate the basic environment for an os script.
1916
1917   @type os_name: str
1918   @param os_name: full operating system name (including variant)
1919   @type inst_os: L{objects.OS}
1920   @param inst_os: operating system for which the environment is being built
1921   @type os_params: dict
1922   @param os_params: the OS parameters
1923   @type debug: integer
1924   @param debug: debug level (0 or 1, for OS Api 10)
1925   @rtype: dict
1926   @return: dict of environment variables
1927   @raise errors.BlockDeviceError: if the block device
1928       cannot be found
1929
1930   """
1931   result = {}
1932   api_version = \
1933     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1934   result['OS_API_VERSION'] = '%d' % api_version
1935   result['OS_NAME'] = inst_os.name
1936   result['DEBUG_LEVEL'] = '%d' % debug
1937
1938   # OS variants
1939   if api_version >= constants.OS_API_V15:
1940     variant = objects.OS.GetVariant(os_name)
1941     if not variant:
1942       variant = inst_os.supported_variants[0]
1943     result['OS_VARIANT'] = variant
1944
1945   # OS params
1946   for pname, pvalue in os_params.items():
1947     result['OSP_%s' % pname.upper()] = pvalue
1948
1949   return result
1950
1951
1952 def OSEnvironment(instance, inst_os, debug=0):
1953   """Calculate the environment for an os script.
1954
1955   @type instance: L{objects.Instance}
1956   @param instance: target instance for the os script run
1957   @type inst_os: L{objects.OS}
1958   @param inst_os: operating system for which the environment is being built
1959   @type debug: integer
1960   @param debug: debug level (0 or 1, for OS Api 10)
1961   @rtype: dict
1962   @return: dict of environment variables
1963   @raise errors.BlockDeviceError: if the block device
1964       cannot be found
1965
1966   """
1967   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
1968
1969   result['INSTANCE_NAME'] = instance.name
1970   result['INSTANCE_OS'] = instance.os
1971   result['HYPERVISOR'] = instance.hypervisor
1972   result['DISK_COUNT'] = '%d' % len(instance.disks)
1973   result['NIC_COUNT'] = '%d' % len(instance.nics)
1974
1975   # Disks
1976   for idx, disk in enumerate(instance.disks):
1977     real_disk = _OpenRealBD(disk)
1978     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1979     result['DISK_%d_ACCESS' % idx] = disk.mode
1980     if constants.HV_DISK_TYPE in instance.hvparams:
1981       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1982         instance.hvparams[constants.HV_DISK_TYPE]
1983     if disk.dev_type in constants.LDS_BLOCK:
1984       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1985     elif disk.dev_type == constants.LD_FILE:
1986       result['DISK_%d_BACKEND_TYPE' % idx] = \
1987         'file:%s' % disk.physical_id[0]
1988
1989   # NICs
1990   for idx, nic in enumerate(instance.nics):
1991     result['NIC_%d_MAC' % idx] = nic.mac
1992     if nic.ip:
1993       result['NIC_%d_IP' % idx] = nic.ip
1994     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1995     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1996       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1997     if nic.nicparams[constants.NIC_LINK]:
1998       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1999     if constants.HV_NIC_TYPE in instance.hvparams:
2000       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2001         instance.hvparams[constants.HV_NIC_TYPE]
2002
2003   # HV/BE params
2004   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2005     for key, value in source.items():
2006       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2007
2008   return result
2009
2010
2011 def BlockdevGrow(disk, amount):
2012   """Grow a stack of block devices.
2013
2014   This function is called recursively, with the childrens being the
2015   first ones to resize.
2016
2017   @type disk: L{objects.Disk}
2018   @param disk: the disk to be grown
2019   @rtype: (status, result)
2020   @return: a tuple with the status of the operation
2021       (True/False), and the errors message if status
2022       is False
2023
2024   """
2025   r_dev = _RecursiveFindBD(disk)
2026   if r_dev is None:
2027     _Fail("Cannot find block device %s", disk)
2028
2029   try:
2030     r_dev.Grow(amount)
2031   except errors.BlockDeviceError, err:
2032     _Fail("Failed to grow block device: %s", err, exc=True)
2033
2034
2035 def BlockdevSnapshot(disk):
2036   """Create a snapshot copy of a block device.
2037
2038   This function is called recursively, and the snapshot is actually created
2039   just for the leaf lvm backend device.
2040
2041   @type disk: L{objects.Disk}
2042   @param disk: the disk to be snapshotted
2043   @rtype: string
2044   @return: snapshot disk path
2045
2046   """
2047   if disk.dev_type == constants.LD_DRBD8:
2048     if not disk.children:
2049       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2050             disk.unique_id)
2051     return BlockdevSnapshot(disk.children[0])
2052   elif disk.dev_type == constants.LD_LV:
2053     r_dev = _RecursiveFindBD(disk)
2054     if r_dev is not None:
2055       # FIXME: choose a saner value for the snapshot size
2056       # let's stay on the safe side and ask for the full size, for now
2057       return r_dev.Snapshot(disk.size)
2058     else:
2059       _Fail("Cannot find block device %s", disk)
2060   else:
2061     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2062           disk.unique_id, disk.dev_type)
2063
2064
2065 def FinalizeExport(instance, snap_disks):
2066   """Write out the export configuration information.
2067
2068   @type instance: L{objects.Instance}
2069   @param instance: the instance which we export, used for
2070       saving configuration
2071   @type snap_disks: list of L{objects.Disk}
2072   @param snap_disks: list of snapshot block devices, which
2073       will be used to get the actual name of the dump file
2074
2075   @rtype: None
2076
2077   """
2078   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2079   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2080
2081   config = objects.SerializableConfigParser()
2082
2083   config.add_section(constants.INISECT_EXP)
2084   config.set(constants.INISECT_EXP, 'version', '0')
2085   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2086   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2087   config.set(constants.INISECT_EXP, 'os', instance.os)
2088   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2089
2090   config.add_section(constants.INISECT_INS)
2091   config.set(constants.INISECT_INS, 'name', instance.name)
2092   config.set(constants.INISECT_INS, 'memory', '%d' %
2093              instance.beparams[constants.BE_MEMORY])
2094   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2095              instance.beparams[constants.BE_VCPUS])
2096   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2097   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2098
2099   nic_total = 0
2100   for nic_count, nic in enumerate(instance.nics):
2101     nic_total += 1
2102     config.set(constants.INISECT_INS, 'nic%d_mac' %
2103                nic_count, '%s' % nic.mac)
2104     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2105     for param in constants.NICS_PARAMETER_TYPES:
2106       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2107                  '%s' % nic.nicparams.get(param, None))
2108   # TODO: redundant: on load can read nics until it doesn't exist
2109   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2110
2111   disk_total = 0
2112   for disk_count, disk in enumerate(snap_disks):
2113     if disk:
2114       disk_total += 1
2115       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2116                  ('%s' % disk.iv_name))
2117       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2118                  ('%s' % disk.physical_id[1]))
2119       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2120                  ('%d' % disk.size))
2121
2122   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2123
2124   # New-style hypervisor/backend parameters
2125
2126   config.add_section(constants.INISECT_HYP)
2127   for name, value in instance.hvparams.items():
2128     if name not in constants.HVC_GLOBALS:
2129       config.set(constants.INISECT_HYP, name, str(value))
2130
2131   config.add_section(constants.INISECT_BEP)
2132   for name, value in instance.beparams.items():
2133     config.set(constants.INISECT_BEP, name, str(value))
2134
2135   config.add_section(constants.INISECT_OSP)
2136   for name, value in instance.osparams.items():
2137     config.set(constants.INISECT_OSP, name, str(value))
2138
2139   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2140                   data=config.Dumps())
2141   shutil.rmtree(finaldestdir, ignore_errors=True)
2142   shutil.move(destdir, finaldestdir)
2143
2144
2145 def ExportInfo(dest):
2146   """Get export configuration information.
2147
2148   @type dest: str
2149   @param dest: directory containing the export
2150
2151   @rtype: L{objects.SerializableConfigParser}
2152   @return: a serializable config file containing the
2153       export info
2154
2155   """
2156   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2157
2158   config = objects.SerializableConfigParser()
2159   config.read(cff)
2160
2161   if (not config.has_section(constants.INISECT_EXP) or
2162       not config.has_section(constants.INISECT_INS)):
2163     _Fail("Export info file doesn't have the required fields")
2164
2165   return config.Dumps()
2166
2167
2168 def ListExports():
2169   """Return a list of exports currently available on this machine.
2170
2171   @rtype: list
2172   @return: list of the exports
2173
2174   """
2175   if os.path.isdir(constants.EXPORT_DIR):
2176     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2177   else:
2178     _Fail("No exports directory")
2179
2180
2181 def RemoveExport(export):
2182   """Remove an existing export from the node.
2183
2184   @type export: str
2185   @param export: the name of the export to remove
2186   @rtype: None
2187
2188   """
2189   target = utils.PathJoin(constants.EXPORT_DIR, export)
2190
2191   try:
2192     shutil.rmtree(target)
2193   except EnvironmentError, err:
2194     _Fail("Error while removing the export: %s", err, exc=True)
2195
2196
2197 def BlockdevRename(devlist):
2198   """Rename a list of block devices.
2199
2200   @type devlist: list of tuples
2201   @param devlist: list of tuples of the form  (disk,
2202       new_logical_id, new_physical_id); disk is an
2203       L{objects.Disk} object describing the current disk,
2204       and new logical_id/physical_id is the name we
2205       rename it to
2206   @rtype: boolean
2207   @return: True if all renames succeeded, False otherwise
2208
2209   """
2210   msgs = []
2211   result = True
2212   for disk, unique_id in devlist:
2213     dev = _RecursiveFindBD(disk)
2214     if dev is None:
2215       msgs.append("Can't find device %s in rename" % str(disk))
2216       result = False
2217       continue
2218     try:
2219       old_rpath = dev.dev_path
2220       dev.Rename(unique_id)
2221       new_rpath = dev.dev_path
2222       if old_rpath != new_rpath:
2223         DevCacheManager.RemoveCache(old_rpath)
2224         # FIXME: we should add the new cache information here, like:
2225         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2226         # but we don't have the owner here - maybe parse from existing
2227         # cache? for now, we only lose lvm data when we rename, which
2228         # is less critical than DRBD or MD
2229     except errors.BlockDeviceError, err:
2230       msgs.append("Can't rename device '%s' to '%s': %s" %
2231                   (dev, unique_id, err))
2232       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2233       result = False
2234   if not result:
2235     _Fail("; ".join(msgs))
2236
2237
2238 def _TransformFileStorageDir(file_storage_dir):
2239   """Checks whether given file_storage_dir is valid.
2240
2241   Checks wheter the given file_storage_dir is within the cluster-wide
2242   default file_storage_dir stored in SimpleStore. Only paths under that
2243   directory are allowed.
2244
2245   @type file_storage_dir: str
2246   @param file_storage_dir: the path to check
2247
2248   @return: the normalized path if valid, None otherwise
2249
2250   """
2251   if not constants.ENABLE_FILE_STORAGE:
2252     _Fail("File storage disabled at configure time")
2253   cfg = _GetConfig()
2254   file_storage_dir = os.path.normpath(file_storage_dir)
2255   base_file_storage_dir = cfg.GetFileStorageDir()
2256   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2257       base_file_storage_dir):
2258     _Fail("File storage directory '%s' is not under base file"
2259           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2260   return file_storage_dir
2261
2262
2263 def CreateFileStorageDir(file_storage_dir):
2264   """Create file storage directory.
2265
2266   @type file_storage_dir: str
2267   @param file_storage_dir: directory to create
2268
2269   @rtype: tuple
2270   @return: tuple with first element a boolean indicating wheter dir
2271       creation was successful or not
2272
2273   """
2274   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2275   if os.path.exists(file_storage_dir):
2276     if not os.path.isdir(file_storage_dir):
2277       _Fail("Specified storage dir '%s' is not a directory",
2278             file_storage_dir)
2279   else:
2280     try:
2281       os.makedirs(file_storage_dir, 0750)
2282     except OSError, err:
2283       _Fail("Cannot create file storage directory '%s': %s",
2284             file_storage_dir, err, exc=True)
2285
2286
2287 def RemoveFileStorageDir(file_storage_dir):
2288   """Remove file storage directory.
2289
2290   Remove it only if it's empty. If not log an error and return.
2291
2292   @type file_storage_dir: str
2293   @param file_storage_dir: the directory we should cleanup
2294   @rtype: tuple (success,)
2295   @return: tuple of one element, C{success}, denoting
2296       whether the operation was successful
2297
2298   """
2299   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2300   if os.path.exists(file_storage_dir):
2301     if not os.path.isdir(file_storage_dir):
2302       _Fail("Specified Storage directory '%s' is not a directory",
2303             file_storage_dir)
2304     # deletes dir only if empty, otherwise we want to fail the rpc call
2305     try:
2306       os.rmdir(file_storage_dir)
2307     except OSError, err:
2308       _Fail("Cannot remove file storage directory '%s': %s",
2309             file_storage_dir, err)
2310
2311
2312 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2313   """Rename the file storage directory.
2314
2315   @type old_file_storage_dir: str
2316   @param old_file_storage_dir: the current path
2317   @type new_file_storage_dir: str
2318   @param new_file_storage_dir: the name we should rename to
2319   @rtype: tuple (success,)
2320   @return: tuple of one element, C{success}, denoting
2321       whether the operation was successful
2322
2323   """
2324   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2325   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2326   if not os.path.exists(new_file_storage_dir):
2327     if os.path.isdir(old_file_storage_dir):
2328       try:
2329         os.rename(old_file_storage_dir, new_file_storage_dir)
2330       except OSError, err:
2331         _Fail("Cannot rename '%s' to '%s': %s",
2332               old_file_storage_dir, new_file_storage_dir, err)
2333     else:
2334       _Fail("Specified storage dir '%s' is not a directory",
2335             old_file_storage_dir)
2336   else:
2337     if os.path.exists(old_file_storage_dir):
2338       _Fail("Cannot rename '%s' to '%s': both locations exist",
2339             old_file_storage_dir, new_file_storage_dir)
2340
2341
2342 def _EnsureJobQueueFile(file_name):
2343   """Checks whether the given filename is in the queue directory.
2344
2345   @type file_name: str
2346   @param file_name: the file name we should check
2347   @rtype: None
2348   @raises RPCFail: if the file is not valid
2349
2350   """
2351   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2352   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2353
2354   if not result:
2355     _Fail("Passed job queue file '%s' does not belong to"
2356           " the queue directory '%s'", file_name, queue_dir)
2357
2358
2359 def JobQueueUpdate(file_name, content):
2360   """Updates a file in the queue directory.
2361
2362   This is just a wrapper over L{utils.WriteFile}, with proper
2363   checking.
2364
2365   @type file_name: str
2366   @param file_name: the job file name
2367   @type content: str
2368   @param content: the new job contents
2369   @rtype: boolean
2370   @return: the success of the operation
2371
2372   """
2373   _EnsureJobQueueFile(file_name)
2374   getents = runtime.GetEnts()
2375
2376   # Write and replace the file atomically
2377   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2378                   gid=getents.masterd_gid)
2379
2380
2381 def JobQueueRename(old, new):
2382   """Renames a job queue file.
2383
2384   This is just a wrapper over os.rename with proper checking.
2385
2386   @type old: str
2387   @param old: the old (actual) file name
2388   @type new: str
2389   @param new: the desired file name
2390   @rtype: tuple
2391   @return: the success of the operation and payload
2392
2393   """
2394   _EnsureJobQueueFile(old)
2395   _EnsureJobQueueFile(new)
2396
2397   utils.RenameFile(old, new, mkdir=True)
2398
2399
2400 def BlockdevClose(instance_name, disks):
2401   """Closes the given block devices.
2402
2403   This means they will be switched to secondary mode (in case of
2404   DRBD).
2405
2406   @param instance_name: if the argument is not empty, the symlinks
2407       of this instance will be removed
2408   @type disks: list of L{objects.Disk}
2409   @param disks: the list of disks to be closed
2410   @rtype: tuple (success, message)
2411   @return: a tuple of success and message, where success
2412       indicates the succes of the operation, and message
2413       which will contain the error details in case we
2414       failed
2415
2416   """
2417   bdevs = []
2418   for cf in disks:
2419     rd = _RecursiveFindBD(cf)
2420     if rd is None:
2421       _Fail("Can't find device %s", cf)
2422     bdevs.append(rd)
2423
2424   msg = []
2425   for rd in bdevs:
2426     try:
2427       rd.Close()
2428     except errors.BlockDeviceError, err:
2429       msg.append(str(err))
2430   if msg:
2431     _Fail("Can't make devices secondary: %s", ",".join(msg))
2432   else:
2433     if instance_name:
2434       _RemoveBlockDevLinks(instance_name, disks)
2435
2436
2437 def ValidateHVParams(hvname, hvparams):
2438   """Validates the given hypervisor parameters.
2439
2440   @type hvname: string
2441   @param hvname: the hypervisor name
2442   @type hvparams: dict
2443   @param hvparams: the hypervisor parameters to be validated
2444   @rtype: None
2445
2446   """
2447   try:
2448     hv_type = hypervisor.GetHypervisor(hvname)
2449     hv_type.ValidateParameters(hvparams)
2450   except errors.HypervisorError, err:
2451     _Fail(str(err), log=False)
2452
2453
2454 def _CheckOSPList(os_obj, parameters):
2455   """Check whether a list of parameters is supported by the OS.
2456
2457   @type os_obj: L{objects.OS}
2458   @param os_obj: OS object to check
2459   @type parameters: list
2460   @param parameters: the list of parameters to check
2461
2462   """
2463   supported = [v[0] for v in os_obj.supported_parameters]
2464   delta = frozenset(parameters).difference(supported)
2465   if delta:
2466     _Fail("The following parameters are not supported"
2467           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2468
2469
2470 def ValidateOS(required, osname, checks, osparams):
2471   """Validate the given OS' parameters.
2472
2473   @type required: boolean
2474   @param required: whether absence of the OS should translate into
2475       failure or not
2476   @type osname: string
2477   @param osname: the OS to be validated
2478   @type checks: list
2479   @param checks: list of the checks to run (currently only 'parameters')
2480   @type osparams: dict
2481   @param osparams: dictionary with OS parameters
2482   @rtype: boolean
2483   @return: True if the validation passed, or False if the OS was not
2484       found and L{required} was false
2485
2486   """
2487   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2488     _Fail("Unknown checks required for OS %s: %s", osname,
2489           set(checks).difference(constants.OS_VALIDATE_CALLS))
2490
2491   name_only = objects.OS.GetName(osname)
2492   status, tbv = _TryOSFromDisk(name_only, None)
2493
2494   if not status:
2495     if required:
2496       _Fail(tbv)
2497     else:
2498       return False
2499
2500   if max(tbv.api_versions) < constants.OS_API_V20:
2501     return True
2502
2503   if constants.OS_VALIDATE_PARAMETERS in checks:
2504     _CheckOSPList(tbv, osparams.keys())
2505
2506   validate_env = OSCoreEnv(osname, tbv, osparams)
2507   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2508                         cwd=tbv.path)
2509   if result.failed:
2510     logging.error("os validate command '%s' returned error: %s output: %s",
2511                   result.cmd, result.fail_reason, result.output)
2512     _Fail("OS validation script failed (%s), output: %s",
2513           result.fail_reason, result.output, log=False)
2514
2515   return True
2516
2517
2518 def DemoteFromMC():
2519   """Demotes the current node from master candidate role.
2520
2521   """
2522   # try to ensure we're not the master by mistake
2523   master, myself = ssconf.GetMasterAndMyself()
2524   if master == myself:
2525     _Fail("ssconf status shows I'm the master node, will not demote")
2526
2527   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2528   if not result.failed:
2529     _Fail("The master daemon is running, will not demote")
2530
2531   try:
2532     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2533       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2534   except EnvironmentError, err:
2535     if err.errno != errno.ENOENT:
2536       _Fail("Error while backing up cluster file: %s", err, exc=True)
2537
2538   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2539
2540
2541 def _GetX509Filenames(cryptodir, name):
2542   """Returns the full paths for the private key and certificate.
2543
2544   """
2545   return (utils.PathJoin(cryptodir, name),
2546           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2547           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2548
2549
2550 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2551   """Creates a new X509 certificate for SSL/TLS.
2552
2553   @type validity: int
2554   @param validity: Validity in seconds
2555   @rtype: tuple; (string, string)
2556   @return: Certificate name and public part
2557
2558   """
2559   (key_pem, cert_pem) = \
2560     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2561                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2562
2563   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2564                               prefix="x509-%s-" % utils.TimestampForFilename())
2565   try:
2566     name = os.path.basename(cert_dir)
2567     assert len(name) > 5
2568
2569     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2570
2571     utils.WriteFile(key_file, mode=0400, data=key_pem)
2572     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2573
2574     # Never return private key as it shouldn't leave the node
2575     return (name, cert_pem)
2576   except Exception:
2577     shutil.rmtree(cert_dir, ignore_errors=True)
2578     raise
2579
2580
2581 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2582   """Removes a X509 certificate.
2583
2584   @type name: string
2585   @param name: Certificate name
2586
2587   """
2588   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2589
2590   utils.RemoveFile(key_file)
2591   utils.RemoveFile(cert_file)
2592
2593   try:
2594     os.rmdir(cert_dir)
2595   except EnvironmentError, err:
2596     _Fail("Cannot remove certificate directory '%s': %s",
2597           cert_dir, err)
2598
2599
2600 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2601   """Returns the command for the requested input/output.
2602
2603   @type instance: L{objects.Instance}
2604   @param instance: The instance object
2605   @param mode: Import/export mode
2606   @param ieio: Input/output type
2607   @param ieargs: Input/output arguments
2608
2609   """
2610   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2611
2612   env = None
2613   prefix = None
2614   suffix = None
2615   exp_size = None
2616
2617   if ieio == constants.IEIO_FILE:
2618     (filename, ) = ieargs
2619
2620     if not utils.IsNormAbsPath(filename):
2621       _Fail("Path '%s' is not normalized or absolute", filename)
2622
2623     directory = os.path.normpath(os.path.dirname(filename))
2624
2625     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2626         constants.EXPORT_DIR):
2627       _Fail("File '%s' is not under exports directory '%s'",
2628             filename, constants.EXPORT_DIR)
2629
2630     # Create directory
2631     utils.Makedirs(directory, mode=0750)
2632
2633     quoted_filename = utils.ShellQuote(filename)
2634
2635     if mode == constants.IEM_IMPORT:
2636       suffix = "> %s" % quoted_filename
2637     elif mode == constants.IEM_EXPORT:
2638       suffix = "< %s" % quoted_filename
2639
2640       # Retrieve file size
2641       try:
2642         st = os.stat(filename)
2643       except EnvironmentError, err:
2644         logging.error("Can't stat(2) %s: %s", filename, err)
2645       else:
2646         exp_size = utils.BytesToMebibyte(st.st_size)
2647
2648   elif ieio == constants.IEIO_RAW_DISK:
2649     (disk, ) = ieargs
2650
2651     real_disk = _OpenRealBD(disk)
2652
2653     if mode == constants.IEM_IMPORT:
2654       # we set here a smaller block size as, due to transport buffering, more
2655       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2656       # is not already there or we pass a wrong path; we use notrunc to no
2657       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2658       # much memory; this means that at best, we flush every 64k, which will
2659       # not be very fast
2660       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2661                                     " bs=%s oflag=dsync"),
2662                                     real_disk.dev_path,
2663                                     str(64 * 1024))
2664
2665     elif mode == constants.IEM_EXPORT:
2666       # the block size on the read dd is 1MiB to match our units
2667       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2668                                    real_disk.dev_path,
2669                                    str(1024 * 1024), # 1 MB
2670                                    str(disk.size))
2671       exp_size = disk.size
2672
2673   elif ieio == constants.IEIO_SCRIPT:
2674     (disk, disk_index, ) = ieargs
2675
2676     assert isinstance(disk_index, (int, long))
2677
2678     real_disk = _OpenRealBD(disk)
2679
2680     inst_os = OSFromDisk(instance.os)
2681     env = OSEnvironment(instance, inst_os)
2682
2683     if mode == constants.IEM_IMPORT:
2684       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2685       env["IMPORT_INDEX"] = str(disk_index)
2686       script = inst_os.import_script
2687
2688     elif mode == constants.IEM_EXPORT:
2689       env["EXPORT_DEVICE"] = real_disk.dev_path
2690       env["EXPORT_INDEX"] = str(disk_index)
2691       script = inst_os.export_script
2692
2693     # TODO: Pass special environment only to script
2694     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2695
2696     if mode == constants.IEM_IMPORT:
2697       suffix = "| %s" % script_cmd
2698
2699     elif mode == constants.IEM_EXPORT:
2700       prefix = "%s |" % script_cmd
2701
2702     # Let script predict size
2703     exp_size = constants.IE_CUSTOM_SIZE
2704
2705   else:
2706     _Fail("Invalid %s I/O mode %r", mode, ieio)
2707
2708   return (env, prefix, suffix, exp_size)
2709
2710
2711 def _CreateImportExportStatusDir(prefix):
2712   """Creates status directory for import/export.
2713
2714   """
2715   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2716                           prefix=("%s-%s-" %
2717                                   (prefix, utils.TimestampForFilename())))
2718
2719
2720 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2721   """Starts an import or export daemon.
2722
2723   @param mode: Import/output mode
2724   @type opts: L{objects.ImportExportOptions}
2725   @param opts: Daemon options
2726   @type host: string
2727   @param host: Remote host for export (None for import)
2728   @type port: int
2729   @param port: Remote port for export (None for import)
2730   @type instance: L{objects.Instance}
2731   @param instance: Instance object
2732   @param ieio: Input/output type
2733   @param ieioargs: Input/output arguments
2734
2735   """
2736   if mode == constants.IEM_IMPORT:
2737     prefix = "import"
2738
2739     if not (host is None and port is None):
2740       _Fail("Can not specify host or port on import")
2741
2742   elif mode == constants.IEM_EXPORT:
2743     prefix = "export"
2744
2745     if host is None or port is None:
2746       _Fail("Host and port must be specified for an export")
2747
2748   else:
2749     _Fail("Invalid mode %r", mode)
2750
2751   if (opts.key_name is None) ^ (opts.ca_pem is None):
2752     _Fail("Cluster certificate can only be used for both key and CA")
2753
2754   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2755     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2756
2757   if opts.key_name is None:
2758     # Use server.pem
2759     key_path = constants.NODED_CERT_FILE
2760     cert_path = constants.NODED_CERT_FILE
2761     assert opts.ca_pem is None
2762   else:
2763     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2764                                                  opts.key_name)
2765     assert opts.ca_pem is not None
2766
2767   for i in [key_path, cert_path]:
2768     if not os.path.exists(i):
2769       _Fail("File '%s' does not exist" % i)
2770
2771   status_dir = _CreateImportExportStatusDir(prefix)
2772   try:
2773     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2774     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2775     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2776
2777     if opts.ca_pem is None:
2778       # Use server.pem
2779       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2780     else:
2781       ca = opts.ca_pem
2782
2783     # Write CA file
2784     utils.WriteFile(ca_file, data=ca, mode=0400)
2785
2786     cmd = [
2787       constants.IMPORT_EXPORT_DAEMON,
2788       status_file, mode,
2789       "--key=%s" % key_path,
2790       "--cert=%s" % cert_path,
2791       "--ca=%s" % ca_file,
2792       ]
2793
2794     if host:
2795       cmd.append("--host=%s" % host)
2796
2797     if port:
2798       cmd.append("--port=%s" % port)
2799
2800     if opts.compress:
2801       cmd.append("--compress=%s" % opts.compress)
2802
2803     if opts.magic:
2804       cmd.append("--magic=%s" % opts.magic)
2805
2806     if exp_size is not None:
2807       cmd.append("--expected-size=%s" % exp_size)
2808
2809     if cmd_prefix:
2810       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2811
2812     if cmd_suffix:
2813       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2814
2815     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2816
2817     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2818     # support for receiving a file descriptor for output
2819     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2820                       output=logfile)
2821
2822     # The import/export name is simply the status directory name
2823     return os.path.basename(status_dir)
2824
2825   except Exception:
2826     shutil.rmtree(status_dir, ignore_errors=True)
2827     raise
2828
2829
2830 def GetImportExportStatus(names):
2831   """Returns import/export daemon status.
2832
2833   @type names: sequence
2834   @param names: List of names
2835   @rtype: List of dicts
2836   @return: Returns a list of the state of each named import/export or None if a
2837            status couldn't be read
2838
2839   """
2840   result = []
2841
2842   for name in names:
2843     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2844                                  _IES_STATUS_FILE)
2845
2846     try:
2847       data = utils.ReadFile(status_file)
2848     except EnvironmentError, err:
2849       if err.errno != errno.ENOENT:
2850         raise
2851       data = None
2852
2853     if not data:
2854       result.append(None)
2855       continue
2856
2857     result.append(serializer.LoadJson(data))
2858
2859   return result
2860
2861
2862 def AbortImportExport(name):
2863   """Sends SIGTERM to a running import/export daemon.
2864
2865   """
2866   logging.info("Abort import/export %s", name)
2867
2868   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2869   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2870
2871   if pid:
2872     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2873                  name, pid)
2874     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2875
2876
2877 def CleanupImportExport(name):
2878   """Cleanup after an import or export.
2879
2880   If the import/export daemon is still running it's killed. Afterwards the
2881   whole status directory is removed.
2882
2883   """
2884   logging.info("Finalizing import/export %s", name)
2885
2886   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2887
2888   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2889
2890   if pid:
2891     logging.info("Import/export %s is still running with PID %s",
2892                  name, pid)
2893     utils.KillProcess(pid, waitpid=False)
2894
2895   shutil.rmtree(status_dir, ignore_errors=True)
2896
2897
2898 def _FindDisks(nodes_ip, disks):
2899   """Sets the physical ID on disks and returns the block devices.
2900
2901   """
2902   # set the correct physical ID
2903   my_name = netutils.Hostname.GetSysName()
2904   for cf in disks:
2905     cf.SetPhysicalID(my_name, nodes_ip)
2906
2907   bdevs = []
2908
2909   for cf in disks:
2910     rd = _RecursiveFindBD(cf)
2911     if rd is None:
2912       _Fail("Can't find device %s", cf)
2913     bdevs.append(rd)
2914   return bdevs
2915
2916
2917 def DrbdDisconnectNet(nodes_ip, disks):
2918   """Disconnects the network on a list of drbd devices.
2919
2920   """
2921   bdevs = _FindDisks(nodes_ip, disks)
2922
2923   # disconnect disks
2924   for rd in bdevs:
2925     try:
2926       rd.DisconnectNet()
2927     except errors.BlockDeviceError, err:
2928       _Fail("Can't change network configuration to standalone mode: %s",
2929             err, exc=True)
2930
2931
2932 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2933   """Attaches the network on a list of drbd devices.
2934
2935   """
2936   bdevs = _FindDisks(nodes_ip, disks)
2937
2938   if multimaster:
2939     for idx, rd in enumerate(bdevs):
2940       try:
2941         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2942       except EnvironmentError, err:
2943         _Fail("Can't create symlink: %s", err)
2944   # reconnect disks, switch to new master configuration and if
2945   # needed primary mode
2946   for rd in bdevs:
2947     try:
2948       rd.AttachNet(multimaster)
2949     except errors.BlockDeviceError, err:
2950       _Fail("Can't change network configuration: %s", err)
2951
2952   # wait until the disks are connected; we need to retry the re-attach
2953   # if the device becomes standalone, as this might happen if the one
2954   # node disconnects and reconnects in a different mode before the
2955   # other node reconnects; in this case, one or both of the nodes will
2956   # decide it has wrong configuration and switch to standalone
2957
2958   def _Attach():
2959     all_connected = True
2960
2961     for rd in bdevs:
2962       stats = rd.GetProcStatus()
2963
2964       all_connected = (all_connected and
2965                        (stats.is_connected or stats.is_in_resync))
2966
2967       if stats.is_standalone:
2968         # peer had different config info and this node became
2969         # standalone, even though this should not happen with the
2970         # new staged way of changing disk configs
2971         try:
2972           rd.AttachNet(multimaster)
2973         except errors.BlockDeviceError, err:
2974           _Fail("Can't change network configuration: %s", err)
2975
2976     if not all_connected:
2977       raise utils.RetryAgain()
2978
2979   try:
2980     # Start with a delay of 100 miliseconds and go up to 5 seconds
2981     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2982   except utils.RetryTimeout:
2983     _Fail("Timeout in disk reconnecting")
2984
2985   if multimaster:
2986     # change to primary mode
2987     for rd in bdevs:
2988       try:
2989         rd.Open()
2990       except errors.BlockDeviceError, err:
2991         _Fail("Can't change to primary mode: %s", err)
2992
2993
2994 def DrbdWaitSync(nodes_ip, disks):
2995   """Wait until DRBDs have synchronized.
2996
2997   """
2998   def _helper(rd):
2999     stats = rd.GetProcStatus()
3000     if not (stats.is_connected or stats.is_in_resync):
3001       raise utils.RetryAgain()
3002     return stats
3003
3004   bdevs = _FindDisks(nodes_ip, disks)
3005
3006   min_resync = 100
3007   alldone = True
3008   for rd in bdevs:
3009     try:
3010       # poll each second for 15 seconds
3011       stats = utils.Retry(_helper, 1, 15, args=[rd])
3012     except utils.RetryTimeout:
3013       stats = rd.GetProcStatus()
3014       # last check
3015       if not (stats.is_connected or stats.is_in_resync):
3016         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3017     alldone = alldone and (not stats.is_in_resync)
3018     if stats.sync_percent is not None:
3019       min_resync = min(min_resync, stats.sync_percent)
3020
3021   return (alldone, min_resync)
3022
3023
3024 def GetDrbdUsermodeHelper():
3025   """Returns DRBD usermode helper currently configured.
3026
3027   """
3028   try:
3029     return bdev.BaseDRBD.GetUsermodeHelper()
3030   except errors.BlockDeviceError, err:
3031     _Fail(str(err))
3032
3033
3034 def PowercycleNode(hypervisor_type):
3035   """Hard-powercycle the node.
3036
3037   Because we need to return first, and schedule the powercycle in the
3038   background, we won't be able to report failures nicely.
3039
3040   """
3041   hyper = hypervisor.GetHypervisor(hypervisor_type)
3042   try:
3043     pid = os.fork()
3044   except OSError:
3045     # if we can't fork, we'll pretend that we're in the child process
3046     pid = 0
3047   if pid > 0:
3048     return "Reboot scheduled in 5 seconds"
3049   # ensure the child is running on ram
3050   try:
3051     utils.Mlockall()
3052   except Exception: # pylint: disable-msg=W0703
3053     pass
3054   time.sleep(5)
3055   hyper.PowercycleNode()
3056
3057
3058 class HooksRunner(object):
3059   """Hook runner.
3060
3061   This class is instantiated on the node side (ganeti-noded) and not
3062   on the master side.
3063
3064   """
3065   def __init__(self, hooks_base_dir=None):
3066     """Constructor for hooks runner.
3067
3068     @type hooks_base_dir: str or None
3069     @param hooks_base_dir: if not None, this overrides the
3070         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3071
3072     """
3073     if hooks_base_dir is None:
3074       hooks_base_dir = constants.HOOKS_BASE_DIR
3075     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3076     # constant
3077     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3078
3079   def RunHooks(self, hpath, phase, env):
3080     """Run the scripts in the hooks directory.
3081
3082     @type hpath: str
3083     @param hpath: the path to the hooks directory which
3084         holds the scripts
3085     @type phase: str
3086     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3087         L{constants.HOOKS_PHASE_POST}
3088     @type env: dict
3089     @param env: dictionary with the environment for the hook
3090     @rtype: list
3091     @return: list of 3-element tuples:
3092       - script path
3093       - script result, either L{constants.HKR_SUCCESS} or
3094         L{constants.HKR_FAIL}
3095       - output of the script
3096
3097     @raise errors.ProgrammerError: for invalid input
3098         parameters
3099
3100     """
3101     if phase == constants.HOOKS_PHASE_PRE:
3102       suffix = "pre"
3103     elif phase == constants.HOOKS_PHASE_POST:
3104       suffix = "post"
3105     else:
3106       _Fail("Unknown hooks phase '%s'", phase)
3107
3108
3109     subdir = "%s-%s.d" % (hpath, suffix)
3110     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3111
3112     results = []
3113
3114     if not os.path.isdir(dir_name):
3115       # for non-existing/non-dirs, we simply exit instead of logging a
3116       # warning at every operation
3117       return results
3118
3119     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3120
3121     for (relname, relstatus, runresult)  in runparts_results:
3122       if relstatus == constants.RUNPARTS_SKIP:
3123         rrval = constants.HKR_SKIP
3124         output = ""
3125       elif relstatus == constants.RUNPARTS_ERR:
3126         rrval = constants.HKR_FAIL
3127         output = "Hook script execution error: %s" % runresult
3128       elif relstatus == constants.RUNPARTS_RUN:
3129         if runresult.failed:
3130           rrval = constants.HKR_FAIL
3131         else:
3132           rrval = constants.HKR_SUCCESS
3133         output = utils.SafeEncode(runresult.output.strip())
3134       results.append(("%s/%s" % (subdir, relname), rrval, output))
3135
3136     return results
3137
3138
3139 class IAllocatorRunner(object):
3140   """IAllocator runner.
3141
3142   This class is instantiated on the node side (ganeti-noded) and not on
3143   the master side.
3144
3145   """
3146   @staticmethod
3147   def Run(name, idata):
3148     """Run an iallocator script.
3149
3150     @type name: str
3151     @param name: the iallocator script name
3152     @type idata: str
3153     @param idata: the allocator input data
3154
3155     @rtype: tuple
3156     @return: two element tuple of:
3157        - status
3158        - either error message or stdout of allocator (for success)
3159
3160     """
3161     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3162                                   os.path.isfile)
3163     if alloc_script is None:
3164       _Fail("iallocator module '%s' not found in the search path", name)
3165
3166     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3167     try:
3168       os.write(fd, idata)
3169       os.close(fd)
3170       result = utils.RunCmd([alloc_script, fin_name])
3171       if result.failed:
3172         _Fail("iallocator module '%s' failed: %s, output '%s'",
3173               name, result.fail_reason, result.output)
3174     finally:
3175       os.unlink(fin_name)
3176
3177     return result.stdout
3178
3179
3180 class DevCacheManager(object):
3181   """Simple class for managing a cache of block device information.
3182
3183   """
3184   _DEV_PREFIX = "/dev/"
3185   _ROOT_DIR = constants.BDEV_CACHE_DIR
3186
3187   @classmethod
3188   def _ConvertPath(cls, dev_path):
3189     """Converts a /dev/name path to the cache file name.
3190
3191     This replaces slashes with underscores and strips the /dev
3192     prefix. It then returns the full path to the cache file.
3193
3194     @type dev_path: str
3195     @param dev_path: the C{/dev/} path name
3196     @rtype: str
3197     @return: the converted path name
3198
3199     """
3200     if dev_path.startswith(cls._DEV_PREFIX):
3201       dev_path = dev_path[len(cls._DEV_PREFIX):]
3202     dev_path = dev_path.replace("/", "_")
3203     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3204     return fpath
3205
3206   @classmethod
3207   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3208     """Updates the cache information for a given device.
3209
3210     @type dev_path: str
3211     @param dev_path: the pathname of the device
3212     @type owner: str
3213     @param owner: the owner (instance name) of the device
3214     @type on_primary: bool
3215     @param on_primary: whether this is the primary
3216         node nor not
3217     @type iv_name: str
3218     @param iv_name: the instance-visible name of the
3219         device, as in objects.Disk.iv_name
3220
3221     @rtype: None
3222
3223     """
3224     if dev_path is None:
3225       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3226       return
3227     fpath = cls._ConvertPath(dev_path)
3228     if on_primary:
3229       state = "primary"
3230     else:
3231       state = "secondary"
3232     if iv_name is None:
3233       iv_name = "not_visible"
3234     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3235     try:
3236       utils.WriteFile(fpath, data=fdata)
3237     except EnvironmentError, err:
3238       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3239
3240   @classmethod
3241   def RemoveCache(cls, dev_path):
3242     """Remove data for a dev_path.
3243
3244     This is just a wrapper over L{utils.RemoveFile} with a converted
3245     path name and logging.
3246
3247     @type dev_path: str
3248     @param dev_path: the pathname of the device
3249
3250     @rtype: None
3251
3252     """
3253     if dev_path is None:
3254       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3255       return
3256     fpath = cls._ConvertPath(dev_path)
3257     try:
3258       utils.RemoveFile(fpath)
3259     except EnvironmentError, err:
3260       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)