Rename some constants to facilitate IPv6 support
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61
62
63 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64 _ALLOWED_CLEAN_DIRS = frozenset([
65   constants.DATA_DIR,
66   constants.JOB_QUEUE_ARCHIVE_DIR,
67   constants.QUEUE_DIR,
68   constants.CRYPTO_KEYS_DIR,
69   ])
70 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71 _X509_KEY_FILE = "key"
72 _X509_CERT_FILE = "cert"
73 _IES_STATUS_FILE = "status"
74 _IES_PID_FILE = "pid"
75 _IES_CA_FILE = "ca"
76
77
78 class RPCFail(Exception):
79   """Class denoting RPC failure.
80
81   Its argument is the error message.
82
83   """
84
85
86 def _Fail(msg, *args, **kwargs):
87   """Log an error and the raise an RPCFail exception.
88
89   This exception is then handled specially in the ganeti daemon and
90   turned into a 'failed' return type. As such, this function is a
91   useful shortcut for logging the error and returning it to the master
92   daemon.
93
94   @type msg: string
95   @param msg: the text of the exception
96   @raise RPCFail
97
98   """
99   if args:
100     msg = msg % args
101   if "log" not in kwargs or kwargs["log"]: # if we should log this error
102     if "exc" in kwargs and kwargs["exc"]:
103       logging.exception(msg)
104     else:
105       logging.error(msg)
106   raise RPCFail(msg)
107
108
109 def _GetConfig():
110   """Simple wrapper to return a SimpleStore.
111
112   @rtype: L{ssconf.SimpleStore}
113   @return: a SimpleStore instance
114
115   """
116   return ssconf.SimpleStore()
117
118
119 def _GetSshRunner(cluster_name):
120   """Simple wrapper to return an SshRunner.
121
122   @type cluster_name: str
123   @param cluster_name: the cluster name, which is needed
124       by the SshRunner constructor
125   @rtype: L{ssh.SshRunner}
126   @return: an SshRunner instance
127
128   """
129   return ssh.SshRunner(cluster_name)
130
131
132 def _Decompress(data):
133   """Unpacks data compressed by the RPC client.
134
135   @type data: list or tuple
136   @param data: Data sent by RPC client
137   @rtype: str
138   @return: Decompressed data
139
140   """
141   assert isinstance(data, (list, tuple))
142   assert len(data) == 2
143   (encoding, content) = data
144   if encoding == constants.RPC_ENCODING_NONE:
145     return content
146   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147     return zlib.decompress(base64.b64decode(content))
148   else:
149     raise AssertionError("Unknown data encoding")
150
151
152 def _CleanDirectory(path, exclude=None):
153   """Removes all regular files in a directory.
154
155   @type path: str
156   @param path: the directory to clean
157   @type exclude: list
158   @param exclude: list of files to be excluded, defaults
159       to the empty list
160
161   """
162   if path not in _ALLOWED_CLEAN_DIRS:
163     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
164           path)
165
166   if not os.path.isdir(path):
167     return
168   if exclude is None:
169     exclude = []
170   else:
171     # Normalize excluded paths
172     exclude = [os.path.normpath(i) for i in exclude]
173
174   for rel_name in utils.ListVisibleFiles(path):
175     full_name = utils.PathJoin(path, rel_name)
176     if full_name in exclude:
177       continue
178     if os.path.isfile(full_name) and not os.path.islink(full_name):
179       utils.RemoveFile(full_name)
180
181
182 def _BuildUploadFileList():
183   """Build the list of allowed upload files.
184
185   This is abstracted so that it's built only once at module import time.
186
187   """
188   allowed_files = set([
189     constants.CLUSTER_CONF_FILE,
190     constants.ETC_HOSTS,
191     constants.SSH_KNOWN_HOSTS_FILE,
192     constants.VNC_PASSWORD_FILE,
193     constants.RAPI_CERT_FILE,
194     constants.RAPI_USERS_FILE,
195     constants.CONFD_HMAC_KEY,
196     constants.CLUSTER_DOMAIN_SECRET_FILE,
197     ])
198
199   for hv_name in constants.HYPER_TYPES:
200     hv_class = hypervisor.GetHypervisorClass(hv_name)
201     allowed_files.update(hv_class.GetAncillaryFiles())
202
203   return frozenset(allowed_files)
204
205
206 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
207
208
209 def JobQueuePurge():
210   """Removes job queue files and archived jobs.
211
212   @rtype: tuple
213   @return: True, None
214
215   """
216   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
217   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
218
219
220 def GetMasterInfo():
221   """Returns master information.
222
223   This is an utility function to compute master information, either
224   for consumption here or from the node daemon.
225
226   @rtype: tuple
227   @return: master_netdev, master_ip, master_name
228   @raise RPCFail: in case of errors
229
230   """
231   try:
232     cfg = _GetConfig()
233     master_netdev = cfg.GetMasterNetdev()
234     master_ip = cfg.GetMasterIP()
235     master_node = cfg.GetMasterNode()
236   except errors.ConfigurationError, err:
237     _Fail("Cluster configuration incomplete: %s", err, exc=True)
238   return (master_netdev, master_ip, master_node)
239
240
241 def StartMaster(start_daemons, no_voting):
242   """Activate local node as master node.
243
244   The function will always try activate the IP address of the master
245   (unless someone else has it). It will also start the master daemons,
246   based on the start_daemons parameter.
247
248   @type start_daemons: boolean
249   @param start_daemons: whether to also start the master
250       daemons (ganeti-masterd and ganeti-rapi)
251   @type no_voting: boolean
252   @param no_voting: whether to start ganeti-masterd without a node vote
253       (if start_daemons is True), but still non-interactively
254   @rtype: None
255
256   """
257   # GetMasterInfo will raise an exception if not able to return data
258   master_netdev, master_ip, _ = GetMasterInfo()
259
260   err_msgs = []
261   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
262     if utils.OwnIpAddress(master_ip):
263       # we already have the ip:
264       logging.debug("Master IP already configured, doing nothing")
265     else:
266       msg = "Someone else has the master ip, not activating"
267       logging.error(msg)
268       err_msgs.append(msg)
269   else:
270     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
271                            "dev", master_netdev, "label",
272                            "%s:0" % master_netdev])
273     if result.failed:
274       msg = "Can't activate master IP: %s" % result.output
275       logging.error(msg)
276       err_msgs.append(msg)
277
278     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
279                            "-s", master_ip, master_ip])
280     # we'll ignore the exit code of arping
281
282   # and now start the master and rapi daemons
283   if start_daemons:
284     if no_voting:
285       masterd_args = "--no-voting --yes-do-it"
286     else:
287       masterd_args = ""
288
289     env = {
290       "EXTRA_MASTERD_ARGS": masterd_args,
291       }
292
293     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
294     if result.failed:
295       msg = "Can't start Ganeti master: %s" % result.output
296       logging.error(msg)
297       err_msgs.append(msg)
298
299   if err_msgs:
300     _Fail("; ".join(err_msgs))
301
302
303 def StopMaster(stop_daemons):
304   """Deactivate this node as master.
305
306   The function will always try to deactivate the IP address of the
307   master. It will also stop the master daemons depending on the
308   stop_daemons parameter.
309
310   @type stop_daemons: boolean
311   @param stop_daemons: whether to also stop the master daemons
312       (ganeti-masterd and ganeti-rapi)
313   @rtype: None
314
315   """
316   # TODO: log and report back to the caller the error failures; we
317   # need to decide in which case we fail the RPC for this
318
319   # GetMasterInfo will raise an exception if not able to return data
320   master_netdev, master_ip, _ = GetMasterInfo()
321
322   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
323                          "dev", master_netdev])
324   if result.failed:
325     logging.error("Can't remove the master IP, error: %s", result.output)
326     # but otherwise ignore the failure
327
328   if stop_daemons:
329     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
330     if result.failed:
331       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
332                     " and error %s",
333                     result.cmd, result.exit_code, result.output)
334
335
336 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
337   """Joins this node to the cluster.
338
339   This does the following:
340       - updates the hostkeys of the machine (rsa and dsa)
341       - adds the ssh private key to the user
342       - adds the ssh public key to the users' authorized_keys file
343
344   @type dsa: str
345   @param dsa: the DSA private key to write
346   @type dsapub: str
347   @param dsapub: the DSA public key to write
348   @type rsa: str
349   @param rsa: the RSA private key to write
350   @type rsapub: str
351   @param rsapub: the RSA public key to write
352   @type sshkey: str
353   @param sshkey: the SSH private key to write
354   @type sshpub: str
355   @param sshpub: the SSH public key to write
356   @rtype: boolean
357   @return: the success of the operation
358
359   """
360   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
361                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
362                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
363                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
364   for name, content, mode in sshd_keys:
365     utils.WriteFile(name, data=content, mode=mode)
366
367   try:
368     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
369                                                     mkdir=True)
370   except errors.OpExecError, err:
371     _Fail("Error while processing user ssh files: %s", err, exc=True)
372
373   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
374     utils.WriteFile(name, data=content, mode=0600)
375
376   utils.AddAuthorizedKey(auth_keys, sshpub)
377
378   result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
379   if result.failed:
380     _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
381           result.cmd, result.exit_code, result.output)
382
383
384 def LeaveCluster(modify_ssh_setup):
385   """Cleans up and remove the current node.
386
387   This function cleans up and prepares the current node to be removed
388   from the cluster.
389
390   If processing is successful, then it raises an
391   L{errors.QuitGanetiException} which is used as a special case to
392   shutdown the node daemon.
393
394   @param modify_ssh_setup: boolean
395
396   """
397   _CleanDirectory(constants.DATA_DIR)
398   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
399   JobQueuePurge()
400
401   if modify_ssh_setup:
402     try:
403       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
404
405       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
406
407       utils.RemoveFile(priv_key)
408       utils.RemoveFile(pub_key)
409     except errors.OpExecError:
410       logging.exception("Error while processing ssh files")
411
412   try:
413     utils.RemoveFile(constants.CONFD_HMAC_KEY)
414     utils.RemoveFile(constants.RAPI_CERT_FILE)
415     utils.RemoveFile(constants.NODED_CERT_FILE)
416   except: # pylint: disable-msg=W0702
417     logging.exception("Error while removing cluster secrets")
418
419   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
420   if result.failed:
421     logging.error("Command %s failed with exitcode %s and error %s",
422                   result.cmd, result.exit_code, result.output)
423
424   # Raise a custom exception (handled in ganeti-noded)
425   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
426
427
428 def GetNodeInfo(vgname, hypervisor_type):
429   """Gives back a hash with different information about the node.
430
431   @type vgname: C{string}
432   @param vgname: the name of the volume group to ask for disk space information
433   @type hypervisor_type: C{str}
434   @param hypervisor_type: the name of the hypervisor to ask for
435       memory information
436   @rtype: C{dict}
437   @return: dictionary with the following keys:
438       - vg_size is the size of the configured volume group in MiB
439       - vg_free is the free size of the volume group in MiB
440       - memory_dom0 is the memory allocated for domain0 in MiB
441       - memory_free is the currently available (free) ram in MiB
442       - memory_total is the total number of ram in MiB
443
444   """
445   outputarray = {}
446   vginfo = _GetVGInfo(vgname)
447   outputarray['vg_size'] = vginfo['vg_size']
448   outputarray['vg_free'] = vginfo['vg_free']
449
450   hyper = hypervisor.GetHypervisor(hypervisor_type)
451   hyp_info = hyper.GetNodeInfo()
452   if hyp_info is not None:
453     outputarray.update(hyp_info)
454
455   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
456
457   return outputarray
458
459
460 def VerifyNode(what, cluster_name):
461   """Verify the status of the local node.
462
463   Based on the input L{what} parameter, various checks are done on the
464   local node.
465
466   If the I{filelist} key is present, this list of
467   files is checksummed and the file/checksum pairs are returned.
468
469   If the I{nodelist} key is present, we check that we have
470   connectivity via ssh with the target nodes (and check the hostname
471   report).
472
473   If the I{node-net-test} key is present, we check that we have
474   connectivity to the given nodes via both primary IP and, if
475   applicable, secondary IPs.
476
477   @type what: C{dict}
478   @param what: a dictionary of things to check:
479       - filelist: list of files for which to compute checksums
480       - nodelist: list of nodes we should check ssh communication with
481       - node-net-test: list of nodes we should check node daemon port
482         connectivity with
483       - hypervisor: list with hypervisors to run the verify for
484   @rtype: dict
485   @return: a dictionary with the same keys as the input dict, and
486       values representing the result of the checks
487
488   """
489   result = {}
490   my_name = utils.HostInfo().name
491   port = utils.GetDaemonPort(constants.NODED)
492
493   if constants.NV_HYPERVISOR in what:
494     result[constants.NV_HYPERVISOR] = tmp = {}
495     for hv_name in what[constants.NV_HYPERVISOR]:
496       try:
497         val = hypervisor.GetHypervisor(hv_name).Verify()
498       except errors.HypervisorError, err:
499         val = "Error while checking hypervisor: %s" % str(err)
500       tmp[hv_name] = val
501
502   if constants.NV_FILELIST in what:
503     result[constants.NV_FILELIST] = utils.FingerprintFiles(
504       what[constants.NV_FILELIST])
505
506   if constants.NV_NODELIST in what:
507     result[constants.NV_NODELIST] = tmp = {}
508     random.shuffle(what[constants.NV_NODELIST])
509     for node in what[constants.NV_NODELIST]:
510       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
511       if not success:
512         tmp[node] = message
513
514   if constants.NV_NODENETTEST in what:
515     result[constants.NV_NODENETTEST] = tmp = {}
516     my_pip = my_sip = None
517     for name, pip, sip in what[constants.NV_NODENETTEST]:
518       if name == my_name:
519         my_pip = pip
520         my_sip = sip
521         break
522     if not my_pip:
523       tmp[my_name] = ("Can't find my own primary/secondary IP"
524                       " in the node list")
525     else:
526       for name, pip, sip in what[constants.NV_NODENETTEST]:
527         fail = []
528         if not utils.TcpPing(pip, port, source=my_pip):
529           fail.append("primary")
530         if sip != pip:
531           if not utils.TcpPing(sip, port, source=my_sip):
532             fail.append("secondary")
533         if fail:
534           tmp[name] = ("failure using the %s interface(s)" %
535                        " and ".join(fail))
536
537   if constants.NV_MASTERIP in what:
538     # FIXME: add checks on incoming data structures (here and in the
539     # rest of the function)
540     master_name, master_ip = what[constants.NV_MASTERIP]
541     if master_name == my_name:
542       source = constants.IP4_ADDRESS_LOCALHOST
543     else:
544       source = None
545     result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
546                                                   source=source)
547
548   if constants.NV_LVLIST in what:
549     try:
550       val = GetVolumeList(what[constants.NV_LVLIST])
551     except RPCFail, err:
552       val = str(err)
553     result[constants.NV_LVLIST] = val
554
555   if constants.NV_INSTANCELIST in what:
556     # GetInstanceList can fail
557     try:
558       val = GetInstanceList(what[constants.NV_INSTANCELIST])
559     except RPCFail, err:
560       val = str(err)
561     result[constants.NV_INSTANCELIST] = val
562
563   if constants.NV_VGLIST in what:
564     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
565
566   if constants.NV_PVLIST in what:
567     result[constants.NV_PVLIST] = \
568       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
569                                    filter_allocatable=False)
570
571   if constants.NV_VERSION in what:
572     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
573                                     constants.RELEASE_VERSION)
574
575   if constants.NV_HVINFO in what:
576     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
577     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
578
579   if constants.NV_DRBDLIST in what:
580     try:
581       used_minors = bdev.DRBD8.GetUsedDevs().keys()
582     except errors.BlockDeviceError, err:
583       logging.warning("Can't get used minors list", exc_info=True)
584       used_minors = str(err)
585     result[constants.NV_DRBDLIST] = used_minors
586
587   if constants.NV_NODESETUP in what:
588     result[constants.NV_NODESETUP] = tmpr = []
589     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
590       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
591                   " under /sys, missing required directories /sys/block"
592                   " and /sys/class/net")
593     if (not os.path.isdir("/proc/sys") or
594         not os.path.isfile("/proc/sysrq-trigger")):
595       tmpr.append("The procfs filesystem doesn't seem to be mounted"
596                   " under /proc, missing required directory /proc/sys and"
597                   " the file /proc/sysrq-trigger")
598
599   if constants.NV_TIME in what:
600     result[constants.NV_TIME] = utils.SplitTime(time.time())
601
602   if constants.NV_OSLIST in what:
603     result[constants.NV_OSLIST] = DiagnoseOS()
604
605   return result
606
607
608 def GetVolumeList(vg_name):
609   """Compute list of logical volumes and their size.
610
611   @type vg_name: str
612   @param vg_name: the volume group whose LVs we should list
613   @rtype: dict
614   @return:
615       dictionary of all partions (key) with value being a tuple of
616       their size (in MiB), inactive and online status::
617
618         {'test1': ('20.06', True, True)}
619
620       in case of errors, a string is returned with the error
621       details.
622
623   """
624   lvs = {}
625   sep = '|'
626   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
627                          "--separator=%s" % sep,
628                          "-olv_name,lv_size,lv_attr", vg_name])
629   if result.failed:
630     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
631
632   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
633   for line in result.stdout.splitlines():
634     line = line.strip()
635     match = valid_line_re.match(line)
636     if not match:
637       logging.error("Invalid line returned from lvs output: '%s'", line)
638       continue
639     name, size, attr = match.groups()
640     inactive = attr[4] == '-'
641     online = attr[5] == 'o'
642     virtual = attr[0] == 'v'
643     if virtual:
644       # we don't want to report such volumes as existing, since they
645       # don't really hold data
646       continue
647     lvs[name] = (size, inactive, online)
648
649   return lvs
650
651
652 def ListVolumeGroups():
653   """List the volume groups and their size.
654
655   @rtype: dict
656   @return: dictionary with keys volume name and values the
657       size of the volume
658
659   """
660   return utils.ListVolumeGroups()
661
662
663 def NodeVolumes():
664   """List all volumes on this node.
665
666   @rtype: list
667   @return:
668     A list of dictionaries, each having four keys:
669       - name: the logical volume name,
670       - size: the size of the logical volume
671       - dev: the physical device on which the LV lives
672       - vg: the volume group to which it belongs
673
674     In case of errors, we return an empty list and log the
675     error.
676
677     Note that since a logical volume can live on multiple physical
678     volumes, the resulting list might include a logical volume
679     multiple times.
680
681   """
682   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
683                          "--separator=|",
684                          "--options=lv_name,lv_size,devices,vg_name"])
685   if result.failed:
686     _Fail("Failed to list logical volumes, lvs output: %s",
687           result.output)
688
689   def parse_dev(dev):
690     return dev.split('(')[0]
691
692   def handle_dev(dev):
693     return [parse_dev(x) for x in dev.split(",")]
694
695   def map_line(line):
696     line = [v.strip() for v in line]
697     return [{'name': line[0], 'size': line[1],
698              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
699
700   all_devs = []
701   for line in result.stdout.splitlines():
702     if line.count('|') >= 3:
703       all_devs.extend(map_line(line.split('|')))
704     else:
705       logging.warning("Strange line in the output from lvs: '%s'", line)
706   return all_devs
707
708
709 def BridgesExist(bridges_list):
710   """Check if a list of bridges exist on the current node.
711
712   @rtype: boolean
713   @return: C{True} if all of them exist, C{False} otherwise
714
715   """
716   missing = []
717   for bridge in bridges_list:
718     if not utils.BridgeExists(bridge):
719       missing.append(bridge)
720
721   if missing:
722     _Fail("Missing bridges %s", utils.CommaJoin(missing))
723
724
725 def GetInstanceList(hypervisor_list):
726   """Provides a list of instances.
727
728   @type hypervisor_list: list
729   @param hypervisor_list: the list of hypervisors to query information
730
731   @rtype: list
732   @return: a list of all running instances on the current node
733     - instance1.example.com
734     - instance2.example.com
735
736   """
737   results = []
738   for hname in hypervisor_list:
739     try:
740       names = hypervisor.GetHypervisor(hname).ListInstances()
741       results.extend(names)
742     except errors.HypervisorError, err:
743       _Fail("Error enumerating instances (hypervisor %s): %s",
744             hname, err, exc=True)
745
746   return results
747
748
749 def GetInstanceInfo(instance, hname):
750   """Gives back the information about an instance as a dictionary.
751
752   @type instance: string
753   @param instance: the instance name
754   @type hname: string
755   @param hname: the hypervisor type of the instance
756
757   @rtype: dict
758   @return: dictionary with the following keys:
759       - memory: memory size of instance (int)
760       - state: xen state of instance (string)
761       - time: cpu time of instance (float)
762
763   """
764   output = {}
765
766   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
767   if iinfo is not None:
768     output['memory'] = iinfo[2]
769     output['state'] = iinfo[4]
770     output['time'] = iinfo[5]
771
772   return output
773
774
775 def GetInstanceMigratable(instance):
776   """Gives whether an instance can be migrated.
777
778   @type instance: L{objects.Instance}
779   @param instance: object representing the instance to be checked.
780
781   @rtype: tuple
782   @return: tuple of (result, description) where:
783       - result: whether the instance can be migrated or not
784       - description: a description of the issue, if relevant
785
786   """
787   hyper = hypervisor.GetHypervisor(instance.hypervisor)
788   iname = instance.name
789   if iname not in hyper.ListInstances():
790     _Fail("Instance %s is not running", iname)
791
792   for idx in range(len(instance.disks)):
793     link_name = _GetBlockDevSymlinkPath(iname, idx)
794     if not os.path.islink(link_name):
795       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
796
797
798 def GetAllInstancesInfo(hypervisor_list):
799   """Gather data about all instances.
800
801   This is the equivalent of L{GetInstanceInfo}, except that it
802   computes data for all instances at once, thus being faster if one
803   needs data about more than one instance.
804
805   @type hypervisor_list: list
806   @param hypervisor_list: list of hypervisors to query for instance data
807
808   @rtype: dict
809   @return: dictionary of instance: data, with data having the following keys:
810       - memory: memory size of instance (int)
811       - state: xen state of instance (string)
812       - time: cpu time of instance (float)
813       - vcpus: the number of vcpus
814
815   """
816   output = {}
817
818   for hname in hypervisor_list:
819     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
820     if iinfo:
821       for name, _, memory, vcpus, state, times in iinfo:
822         value = {
823           'memory': memory,
824           'vcpus': vcpus,
825           'state': state,
826           'time': times,
827           }
828         if name in output:
829           # we only check static parameters, like memory and vcpus,
830           # and not state and time which can change between the
831           # invocations of the different hypervisors
832           for key in 'memory', 'vcpus':
833             if value[key] != output[name][key]:
834               _Fail("Instance %s is running twice"
835                     " with different parameters", name)
836         output[name] = value
837
838   return output
839
840
841 def _InstanceLogName(kind, os_name, instance):
842   """Compute the OS log filename for a given instance and operation.
843
844   The instance name and os name are passed in as strings since not all
845   operations have these as part of an instance object.
846
847   @type kind: string
848   @param kind: the operation type (e.g. add, import, etc.)
849   @type os_name: string
850   @param os_name: the os name
851   @type instance: string
852   @param instance: the name of the instance being imported/added/etc.
853
854   """
855   # TODO: Use tempfile.mkstemp to create unique filename
856   base = ("%s-%s-%s-%s.log" %
857           (kind, os_name, instance, utils.TimestampForFilename()))
858   return utils.PathJoin(constants.LOG_OS_DIR, base)
859
860
861 def InstanceOsAdd(instance, reinstall, debug):
862   """Add an OS to an instance.
863
864   @type instance: L{objects.Instance}
865   @param instance: Instance whose OS is to be installed
866   @type reinstall: boolean
867   @param reinstall: whether this is an instance reinstall
868   @type debug: integer
869   @param debug: debug level, passed to the OS scripts
870   @rtype: None
871
872   """
873   inst_os = OSFromDisk(instance.os)
874
875   create_env = OSEnvironment(instance, inst_os, debug)
876   if reinstall:
877     create_env['INSTANCE_REINSTALL'] = "1"
878
879   logfile = _InstanceLogName("add", instance.os, instance.name)
880
881   result = utils.RunCmd([inst_os.create_script], env=create_env,
882                         cwd=inst_os.path, output=logfile,)
883   if result.failed:
884     logging.error("os create command '%s' returned error: %s, logfile: %s,"
885                   " output: %s", result.cmd, result.fail_reason, logfile,
886                   result.output)
887     lines = [utils.SafeEncode(val)
888              for val in utils.TailFile(logfile, lines=20)]
889     _Fail("OS create script failed (%s), last lines in the"
890           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
891
892
893 def RunRenameInstance(instance, old_name, debug):
894   """Run the OS rename script for an instance.
895
896   @type instance: L{objects.Instance}
897   @param instance: Instance whose OS is to be installed
898   @type old_name: string
899   @param old_name: previous instance name
900   @type debug: integer
901   @param debug: debug level, passed to the OS scripts
902   @rtype: boolean
903   @return: the success of the operation
904
905   """
906   inst_os = OSFromDisk(instance.os)
907
908   rename_env = OSEnvironment(instance, inst_os, debug)
909   rename_env['OLD_INSTANCE_NAME'] = old_name
910
911   logfile = _InstanceLogName("rename", instance.os,
912                              "%s-%s" % (old_name, instance.name))
913
914   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
915                         cwd=inst_os.path, output=logfile)
916
917   if result.failed:
918     logging.error("os create command '%s' returned error: %s output: %s",
919                   result.cmd, result.fail_reason, result.output)
920     lines = [utils.SafeEncode(val)
921              for val in utils.TailFile(logfile, lines=20)]
922     _Fail("OS rename script failed (%s), last lines in the"
923           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
924
925
926 def _GetVGInfo(vg_name):
927   """Get information about the volume group.
928
929   @type vg_name: str
930   @param vg_name: the volume group which we query
931   @rtype: dict
932   @return:
933     A dictionary with the following keys:
934       - C{vg_size} is the total size of the volume group in MiB
935       - C{vg_free} is the free size of the volume group in MiB
936       - C{pv_count} are the number of physical disks in that VG
937
938     If an error occurs during gathering of data, we return the same dict
939     with keys all set to None.
940
941   """
942   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
943
944   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
945                          "--nosuffix", "--units=m", "--separator=:", vg_name])
946
947   if retval.failed:
948     logging.error("volume group %s not present", vg_name)
949     return retdic
950   valarr = retval.stdout.strip().rstrip(':').split(':')
951   if len(valarr) == 3:
952     try:
953       retdic = {
954         "vg_size": int(round(float(valarr[0]), 0)),
955         "vg_free": int(round(float(valarr[1]), 0)),
956         "pv_count": int(valarr[2]),
957         }
958     except (TypeError, ValueError), err:
959       logging.exception("Fail to parse vgs output: %s", err)
960   else:
961     logging.error("vgs output has the wrong number of fields (expected"
962                   " three): %s", str(valarr))
963   return retdic
964
965
966 def _GetBlockDevSymlinkPath(instance_name, idx):
967   return utils.PathJoin(constants.DISK_LINKS_DIR,
968                         "%s:%d" % (instance_name, idx))
969
970
971 def _SymlinkBlockDev(instance_name, device_path, idx):
972   """Set up symlinks to a instance's block device.
973
974   This is an auxiliary function run when an instance is start (on the primary
975   node) or when an instance is migrated (on the target node).
976
977
978   @param instance_name: the name of the target instance
979   @param device_path: path of the physical block device, on the node
980   @param idx: the disk index
981   @return: absolute path to the disk's symlink
982
983   """
984   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
985   try:
986     os.symlink(device_path, link_name)
987   except OSError, err:
988     if err.errno == errno.EEXIST:
989       if (not os.path.islink(link_name) or
990           os.readlink(link_name) != device_path):
991         os.remove(link_name)
992         os.symlink(device_path, link_name)
993     else:
994       raise
995
996   return link_name
997
998
999 def _RemoveBlockDevLinks(instance_name, disks):
1000   """Remove the block device symlinks belonging to the given instance.
1001
1002   """
1003   for idx, _ in enumerate(disks):
1004     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1005     if os.path.islink(link_name):
1006       try:
1007         os.remove(link_name)
1008       except OSError:
1009         logging.exception("Can't remove symlink '%s'", link_name)
1010
1011
1012 def _GatherAndLinkBlockDevs(instance):
1013   """Set up an instance's block device(s).
1014
1015   This is run on the primary node at instance startup. The block
1016   devices must be already assembled.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: the instance whose disks we shoul assemble
1020   @rtype: list
1021   @return: list of (disk_object, device_path)
1022
1023   """
1024   block_devices = []
1025   for idx, disk in enumerate(instance.disks):
1026     device = _RecursiveFindBD(disk)
1027     if device is None:
1028       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1029                                     str(disk))
1030     device.Open()
1031     try:
1032       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1033     except OSError, e:
1034       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1035                                     e.strerror)
1036
1037     block_devices.append((disk, link_name))
1038
1039   return block_devices
1040
1041
1042 def StartInstance(instance):
1043   """Start an instance.
1044
1045   @type instance: L{objects.Instance}
1046   @param instance: the instance object
1047   @rtype: None
1048
1049   """
1050   running_instances = GetInstanceList([instance.hypervisor])
1051
1052   if instance.name in running_instances:
1053     logging.info("Instance %s already running, not starting", instance.name)
1054     return
1055
1056   try:
1057     block_devices = _GatherAndLinkBlockDevs(instance)
1058     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1059     hyper.StartInstance(instance, block_devices)
1060   except errors.BlockDeviceError, err:
1061     _Fail("Block device error: %s", err, exc=True)
1062   except errors.HypervisorError, err:
1063     _RemoveBlockDevLinks(instance.name, instance.disks)
1064     _Fail("Hypervisor error: %s", err, exc=True)
1065
1066
1067 def InstanceShutdown(instance, timeout):
1068   """Shut an instance down.
1069
1070   @note: this functions uses polling with a hardcoded timeout.
1071
1072   @type instance: L{objects.Instance}
1073   @param instance: the instance object
1074   @type timeout: integer
1075   @param timeout: maximum timeout for soft shutdown
1076   @rtype: None
1077
1078   """
1079   hv_name = instance.hypervisor
1080   hyper = hypervisor.GetHypervisor(hv_name)
1081   iname = instance.name
1082
1083   if instance.name not in hyper.ListInstances():
1084     logging.info("Instance %s not running, doing nothing", iname)
1085     return
1086
1087   class _TryShutdown:
1088     def __init__(self):
1089       self.tried_once = False
1090
1091     def __call__(self):
1092       if iname not in hyper.ListInstances():
1093         return
1094
1095       try:
1096         hyper.StopInstance(instance, retry=self.tried_once)
1097       except errors.HypervisorError, err:
1098         if iname not in hyper.ListInstances():
1099           # if the instance is no longer existing, consider this a
1100           # success and go to cleanup
1101           return
1102
1103         _Fail("Failed to stop instance %s: %s", iname, err)
1104
1105       self.tried_once = True
1106
1107       raise utils.RetryAgain()
1108
1109   try:
1110     utils.Retry(_TryShutdown(), 5, timeout)
1111   except utils.RetryTimeout:
1112     # the shutdown did not succeed
1113     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1114
1115     try:
1116       hyper.StopInstance(instance, force=True)
1117     except errors.HypervisorError, err:
1118       if iname in hyper.ListInstances():
1119         # only raise an error if the instance still exists, otherwise
1120         # the error could simply be "instance ... unknown"!
1121         _Fail("Failed to force stop instance %s: %s", iname, err)
1122
1123     time.sleep(1)
1124
1125     if iname in hyper.ListInstances():
1126       _Fail("Could not shutdown instance %s even by destroy", iname)
1127
1128   try:
1129     hyper.CleanupInstance(instance.name)
1130   except errors.HypervisorError, err:
1131     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1132
1133   _RemoveBlockDevLinks(iname, instance.disks)
1134
1135
1136 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1137   """Reboot an instance.
1138
1139   @type instance: L{objects.Instance}
1140   @param instance: the instance object to reboot
1141   @type reboot_type: str
1142   @param reboot_type: the type of reboot, one the following
1143     constants:
1144       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1145         instance OS, do not recreate the VM
1146       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1147         restart the VM (at the hypervisor level)
1148       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1149         not accepted here, since that mode is handled differently, in
1150         cmdlib, and translates into full stop and start of the
1151         instance (instead of a call_instance_reboot RPC)
1152   @type shutdown_timeout: integer
1153   @param shutdown_timeout: maximum timeout for soft shutdown
1154   @rtype: None
1155
1156   """
1157   running_instances = GetInstanceList([instance.hypervisor])
1158
1159   if instance.name not in running_instances:
1160     _Fail("Cannot reboot instance %s that is not running", instance.name)
1161
1162   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1163   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1164     try:
1165       hyper.RebootInstance(instance)
1166     except errors.HypervisorError, err:
1167       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1168   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1169     try:
1170       InstanceShutdown(instance, shutdown_timeout)
1171       return StartInstance(instance)
1172     except errors.HypervisorError, err:
1173       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1174   else:
1175     _Fail("Invalid reboot_type received: %s", reboot_type)
1176
1177
1178 def MigrationInfo(instance):
1179   """Gather information about an instance to be migrated.
1180
1181   @type instance: L{objects.Instance}
1182   @param instance: the instance definition
1183
1184   """
1185   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1186   try:
1187     info = hyper.MigrationInfo(instance)
1188   except errors.HypervisorError, err:
1189     _Fail("Failed to fetch migration information: %s", err, exc=True)
1190   return info
1191
1192
1193 def AcceptInstance(instance, info, target):
1194   """Prepare the node to accept an instance.
1195
1196   @type instance: L{objects.Instance}
1197   @param instance: the instance definition
1198   @type info: string/data (opaque)
1199   @param info: migration information, from the source node
1200   @type target: string
1201   @param target: target host (usually ip), on this node
1202
1203   """
1204   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1205   try:
1206     hyper.AcceptInstance(instance, info, target)
1207   except errors.HypervisorError, err:
1208     _Fail("Failed to accept instance: %s", err, exc=True)
1209
1210
1211 def FinalizeMigration(instance, info, success):
1212   """Finalize any preparation to accept an instance.
1213
1214   @type instance: L{objects.Instance}
1215   @param instance: the instance definition
1216   @type info: string/data (opaque)
1217   @param info: migration information, from the source node
1218   @type success: boolean
1219   @param success: whether the migration was a success or a failure
1220
1221   """
1222   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1223   try:
1224     hyper.FinalizeMigration(instance, info, success)
1225   except errors.HypervisorError, err:
1226     _Fail("Failed to finalize migration: %s", err, exc=True)
1227
1228
1229 def MigrateInstance(instance, target, live):
1230   """Migrates an instance to another node.
1231
1232   @type instance: L{objects.Instance}
1233   @param instance: the instance definition
1234   @type target: string
1235   @param target: the target node name
1236   @type live: boolean
1237   @param live: whether the migration should be done live or not (the
1238       interpretation of this parameter is left to the hypervisor)
1239   @rtype: tuple
1240   @return: a tuple of (success, msg) where:
1241       - succes is a boolean denoting the success/failure of the operation
1242       - msg is a string with details in case of failure
1243
1244   """
1245   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1246
1247   try:
1248     hyper.MigrateInstance(instance, target, live)
1249   except errors.HypervisorError, err:
1250     _Fail("Failed to migrate instance: %s", err, exc=True)
1251
1252
1253 def BlockdevCreate(disk, size, owner, on_primary, info):
1254   """Creates a block device for an instance.
1255
1256   @type disk: L{objects.Disk}
1257   @param disk: the object describing the disk we should create
1258   @type size: int
1259   @param size: the size of the physical underlying device, in MiB
1260   @type owner: str
1261   @param owner: the name of the instance for which disk is created,
1262       used for device cache data
1263   @type on_primary: boolean
1264   @param on_primary:  indicates if it is the primary node or not
1265   @type info: string
1266   @param info: string that will be sent to the physical device
1267       creation, used for example to set (LVM) tags on LVs
1268
1269   @return: the new unique_id of the device (this can sometime be
1270       computed only after creation), or None. On secondary nodes,
1271       it's not required to return anything.
1272
1273   """
1274   # TODO: remove the obsolete 'size' argument
1275   # pylint: disable-msg=W0613
1276   clist = []
1277   if disk.children:
1278     for child in disk.children:
1279       try:
1280         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1281       except errors.BlockDeviceError, err:
1282         _Fail("Can't assemble device %s: %s", child, err)
1283       if on_primary or disk.AssembleOnSecondary():
1284         # we need the children open in case the device itself has to
1285         # be assembled
1286         try:
1287           # pylint: disable-msg=E1103
1288           crdev.Open()
1289         except errors.BlockDeviceError, err:
1290           _Fail("Can't make child '%s' read-write: %s", child, err)
1291       clist.append(crdev)
1292
1293   try:
1294     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1295   except errors.BlockDeviceError, err:
1296     _Fail("Can't create block device: %s", err)
1297
1298   if on_primary or disk.AssembleOnSecondary():
1299     try:
1300       device.Assemble()
1301     except errors.BlockDeviceError, err:
1302       _Fail("Can't assemble device after creation, unusual event: %s", err)
1303     device.SetSyncSpeed(constants.SYNC_SPEED)
1304     if on_primary or disk.OpenOnSecondary():
1305       try:
1306         device.Open(force=True)
1307       except errors.BlockDeviceError, err:
1308         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1309     DevCacheManager.UpdateCache(device.dev_path, owner,
1310                                 on_primary, disk.iv_name)
1311
1312   device.SetInfo(info)
1313
1314   return device.unique_id
1315
1316
1317 def BlockdevRemove(disk):
1318   """Remove a block device.
1319
1320   @note: This is intended to be called recursively.
1321
1322   @type disk: L{objects.Disk}
1323   @param disk: the disk object we should remove
1324   @rtype: boolean
1325   @return: the success of the operation
1326
1327   """
1328   msgs = []
1329   try:
1330     rdev = _RecursiveFindBD(disk)
1331   except errors.BlockDeviceError, err:
1332     # probably can't attach
1333     logging.info("Can't attach to device %s in remove", disk)
1334     rdev = None
1335   if rdev is not None:
1336     r_path = rdev.dev_path
1337     try:
1338       rdev.Remove()
1339     except errors.BlockDeviceError, err:
1340       msgs.append(str(err))
1341     if not msgs:
1342       DevCacheManager.RemoveCache(r_path)
1343
1344   if disk.children:
1345     for child in disk.children:
1346       try:
1347         BlockdevRemove(child)
1348       except RPCFail, err:
1349         msgs.append(str(err))
1350
1351   if msgs:
1352     _Fail("; ".join(msgs))
1353
1354
1355 def _RecursiveAssembleBD(disk, owner, as_primary):
1356   """Activate a block device for an instance.
1357
1358   This is run on the primary and secondary nodes for an instance.
1359
1360   @note: this function is called recursively.
1361
1362   @type disk: L{objects.Disk}
1363   @param disk: the disk we try to assemble
1364   @type owner: str
1365   @param owner: the name of the instance which owns the disk
1366   @type as_primary: boolean
1367   @param as_primary: if we should make the block device
1368       read/write
1369
1370   @return: the assembled device or None (in case no device
1371       was assembled)
1372   @raise errors.BlockDeviceError: in case there is an error
1373       during the activation of the children or the device
1374       itself
1375
1376   """
1377   children = []
1378   if disk.children:
1379     mcn = disk.ChildrenNeeded()
1380     if mcn == -1:
1381       mcn = 0 # max number of Nones allowed
1382     else:
1383       mcn = len(disk.children) - mcn # max number of Nones
1384     for chld_disk in disk.children:
1385       try:
1386         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1387       except errors.BlockDeviceError, err:
1388         if children.count(None) >= mcn:
1389           raise
1390         cdev = None
1391         logging.error("Error in child activation (but continuing): %s",
1392                       str(err))
1393       children.append(cdev)
1394
1395   if as_primary or disk.AssembleOnSecondary():
1396     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1397     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1398     result = r_dev
1399     if as_primary or disk.OpenOnSecondary():
1400       r_dev.Open()
1401     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1402                                 as_primary, disk.iv_name)
1403
1404   else:
1405     result = True
1406   return result
1407
1408
1409 def BlockdevAssemble(disk, owner, as_primary):
1410   """Activate a block device for an instance.
1411
1412   This is a wrapper over _RecursiveAssembleBD.
1413
1414   @rtype: str or boolean
1415   @return: a C{/dev/...} path for primary nodes, and
1416       C{True} for secondary nodes
1417
1418   """
1419   try:
1420     result = _RecursiveAssembleBD(disk, owner, as_primary)
1421     if isinstance(result, bdev.BlockDev):
1422       # pylint: disable-msg=E1103
1423       result = result.dev_path
1424   except errors.BlockDeviceError, err:
1425     _Fail("Error while assembling disk: %s", err, exc=True)
1426
1427   return result
1428
1429
1430 def BlockdevShutdown(disk):
1431   """Shut down a block device.
1432
1433   First, if the device is assembled (Attach() is successful), then
1434   the device is shutdown. Then the children of the device are
1435   shutdown.
1436
1437   This function is called recursively. Note that we don't cache the
1438   children or such, as oppossed to assemble, shutdown of different
1439   devices doesn't require that the upper device was active.
1440
1441   @type disk: L{objects.Disk}
1442   @param disk: the description of the disk we should
1443       shutdown
1444   @rtype: None
1445
1446   """
1447   msgs = []
1448   r_dev = _RecursiveFindBD(disk)
1449   if r_dev is not None:
1450     r_path = r_dev.dev_path
1451     try:
1452       r_dev.Shutdown()
1453       DevCacheManager.RemoveCache(r_path)
1454     except errors.BlockDeviceError, err:
1455       msgs.append(str(err))
1456
1457   if disk.children:
1458     for child in disk.children:
1459       try:
1460         BlockdevShutdown(child)
1461       except RPCFail, err:
1462         msgs.append(str(err))
1463
1464   if msgs:
1465     _Fail("; ".join(msgs))
1466
1467
1468 def BlockdevAddchildren(parent_cdev, new_cdevs):
1469   """Extend a mirrored block device.
1470
1471   @type parent_cdev: L{objects.Disk}
1472   @param parent_cdev: the disk to which we should add children
1473   @type new_cdevs: list of L{objects.Disk}
1474   @param new_cdevs: the list of children which we should add
1475   @rtype: None
1476
1477   """
1478   parent_bdev = _RecursiveFindBD(parent_cdev)
1479   if parent_bdev is None:
1480     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1481   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1482   if new_bdevs.count(None) > 0:
1483     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1484   parent_bdev.AddChildren(new_bdevs)
1485
1486
1487 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1488   """Shrink a mirrored block device.
1489
1490   @type parent_cdev: L{objects.Disk}
1491   @param parent_cdev: the disk from which we should remove children
1492   @type new_cdevs: list of L{objects.Disk}
1493   @param new_cdevs: the list of children which we should remove
1494   @rtype: None
1495
1496   """
1497   parent_bdev = _RecursiveFindBD(parent_cdev)
1498   if parent_bdev is None:
1499     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1500   devs = []
1501   for disk in new_cdevs:
1502     rpath = disk.StaticDevPath()
1503     if rpath is None:
1504       bd = _RecursiveFindBD(disk)
1505       if bd is None:
1506         _Fail("Can't find device %s while removing children", disk)
1507       else:
1508         devs.append(bd.dev_path)
1509     else:
1510       if not utils.IsNormAbsPath(rpath):
1511         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1512       devs.append(rpath)
1513   parent_bdev.RemoveChildren(devs)
1514
1515
1516 def BlockdevGetmirrorstatus(disks):
1517   """Get the mirroring status of a list of devices.
1518
1519   @type disks: list of L{objects.Disk}
1520   @param disks: the list of disks which we should query
1521   @rtype: disk
1522   @return:
1523       a list of (mirror_done, estimated_time) tuples, which
1524       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1525   @raise errors.BlockDeviceError: if any of the disks cannot be
1526       found
1527
1528   """
1529   stats = []
1530   for dsk in disks:
1531     rbd = _RecursiveFindBD(dsk)
1532     if rbd is None:
1533       _Fail("Can't find device %s", dsk)
1534
1535     stats.append(rbd.CombinedSyncStatus())
1536
1537   return stats
1538
1539
1540 def _RecursiveFindBD(disk):
1541   """Check if a device is activated.
1542
1543   If so, return information about the real device.
1544
1545   @type disk: L{objects.Disk}
1546   @param disk: the disk object we need to find
1547
1548   @return: None if the device can't be found,
1549       otherwise the device instance
1550
1551   """
1552   children = []
1553   if disk.children:
1554     for chdisk in disk.children:
1555       children.append(_RecursiveFindBD(chdisk))
1556
1557   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1558
1559
1560 def _OpenRealBD(disk):
1561   """Opens the underlying block device of a disk.
1562
1563   @type disk: L{objects.Disk}
1564   @param disk: the disk object we want to open
1565
1566   """
1567   real_disk = _RecursiveFindBD(disk)
1568   if real_disk is None:
1569     _Fail("Block device '%s' is not set up", disk)
1570
1571   real_disk.Open()
1572
1573   return real_disk
1574
1575
1576 def BlockdevFind(disk):
1577   """Check if a device is activated.
1578
1579   If it is, return information about the real device.
1580
1581   @type disk: L{objects.Disk}
1582   @param disk: the disk to find
1583   @rtype: None or objects.BlockDevStatus
1584   @return: None if the disk cannot be found, otherwise a the current
1585            information
1586
1587   """
1588   try:
1589     rbd = _RecursiveFindBD(disk)
1590   except errors.BlockDeviceError, err:
1591     _Fail("Failed to find device: %s", err, exc=True)
1592
1593   if rbd is None:
1594     return None
1595
1596   return rbd.GetSyncStatus()
1597
1598
1599 def BlockdevGetsize(disks):
1600   """Computes the size of the given disks.
1601
1602   If a disk is not found, returns None instead.
1603
1604   @type disks: list of L{objects.Disk}
1605   @param disks: the list of disk to compute the size for
1606   @rtype: list
1607   @return: list with elements None if the disk cannot be found,
1608       otherwise the size
1609
1610   """
1611   result = []
1612   for cf in disks:
1613     try:
1614       rbd = _RecursiveFindBD(cf)
1615     except errors.BlockDeviceError:
1616       result.append(None)
1617       continue
1618     if rbd is None:
1619       result.append(None)
1620     else:
1621       result.append(rbd.GetActualSize())
1622   return result
1623
1624
1625 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1626   """Export a block device to a remote node.
1627
1628   @type disk: L{objects.Disk}
1629   @param disk: the description of the disk to export
1630   @type dest_node: str
1631   @param dest_node: the destination node to export to
1632   @type dest_path: str
1633   @param dest_path: the destination path on the target node
1634   @type cluster_name: str
1635   @param cluster_name: the cluster name, needed for SSH hostalias
1636   @rtype: None
1637
1638   """
1639   real_disk = _OpenRealBD(disk)
1640
1641   # the block size on the read dd is 1MiB to match our units
1642   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1643                                "dd if=%s bs=1048576 count=%s",
1644                                real_disk.dev_path, str(disk.size))
1645
1646   # we set here a smaller block size as, due to ssh buffering, more
1647   # than 64-128k will mostly ignored; we use nocreat to fail if the
1648   # device is not already there or we pass a wrong path; we use
1649   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1650   # to not buffer too much memory; this means that at best, we flush
1651   # every 64k, which will not be very fast
1652   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1653                                 " oflag=dsync", dest_path)
1654
1655   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1656                                                    constants.GANETI_RUNAS,
1657                                                    destcmd)
1658
1659   # all commands have been checked, so we're safe to combine them
1660   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1661
1662   result = utils.RunCmd(["bash", "-c", command])
1663
1664   if result.failed:
1665     _Fail("Disk copy command '%s' returned error: %s"
1666           " output: %s", command, result.fail_reason, result.output)
1667
1668
1669 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1670   """Write a file to the filesystem.
1671
1672   This allows the master to overwrite(!) a file. It will only perform
1673   the operation if the file belongs to a list of configuration files.
1674
1675   @type file_name: str
1676   @param file_name: the target file name
1677   @type data: str
1678   @param data: the new contents of the file
1679   @type mode: int
1680   @param mode: the mode to give the file (can be None)
1681   @type uid: int
1682   @param uid: the owner of the file (can be -1 for default)
1683   @type gid: int
1684   @param gid: the group of the file (can be -1 for default)
1685   @type atime: float
1686   @param atime: the atime to set on the file (can be None)
1687   @type mtime: float
1688   @param mtime: the mtime to set on the file (can be None)
1689   @rtype: None
1690
1691   """
1692   if not os.path.isabs(file_name):
1693     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1694
1695   if file_name not in _ALLOWED_UPLOAD_FILES:
1696     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1697           file_name)
1698
1699   raw_data = _Decompress(data)
1700
1701   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1702                   atime=atime, mtime=mtime)
1703
1704
1705 def WriteSsconfFiles(values):
1706   """Update all ssconf files.
1707
1708   Wrapper around the SimpleStore.WriteFiles.
1709
1710   """
1711   ssconf.SimpleStore().WriteFiles(values)
1712
1713
1714 def _ErrnoOrStr(err):
1715   """Format an EnvironmentError exception.
1716
1717   If the L{err} argument has an errno attribute, it will be looked up
1718   and converted into a textual C{E...} description. Otherwise the
1719   string representation of the error will be returned.
1720
1721   @type err: L{EnvironmentError}
1722   @param err: the exception to format
1723
1724   """
1725   if hasattr(err, 'errno'):
1726     detail = errno.errorcode[err.errno]
1727   else:
1728     detail = str(err)
1729   return detail
1730
1731
1732 def _OSOndiskAPIVersion(os_dir):
1733   """Compute and return the API version of a given OS.
1734
1735   This function will try to read the API version of the OS residing in
1736   the 'os_dir' directory.
1737
1738   @type os_dir: str
1739   @param os_dir: the directory in which we should look for the OS
1740   @rtype: tuple
1741   @return: tuple (status, data) with status denoting the validity and
1742       data holding either the vaid versions or an error message
1743
1744   """
1745   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1746
1747   try:
1748     st = os.stat(api_file)
1749   except EnvironmentError, err:
1750     return False, ("Required file '%s' not found under path %s: %s" %
1751                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1752
1753   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1754     return False, ("File '%s' in %s is not a regular file" %
1755                    (constants.OS_API_FILE, os_dir))
1756
1757   try:
1758     api_versions = utils.ReadFile(api_file).splitlines()
1759   except EnvironmentError, err:
1760     return False, ("Error while reading the API version file at %s: %s" %
1761                    (api_file, _ErrnoOrStr(err)))
1762
1763   try:
1764     api_versions = [int(version.strip()) for version in api_versions]
1765   except (TypeError, ValueError), err:
1766     return False, ("API version(s) can't be converted to integer: %s" %
1767                    str(err))
1768
1769   return True, api_versions
1770
1771
1772 def DiagnoseOS(top_dirs=None):
1773   """Compute the validity for all OSes.
1774
1775   @type top_dirs: list
1776   @param top_dirs: the list of directories in which to
1777       search (if not given defaults to
1778       L{constants.OS_SEARCH_PATH})
1779   @rtype: list of L{objects.OS}
1780   @return: a list of tuples (name, path, status, diagnose, variants,
1781       parameters, api_version) for all (potential) OSes under all
1782       search paths, where:
1783           - name is the (potential) OS name
1784           - path is the full path to the OS
1785           - status True/False is the validity of the OS
1786           - diagnose is the error message for an invalid OS, otherwise empty
1787           - variants is a list of supported OS variants, if any
1788           - parameters is a list of (name, help) parameters, if any
1789           - api_version is a list of support OS API versions
1790
1791   """
1792   if top_dirs is None:
1793     top_dirs = constants.OS_SEARCH_PATH
1794
1795   result = []
1796   for dir_name in top_dirs:
1797     if os.path.isdir(dir_name):
1798       try:
1799         f_names = utils.ListVisibleFiles(dir_name)
1800       except EnvironmentError, err:
1801         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1802         break
1803       for name in f_names:
1804         os_path = utils.PathJoin(dir_name, name)
1805         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1806         if status:
1807           diagnose = ""
1808           variants = os_inst.supported_variants
1809           parameters = os_inst.supported_parameters
1810           api_versions = os_inst.api_versions
1811         else:
1812           diagnose = os_inst
1813           variants = parameters = api_versions = []
1814         result.append((name, os_path, status, diagnose, variants,
1815                        parameters, api_versions))
1816
1817   return result
1818
1819
1820 def _TryOSFromDisk(name, base_dir=None):
1821   """Create an OS instance from disk.
1822
1823   This function will return an OS instance if the given name is a
1824   valid OS name.
1825
1826   @type base_dir: string
1827   @keyword base_dir: Base directory containing OS installations.
1828                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1829   @rtype: tuple
1830   @return: success and either the OS instance if we find a valid one,
1831       or error message
1832
1833   """
1834   if base_dir is None:
1835     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1836   else:
1837     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1838
1839   if os_dir is None:
1840     return False, "Directory for OS %s not found in search path" % name
1841
1842   status, api_versions = _OSOndiskAPIVersion(os_dir)
1843   if not status:
1844     # push the error up
1845     return status, api_versions
1846
1847   if not constants.OS_API_VERSIONS.intersection(api_versions):
1848     return False, ("API version mismatch for path '%s': found %s, want %s." %
1849                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1850
1851   # OS Files dictionary, we will populate it with the absolute path names
1852   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1853
1854   if max(api_versions) >= constants.OS_API_V15:
1855     os_files[constants.OS_VARIANTS_FILE] = ''
1856
1857   if max(api_versions) >= constants.OS_API_V20:
1858     os_files[constants.OS_PARAMETERS_FILE] = ''
1859   else:
1860     del os_files[constants.OS_SCRIPT_VERIFY]
1861
1862   for filename in os_files:
1863     os_files[filename] = utils.PathJoin(os_dir, filename)
1864
1865     try:
1866       st = os.stat(os_files[filename])
1867     except EnvironmentError, err:
1868       return False, ("File '%s' under path '%s' is missing (%s)" %
1869                      (filename, os_dir, _ErrnoOrStr(err)))
1870
1871     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1872       return False, ("File '%s' under path '%s' is not a regular file" %
1873                      (filename, os_dir))
1874
1875     if filename in constants.OS_SCRIPTS:
1876       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1877         return False, ("File '%s' under path '%s' is not executable" %
1878                        (filename, os_dir))
1879
1880   variants = []
1881   if constants.OS_VARIANTS_FILE in os_files:
1882     variants_file = os_files[constants.OS_VARIANTS_FILE]
1883     try:
1884       variants = utils.ReadFile(variants_file).splitlines()
1885     except EnvironmentError, err:
1886       return False, ("Error while reading the OS variants file at %s: %s" %
1887                      (variants_file, _ErrnoOrStr(err)))
1888     if not variants:
1889       return False, ("No supported os variant found")
1890
1891   parameters = []
1892   if constants.OS_PARAMETERS_FILE in os_files:
1893     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1894     try:
1895       parameters = utils.ReadFile(parameters_file).splitlines()
1896     except EnvironmentError, err:
1897       return False, ("Error while reading the OS parameters file at %s: %s" %
1898                      (parameters_file, _ErrnoOrStr(err)))
1899     parameters = [v.split(None, 1) for v in parameters]
1900
1901   os_obj = objects.OS(name=name, path=os_dir,
1902                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1903                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1904                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1905                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1906                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1907                                                  None),
1908                       supported_variants=variants,
1909                       supported_parameters=parameters,
1910                       api_versions=api_versions)
1911   return True, os_obj
1912
1913
1914 def OSFromDisk(name, base_dir=None):
1915   """Create an OS instance from disk.
1916
1917   This function will return an OS instance if the given name is a
1918   valid OS name. Otherwise, it will raise an appropriate
1919   L{RPCFail} exception, detailing why this is not a valid OS.
1920
1921   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1922   an exception but returns true/false status data.
1923
1924   @type base_dir: string
1925   @keyword base_dir: Base directory containing OS installations.
1926                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1927   @rtype: L{objects.OS}
1928   @return: the OS instance if we find a valid one
1929   @raise RPCFail: if we don't find a valid OS
1930
1931   """
1932   name_only = name.split("+", 1)[0]
1933   status, payload = _TryOSFromDisk(name_only, base_dir)
1934
1935   if not status:
1936     _Fail(payload)
1937
1938   return payload
1939
1940
1941 def OSCoreEnv(inst_os, os_params, debug=0):
1942   """Calculate the basic environment for an os script.
1943
1944   @type inst_os: L{objects.OS}
1945   @param inst_os: operating system for which the environment is being built
1946   @type os_params: dict
1947   @param os_params: the OS parameters
1948   @type debug: integer
1949   @param debug: debug level (0 or 1, for OS Api 10)
1950   @rtype: dict
1951   @return: dict of environment variables
1952   @raise errors.BlockDeviceError: if the block device
1953       cannot be found
1954
1955   """
1956   result = {}
1957   api_version = \
1958     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1959   result['OS_API_VERSION'] = '%d' % api_version
1960   result['OS_NAME'] = inst_os.name
1961   result['DEBUG_LEVEL'] = '%d' % debug
1962
1963   # OS variants
1964   if api_version >= constants.OS_API_V15:
1965     try:
1966       variant = inst_os.name.split('+', 1)[1]
1967     except IndexError:
1968       variant = inst_os.supported_variants[0]
1969     result['OS_VARIANT'] = variant
1970
1971   # OS params
1972   for pname, pvalue in os_params.items():
1973     result['OSP_%s' % pname.upper()] = pvalue
1974
1975   return result
1976
1977
1978 def OSEnvironment(instance, inst_os, debug=0):
1979   """Calculate the environment for an os script.
1980
1981   @type instance: L{objects.Instance}
1982   @param instance: target instance for the os script run
1983   @type inst_os: L{objects.OS}
1984   @param inst_os: operating system for which the environment is being built
1985   @type debug: integer
1986   @param debug: debug level (0 or 1, for OS Api 10)
1987   @rtype: dict
1988   @return: dict of environment variables
1989   @raise errors.BlockDeviceError: if the block device
1990       cannot be found
1991
1992   """
1993   result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
1994
1995   result['INSTANCE_NAME'] = instance.name
1996   result['INSTANCE_OS'] = instance.os
1997   result['HYPERVISOR'] = instance.hypervisor
1998   result['DISK_COUNT'] = '%d' % len(instance.disks)
1999   result['NIC_COUNT'] = '%d' % len(instance.nics)
2000
2001   # Disks
2002   for idx, disk in enumerate(instance.disks):
2003     real_disk = _OpenRealBD(disk)
2004     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2005     result['DISK_%d_ACCESS' % idx] = disk.mode
2006     if constants.HV_DISK_TYPE in instance.hvparams:
2007       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2008         instance.hvparams[constants.HV_DISK_TYPE]
2009     if disk.dev_type in constants.LDS_BLOCK:
2010       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2011     elif disk.dev_type == constants.LD_FILE:
2012       result['DISK_%d_BACKEND_TYPE' % idx] = \
2013         'file:%s' % disk.physical_id[0]
2014
2015   # NICs
2016   for idx, nic in enumerate(instance.nics):
2017     result['NIC_%d_MAC' % idx] = nic.mac
2018     if nic.ip:
2019       result['NIC_%d_IP' % idx] = nic.ip
2020     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2021     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2022       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2023     if nic.nicparams[constants.NIC_LINK]:
2024       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2025     if constants.HV_NIC_TYPE in instance.hvparams:
2026       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2027         instance.hvparams[constants.HV_NIC_TYPE]
2028
2029   # HV/BE params
2030   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2031     for key, value in source.items():
2032       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2033
2034   return result
2035
2036
2037 def BlockdevGrow(disk, amount):
2038   """Grow a stack of block devices.
2039
2040   This function is called recursively, with the childrens being the
2041   first ones to resize.
2042
2043   @type disk: L{objects.Disk}
2044   @param disk: the disk to be grown
2045   @rtype: (status, result)
2046   @return: a tuple with the status of the operation
2047       (True/False), and the errors message if status
2048       is False
2049
2050   """
2051   r_dev = _RecursiveFindBD(disk)
2052   if r_dev is None:
2053     _Fail("Cannot find block device %s", disk)
2054
2055   try:
2056     r_dev.Grow(amount)
2057   except errors.BlockDeviceError, err:
2058     _Fail("Failed to grow block device: %s", err, exc=True)
2059
2060
2061 def BlockdevSnapshot(disk):
2062   """Create a snapshot copy of a block device.
2063
2064   This function is called recursively, and the snapshot is actually created
2065   just for the leaf lvm backend device.
2066
2067   @type disk: L{objects.Disk}
2068   @param disk: the disk to be snapshotted
2069   @rtype: string
2070   @return: snapshot disk path
2071
2072   """
2073   if disk.dev_type == constants.LD_DRBD8:
2074     if not disk.children:
2075       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2076             disk.unique_id)
2077     return BlockdevSnapshot(disk.children[0])
2078   elif disk.dev_type == constants.LD_LV:
2079     r_dev = _RecursiveFindBD(disk)
2080     if r_dev is not None:
2081       # FIXME: choose a saner value for the snapshot size
2082       # let's stay on the safe side and ask for the full size, for now
2083       return r_dev.Snapshot(disk.size)
2084     else:
2085       _Fail("Cannot find block device %s", disk)
2086   else:
2087     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2088           disk.unique_id, disk.dev_type)
2089
2090
2091 def FinalizeExport(instance, snap_disks):
2092   """Write out the export configuration information.
2093
2094   @type instance: L{objects.Instance}
2095   @param instance: the instance which we export, used for
2096       saving configuration
2097   @type snap_disks: list of L{objects.Disk}
2098   @param snap_disks: list of snapshot block devices, which
2099       will be used to get the actual name of the dump file
2100
2101   @rtype: None
2102
2103   """
2104   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2105   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2106
2107   config = objects.SerializableConfigParser()
2108
2109   config.add_section(constants.INISECT_EXP)
2110   config.set(constants.INISECT_EXP, 'version', '0')
2111   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2112   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2113   config.set(constants.INISECT_EXP, 'os', instance.os)
2114   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2115
2116   config.add_section(constants.INISECT_INS)
2117   config.set(constants.INISECT_INS, 'name', instance.name)
2118   config.set(constants.INISECT_INS, 'memory', '%d' %
2119              instance.beparams[constants.BE_MEMORY])
2120   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2121              instance.beparams[constants.BE_VCPUS])
2122   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2123   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2124
2125   nic_total = 0
2126   for nic_count, nic in enumerate(instance.nics):
2127     nic_total += 1
2128     config.set(constants.INISECT_INS, 'nic%d_mac' %
2129                nic_count, '%s' % nic.mac)
2130     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2131     for param in constants.NICS_PARAMETER_TYPES:
2132       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2133                  '%s' % nic.nicparams.get(param, None))
2134   # TODO: redundant: on load can read nics until it doesn't exist
2135   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2136
2137   disk_total = 0
2138   for disk_count, disk in enumerate(snap_disks):
2139     if disk:
2140       disk_total += 1
2141       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2142                  ('%s' % disk.iv_name))
2143       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2144                  ('%s' % disk.physical_id[1]))
2145       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2146                  ('%d' % disk.size))
2147
2148   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2149
2150   # New-style hypervisor/backend parameters
2151
2152   config.add_section(constants.INISECT_HYP)
2153   for name, value in instance.hvparams.items():
2154     if name not in constants.HVC_GLOBALS:
2155       config.set(constants.INISECT_HYP, name, str(value))
2156
2157   config.add_section(constants.INISECT_BEP)
2158   for name, value in instance.beparams.items():
2159     config.set(constants.INISECT_BEP, name, str(value))
2160
2161   config.add_section(constants.INISECT_OSP)
2162   for name, value in instance.osparams.items():
2163     config.set(constants.INISECT_OSP, name, str(value))
2164
2165   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2166                   data=config.Dumps())
2167   shutil.rmtree(finaldestdir, ignore_errors=True)
2168   shutil.move(destdir, finaldestdir)
2169
2170
2171 def ExportInfo(dest):
2172   """Get export configuration information.
2173
2174   @type dest: str
2175   @param dest: directory containing the export
2176
2177   @rtype: L{objects.SerializableConfigParser}
2178   @return: a serializable config file containing the
2179       export info
2180
2181   """
2182   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2183
2184   config = objects.SerializableConfigParser()
2185   config.read(cff)
2186
2187   if (not config.has_section(constants.INISECT_EXP) or
2188       not config.has_section(constants.INISECT_INS)):
2189     _Fail("Export info file doesn't have the required fields")
2190
2191   return config.Dumps()
2192
2193
2194 def ListExports():
2195   """Return a list of exports currently available on this machine.
2196
2197   @rtype: list
2198   @return: list of the exports
2199
2200   """
2201   if os.path.isdir(constants.EXPORT_DIR):
2202     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2203   else:
2204     _Fail("No exports directory")
2205
2206
2207 def RemoveExport(export):
2208   """Remove an existing export from the node.
2209
2210   @type export: str
2211   @param export: the name of the export to remove
2212   @rtype: None
2213
2214   """
2215   target = utils.PathJoin(constants.EXPORT_DIR, export)
2216
2217   try:
2218     shutil.rmtree(target)
2219   except EnvironmentError, err:
2220     _Fail("Error while removing the export: %s", err, exc=True)
2221
2222
2223 def BlockdevRename(devlist):
2224   """Rename a list of block devices.
2225
2226   @type devlist: list of tuples
2227   @param devlist: list of tuples of the form  (disk,
2228       new_logical_id, new_physical_id); disk is an
2229       L{objects.Disk} object describing the current disk,
2230       and new logical_id/physical_id is the name we
2231       rename it to
2232   @rtype: boolean
2233   @return: True if all renames succeeded, False otherwise
2234
2235   """
2236   msgs = []
2237   result = True
2238   for disk, unique_id in devlist:
2239     dev = _RecursiveFindBD(disk)
2240     if dev is None:
2241       msgs.append("Can't find device %s in rename" % str(disk))
2242       result = False
2243       continue
2244     try:
2245       old_rpath = dev.dev_path
2246       dev.Rename(unique_id)
2247       new_rpath = dev.dev_path
2248       if old_rpath != new_rpath:
2249         DevCacheManager.RemoveCache(old_rpath)
2250         # FIXME: we should add the new cache information here, like:
2251         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2252         # but we don't have the owner here - maybe parse from existing
2253         # cache? for now, we only lose lvm data when we rename, which
2254         # is less critical than DRBD or MD
2255     except errors.BlockDeviceError, err:
2256       msgs.append("Can't rename device '%s' to '%s': %s" %
2257                   (dev, unique_id, err))
2258       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2259       result = False
2260   if not result:
2261     _Fail("; ".join(msgs))
2262
2263
2264 def _TransformFileStorageDir(file_storage_dir):
2265   """Checks whether given file_storage_dir is valid.
2266
2267   Checks wheter the given file_storage_dir is within the cluster-wide
2268   default file_storage_dir stored in SimpleStore. Only paths under that
2269   directory are allowed.
2270
2271   @type file_storage_dir: str
2272   @param file_storage_dir: the path to check
2273
2274   @return: the normalized path if valid, None otherwise
2275
2276   """
2277   if not constants.ENABLE_FILE_STORAGE:
2278     _Fail("File storage disabled at configure time")
2279   cfg = _GetConfig()
2280   file_storage_dir = os.path.normpath(file_storage_dir)
2281   base_file_storage_dir = cfg.GetFileStorageDir()
2282   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2283       base_file_storage_dir):
2284     _Fail("File storage directory '%s' is not under base file"
2285           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2286   return file_storage_dir
2287
2288
2289 def CreateFileStorageDir(file_storage_dir):
2290   """Create file storage directory.
2291
2292   @type file_storage_dir: str
2293   @param file_storage_dir: directory to create
2294
2295   @rtype: tuple
2296   @return: tuple with first element a boolean indicating wheter dir
2297       creation was successful or not
2298
2299   """
2300   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2301   if os.path.exists(file_storage_dir):
2302     if not os.path.isdir(file_storage_dir):
2303       _Fail("Specified storage dir '%s' is not a directory",
2304             file_storage_dir)
2305   else:
2306     try:
2307       os.makedirs(file_storage_dir, 0750)
2308     except OSError, err:
2309       _Fail("Cannot create file storage directory '%s': %s",
2310             file_storage_dir, err, exc=True)
2311
2312
2313 def RemoveFileStorageDir(file_storage_dir):
2314   """Remove file storage directory.
2315
2316   Remove it only if it's empty. If not log an error and return.
2317
2318   @type file_storage_dir: str
2319   @param file_storage_dir: the directory we should cleanup
2320   @rtype: tuple (success,)
2321   @return: tuple of one element, C{success}, denoting
2322       whether the operation was successful
2323
2324   """
2325   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2326   if os.path.exists(file_storage_dir):
2327     if not os.path.isdir(file_storage_dir):
2328       _Fail("Specified Storage directory '%s' is not a directory",
2329             file_storage_dir)
2330     # deletes dir only if empty, otherwise we want to fail the rpc call
2331     try:
2332       os.rmdir(file_storage_dir)
2333     except OSError, err:
2334       _Fail("Cannot remove file storage directory '%s': %s",
2335             file_storage_dir, err)
2336
2337
2338 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2339   """Rename the file storage directory.
2340
2341   @type old_file_storage_dir: str
2342   @param old_file_storage_dir: the current path
2343   @type new_file_storage_dir: str
2344   @param new_file_storage_dir: the name we should rename to
2345   @rtype: tuple (success,)
2346   @return: tuple of one element, C{success}, denoting
2347       whether the operation was successful
2348
2349   """
2350   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2351   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2352   if not os.path.exists(new_file_storage_dir):
2353     if os.path.isdir(old_file_storage_dir):
2354       try:
2355         os.rename(old_file_storage_dir, new_file_storage_dir)
2356       except OSError, err:
2357         _Fail("Cannot rename '%s' to '%s': %s",
2358               old_file_storage_dir, new_file_storage_dir, err)
2359     else:
2360       _Fail("Specified storage dir '%s' is not a directory",
2361             old_file_storage_dir)
2362   else:
2363     if os.path.exists(old_file_storage_dir):
2364       _Fail("Cannot rename '%s' to '%s': both locations exist",
2365             old_file_storage_dir, new_file_storage_dir)
2366
2367
2368 def _EnsureJobQueueFile(file_name):
2369   """Checks whether the given filename is in the queue directory.
2370
2371   @type file_name: str
2372   @param file_name: the file name we should check
2373   @rtype: None
2374   @raises RPCFail: if the file is not valid
2375
2376   """
2377   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2378   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2379
2380   if not result:
2381     _Fail("Passed job queue file '%s' does not belong to"
2382           " the queue directory '%s'", file_name, queue_dir)
2383
2384
2385 def JobQueueUpdate(file_name, content):
2386   """Updates a file in the queue directory.
2387
2388   This is just a wrapper over L{utils.WriteFile}, with proper
2389   checking.
2390
2391   @type file_name: str
2392   @param file_name: the job file name
2393   @type content: str
2394   @param content: the new job contents
2395   @rtype: boolean
2396   @return: the success of the operation
2397
2398   """
2399   _EnsureJobQueueFile(file_name)
2400
2401   # Write and replace the file atomically
2402   utils.WriteFile(file_name, data=_Decompress(content))
2403
2404
2405 def JobQueueRename(old, new):
2406   """Renames a job queue file.
2407
2408   This is just a wrapper over os.rename with proper checking.
2409
2410   @type old: str
2411   @param old: the old (actual) file name
2412   @type new: str
2413   @param new: the desired file name
2414   @rtype: tuple
2415   @return: the success of the operation and payload
2416
2417   """
2418   _EnsureJobQueueFile(old)
2419   _EnsureJobQueueFile(new)
2420
2421   utils.RenameFile(old, new, mkdir=True)
2422
2423
2424 def BlockdevClose(instance_name, disks):
2425   """Closes the given block devices.
2426
2427   This means they will be switched to secondary mode (in case of
2428   DRBD).
2429
2430   @param instance_name: if the argument is not empty, the symlinks
2431       of this instance will be removed
2432   @type disks: list of L{objects.Disk}
2433   @param disks: the list of disks to be closed
2434   @rtype: tuple (success, message)
2435   @return: a tuple of success and message, where success
2436       indicates the succes of the operation, and message
2437       which will contain the error details in case we
2438       failed
2439
2440   """
2441   bdevs = []
2442   for cf in disks:
2443     rd = _RecursiveFindBD(cf)
2444     if rd is None:
2445       _Fail("Can't find device %s", cf)
2446     bdevs.append(rd)
2447
2448   msg = []
2449   for rd in bdevs:
2450     try:
2451       rd.Close()
2452     except errors.BlockDeviceError, err:
2453       msg.append(str(err))
2454   if msg:
2455     _Fail("Can't make devices secondary: %s", ",".join(msg))
2456   else:
2457     if instance_name:
2458       _RemoveBlockDevLinks(instance_name, disks)
2459
2460
2461 def ValidateHVParams(hvname, hvparams):
2462   """Validates the given hypervisor parameters.
2463
2464   @type hvname: string
2465   @param hvname: the hypervisor name
2466   @type hvparams: dict
2467   @param hvparams: the hypervisor parameters to be validated
2468   @rtype: None
2469
2470   """
2471   try:
2472     hv_type = hypervisor.GetHypervisor(hvname)
2473     hv_type.ValidateParameters(hvparams)
2474   except errors.HypervisorError, err:
2475     _Fail(str(err), log=False)
2476
2477
2478 def _CheckOSPList(os_obj, parameters):
2479   """Check whether a list of parameters is supported by the OS.
2480
2481   @type os_obj: L{objects.OS}
2482   @param os_obj: OS object to check
2483   @type parameters: list
2484   @param parameters: the list of parameters to check
2485
2486   """
2487   supported = [v[0] for v in os_obj.supported_parameters]
2488   delta = frozenset(parameters).difference(supported)
2489   if delta:
2490     _Fail("The following parameters are not supported"
2491           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2492
2493
2494 def ValidateOS(required, osname, checks, osparams):
2495   """Validate the given OS' parameters.
2496
2497   @type required: boolean
2498   @param required: whether absence of the OS should translate into
2499       failure or not
2500   @type osname: string
2501   @param osname: the OS to be validated
2502   @type checks: list
2503   @param checks: list of the checks to run (currently only 'parameters')
2504   @type osparams: dict
2505   @param osparams: dictionary with OS parameters
2506   @rtype: boolean
2507   @return: True if the validation passed, or False if the OS was not
2508       found and L{required} was false
2509
2510   """
2511   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2512     _Fail("Unknown checks required for OS %s: %s", osname,
2513           set(checks).difference(constants.OS_VALIDATE_CALLS))
2514
2515   name_only = osname.split("+", 1)[0]
2516   status, tbv = _TryOSFromDisk(name_only, None)
2517
2518   if not status:
2519     if required:
2520       _Fail(tbv)
2521     else:
2522       return False
2523
2524   if max(tbv.api_versions) < constants.OS_API_V20:
2525     return True
2526
2527   if constants.OS_VALIDATE_PARAMETERS in checks:
2528     _CheckOSPList(tbv, osparams.keys())
2529
2530   validate_env = OSCoreEnv(tbv, osparams)
2531   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2532                         cwd=tbv.path)
2533   if result.failed:
2534     logging.error("os validate command '%s' returned error: %s output: %s",
2535                   result.cmd, result.fail_reason, result.output)
2536     _Fail("OS validation script failed (%s), output: %s",
2537           result.fail_reason, result.output, log=False)
2538
2539   return True
2540
2541
2542 def DemoteFromMC():
2543   """Demotes the current node from master candidate role.
2544
2545   """
2546   # try to ensure we're not the master by mistake
2547   master, myself = ssconf.GetMasterAndMyself()
2548   if master == myself:
2549     _Fail("ssconf status shows I'm the master node, will not demote")
2550
2551   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2552   if not result.failed:
2553     _Fail("The master daemon is running, will not demote")
2554
2555   try:
2556     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2557       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2558   except EnvironmentError, err:
2559     if err.errno != errno.ENOENT:
2560       _Fail("Error while backing up cluster file: %s", err, exc=True)
2561
2562   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2563
2564
2565 def _GetX509Filenames(cryptodir, name):
2566   """Returns the full paths for the private key and certificate.
2567
2568   """
2569   return (utils.PathJoin(cryptodir, name),
2570           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2571           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2572
2573
2574 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2575   """Creates a new X509 certificate for SSL/TLS.
2576
2577   @type validity: int
2578   @param validity: Validity in seconds
2579   @rtype: tuple; (string, string)
2580   @return: Certificate name and public part
2581
2582   """
2583   (key_pem, cert_pem) = \
2584     utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2585                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2586
2587   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2588                               prefix="x509-%s-" % utils.TimestampForFilename())
2589   try:
2590     name = os.path.basename(cert_dir)
2591     assert len(name) > 5
2592
2593     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2594
2595     utils.WriteFile(key_file, mode=0400, data=key_pem)
2596     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2597
2598     # Never return private key as it shouldn't leave the node
2599     return (name, cert_pem)
2600   except Exception:
2601     shutil.rmtree(cert_dir, ignore_errors=True)
2602     raise
2603
2604
2605 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2606   """Removes a X509 certificate.
2607
2608   @type name: string
2609   @param name: Certificate name
2610
2611   """
2612   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2613
2614   utils.RemoveFile(key_file)
2615   utils.RemoveFile(cert_file)
2616
2617   try:
2618     os.rmdir(cert_dir)
2619   except EnvironmentError, err:
2620     _Fail("Cannot remove certificate directory '%s': %s",
2621           cert_dir, err)
2622
2623
2624 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2625   """Returns the command for the requested input/output.
2626
2627   @type instance: L{objects.Instance}
2628   @param instance: The instance object
2629   @param mode: Import/export mode
2630   @param ieio: Input/output type
2631   @param ieargs: Input/output arguments
2632
2633   """
2634   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2635
2636   env = None
2637   prefix = None
2638   suffix = None
2639   exp_size = None
2640
2641   if ieio == constants.IEIO_FILE:
2642     (filename, ) = ieargs
2643
2644     if not utils.IsNormAbsPath(filename):
2645       _Fail("Path '%s' is not normalized or absolute", filename)
2646
2647     directory = os.path.normpath(os.path.dirname(filename))
2648
2649     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2650         constants.EXPORT_DIR):
2651       _Fail("File '%s' is not under exports directory '%s'",
2652             filename, constants.EXPORT_DIR)
2653
2654     # Create directory
2655     utils.Makedirs(directory, mode=0750)
2656
2657     quoted_filename = utils.ShellQuote(filename)
2658
2659     if mode == constants.IEM_IMPORT:
2660       suffix = "> %s" % quoted_filename
2661     elif mode == constants.IEM_EXPORT:
2662       suffix = "< %s" % quoted_filename
2663
2664       # Retrieve file size
2665       try:
2666         st = os.stat(filename)
2667       except EnvironmentError, err:
2668         logging.error("Can't stat(2) %s: %s", filename, err)
2669       else:
2670         exp_size = utils.BytesToMebibyte(st.st_size)
2671
2672   elif ieio == constants.IEIO_RAW_DISK:
2673     (disk, ) = ieargs
2674
2675     real_disk = _OpenRealBD(disk)
2676
2677     if mode == constants.IEM_IMPORT:
2678       # we set here a smaller block size as, due to transport buffering, more
2679       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2680       # is not already there or we pass a wrong path; we use notrunc to no
2681       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2682       # much memory; this means that at best, we flush every 64k, which will
2683       # not be very fast
2684       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2685                                     " bs=%s oflag=dsync"),
2686                                     real_disk.dev_path,
2687                                     str(64 * 1024))
2688
2689     elif mode == constants.IEM_EXPORT:
2690       # the block size on the read dd is 1MiB to match our units
2691       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2692                                    real_disk.dev_path,
2693                                    str(1024 * 1024), # 1 MB
2694                                    str(disk.size))
2695       exp_size = disk.size
2696
2697   elif ieio == constants.IEIO_SCRIPT:
2698     (disk, disk_index, ) = ieargs
2699
2700     assert isinstance(disk_index, (int, long))
2701
2702     real_disk = _OpenRealBD(disk)
2703
2704     inst_os = OSFromDisk(instance.os)
2705     env = OSEnvironment(instance, inst_os)
2706
2707     if mode == constants.IEM_IMPORT:
2708       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2709       env["IMPORT_INDEX"] = str(disk_index)
2710       script = inst_os.import_script
2711
2712     elif mode == constants.IEM_EXPORT:
2713       env["EXPORT_DEVICE"] = real_disk.dev_path
2714       env["EXPORT_INDEX"] = str(disk_index)
2715       script = inst_os.export_script
2716
2717     # TODO: Pass special environment only to script
2718     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2719
2720     if mode == constants.IEM_IMPORT:
2721       suffix = "| %s" % script_cmd
2722
2723     elif mode == constants.IEM_EXPORT:
2724       prefix = "%s |" % script_cmd
2725
2726     # Let script predict size
2727     exp_size = constants.IE_CUSTOM_SIZE
2728
2729   else:
2730     _Fail("Invalid %s I/O mode %r", mode, ieio)
2731
2732   return (env, prefix, suffix, exp_size)
2733
2734
2735 def _CreateImportExportStatusDir(prefix):
2736   """Creates status directory for import/export.
2737
2738   """
2739   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2740                           prefix=("%s-%s-" %
2741                                   (prefix, utils.TimestampForFilename())))
2742
2743
2744 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2745   """Starts an import or export daemon.
2746
2747   @param mode: Import/output mode
2748   @type opts: L{objects.ImportExportOptions}
2749   @param opts: Daemon options
2750   @type host: string
2751   @param host: Remote host for export (None for import)
2752   @type port: int
2753   @param port: Remote port for export (None for import)
2754   @type instance: L{objects.Instance}
2755   @param instance: Instance object
2756   @param ieio: Input/output type
2757   @param ieioargs: Input/output arguments
2758
2759   """
2760   if mode == constants.IEM_IMPORT:
2761     prefix = "import"
2762
2763     if not (host is None and port is None):
2764       _Fail("Can not specify host or port on import")
2765
2766   elif mode == constants.IEM_EXPORT:
2767     prefix = "export"
2768
2769     if host is None or port is None:
2770       _Fail("Host and port must be specified for an export")
2771
2772   else:
2773     _Fail("Invalid mode %r", mode)
2774
2775   if (opts.key_name is None) ^ (opts.ca_pem is None):
2776     _Fail("Cluster certificate can only be used for both key and CA")
2777
2778   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2779     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2780
2781   if opts.key_name is None:
2782     # Use server.pem
2783     key_path = constants.NODED_CERT_FILE
2784     cert_path = constants.NODED_CERT_FILE
2785     assert opts.ca_pem is None
2786   else:
2787     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2788                                                  opts.key_name)
2789     assert opts.ca_pem is not None
2790
2791   for i in [key_path, cert_path]:
2792     if not os.path.exists(i):
2793       _Fail("File '%s' does not exist" % i)
2794
2795   status_dir = _CreateImportExportStatusDir(prefix)
2796   try:
2797     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2798     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2799     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2800
2801     if opts.ca_pem is None:
2802       # Use server.pem
2803       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2804     else:
2805       ca = opts.ca_pem
2806
2807     # Write CA file
2808     utils.WriteFile(ca_file, data=ca, mode=0400)
2809
2810     cmd = [
2811       constants.IMPORT_EXPORT_DAEMON,
2812       status_file, mode,
2813       "--key=%s" % key_path,
2814       "--cert=%s" % cert_path,
2815       "--ca=%s" % ca_file,
2816       ]
2817
2818     if host:
2819       cmd.append("--host=%s" % host)
2820
2821     if port:
2822       cmd.append("--port=%s" % port)
2823
2824     if opts.compress:
2825       cmd.append("--compress=%s" % opts.compress)
2826
2827     if opts.magic:
2828       cmd.append("--magic=%s" % opts.magic)
2829
2830     if exp_size is not None:
2831       cmd.append("--expected-size=%s" % exp_size)
2832
2833     if cmd_prefix:
2834       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2835
2836     if cmd_suffix:
2837       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2838
2839     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2840
2841     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2842     # support for receiving a file descriptor for output
2843     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2844                       output=logfile)
2845
2846     # The import/export name is simply the status directory name
2847     return os.path.basename(status_dir)
2848
2849   except Exception:
2850     shutil.rmtree(status_dir, ignore_errors=True)
2851     raise
2852
2853
2854 def GetImportExportStatus(names):
2855   """Returns import/export daemon status.
2856
2857   @type names: sequence
2858   @param names: List of names
2859   @rtype: List of dicts
2860   @return: Returns a list of the state of each named import/export or None if a
2861            status couldn't be read
2862
2863   """
2864   result = []
2865
2866   for name in names:
2867     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2868                                  _IES_STATUS_FILE)
2869
2870     try:
2871       data = utils.ReadFile(status_file)
2872     except EnvironmentError, err:
2873       if err.errno != errno.ENOENT:
2874         raise
2875       data = None
2876
2877     if not data:
2878       result.append(None)
2879       continue
2880
2881     result.append(serializer.LoadJson(data))
2882
2883   return result
2884
2885
2886 def AbortImportExport(name):
2887   """Sends SIGTERM to a running import/export daemon.
2888
2889   """
2890   logging.info("Abort import/export %s", name)
2891
2892   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2893   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2894
2895   if pid:
2896     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2897                  name, pid)
2898     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2899
2900
2901 def CleanupImportExport(name):
2902   """Cleanup after an import or export.
2903
2904   If the import/export daemon is still running it's killed. Afterwards the
2905   whole status directory is removed.
2906
2907   """
2908   logging.info("Finalizing import/export %s", name)
2909
2910   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2911
2912   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2913
2914   if pid:
2915     logging.info("Import/export %s is still running with PID %s",
2916                  name, pid)
2917     utils.KillProcess(pid, waitpid=False)
2918
2919   shutil.rmtree(status_dir, ignore_errors=True)
2920
2921
2922 def _FindDisks(nodes_ip, disks):
2923   """Sets the physical ID on disks and returns the block devices.
2924
2925   """
2926   # set the correct physical ID
2927   my_name = utils.HostInfo().name
2928   for cf in disks:
2929     cf.SetPhysicalID(my_name, nodes_ip)
2930
2931   bdevs = []
2932
2933   for cf in disks:
2934     rd = _RecursiveFindBD(cf)
2935     if rd is None:
2936       _Fail("Can't find device %s", cf)
2937     bdevs.append(rd)
2938   return bdevs
2939
2940
2941 def DrbdDisconnectNet(nodes_ip, disks):
2942   """Disconnects the network on a list of drbd devices.
2943
2944   """
2945   bdevs = _FindDisks(nodes_ip, disks)
2946
2947   # disconnect disks
2948   for rd in bdevs:
2949     try:
2950       rd.DisconnectNet()
2951     except errors.BlockDeviceError, err:
2952       _Fail("Can't change network configuration to standalone mode: %s",
2953             err, exc=True)
2954
2955
2956 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2957   """Attaches the network on a list of drbd devices.
2958
2959   """
2960   bdevs = _FindDisks(nodes_ip, disks)
2961
2962   if multimaster:
2963     for idx, rd in enumerate(bdevs):
2964       try:
2965         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2966       except EnvironmentError, err:
2967         _Fail("Can't create symlink: %s", err)
2968   # reconnect disks, switch to new master configuration and if
2969   # needed primary mode
2970   for rd in bdevs:
2971     try:
2972       rd.AttachNet(multimaster)
2973     except errors.BlockDeviceError, err:
2974       _Fail("Can't change network configuration: %s", err)
2975
2976   # wait until the disks are connected; we need to retry the re-attach
2977   # if the device becomes standalone, as this might happen if the one
2978   # node disconnects and reconnects in a different mode before the
2979   # other node reconnects; in this case, one or both of the nodes will
2980   # decide it has wrong configuration and switch to standalone
2981
2982   def _Attach():
2983     all_connected = True
2984
2985     for rd in bdevs:
2986       stats = rd.GetProcStatus()
2987
2988       all_connected = (all_connected and
2989                        (stats.is_connected or stats.is_in_resync))
2990
2991       if stats.is_standalone:
2992         # peer had different config info and this node became
2993         # standalone, even though this should not happen with the
2994         # new staged way of changing disk configs
2995         try:
2996           rd.AttachNet(multimaster)
2997         except errors.BlockDeviceError, err:
2998           _Fail("Can't change network configuration: %s", err)
2999
3000     if not all_connected:
3001       raise utils.RetryAgain()
3002
3003   try:
3004     # Start with a delay of 100 miliseconds and go up to 5 seconds
3005     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3006   except utils.RetryTimeout:
3007     _Fail("Timeout in disk reconnecting")
3008
3009   if multimaster:
3010     # change to primary mode
3011     for rd in bdevs:
3012       try:
3013         rd.Open()
3014       except errors.BlockDeviceError, err:
3015         _Fail("Can't change to primary mode: %s", err)
3016
3017
3018 def DrbdWaitSync(nodes_ip, disks):
3019   """Wait until DRBDs have synchronized.
3020
3021   """
3022   def _helper(rd):
3023     stats = rd.GetProcStatus()
3024     if not (stats.is_connected or stats.is_in_resync):
3025       raise utils.RetryAgain()
3026     return stats
3027
3028   bdevs = _FindDisks(nodes_ip, disks)
3029
3030   min_resync = 100
3031   alldone = True
3032   for rd in bdevs:
3033     try:
3034       # poll each second for 15 seconds
3035       stats = utils.Retry(_helper, 1, 15, args=[rd])
3036     except utils.RetryTimeout:
3037       stats = rd.GetProcStatus()
3038       # last check
3039       if not (stats.is_connected or stats.is_in_resync):
3040         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3041     alldone = alldone and (not stats.is_in_resync)
3042     if stats.sync_percent is not None:
3043       min_resync = min(min_resync, stats.sync_percent)
3044
3045   return (alldone, min_resync)
3046
3047
3048 def PowercycleNode(hypervisor_type):
3049   """Hard-powercycle the node.
3050
3051   Because we need to return first, and schedule the powercycle in the
3052   background, we won't be able to report failures nicely.
3053
3054   """
3055   hyper = hypervisor.GetHypervisor(hypervisor_type)
3056   try:
3057     pid = os.fork()
3058   except OSError:
3059     # if we can't fork, we'll pretend that we're in the child process
3060     pid = 0
3061   if pid > 0:
3062     return "Reboot scheduled in 5 seconds"
3063   # ensure the child is running on ram
3064   try:
3065     utils.Mlockall()
3066   except Exception: # pylint: disable-msg=W0703
3067     pass
3068   time.sleep(5)
3069   hyper.PowercycleNode()
3070
3071
3072 class HooksRunner(object):
3073   """Hook runner.
3074
3075   This class is instantiated on the node side (ganeti-noded) and not
3076   on the master side.
3077
3078   """
3079   def __init__(self, hooks_base_dir=None):
3080     """Constructor for hooks runner.
3081
3082     @type hooks_base_dir: str or None
3083     @param hooks_base_dir: if not None, this overrides the
3084         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3085
3086     """
3087     if hooks_base_dir is None:
3088       hooks_base_dir = constants.HOOKS_BASE_DIR
3089     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3090     # constant
3091     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3092
3093   def RunHooks(self, hpath, phase, env):
3094     """Run the scripts in the hooks directory.
3095
3096     @type hpath: str
3097     @param hpath: the path to the hooks directory which
3098         holds the scripts
3099     @type phase: str
3100     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3101         L{constants.HOOKS_PHASE_POST}
3102     @type env: dict
3103     @param env: dictionary with the environment for the hook
3104     @rtype: list
3105     @return: list of 3-element tuples:
3106       - script path
3107       - script result, either L{constants.HKR_SUCCESS} or
3108         L{constants.HKR_FAIL}
3109       - output of the script
3110
3111     @raise errors.ProgrammerError: for invalid input
3112         parameters
3113
3114     """
3115     if phase == constants.HOOKS_PHASE_PRE:
3116       suffix = "pre"
3117     elif phase == constants.HOOKS_PHASE_POST:
3118       suffix = "post"
3119     else:
3120       _Fail("Unknown hooks phase '%s'", phase)
3121
3122
3123     subdir = "%s-%s.d" % (hpath, suffix)
3124     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3125
3126     results = []
3127
3128     if not os.path.isdir(dir_name):
3129       # for non-existing/non-dirs, we simply exit instead of logging a
3130       # warning at every operation
3131       return results
3132
3133     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3134
3135     for (relname, relstatus, runresult)  in runparts_results:
3136       if relstatus == constants.RUNPARTS_SKIP:
3137         rrval = constants.HKR_SKIP
3138         output = ""
3139       elif relstatus == constants.RUNPARTS_ERR:
3140         rrval = constants.HKR_FAIL
3141         output = "Hook script execution error: %s" % runresult
3142       elif relstatus == constants.RUNPARTS_RUN:
3143         if runresult.failed:
3144           rrval = constants.HKR_FAIL
3145         else:
3146           rrval = constants.HKR_SUCCESS
3147         output = utils.SafeEncode(runresult.output.strip())
3148       results.append(("%s/%s" % (subdir, relname), rrval, output))
3149
3150     return results
3151
3152
3153 class IAllocatorRunner(object):
3154   """IAllocator runner.
3155
3156   This class is instantiated on the node side (ganeti-noded) and not on
3157   the master side.
3158
3159   """
3160   @staticmethod
3161   def Run(name, idata):
3162     """Run an iallocator script.
3163
3164     @type name: str
3165     @param name: the iallocator script name
3166     @type idata: str
3167     @param idata: the allocator input data
3168
3169     @rtype: tuple
3170     @return: two element tuple of:
3171        - status
3172        - either error message or stdout of allocator (for success)
3173
3174     """
3175     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3176                                   os.path.isfile)
3177     if alloc_script is None:
3178       _Fail("iallocator module '%s' not found in the search path", name)
3179
3180     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3181     try:
3182       os.write(fd, idata)
3183       os.close(fd)
3184       result = utils.RunCmd([alloc_script, fin_name])
3185       if result.failed:
3186         _Fail("iallocator module '%s' failed: %s, output '%s'",
3187               name, result.fail_reason, result.output)
3188     finally:
3189       os.unlink(fin_name)
3190
3191     return result.stdout
3192
3193
3194 class DevCacheManager(object):
3195   """Simple class for managing a cache of block device information.
3196
3197   """
3198   _DEV_PREFIX = "/dev/"
3199   _ROOT_DIR = constants.BDEV_CACHE_DIR
3200
3201   @classmethod
3202   def _ConvertPath(cls, dev_path):
3203     """Converts a /dev/name path to the cache file name.
3204
3205     This replaces slashes with underscores and strips the /dev
3206     prefix. It then returns the full path to the cache file.
3207
3208     @type dev_path: str
3209     @param dev_path: the C{/dev/} path name
3210     @rtype: str
3211     @return: the converted path name
3212
3213     """
3214     if dev_path.startswith(cls._DEV_PREFIX):
3215       dev_path = dev_path[len(cls._DEV_PREFIX):]
3216     dev_path = dev_path.replace("/", "_")
3217     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3218     return fpath
3219
3220   @classmethod
3221   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3222     """Updates the cache information for a given device.
3223
3224     @type dev_path: str
3225     @param dev_path: the pathname of the device
3226     @type owner: str
3227     @param owner: the owner (instance name) of the device
3228     @type on_primary: bool
3229     @param on_primary: whether this is the primary
3230         node nor not
3231     @type iv_name: str
3232     @param iv_name: the instance-visible name of the
3233         device, as in objects.Disk.iv_name
3234
3235     @rtype: None
3236
3237     """
3238     if dev_path is None:
3239       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3240       return
3241     fpath = cls._ConvertPath(dev_path)
3242     if on_primary:
3243       state = "primary"
3244     else:
3245       state = "secondary"
3246     if iv_name is None:
3247       iv_name = "not_visible"
3248     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3249     try:
3250       utils.WriteFile(fpath, data=fdata)
3251     except EnvironmentError, err:
3252       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3253
3254   @classmethod
3255   def RemoveCache(cls, dev_path):
3256     """Remove data for a dev_path.
3257
3258     This is just a wrapper over L{utils.RemoveFile} with a converted
3259     path name and logging.
3260
3261     @type dev_path: str
3262     @param dev_path: the pathname of the device
3263
3264     @rtype: None
3265
3266     """
3267     if dev_path is None:
3268       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3269       return
3270     fpath = cls._ConvertPath(dev_path)
3271     try:
3272       utils.RemoveFile(fpath)
3273     except EnvironmentError, err:
3274       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)