backend.InstanceShutdown: small cleanup
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26
27 """
28
29
30 import os
31 import os.path
32 import shutil
33 import time
34 import stat
35 import errno
36 import re
37 import subprocess
38 import random
39 import logging
40 import tempfile
41 import zlib
42 import base64
43
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
52
53
54 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55
56
57 class RPCFail(Exception):
58   """Class denoting RPC failure.
59
60   Its argument is the error message.
61
62   """
63
64
65 def _Fail(msg, *args, **kwargs):
66   """Log an error and the raise an RPCFail exception.
67
68   This exception is then handled specially in the ganeti daemon and
69   turned into a 'failed' return type. As such, this function is a
70   useful shortcut for logging the error and returning it to the master
71   daemon.
72
73   @type msg: string
74   @param msg: the text of the exception
75   @raise RPCFail
76
77   """
78   if args:
79     msg = msg % args
80   if "log" not in kwargs or kwargs["log"]: # if we should log this error
81     if "exc" in kwargs and kwargs["exc"]:
82       logging.exception(msg)
83     else:
84       logging.error(msg)
85   raise RPCFail(msg)
86
87
88 def _GetConfig():
89   """Simple wrapper to return a SimpleStore.
90
91   @rtype: L{ssconf.SimpleStore}
92   @return: a SimpleStore instance
93
94   """
95   return ssconf.SimpleStore()
96
97
98 def _GetSshRunner(cluster_name):
99   """Simple wrapper to return an SshRunner.
100
101   @type cluster_name: str
102   @param cluster_name: the cluster name, which is needed
103       by the SshRunner constructor
104   @rtype: L{ssh.SshRunner}
105   @return: an SshRunner instance
106
107   """
108   return ssh.SshRunner(cluster_name)
109
110
111 def _Decompress(data):
112   """Unpacks data compressed by the RPC client.
113
114   @type data: list or tuple
115   @param data: Data sent by RPC client
116   @rtype: str
117   @return: Decompressed data
118
119   """
120   assert isinstance(data, (list, tuple))
121   assert len(data) == 2
122   (encoding, content) = data
123   if encoding == constants.RPC_ENCODING_NONE:
124     return content
125   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126     return zlib.decompress(base64.b64decode(content))
127   else:
128     raise AssertionError("Unknown data encoding")
129
130
131 def _CleanDirectory(path, exclude=None):
132   """Removes all regular files in a directory.
133
134   @type path: str
135   @param path: the directory to clean
136   @type exclude: list
137   @param exclude: list of files to be excluded, defaults
138       to the empty list
139
140   """
141   if not os.path.isdir(path):
142     return
143   if exclude is None:
144     exclude = []
145   else:
146     # Normalize excluded paths
147     exclude = [os.path.normpath(i) for i in exclude]
148
149   for rel_name in utils.ListVisibleFiles(path):
150     full_name = os.path.normpath(os.path.join(path, rel_name))
151     if full_name in exclude:
152       continue
153     if os.path.isfile(full_name) and not os.path.islink(full_name):
154       utils.RemoveFile(full_name)
155
156
157 def _BuildUploadFileList():
158   """Build the list of allowed upload files.
159
160   This is abstracted so that it's built only once at module import time.
161
162   """
163   allowed_files = set([
164     constants.CLUSTER_CONF_FILE,
165     constants.ETC_HOSTS,
166     constants.SSH_KNOWN_HOSTS_FILE,
167     constants.VNC_PASSWORD_FILE,
168     constants.RAPI_CERT_FILE,
169     constants.RAPI_USERS_FILE,
170     constants.HMAC_CLUSTER_KEY,
171     ])
172
173   for hv_name in constants.HYPER_TYPES:
174     hv_class = hypervisor.GetHypervisorClass(hv_name)
175     allowed_files.update(hv_class.GetAncillaryFiles())
176
177   return frozenset(allowed_files)
178
179
180 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181
182
183 def JobQueuePurge():
184   """Removes job queue files and archived jobs.
185
186   @rtype: tuple
187   @return: True, None
188
189   """
190   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192
193
194 def GetMasterInfo():
195   """Returns master information.
196
197   This is an utility function to compute master information, either
198   for consumption here or from the node daemon.
199
200   @rtype: tuple
201   @return: master_netdev, master_ip, master_name
202   @raise RPCFail: in case of errors
203
204   """
205   try:
206     cfg = _GetConfig()
207     master_netdev = cfg.GetMasterNetdev()
208     master_ip = cfg.GetMasterIP()
209     master_node = cfg.GetMasterNode()
210   except errors.ConfigurationError, err:
211     _Fail("Cluster configuration incomplete: %s", err, exc=True)
212   return (master_netdev, master_ip, master_node)
213
214
215 def StartMaster(start_daemons, no_voting):
216   """Activate local node as master node.
217
218   The function will always try activate the IP address of the master
219   (unless someone else has it). It will also start the master daemons,
220   based on the start_daemons parameter.
221
222   @type start_daemons: boolean
223   @param start_daemons: whether to also start the master
224       daemons (ganeti-masterd and ganeti-rapi)
225   @type no_voting: boolean
226   @param no_voting: whether to start ganeti-masterd without a node vote
227       (if start_daemons is True), but still non-interactively
228   @rtype: None
229
230   """
231   # GetMasterInfo will raise an exception if not able to return data
232   master_netdev, master_ip, _ = GetMasterInfo()
233
234   err_msgs = []
235   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236     if utils.OwnIpAddress(master_ip):
237       # we already have the ip:
238       logging.debug("Master IP already configured, doing nothing")
239     else:
240       msg = "Someone else has the master ip, not activating"
241       logging.error(msg)
242       err_msgs.append(msg)
243   else:
244     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245                            "dev", master_netdev, "label",
246                            "%s:0" % master_netdev])
247     if result.failed:
248       msg = "Can't activate master IP: %s" % result.output
249       logging.error(msg)
250       err_msgs.append(msg)
251
252     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253                            "-s", master_ip, master_ip])
254     # we'll ignore the exit code of arping
255
256   # and now start the master and rapi daemons
257   if start_daemons:
258     daemons_params = {
259         'ganeti-masterd': [],
260         'ganeti-rapi': [],
261         }
262     if no_voting:
263       daemons_params['ganeti-masterd'].append('--no-voting')
264       daemons_params['ganeti-masterd'].append('--yes-do-it')
265     for daemon in daemons_params:
266       cmd = [daemon]
267       cmd.extend(daemons_params[daemon])
268       result = utils.RunCmd(cmd)
269       if result.failed:
270         msg = "Can't start daemon %s: %s" % (daemon, result.output)
271         logging.error(msg)
272         err_msgs.append(msg)
273
274   if err_msgs:
275     _Fail("; ".join(err_msgs))
276
277
278 def StopMaster(stop_daemons):
279   """Deactivate this node as master.
280
281   The function will always try to deactivate the IP address of the
282   master. It will also stop the master daemons depending on the
283   stop_daemons parameter.
284
285   @type stop_daemons: boolean
286   @param stop_daemons: whether to also stop the master daemons
287       (ganeti-masterd and ganeti-rapi)
288   @rtype: None
289
290   """
291   # TODO: log and report back to the caller the error failures; we
292   # need to decide in which case we fail the RPC for this
293
294   # GetMasterInfo will raise an exception if not able to return data
295   master_netdev, master_ip, _ = GetMasterInfo()
296
297   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298                          "dev", master_netdev])
299   if result.failed:
300     logging.error("Can't remove the master IP, error: %s", result.output)
301     # but otherwise ignore the failure
302
303   if stop_daemons:
304     # stop/kill the rapi and the master daemon
305     for daemon in constants.RAPI, constants.MASTERD:
306       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307
308
309 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310   """Joins this node to the cluster.
311
312   This does the following:
313       - updates the hostkeys of the machine (rsa and dsa)
314       - adds the ssh private key to the user
315       - adds the ssh public key to the users' authorized_keys file
316
317   @type dsa: str
318   @param dsa: the DSA private key to write
319   @type dsapub: str
320   @param dsapub: the DSA public key to write
321   @type rsa: str
322   @param rsa: the RSA private key to write
323   @type rsapub: str
324   @param rsapub: the RSA public key to write
325   @type sshkey: str
326   @param sshkey: the SSH private key to write
327   @type sshpub: str
328   @param sshpub: the SSH public key to write
329   @rtype: boolean
330   @return: the success of the operation
331
332   """
333   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337   for name, content, mode in sshd_keys:
338     utils.WriteFile(name, data=content, mode=mode)
339
340   try:
341     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342                                                     mkdir=True)
343   except errors.OpExecError, err:
344     _Fail("Error while processing user ssh files: %s", err, exc=True)
345
346   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347     utils.WriteFile(name, data=content, mode=0600)
348
349   utils.AddAuthorizedKey(auth_keys, sshpub)
350
351   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352
353
354 def LeaveCluster():
355   """Cleans up and remove the current node.
356
357   This function cleans up and prepares the current node to be removed
358   from the cluster.
359
360   If processing is successful, then it raises an
361   L{errors.QuitGanetiException} which is used as a special case to
362   shutdown the node daemon.
363
364   """
365   _CleanDirectory(constants.DATA_DIR)
366   JobQueuePurge()
367
368   try:
369     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370
371     utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372
373     utils.RemoveFile(priv_key)
374     utils.RemoveFile(pub_key)
375   except errors.OpExecError:
376     logging.exception("Error while processing ssh files")
377
378   try:
379     utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380     utils.RemoveFile(constants.RAPI_CERT_FILE)
381     utils.RemoveFile(constants.SSL_CERT_FILE)
382   except:
383     logging.exception("Error while removing cluster secrets")
384
385   confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
386
387   if confd_pid:
388     utils.KillProcess(confd_pid, timeout=2)
389
390   # Raise a custom exception (handled in ganeti-noded)
391   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
392
393
394 def GetNodeInfo(vgname, hypervisor_type):
395   """Gives back a hash with different information about the node.
396
397   @type vgname: C{string}
398   @param vgname: the name of the volume group to ask for disk space information
399   @type hypervisor_type: C{str}
400   @param hypervisor_type: the name of the hypervisor to ask for
401       memory information
402   @rtype: C{dict}
403   @return: dictionary with the following keys:
404       - vg_size is the size of the configured volume group in MiB
405       - vg_free is the free size of the volume group in MiB
406       - memory_dom0 is the memory allocated for domain0 in MiB
407       - memory_free is the currently available (free) ram in MiB
408       - memory_total is the total number of ram in MiB
409
410   """
411   outputarray = {}
412   vginfo = _GetVGInfo(vgname)
413   outputarray['vg_size'] = vginfo['vg_size']
414   outputarray['vg_free'] = vginfo['vg_free']
415
416   hyper = hypervisor.GetHypervisor(hypervisor_type)
417   hyp_info = hyper.GetNodeInfo()
418   if hyp_info is not None:
419     outputarray.update(hyp_info)
420
421   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
422
423   return outputarray
424
425
426 def VerifyNode(what, cluster_name):
427   """Verify the status of the local node.
428
429   Based on the input L{what} parameter, various checks are done on the
430   local node.
431
432   If the I{filelist} key is present, this list of
433   files is checksummed and the file/checksum pairs are returned.
434
435   If the I{nodelist} key is present, we check that we have
436   connectivity via ssh with the target nodes (and check the hostname
437   report).
438
439   If the I{node-net-test} key is present, we check that we have
440   connectivity to the given nodes via both primary IP and, if
441   applicable, secondary IPs.
442
443   @type what: C{dict}
444   @param what: a dictionary of things to check:
445       - filelist: list of files for which to compute checksums
446       - nodelist: list of nodes we should check ssh communication with
447       - node-net-test: list of nodes we should check node daemon port
448         connectivity with
449       - hypervisor: list with hypervisors to run the verify for
450   @rtype: dict
451   @return: a dictionary with the same keys as the input dict, and
452       values representing the result of the checks
453
454   """
455   result = {}
456
457   if constants.NV_HYPERVISOR in what:
458     result[constants.NV_HYPERVISOR] = tmp = {}
459     for hv_name in what[constants.NV_HYPERVISOR]:
460       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
461
462   if constants.NV_FILELIST in what:
463     result[constants.NV_FILELIST] = utils.FingerprintFiles(
464       what[constants.NV_FILELIST])
465
466   if constants.NV_NODELIST in what:
467     result[constants.NV_NODELIST] = tmp = {}
468     random.shuffle(what[constants.NV_NODELIST])
469     for node in what[constants.NV_NODELIST]:
470       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
471       if not success:
472         tmp[node] = message
473
474   if constants.NV_NODENETTEST in what:
475     result[constants.NV_NODENETTEST] = tmp = {}
476     my_name = utils.HostInfo().name
477     my_pip = my_sip = None
478     for name, pip, sip in what[constants.NV_NODENETTEST]:
479       if name == my_name:
480         my_pip = pip
481         my_sip = sip
482         break
483     if not my_pip:
484       tmp[my_name] = ("Can't find my own primary/secondary IP"
485                       " in the node list")
486     else:
487       port = utils.GetDaemonPort(constants.NODED)
488       for name, pip, sip in what[constants.NV_NODENETTEST]:
489         fail = []
490         if not utils.TcpPing(pip, port, source=my_pip):
491           fail.append("primary")
492         if sip != pip:
493           if not utils.TcpPing(sip, port, source=my_sip):
494             fail.append("secondary")
495         if fail:
496           tmp[name] = ("failure using the %s interface(s)" %
497                        " and ".join(fail))
498
499   if constants.NV_LVLIST in what:
500     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
501
502   if constants.NV_INSTANCELIST in what:
503     result[constants.NV_INSTANCELIST] = GetInstanceList(
504       what[constants.NV_INSTANCELIST])
505
506   if constants.NV_VGLIST in what:
507     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
508
509   if constants.NV_VERSION in what:
510     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511                                     constants.RELEASE_VERSION)
512
513   if constants.NV_HVINFO in what:
514     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
516
517   if constants.NV_DRBDLIST in what:
518     try:
519       used_minors = bdev.DRBD8.GetUsedDevs().keys()
520     except errors.BlockDeviceError, err:
521       logging.warning("Can't get used minors list", exc_info=True)
522       used_minors = str(err)
523     result[constants.NV_DRBDLIST] = used_minors
524
525   return result
526
527
528 def GetVolumeList(vg_name):
529   """Compute list of logical volumes and their size.
530
531   @type vg_name: str
532   @param vg_name: the volume group whose LVs we should list
533   @rtype: dict
534   @return:
535       dictionary of all partions (key) with value being a tuple of
536       their size (in MiB), inactive and online status::
537
538         {'test1': ('20.06', True, True)}
539
540       in case of errors, a string is returned with the error
541       details.
542
543   """
544   lvs = {}
545   sep = '|'
546   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547                          "--separator=%s" % sep,
548                          "-olv_name,lv_size,lv_attr", vg_name])
549   if result.failed:
550     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
551
552   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
553   for line in result.stdout.splitlines():
554     line = line.strip()
555     match = valid_line_re.match(line)
556     if not match:
557       logging.error("Invalid line returned from lvs output: '%s'", line)
558       continue
559     name, size, attr = match.groups()
560     inactive = attr[4] == '-'
561     online = attr[5] == 'o'
562     virtual = attr[0] == 'v'
563     if virtual:
564       # we don't want to report such volumes as existing, since they
565       # don't really hold data
566       continue
567     lvs[name] = (size, inactive, online)
568
569   return lvs
570
571
572 def ListVolumeGroups():
573   """List the volume groups and their size.
574
575   @rtype: dict
576   @return: dictionary with keys volume name and values the
577       size of the volume
578
579   """
580   return utils.ListVolumeGroups()
581
582
583 def NodeVolumes():
584   """List all volumes on this node.
585
586   @rtype: list
587   @return:
588     A list of dictionaries, each having four keys:
589       - name: the logical volume name,
590       - size: the size of the logical volume
591       - dev: the physical device on which the LV lives
592       - vg: the volume group to which it belongs
593
594     In case of errors, we return an empty list and log the
595     error.
596
597     Note that since a logical volume can live on multiple physical
598     volumes, the resulting list might include a logical volume
599     multiple times.
600
601   """
602   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
603                          "--separator=|",
604                          "--options=lv_name,lv_size,devices,vg_name"])
605   if result.failed:
606     _Fail("Failed to list logical volumes, lvs output: %s",
607           result.output)
608
609   def parse_dev(dev):
610     if '(' in dev:
611       return dev.split('(')[0]
612     else:
613       return dev
614
615   def map_line(line):
616     return {
617       'name': line[0].strip(),
618       'size': line[1].strip(),
619       'dev': parse_dev(line[2].strip()),
620       'vg': line[3].strip(),
621     }
622
623   return [map_line(line.split('|')) for line in result.stdout.splitlines()
624           if line.count('|') >= 3]
625
626
627 def BridgesExist(bridges_list):
628   """Check if a list of bridges exist on the current node.
629
630   @rtype: boolean
631   @return: C{True} if all of them exist, C{False} otherwise
632
633   """
634   missing = []
635   for bridge in bridges_list:
636     if not utils.BridgeExists(bridge):
637       missing.append(bridge)
638
639   if missing:
640     _Fail("Missing bridges %s", ", ".join(missing))
641
642
643 def GetInstanceList(hypervisor_list):
644   """Provides a list of instances.
645
646   @type hypervisor_list: list
647   @param hypervisor_list: the list of hypervisors to query information
648
649   @rtype: list
650   @return: a list of all running instances on the current node
651     - instance1.example.com
652     - instance2.example.com
653
654   """
655   results = []
656   for hname in hypervisor_list:
657     try:
658       names = hypervisor.GetHypervisor(hname).ListInstances()
659       results.extend(names)
660     except errors.HypervisorError, err:
661       _Fail("Error enumerating instances (hypervisor %s): %s",
662             hname, err, exc=True)
663
664   return results
665
666
667 def GetInstanceInfo(instance, hname):
668   """Gives back the information about an instance as a dictionary.
669
670   @type instance: string
671   @param instance: the instance name
672   @type hname: string
673   @param hname: the hypervisor type of the instance
674
675   @rtype: dict
676   @return: dictionary with the following keys:
677       - memory: memory size of instance (int)
678       - state: xen state of instance (string)
679       - time: cpu time of instance (float)
680
681   """
682   output = {}
683
684   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
685   if iinfo is not None:
686     output['memory'] = iinfo[2]
687     output['state'] = iinfo[4]
688     output['time'] = iinfo[5]
689
690   return output
691
692
693 def GetInstanceMigratable(instance):
694   """Gives whether an instance can be migrated.
695
696   @type instance: L{objects.Instance}
697   @param instance: object representing the instance to be checked.
698
699   @rtype: tuple
700   @return: tuple of (result, description) where:
701       - result: whether the instance can be migrated or not
702       - description: a description of the issue, if relevant
703
704   """
705   hyper = hypervisor.GetHypervisor(instance.hypervisor)
706   iname = instance.name
707   if iname not in hyper.ListInstances():
708     _Fail("Instance %s is not running", iname)
709
710   for idx in range(len(instance.disks)):
711     link_name = _GetBlockDevSymlinkPath(iname, idx)
712     if not os.path.islink(link_name):
713       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
714
715
716 def GetAllInstancesInfo(hypervisor_list):
717   """Gather data about all instances.
718
719   This is the equivalent of L{GetInstanceInfo}, except that it
720   computes data for all instances at once, thus being faster if one
721   needs data about more than one instance.
722
723   @type hypervisor_list: list
724   @param hypervisor_list: list of hypervisors to query for instance data
725
726   @rtype: dict
727   @return: dictionary of instance: data, with data having the following keys:
728       - memory: memory size of instance (int)
729       - state: xen state of instance (string)
730       - time: cpu time of instance (float)
731       - vcpus: the number of vcpus
732
733   """
734   output = {}
735
736   for hname in hypervisor_list:
737     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
738     if iinfo:
739       for name, _, memory, vcpus, state, times in iinfo:
740         value = {
741           'memory': memory,
742           'vcpus': vcpus,
743           'state': state,
744           'time': times,
745           }
746         if name in output:
747           # we only check static parameters, like memory and vcpus,
748           # and not state and time which can change between the
749           # invocations of the different hypervisors
750           for key in 'memory', 'vcpus':
751             if value[key] != output[name][key]:
752               _Fail("Instance %s is running twice"
753                     " with different parameters", name)
754         output[name] = value
755
756   return output
757
758
759 def InstanceOsAdd(instance, reinstall):
760   """Add an OS to an instance.
761
762   @type instance: L{objects.Instance}
763   @param instance: Instance whose OS is to be installed
764   @type reinstall: boolean
765   @param reinstall: whether this is an instance reinstall
766   @rtype: None
767
768   """
769   inst_os = OSFromDisk(instance.os)
770
771   create_env = OSEnvironment(instance, inst_os)
772   if reinstall:
773     create_env['INSTANCE_REINSTALL'] = "1"
774
775   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
776                                      instance.name, int(time.time()))
777
778   result = utils.RunCmd([inst_os.create_script], env=create_env,
779                         cwd=inst_os.path, output=logfile,)
780   if result.failed:
781     logging.error("os create command '%s' returned error: %s, logfile: %s,"
782                   " output: %s", result.cmd, result.fail_reason, logfile,
783                   result.output)
784     lines = [utils.SafeEncode(val)
785              for val in utils.TailFile(logfile, lines=20)]
786     _Fail("OS create script failed (%s), last lines in the"
787           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
788
789
790 def RunRenameInstance(instance, old_name):
791   """Run the OS rename script for an instance.
792
793   @type instance: L{objects.Instance}
794   @param instance: Instance whose OS is to be installed
795   @type old_name: string
796   @param old_name: previous instance name
797   @rtype: boolean
798   @return: the success of the operation
799
800   """
801   inst_os = OSFromDisk(instance.os)
802
803   rename_env = OSEnvironment(instance, inst_os)
804   rename_env['OLD_INSTANCE_NAME'] = old_name
805
806   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
807                                            old_name,
808                                            instance.name, int(time.time()))
809
810   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
811                         cwd=inst_os.path, output=logfile)
812
813   if result.failed:
814     logging.error("os create command '%s' returned error: %s output: %s",
815                   result.cmd, result.fail_reason, result.output)
816     lines = [utils.SafeEncode(val)
817              for val in utils.TailFile(logfile, lines=20)]
818     _Fail("OS rename script failed (%s), last lines in the"
819           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
820
821
822 def _GetVGInfo(vg_name):
823   """Get information about the volume group.
824
825   @type vg_name: str
826   @param vg_name: the volume group which we query
827   @rtype: dict
828   @return:
829     A dictionary with the following keys:
830       - C{vg_size} is the total size of the volume group in MiB
831       - C{vg_free} is the free size of the volume group in MiB
832       - C{pv_count} are the number of physical disks in that VG
833
834     If an error occurs during gathering of data, we return the same dict
835     with keys all set to None.
836
837   """
838   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
839
840   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
841                          "--nosuffix", "--units=m", "--separator=:", vg_name])
842
843   if retval.failed:
844     logging.error("volume group %s not present", vg_name)
845     return retdic
846   valarr = retval.stdout.strip().rstrip(':').split(':')
847   if len(valarr) == 3:
848     try:
849       retdic = {
850         "vg_size": int(round(float(valarr[0]), 0)),
851         "vg_free": int(round(float(valarr[1]), 0)),
852         "pv_count": int(valarr[2]),
853         }
854     except ValueError, err:
855       logging.exception("Fail to parse vgs output: %s", err)
856   else:
857     logging.error("vgs output has the wrong number of fields (expected"
858                   " three): %s", str(valarr))
859   return retdic
860
861
862 def _GetBlockDevSymlinkPath(instance_name, idx):
863   return os.path.join(constants.DISK_LINKS_DIR,
864                       "%s:%d" % (instance_name, idx))
865
866
867 def _SymlinkBlockDev(instance_name, device_path, idx):
868   """Set up symlinks to a instance's block device.
869
870   This is an auxiliary function run when an instance is start (on the primary
871   node) or when an instance is migrated (on the target node).
872
873
874   @param instance_name: the name of the target instance
875   @param device_path: path of the physical block device, on the node
876   @param idx: the disk index
877   @return: absolute path to the disk's symlink
878
879   """
880   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
881   try:
882     os.symlink(device_path, link_name)
883   except OSError, err:
884     if err.errno == errno.EEXIST:
885       if (not os.path.islink(link_name) or
886           os.readlink(link_name) != device_path):
887         os.remove(link_name)
888         os.symlink(device_path, link_name)
889     else:
890       raise
891
892   return link_name
893
894
895 def _RemoveBlockDevLinks(instance_name, disks):
896   """Remove the block device symlinks belonging to the given instance.
897
898   """
899   for idx, _ in enumerate(disks):
900     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
901     if os.path.islink(link_name):
902       try:
903         os.remove(link_name)
904       except OSError:
905         logging.exception("Can't remove symlink '%s'", link_name)
906
907
908 def _GatherAndLinkBlockDevs(instance):
909   """Set up an instance's block device(s).
910
911   This is run on the primary node at instance startup. The block
912   devices must be already assembled.
913
914   @type instance: L{objects.Instance}
915   @param instance: the instance whose disks we shoul assemble
916   @rtype: list
917   @return: list of (disk_object, device_path)
918
919   """
920   block_devices = []
921   for idx, disk in enumerate(instance.disks):
922     device = _RecursiveFindBD(disk)
923     if device is None:
924       raise errors.BlockDeviceError("Block device '%s' is not set up." %
925                                     str(disk))
926     device.Open()
927     try:
928       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
929     except OSError, e:
930       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
931                                     e.strerror)
932
933     block_devices.append((disk, link_name))
934
935   return block_devices
936
937
938 def StartInstance(instance):
939   """Start an instance.
940
941   @type instance: L{objects.Instance}
942   @param instance: the instance object
943   @rtype: None
944
945   """
946   running_instances = GetInstanceList([instance.hypervisor])
947
948   if instance.name in running_instances:
949     logging.info("Instance %s already running, not starting", instance.name)
950     return
951
952   try:
953     block_devices = _GatherAndLinkBlockDevs(instance)
954     hyper = hypervisor.GetHypervisor(instance.hypervisor)
955     hyper.StartInstance(instance, block_devices)
956   except errors.BlockDeviceError, err:
957     _Fail("Block device error: %s", err, exc=True)
958   except errors.HypervisorError, err:
959     _RemoveBlockDevLinks(instance.name, instance.disks)
960     _Fail("Hypervisor error: %s", err, exc=True)
961
962
963 def InstanceShutdown(instance):
964   """Shut an instance down.
965
966   @note: this functions uses polling with a hardcoded timeout.
967
968   @type instance: L{objects.Instance}
969   @param instance: the instance object
970   @rtype: None
971
972   """
973   hv_name = instance.hypervisor
974   hyper = hypervisor.GetHypervisor(hv_name)
975   running_instances = hyper.ListInstances()
976   iname = instance.name
977   timeout = constants.DEFAULT_SHUTDOWN_TIMEOUT
978
979   if iname not in running_instances:
980     logging.info("Instance %s not running, doing nothing", iname)
981     return
982
983   start = time.time()
984   end = start + timeout
985   sleep_time = 1
986
987   tried_once = False
988   while not tried_once and time.time() < end:
989     try:
990       hyper.StopInstance(instance, retry=tried_once)
991     except errors.HypervisorError, err:
992       _Fail("Failed to stop instance %s: %s", iname, err)
993     tried_once = True
994     time.sleep(sleep_time)
995     if instance.name not in hyper.ListInstances():
996       break
997     if sleep_time < 5:
998       # 1.2 behaves particularly good for our case:
999       # it gives us 10 increasing steps and caps just slightly above 5 seconds
1000       sleep_time *= 1.2
1001   else:
1002     # the shutdown did not succeed
1003     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1004
1005     try:
1006       hyper.StopInstance(instance, force=True)
1007     except errors.HypervisorError, err:
1008       _Fail("Failed to force stop instance %s: %s", iname, err)
1009
1010     time.sleep(1)
1011     if instance.name in GetInstanceList([hv_name]):
1012       _Fail("Could not shutdown instance %s even by destroy", iname)
1013
1014   _RemoveBlockDevLinks(iname, instance.disks)
1015
1016
1017 def InstanceReboot(instance, reboot_type):
1018   """Reboot an instance.
1019
1020   @type instance: L{objects.Instance}
1021   @param instance: the instance object to reboot
1022   @type reboot_type: str
1023   @param reboot_type: the type of reboot, one the following
1024     constants:
1025       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1026         instance OS, do not recreate the VM
1027       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1028         restart the VM (at the hypervisor level)
1029       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1030         not accepted here, since that mode is handled differently, in
1031         cmdlib, and translates into full stop and start of the
1032         instance (instead of a call_instance_reboot RPC)
1033   @rtype: None
1034
1035   """
1036   running_instances = GetInstanceList([instance.hypervisor])
1037
1038   if instance.name not in running_instances:
1039     _Fail("Cannot reboot instance %s that is not running", instance.name)
1040
1041   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1042   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1043     try:
1044       hyper.RebootInstance(instance)
1045     except errors.HypervisorError, err:
1046       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1047   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1048     try:
1049       InstanceShutdown(instance)
1050       return StartInstance(instance)
1051     except errors.HypervisorError, err:
1052       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1053   else:
1054     _Fail("Invalid reboot_type received: %s", reboot_type)
1055
1056
1057 def MigrationInfo(instance):
1058   """Gather information about an instance to be migrated.
1059
1060   @type instance: L{objects.Instance}
1061   @param instance: the instance definition
1062
1063   """
1064   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1065   try:
1066     info = hyper.MigrationInfo(instance)
1067   except errors.HypervisorError, err:
1068     _Fail("Failed to fetch migration information: %s", err, exc=True)
1069   return info
1070
1071
1072 def AcceptInstance(instance, info, target):
1073   """Prepare the node to accept an instance.
1074
1075   @type instance: L{objects.Instance}
1076   @param instance: the instance definition
1077   @type info: string/data (opaque)
1078   @param info: migration information, from the source node
1079   @type target: string
1080   @param target: target host (usually ip), on this node
1081
1082   """
1083   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1084   try:
1085     hyper.AcceptInstance(instance, info, target)
1086   except errors.HypervisorError, err:
1087     _Fail("Failed to accept instance: %s", err, exc=True)
1088
1089
1090 def FinalizeMigration(instance, info, success):
1091   """Finalize any preparation to accept an instance.
1092
1093   @type instance: L{objects.Instance}
1094   @param instance: the instance definition
1095   @type info: string/data (opaque)
1096   @param info: migration information, from the source node
1097   @type success: boolean
1098   @param success: whether the migration was a success or a failure
1099
1100   """
1101   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1102   try:
1103     hyper.FinalizeMigration(instance, info, success)
1104   except errors.HypervisorError, err:
1105     _Fail("Failed to finalize migration: %s", err, exc=True)
1106
1107
1108 def MigrateInstance(instance, target, live):
1109   """Migrates an instance to another node.
1110
1111   @type instance: L{objects.Instance}
1112   @param instance: the instance definition
1113   @type target: string
1114   @param target: the target node name
1115   @type live: boolean
1116   @param live: whether the migration should be done live or not (the
1117       interpretation of this parameter is left to the hypervisor)
1118   @rtype: tuple
1119   @return: a tuple of (success, msg) where:
1120       - succes is a boolean denoting the success/failure of the operation
1121       - msg is a string with details in case of failure
1122
1123   """
1124   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1125
1126   try:
1127     hyper.MigrateInstance(instance.name, target, live)
1128   except errors.HypervisorError, err:
1129     _Fail("Failed to migrate instance: %s", err, exc=True)
1130
1131
1132 def BlockdevCreate(disk, size, owner, on_primary, info):
1133   """Creates a block device for an instance.
1134
1135   @type disk: L{objects.Disk}
1136   @param disk: the object describing the disk we should create
1137   @type size: int
1138   @param size: the size of the physical underlying device, in MiB
1139   @type owner: str
1140   @param owner: the name of the instance for which disk is created,
1141       used for device cache data
1142   @type on_primary: boolean
1143   @param on_primary:  indicates if it is the primary node or not
1144   @type info: string
1145   @param info: string that will be sent to the physical device
1146       creation, used for example to set (LVM) tags on LVs
1147
1148   @return: the new unique_id of the device (this can sometime be
1149       computed only after creation), or None. On secondary nodes,
1150       it's not required to return anything.
1151
1152   """
1153   clist = []
1154   if disk.children:
1155     for child in disk.children:
1156       try:
1157         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1158       except errors.BlockDeviceError, err:
1159         _Fail("Can't assemble device %s: %s", child, err)
1160       if on_primary or disk.AssembleOnSecondary():
1161         # we need the children open in case the device itself has to
1162         # be assembled
1163         try:
1164           crdev.Open()
1165         except errors.BlockDeviceError, err:
1166           _Fail("Can't make child '%s' read-write: %s", child, err)
1167       clist.append(crdev)
1168
1169   try:
1170     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1171   except errors.BlockDeviceError, err:
1172     _Fail("Can't create block device: %s", err)
1173
1174   if on_primary or disk.AssembleOnSecondary():
1175     try:
1176       device.Assemble()
1177     except errors.BlockDeviceError, err:
1178       _Fail("Can't assemble device after creation, unusual event: %s", err)
1179     device.SetSyncSpeed(constants.SYNC_SPEED)
1180     if on_primary or disk.OpenOnSecondary():
1181       try:
1182         device.Open(force=True)
1183       except errors.BlockDeviceError, err:
1184         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1185     DevCacheManager.UpdateCache(device.dev_path, owner,
1186                                 on_primary, disk.iv_name)
1187
1188   device.SetInfo(info)
1189
1190   return device.unique_id
1191
1192
1193 def BlockdevRemove(disk):
1194   """Remove a block device.
1195
1196   @note: This is intended to be called recursively.
1197
1198   @type disk: L{objects.Disk}
1199   @param disk: the disk object we should remove
1200   @rtype: boolean
1201   @return: the success of the operation
1202
1203   """
1204   msgs = []
1205   try:
1206     rdev = _RecursiveFindBD(disk)
1207   except errors.BlockDeviceError, err:
1208     # probably can't attach
1209     logging.info("Can't attach to device %s in remove", disk)
1210     rdev = None
1211   if rdev is not None:
1212     r_path = rdev.dev_path
1213     try:
1214       rdev.Remove()
1215     except errors.BlockDeviceError, err:
1216       msgs.append(str(err))
1217     if not msgs:
1218       DevCacheManager.RemoveCache(r_path)
1219
1220   if disk.children:
1221     for child in disk.children:
1222       try:
1223         BlockdevRemove(child)
1224       except RPCFail, err:
1225         msgs.append(str(err))
1226
1227   if msgs:
1228     _Fail("; ".join(msgs))
1229
1230
1231 def _RecursiveAssembleBD(disk, owner, as_primary):
1232   """Activate a block device for an instance.
1233
1234   This is run on the primary and secondary nodes for an instance.
1235
1236   @note: this function is called recursively.
1237
1238   @type disk: L{objects.Disk}
1239   @param disk: the disk we try to assemble
1240   @type owner: str
1241   @param owner: the name of the instance which owns the disk
1242   @type as_primary: boolean
1243   @param as_primary: if we should make the block device
1244       read/write
1245
1246   @return: the assembled device or None (in case no device
1247       was assembled)
1248   @raise errors.BlockDeviceError: in case there is an error
1249       during the activation of the children or the device
1250       itself
1251
1252   """
1253   children = []
1254   if disk.children:
1255     mcn = disk.ChildrenNeeded()
1256     if mcn == -1:
1257       mcn = 0 # max number of Nones allowed
1258     else:
1259       mcn = len(disk.children) - mcn # max number of Nones
1260     for chld_disk in disk.children:
1261       try:
1262         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1263       except errors.BlockDeviceError, err:
1264         if children.count(None) >= mcn:
1265           raise
1266         cdev = None
1267         logging.error("Error in child activation (but continuing): %s",
1268                       str(err))
1269       children.append(cdev)
1270
1271   if as_primary or disk.AssembleOnSecondary():
1272     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1273     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1274     result = r_dev
1275     if as_primary or disk.OpenOnSecondary():
1276       r_dev.Open()
1277     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1278                                 as_primary, disk.iv_name)
1279
1280   else:
1281     result = True
1282   return result
1283
1284
1285 def BlockdevAssemble(disk, owner, as_primary):
1286   """Activate a block device for an instance.
1287
1288   This is a wrapper over _RecursiveAssembleBD.
1289
1290   @rtype: str or boolean
1291   @return: a C{/dev/...} path for primary nodes, and
1292       C{True} for secondary nodes
1293
1294   """
1295   try:
1296     result = _RecursiveAssembleBD(disk, owner, as_primary)
1297     if isinstance(result, bdev.BlockDev):
1298       result = result.dev_path
1299   except errors.BlockDeviceError, err:
1300     _Fail("Error while assembling disk: %s", err, exc=True)
1301
1302   return result
1303
1304
1305 def BlockdevShutdown(disk):
1306   """Shut down a block device.
1307
1308   First, if the device is assembled (Attach() is successful), then
1309   the device is shutdown. Then the children of the device are
1310   shutdown.
1311
1312   This function is called recursively. Note that we don't cache the
1313   children or such, as oppossed to assemble, shutdown of different
1314   devices doesn't require that the upper device was active.
1315
1316   @type disk: L{objects.Disk}
1317   @param disk: the description of the disk we should
1318       shutdown
1319   @rtype: None
1320
1321   """
1322   msgs = []
1323   r_dev = _RecursiveFindBD(disk)
1324   if r_dev is not None:
1325     r_path = r_dev.dev_path
1326     try:
1327       r_dev.Shutdown()
1328       DevCacheManager.RemoveCache(r_path)
1329     except errors.BlockDeviceError, err:
1330       msgs.append(str(err))
1331
1332   if disk.children:
1333     for child in disk.children:
1334       try:
1335         BlockdevShutdown(child)
1336       except RPCFail, err:
1337         msgs.append(str(err))
1338
1339   if msgs:
1340     _Fail("; ".join(msgs))
1341
1342
1343 def BlockdevAddchildren(parent_cdev, new_cdevs):
1344   """Extend a mirrored block device.
1345
1346   @type parent_cdev: L{objects.Disk}
1347   @param parent_cdev: the disk to which we should add children
1348   @type new_cdevs: list of L{objects.Disk}
1349   @param new_cdevs: the list of children which we should add
1350   @rtype: None
1351
1352   """
1353   parent_bdev = _RecursiveFindBD(parent_cdev)
1354   if parent_bdev is None:
1355     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1356   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1357   if new_bdevs.count(None) > 0:
1358     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1359   parent_bdev.AddChildren(new_bdevs)
1360
1361
1362 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1363   """Shrink a mirrored block device.
1364
1365   @type parent_cdev: L{objects.Disk}
1366   @param parent_cdev: the disk from which we should remove children
1367   @type new_cdevs: list of L{objects.Disk}
1368   @param new_cdevs: the list of children which we should remove
1369   @rtype: None
1370
1371   """
1372   parent_bdev = _RecursiveFindBD(parent_cdev)
1373   if parent_bdev is None:
1374     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1375   devs = []
1376   for disk in new_cdevs:
1377     rpath = disk.StaticDevPath()
1378     if rpath is None:
1379       bd = _RecursiveFindBD(disk)
1380       if bd is None:
1381         _Fail("Can't find device %s while removing children", disk)
1382       else:
1383         devs.append(bd.dev_path)
1384     else:
1385       devs.append(rpath)
1386   parent_bdev.RemoveChildren(devs)
1387
1388
1389 def BlockdevGetmirrorstatus(disks):
1390   """Get the mirroring status of a list of devices.
1391
1392   @type disks: list of L{objects.Disk}
1393   @param disks: the list of disks which we should query
1394   @rtype: disk
1395   @return:
1396       a list of (mirror_done, estimated_time) tuples, which
1397       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1398   @raise errors.BlockDeviceError: if any of the disks cannot be
1399       found
1400
1401   """
1402   stats = []
1403   for dsk in disks:
1404     rbd = _RecursiveFindBD(dsk)
1405     if rbd is None:
1406       _Fail("Can't find device %s", dsk)
1407
1408     stats.append(rbd.CombinedSyncStatus())
1409
1410   return stats
1411
1412
1413 def _RecursiveFindBD(disk):
1414   """Check if a device is activated.
1415
1416   If so, return information about the real device.
1417
1418   @type disk: L{objects.Disk}
1419   @param disk: the disk object we need to find
1420
1421   @return: None if the device can't be found,
1422       otherwise the device instance
1423
1424   """
1425   children = []
1426   if disk.children:
1427     for chdisk in disk.children:
1428       children.append(_RecursiveFindBD(chdisk))
1429
1430   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1431
1432
1433 def BlockdevFind(disk):
1434   """Check if a device is activated.
1435
1436   If it is, return information about the real device.
1437
1438   @type disk: L{objects.Disk}
1439   @param disk: the disk to find
1440   @rtype: None or objects.BlockDevStatus
1441   @return: None if the disk cannot be found, otherwise a the current
1442            information
1443
1444   """
1445   try:
1446     rbd = _RecursiveFindBD(disk)
1447   except errors.BlockDeviceError, err:
1448     _Fail("Failed to find device: %s", err, exc=True)
1449
1450   if rbd is None:
1451     return None
1452
1453   return rbd.GetSyncStatus()
1454
1455
1456 def BlockdevGetsize(disks):
1457   """Computes the size of the given disks.
1458
1459   If a disk is not found, returns None instead.
1460
1461   @type disks: list of L{objects.Disk}
1462   @param disks: the list of disk to compute the size for
1463   @rtype: list
1464   @return: list with elements None if the disk cannot be found,
1465       otherwise the size
1466
1467   """
1468   result = []
1469   for cf in disks:
1470     try:
1471       rbd = _RecursiveFindBD(cf)
1472     except errors.BlockDeviceError, err:
1473       result.append(None)
1474       continue
1475     if rbd is None:
1476       result.append(None)
1477     else:
1478       result.append(rbd.GetActualSize())
1479   return result
1480
1481
1482 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1483   """Export a block device to a remote node.
1484
1485   @type disk: L{objects.Disk}
1486   @param disk: the description of the disk to export
1487   @type dest_node: str
1488   @param dest_node: the destination node to export to
1489   @type dest_path: str
1490   @param dest_path: the destination path on the target node
1491   @type cluster_name: str
1492   @param cluster_name: the cluster name, needed for SSH hostalias
1493   @rtype: None
1494
1495   """
1496   real_disk = _RecursiveFindBD(disk)
1497   if real_disk is None:
1498     _Fail("Block device '%s' is not set up", disk)
1499
1500   real_disk.Open()
1501
1502   # the block size on the read dd is 1MiB to match our units
1503   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1504                                "dd if=%s bs=1048576 count=%s",
1505                                real_disk.dev_path, str(disk.size))
1506
1507   # we set here a smaller block size as, due to ssh buffering, more
1508   # than 64-128k will mostly ignored; we use nocreat to fail if the
1509   # device is not already there or we pass a wrong path; we use
1510   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1511   # to not buffer too much memory; this means that at best, we flush
1512   # every 64k, which will not be very fast
1513   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1514                                 " oflag=dsync", dest_path)
1515
1516   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1517                                                    constants.GANETI_RUNAS,
1518                                                    destcmd)
1519
1520   # all commands have been checked, so we're safe to combine them
1521   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1522
1523   result = utils.RunCmd(["bash", "-c", command])
1524
1525   if result.failed:
1526     _Fail("Disk copy command '%s' returned error: %s"
1527           " output: %s", command, result.fail_reason, result.output)
1528
1529
1530 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1531   """Write a file to the filesystem.
1532
1533   This allows the master to overwrite(!) a file. It will only perform
1534   the operation if the file belongs to a list of configuration files.
1535
1536   @type file_name: str
1537   @param file_name: the target file name
1538   @type data: str
1539   @param data: the new contents of the file
1540   @type mode: int
1541   @param mode: the mode to give the file (can be None)
1542   @type uid: int
1543   @param uid: the owner of the file (can be -1 for default)
1544   @type gid: int
1545   @param gid: the group of the file (can be -1 for default)
1546   @type atime: float
1547   @param atime: the atime to set on the file (can be None)
1548   @type mtime: float
1549   @param mtime: the mtime to set on the file (can be None)
1550   @rtype: None
1551
1552   """
1553   if not os.path.isabs(file_name):
1554     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1555
1556   if file_name not in _ALLOWED_UPLOAD_FILES:
1557     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1558           file_name)
1559
1560   raw_data = _Decompress(data)
1561
1562   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1563                   atime=atime, mtime=mtime)
1564
1565
1566 def WriteSsconfFiles(values):
1567   """Update all ssconf files.
1568
1569   Wrapper around the SimpleStore.WriteFiles.
1570
1571   """
1572   ssconf.SimpleStore().WriteFiles(values)
1573
1574
1575 def _ErrnoOrStr(err):
1576   """Format an EnvironmentError exception.
1577
1578   If the L{err} argument has an errno attribute, it will be looked up
1579   and converted into a textual C{E...} description. Otherwise the
1580   string representation of the error will be returned.
1581
1582   @type err: L{EnvironmentError}
1583   @param err: the exception to format
1584
1585   """
1586   if hasattr(err, 'errno'):
1587     detail = errno.errorcode[err.errno]
1588   else:
1589     detail = str(err)
1590   return detail
1591
1592
1593 def _OSOndiskAPIVersion(name, os_dir):
1594   """Compute and return the API version of a given OS.
1595
1596   This function will try to read the API version of the OS given by
1597   the 'name' parameter and residing in the 'os_dir' directory.
1598
1599   @type name: str
1600   @param name: the OS name we should look for
1601   @type os_dir: str
1602   @param os_dir: the directory inwhich we should look for the OS
1603   @rtype: tuple
1604   @return: tuple (status, data) with status denoting the validity and
1605       data holding either the vaid versions or an error message
1606
1607   """
1608   api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1609
1610   try:
1611     st = os.stat(api_file)
1612   except EnvironmentError, err:
1613     return False, ("Required file '%s' not found under path %s: %s" %
1614                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1615
1616   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1617     return False, ("File '%s' in %s is not a regular file" %
1618                    (constants.OS_API_FILE, os_dir))
1619
1620   try:
1621     api_versions = utils.ReadFile(api_file).splitlines()
1622   except EnvironmentError, err:
1623     return False, ("Error while reading the API version file at %s: %s" %
1624                    (api_file, _ErrnoOrStr(err)))
1625
1626   try:
1627     api_versions = [int(version.strip()) for version in api_versions]
1628   except (TypeError, ValueError), err:
1629     return False, ("API version(s) can't be converted to integer: %s" %
1630                    str(err))
1631
1632   return True, api_versions
1633
1634
1635 def DiagnoseOS(top_dirs=None):
1636   """Compute the validity for all OSes.
1637
1638   @type top_dirs: list
1639   @param top_dirs: the list of directories in which to
1640       search (if not given defaults to
1641       L{constants.OS_SEARCH_PATH})
1642   @rtype: list of L{objects.OS}
1643   @return: a list of tuples (name, path, status, diagnose, variants)
1644       for all (potential) OSes under all search paths, where:
1645           - name is the (potential) OS name
1646           - path is the full path to the OS
1647           - status True/False is the validity of the OS
1648           - diagnose is the error message for an invalid OS, otherwise empty
1649           - variants is a list of supported OS variants, if any
1650
1651   """
1652   if top_dirs is None:
1653     top_dirs = constants.OS_SEARCH_PATH
1654
1655   result = []
1656   for dir_name in top_dirs:
1657     if os.path.isdir(dir_name):
1658       try:
1659         f_names = utils.ListVisibleFiles(dir_name)
1660       except EnvironmentError, err:
1661         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1662         break
1663       for name in f_names:
1664         os_path = os.path.sep.join([dir_name, name])
1665         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1666         if status:
1667           diagnose = ""
1668           variants = os_inst.supported_variants
1669         else:
1670           diagnose = os_inst
1671           variants = []
1672         result.append((name, os_path, status, diagnose, variants))
1673
1674   return result
1675
1676
1677 def _TryOSFromDisk(name, base_dir=None):
1678   """Create an OS instance from disk.
1679
1680   This function will return an OS instance if the given name is a
1681   valid OS name.
1682
1683   @type base_dir: string
1684   @keyword base_dir: Base directory containing OS installations.
1685                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1686   @rtype: tuple
1687   @return: success and either the OS instance if we find a valid one,
1688       or error message
1689
1690   """
1691   if base_dir is None:
1692     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1693     if os_dir is None:
1694       return False, "Directory for OS %s not found in search path" % name
1695   else:
1696     os_dir = os.path.sep.join([base_dir, name])
1697
1698   status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1699   if not status:
1700     # push the error up
1701     return status, api_versions
1702
1703   if not constants.OS_API_VERSIONS.intersection(api_versions):
1704     return False, ("API version mismatch for path '%s': found %s, want %s." %
1705                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1706
1707   # OS Files dictionary, we will populate it with the absolute path names
1708   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1709
1710   if max(api_versions) >= constants.OS_API_V15:
1711     os_files[constants.OS_VARIANTS_FILE] = ''
1712
1713   for name in os_files:
1714     os_files[name] = os.path.sep.join([os_dir, name])
1715
1716     try:
1717       st = os.stat(os_files[name])
1718     except EnvironmentError, err:
1719       return False, ("File '%s' under path '%s' is missing (%s)" %
1720                      (name, os_dir, _ErrnoOrStr(err)))
1721
1722     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1723       return False, ("File '%s' under path '%s' is not a regular file" %
1724                      (name, os_dir))
1725
1726     if name in constants.OS_SCRIPTS:
1727       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1728         return False, ("File '%s' under path '%s' is not executable" %
1729                        (name, os_dir))
1730
1731   variants = None
1732   if constants.OS_VARIANTS_FILE in os_files:
1733     variants_file = os_files[constants.OS_VARIANTS_FILE]
1734     try:
1735       variants = utils.ReadFile(variants_file).splitlines()
1736     except EnvironmentError, err:
1737       return False, ("Error while reading the OS variants file at %s: %s" %
1738                      (variants_file, _ErrnoOrStr(err)))
1739     if not variants:
1740       return False, ("No supported os variant found")
1741
1742   os_obj = objects.OS(name=name, path=os_dir,
1743                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1744                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1745                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1746                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1747                       supported_variants=variants,
1748                       api_versions=api_versions)
1749   return True, os_obj
1750
1751
1752 def OSFromDisk(name, base_dir=None):
1753   """Create an OS instance from disk.
1754
1755   This function will return an OS instance if the given name is a
1756   valid OS name. Otherwise, it will raise an appropriate
1757   L{RPCFail} exception, detailing why this is not a valid OS.
1758
1759   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1760   an exception but returns true/false status data.
1761
1762   @type base_dir: string
1763   @keyword base_dir: Base directory containing OS installations.
1764                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1765   @rtype: L{objects.OS}
1766   @return: the OS instance if we find a valid one
1767   @raise RPCFail: if we don't find a valid OS
1768
1769   """
1770   name_only = name.split('+',1)[0]
1771   status, payload = _TryOSFromDisk(name_only, base_dir)
1772
1773   if not status:
1774     _Fail(payload)
1775
1776   return payload
1777
1778
1779 def OSEnvironment(instance, os, debug=0):
1780   """Calculate the environment for an os script.
1781
1782   @type instance: L{objects.Instance}
1783   @param instance: target instance for the os script run
1784   @type os: L{objects.OS}
1785   @param os: operating system for which the environment is being built
1786   @type debug: integer
1787   @param debug: debug level (0 or 1, for OS Api 10)
1788   @rtype: dict
1789   @return: dict of environment variables
1790   @raise errors.BlockDeviceError: if the block device
1791       cannot be found
1792
1793   """
1794   result = {}
1795   api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1796   result['OS_API_VERSION'] = '%d' % api_version
1797   result['INSTANCE_NAME'] = instance.name
1798   result['INSTANCE_OS'] = instance.os
1799   result['HYPERVISOR'] = instance.hypervisor
1800   result['DISK_COUNT'] = '%d' % len(instance.disks)
1801   result['NIC_COUNT'] = '%d' % len(instance.nics)
1802   result['DEBUG_LEVEL'] = '%d' % debug
1803   if api_version >= constants.OS_API_V15:
1804     try:
1805       variant = instance.os.split('+', 1)[1]
1806     except IndexError:
1807       variant = os.supported_variants[0]
1808     result['OS_VARIANT'] = variant
1809   for idx, disk in enumerate(instance.disks):
1810     real_disk = _RecursiveFindBD(disk)
1811     if real_disk is None:
1812       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1813                                     str(disk))
1814     real_disk.Open()
1815     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1816     result['DISK_%d_ACCESS' % idx] = disk.mode
1817     if constants.HV_DISK_TYPE in instance.hvparams:
1818       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1819         instance.hvparams[constants.HV_DISK_TYPE]
1820     if disk.dev_type in constants.LDS_BLOCK:
1821       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1822     elif disk.dev_type == constants.LD_FILE:
1823       result['DISK_%d_BACKEND_TYPE' % idx] = \
1824         'file:%s' % disk.physical_id[0]
1825   for idx, nic in enumerate(instance.nics):
1826     result['NIC_%d_MAC' % idx] = nic.mac
1827     if nic.ip:
1828       result['NIC_%d_IP' % idx] = nic.ip
1829     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1830     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1831       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1832     if nic.nicparams[constants.NIC_LINK]:
1833       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1834     if constants.HV_NIC_TYPE in instance.hvparams:
1835       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1836         instance.hvparams[constants.HV_NIC_TYPE]
1837
1838   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1839     for key, value in source.items():
1840       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1841
1842   return result
1843
1844 def BlockdevGrow(disk, amount):
1845   """Grow a stack of block devices.
1846
1847   This function is called recursively, with the childrens being the
1848   first ones to resize.
1849
1850   @type disk: L{objects.Disk}
1851   @param disk: the disk to be grown
1852   @rtype: (status, result)
1853   @return: a tuple with the status of the operation
1854       (True/False), and the errors message if status
1855       is False
1856
1857   """
1858   r_dev = _RecursiveFindBD(disk)
1859   if r_dev is None:
1860     _Fail("Cannot find block device %s", disk)
1861
1862   try:
1863     r_dev.Grow(amount)
1864   except errors.BlockDeviceError, err:
1865     _Fail("Failed to grow block device: %s", err, exc=True)
1866
1867
1868 def BlockdevSnapshot(disk):
1869   """Create a snapshot copy of a block device.
1870
1871   This function is called recursively, and the snapshot is actually created
1872   just for the leaf lvm backend device.
1873
1874   @type disk: L{objects.Disk}
1875   @param disk: the disk to be snapshotted
1876   @rtype: string
1877   @return: snapshot disk path
1878
1879   """
1880   if disk.children:
1881     if len(disk.children) == 1:
1882       # only one child, let's recurse on it
1883       return BlockdevSnapshot(disk.children[0])
1884     else:
1885       # more than one child, choose one that matches
1886       for child in disk.children:
1887         if child.size == disk.size:
1888           # return implies breaking the loop
1889           return BlockdevSnapshot(child)
1890   elif disk.dev_type == constants.LD_LV:
1891     r_dev = _RecursiveFindBD(disk)
1892     if r_dev is not None:
1893       # let's stay on the safe side and ask for the full size, for now
1894       return r_dev.Snapshot(disk.size)
1895     else:
1896       _Fail("Cannot find block device %s", disk)
1897   else:
1898     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1899           disk.unique_id, disk.dev_type)
1900
1901
1902 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1903   """Export a block device snapshot to a remote node.
1904
1905   @type disk: L{objects.Disk}
1906   @param disk: the description of the disk to export
1907   @type dest_node: str
1908   @param dest_node: the destination node to export to
1909   @type instance: L{objects.Instance}
1910   @param instance: the instance object to whom the disk belongs
1911   @type cluster_name: str
1912   @param cluster_name: the cluster name, needed for SSH hostalias
1913   @type idx: int
1914   @param idx: the index of the disk in the instance's disk list,
1915       used to export to the OS scripts environment
1916   @rtype: None
1917
1918   """
1919   inst_os = OSFromDisk(instance.os)
1920   export_env = OSEnvironment(instance, inst_os)
1921
1922   export_script = inst_os.export_script
1923
1924   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1925                                      instance.name, int(time.time()))
1926   if not os.path.exists(constants.LOG_OS_DIR):
1927     os.mkdir(constants.LOG_OS_DIR, 0750)
1928   real_disk = _RecursiveFindBD(disk)
1929   if real_disk is None:
1930     _Fail("Block device '%s' is not set up", disk)
1931
1932   real_disk.Open()
1933
1934   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1935   export_env['EXPORT_INDEX'] = str(idx)
1936
1937   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1938   destfile = disk.physical_id[1]
1939
1940   # the target command is built out of three individual commands,
1941   # which are joined by pipes; we check each individual command for
1942   # valid parameters
1943   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1944                                inst_os.path, export_script, logfile)
1945
1946   comprcmd = "gzip"
1947
1948   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1949                                 destdir, destdir, destfile)
1950   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1951                                                    constants.GANETI_RUNAS,
1952                                                    destcmd)
1953
1954   # all commands have been checked, so we're safe to combine them
1955   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1956
1957   result = utils.RunCmd(["bash", "-c", command], env=export_env)
1958
1959   if result.failed:
1960     _Fail("OS snapshot export command '%s' returned error: %s"
1961           " output: %s", command, result.fail_reason, result.output)
1962
1963
1964 def FinalizeExport(instance, snap_disks):
1965   """Write out the export configuration information.
1966
1967   @type instance: L{objects.Instance}
1968   @param instance: the instance which we export, used for
1969       saving configuration
1970   @type snap_disks: list of L{objects.Disk}
1971   @param snap_disks: list of snapshot block devices, which
1972       will be used to get the actual name of the dump file
1973
1974   @rtype: None
1975
1976   """
1977   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1978   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1979
1980   config = objects.SerializableConfigParser()
1981
1982   config.add_section(constants.INISECT_EXP)
1983   config.set(constants.INISECT_EXP, 'version', '0')
1984   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1985   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1986   config.set(constants.INISECT_EXP, 'os', instance.os)
1987   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1988
1989   config.add_section(constants.INISECT_INS)
1990   config.set(constants.INISECT_INS, 'name', instance.name)
1991   config.set(constants.INISECT_INS, 'memory', '%d' %
1992              instance.beparams[constants.BE_MEMORY])
1993   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1994              instance.beparams[constants.BE_VCPUS])
1995   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1996
1997   nic_total = 0
1998   for nic_count, nic in enumerate(instance.nics):
1999     nic_total += 1
2000     config.set(constants.INISECT_INS, 'nic%d_mac' %
2001                nic_count, '%s' % nic.mac)
2002     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2003     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2004                '%s' % nic.bridge)
2005   # TODO: redundant: on load can read nics until it doesn't exist
2006   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2007
2008   disk_total = 0
2009   for disk_count, disk in enumerate(snap_disks):
2010     if disk:
2011       disk_total += 1
2012       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2013                  ('%s' % disk.iv_name))
2014       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2015                  ('%s' % disk.physical_id[1]))
2016       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2017                  ('%d' % disk.size))
2018
2019   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2020
2021   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2022                   data=config.Dumps())
2023   shutil.rmtree(finaldestdir, True)
2024   shutil.move(destdir, finaldestdir)
2025
2026
2027 def ExportInfo(dest):
2028   """Get export configuration information.
2029
2030   @type dest: str
2031   @param dest: directory containing the export
2032
2033   @rtype: L{objects.SerializableConfigParser}
2034   @return: a serializable config file containing the
2035       export info
2036
2037   """
2038   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2039
2040   config = objects.SerializableConfigParser()
2041   config.read(cff)
2042
2043   if (not config.has_section(constants.INISECT_EXP) or
2044       not config.has_section(constants.INISECT_INS)):
2045     _Fail("Export info file doesn't have the required fields")
2046
2047   return config.Dumps()
2048
2049
2050 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2051   """Import an os image into an instance.
2052
2053   @type instance: L{objects.Instance}
2054   @param instance: instance to import the disks into
2055   @type src_node: string
2056   @param src_node: source node for the disk images
2057   @type src_images: list of string
2058   @param src_images: absolute paths of the disk images
2059   @rtype: list of boolean
2060   @return: each boolean represent the success of importing the n-th disk
2061
2062   """
2063   inst_os = OSFromDisk(instance.os)
2064   import_env = OSEnvironment(instance, inst_os)
2065   import_script = inst_os.import_script
2066
2067   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2068                                         instance.name, int(time.time()))
2069   if not os.path.exists(constants.LOG_OS_DIR):
2070     os.mkdir(constants.LOG_OS_DIR, 0750)
2071
2072   comprcmd = "gunzip"
2073   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2074                                import_script, logfile)
2075
2076   final_result = []
2077   for idx, image in enumerate(src_images):
2078     if image:
2079       destcmd = utils.BuildShellCmd('cat %s', image)
2080       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2081                                                        constants.GANETI_RUNAS,
2082                                                        destcmd)
2083       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2084       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2085       import_env['IMPORT_INDEX'] = str(idx)
2086       result = utils.RunCmd(command, env=import_env)
2087       if result.failed:
2088         logging.error("Disk import command '%s' returned error: %s"
2089                       " output: %s", command, result.fail_reason,
2090                       result.output)
2091         final_result.append("error importing disk %d: %s, %s" %
2092                             (idx, result.fail_reason, result.output[-100]))
2093
2094   if final_result:
2095     _Fail("; ".join(final_result), log=False)
2096
2097
2098 def ListExports():
2099   """Return a list of exports currently available on this machine.
2100
2101   @rtype: list
2102   @return: list of the exports
2103
2104   """
2105   if os.path.isdir(constants.EXPORT_DIR):
2106     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2107   else:
2108     _Fail("No exports directory")
2109
2110
2111 def RemoveExport(export):
2112   """Remove an existing export from the node.
2113
2114   @type export: str
2115   @param export: the name of the export to remove
2116   @rtype: None
2117
2118   """
2119   target = os.path.join(constants.EXPORT_DIR, export)
2120
2121   try:
2122     shutil.rmtree(target)
2123   except EnvironmentError, err:
2124     _Fail("Error while removing the export: %s", err, exc=True)
2125
2126
2127 def BlockdevRename(devlist):
2128   """Rename a list of block devices.
2129
2130   @type devlist: list of tuples
2131   @param devlist: list of tuples of the form  (disk,
2132       new_logical_id, new_physical_id); disk is an
2133       L{objects.Disk} object describing the current disk,
2134       and new logical_id/physical_id is the name we
2135       rename it to
2136   @rtype: boolean
2137   @return: True if all renames succeeded, False otherwise
2138
2139   """
2140   msgs = []
2141   result = True
2142   for disk, unique_id in devlist:
2143     dev = _RecursiveFindBD(disk)
2144     if dev is None:
2145       msgs.append("Can't find device %s in rename" % str(disk))
2146       result = False
2147       continue
2148     try:
2149       old_rpath = dev.dev_path
2150       dev.Rename(unique_id)
2151       new_rpath = dev.dev_path
2152       if old_rpath != new_rpath:
2153         DevCacheManager.RemoveCache(old_rpath)
2154         # FIXME: we should add the new cache information here, like:
2155         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2156         # but we don't have the owner here - maybe parse from existing
2157         # cache? for now, we only lose lvm data when we rename, which
2158         # is less critical than DRBD or MD
2159     except errors.BlockDeviceError, err:
2160       msgs.append("Can't rename device '%s' to '%s': %s" %
2161                   (dev, unique_id, err))
2162       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2163       result = False
2164   if not result:
2165     _Fail("; ".join(msgs))
2166
2167
2168 def _TransformFileStorageDir(file_storage_dir):
2169   """Checks whether given file_storage_dir is valid.
2170
2171   Checks wheter the given file_storage_dir is within the cluster-wide
2172   default file_storage_dir stored in SimpleStore. Only paths under that
2173   directory are allowed.
2174
2175   @type file_storage_dir: str
2176   @param file_storage_dir: the path to check
2177
2178   @return: the normalized path if valid, None otherwise
2179
2180   """
2181   cfg = _GetConfig()
2182   file_storage_dir = os.path.normpath(file_storage_dir)
2183   base_file_storage_dir = cfg.GetFileStorageDir()
2184   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2185       base_file_storage_dir):
2186     _Fail("File storage directory '%s' is not under base file"
2187           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2188   return file_storage_dir
2189
2190
2191 def CreateFileStorageDir(file_storage_dir):
2192   """Create file storage directory.
2193
2194   @type file_storage_dir: str
2195   @param file_storage_dir: directory to create
2196
2197   @rtype: tuple
2198   @return: tuple with first element a boolean indicating wheter dir
2199       creation was successful or not
2200
2201   """
2202   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2203   if os.path.exists(file_storage_dir):
2204     if not os.path.isdir(file_storage_dir):
2205       _Fail("Specified storage dir '%s' is not a directory",
2206             file_storage_dir)
2207   else:
2208     try:
2209       os.makedirs(file_storage_dir, 0750)
2210     except OSError, err:
2211       _Fail("Cannot create file storage directory '%s': %s",
2212             file_storage_dir, err, exc=True)
2213
2214
2215 def RemoveFileStorageDir(file_storage_dir):
2216   """Remove file storage directory.
2217
2218   Remove it only if it's empty. If not log an error and return.
2219
2220   @type file_storage_dir: str
2221   @param file_storage_dir: the directory we should cleanup
2222   @rtype: tuple (success,)
2223   @return: tuple of one element, C{success}, denoting
2224       whether the operation was successful
2225
2226   """
2227   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2228   if os.path.exists(file_storage_dir):
2229     if not os.path.isdir(file_storage_dir):
2230       _Fail("Specified Storage directory '%s' is not a directory",
2231             file_storage_dir)
2232     # deletes dir only if empty, otherwise we want to fail the rpc call
2233     try:
2234       os.rmdir(file_storage_dir)
2235     except OSError, err:
2236       _Fail("Cannot remove file storage directory '%s': %s",
2237             file_storage_dir, err)
2238
2239
2240 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2241   """Rename the file storage directory.
2242
2243   @type old_file_storage_dir: str
2244   @param old_file_storage_dir: the current path
2245   @type new_file_storage_dir: str
2246   @param new_file_storage_dir: the name we should rename to
2247   @rtype: tuple (success,)
2248   @return: tuple of one element, C{success}, denoting
2249       whether the operation was successful
2250
2251   """
2252   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2253   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2254   if not os.path.exists(new_file_storage_dir):
2255     if os.path.isdir(old_file_storage_dir):
2256       try:
2257         os.rename(old_file_storage_dir, new_file_storage_dir)
2258       except OSError, err:
2259         _Fail("Cannot rename '%s' to '%s': %s",
2260               old_file_storage_dir, new_file_storage_dir, err)
2261     else:
2262       _Fail("Specified storage dir '%s' is not a directory",
2263             old_file_storage_dir)
2264   else:
2265     if os.path.exists(old_file_storage_dir):
2266       _Fail("Cannot rename '%s' to '%s': both locations exist",
2267             old_file_storage_dir, new_file_storage_dir)
2268
2269
2270 def _EnsureJobQueueFile(file_name):
2271   """Checks whether the given filename is in the queue directory.
2272
2273   @type file_name: str
2274   @param file_name: the file name we should check
2275   @rtype: None
2276   @raises RPCFail: if the file is not valid
2277
2278   """
2279   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2280   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2281
2282   if not result:
2283     _Fail("Passed job queue file '%s' does not belong to"
2284           " the queue directory '%s'", file_name, queue_dir)
2285
2286
2287 def JobQueueUpdate(file_name, content):
2288   """Updates a file in the queue directory.
2289
2290   This is just a wrapper over L{utils.WriteFile}, with proper
2291   checking.
2292
2293   @type file_name: str
2294   @param file_name: the job file name
2295   @type content: str
2296   @param content: the new job contents
2297   @rtype: boolean
2298   @return: the success of the operation
2299
2300   """
2301   _EnsureJobQueueFile(file_name)
2302
2303   # Write and replace the file atomically
2304   utils.WriteFile(file_name, data=_Decompress(content))
2305
2306
2307 def JobQueueRename(old, new):
2308   """Renames a job queue file.
2309
2310   This is just a wrapper over os.rename with proper checking.
2311
2312   @type old: str
2313   @param old: the old (actual) file name
2314   @type new: str
2315   @param new: the desired file name
2316   @rtype: tuple
2317   @return: the success of the operation and payload
2318
2319   """
2320   _EnsureJobQueueFile(old)
2321   _EnsureJobQueueFile(new)
2322
2323   utils.RenameFile(old, new, mkdir=True)
2324
2325
2326 def JobQueueSetDrainFlag(drain_flag):
2327   """Set the drain flag for the queue.
2328
2329   This will set or unset the queue drain flag.
2330
2331   @type drain_flag: boolean
2332   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2333   @rtype: truple
2334   @return: always True, None
2335   @warning: the function always returns True
2336
2337   """
2338   if drain_flag:
2339     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2340   else:
2341     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2342
2343
2344 def BlockdevClose(instance_name, disks):
2345   """Closes the given block devices.
2346
2347   This means they will be switched to secondary mode (in case of
2348   DRBD).
2349
2350   @param instance_name: if the argument is not empty, the symlinks
2351       of this instance will be removed
2352   @type disks: list of L{objects.Disk}
2353   @param disks: the list of disks to be closed
2354   @rtype: tuple (success, message)
2355   @return: a tuple of success and message, where success
2356       indicates the succes of the operation, and message
2357       which will contain the error details in case we
2358       failed
2359
2360   """
2361   bdevs = []
2362   for cf in disks:
2363     rd = _RecursiveFindBD(cf)
2364     if rd is None:
2365       _Fail("Can't find device %s", cf)
2366     bdevs.append(rd)
2367
2368   msg = []
2369   for rd in bdevs:
2370     try:
2371       rd.Close()
2372     except errors.BlockDeviceError, err:
2373       msg.append(str(err))
2374   if msg:
2375     _Fail("Can't make devices secondary: %s", ",".join(msg))
2376   else:
2377     if instance_name:
2378       _RemoveBlockDevLinks(instance_name, disks)
2379
2380
2381 def ValidateHVParams(hvname, hvparams):
2382   """Validates the given hypervisor parameters.
2383
2384   @type hvname: string
2385   @param hvname: the hypervisor name
2386   @type hvparams: dict
2387   @param hvparams: the hypervisor parameters to be validated
2388   @rtype: None
2389
2390   """
2391   try:
2392     hv_type = hypervisor.GetHypervisor(hvname)
2393     hv_type.ValidateParameters(hvparams)
2394   except errors.HypervisorError, err:
2395     _Fail(str(err), log=False)
2396
2397
2398 def DemoteFromMC():
2399   """Demotes the current node from master candidate role.
2400
2401   """
2402   # try to ensure we're not the master by mistake
2403   master, myself = ssconf.GetMasterAndMyself()
2404   if master == myself:
2405     _Fail("ssconf status shows I'm the master node, will not demote")
2406   pid_file = utils.DaemonPidFileName(constants.MASTERD)
2407   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2408     _Fail("The master daemon is running, will not demote")
2409   try:
2410     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2411       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2412   except EnvironmentError, err:
2413     if err.errno != errno.ENOENT:
2414       _Fail("Error while backing up cluster file: %s", err, exc=True)
2415   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2416
2417
2418 def _FindDisks(nodes_ip, disks):
2419   """Sets the physical ID on disks and returns the block devices.
2420
2421   """
2422   # set the correct physical ID
2423   my_name = utils.HostInfo().name
2424   for cf in disks:
2425     cf.SetPhysicalID(my_name, nodes_ip)
2426
2427   bdevs = []
2428
2429   for cf in disks:
2430     rd = _RecursiveFindBD(cf)
2431     if rd is None:
2432       _Fail("Can't find device %s", cf)
2433     bdevs.append(rd)
2434   return bdevs
2435
2436
2437 def DrbdDisconnectNet(nodes_ip, disks):
2438   """Disconnects the network on a list of drbd devices.
2439
2440   """
2441   bdevs = _FindDisks(nodes_ip, disks)
2442
2443   # disconnect disks
2444   for rd in bdevs:
2445     try:
2446       rd.DisconnectNet()
2447     except errors.BlockDeviceError, err:
2448       _Fail("Can't change network configuration to standalone mode: %s",
2449             err, exc=True)
2450
2451
2452 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2453   """Attaches the network on a list of drbd devices.
2454
2455   """
2456   bdevs = _FindDisks(nodes_ip, disks)
2457
2458   if multimaster:
2459     for idx, rd in enumerate(bdevs):
2460       try:
2461         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2462       except EnvironmentError, err:
2463         _Fail("Can't create symlink: %s", err)
2464   # reconnect disks, switch to new master configuration and if
2465   # needed primary mode
2466   for rd in bdevs:
2467     try:
2468       rd.AttachNet(multimaster)
2469     except errors.BlockDeviceError, err:
2470       _Fail("Can't change network configuration: %s", err)
2471   # wait until the disks are connected; we need to retry the re-attach
2472   # if the device becomes standalone, as this might happen if the one
2473   # node disconnects and reconnects in a different mode before the
2474   # other node reconnects; in this case, one or both of the nodes will
2475   # decide it has wrong configuration and switch to standalone
2476   RECONNECT_TIMEOUT = 2 * 60
2477   sleep_time = 0.100 # start with 100 miliseconds
2478   timeout_limit = time.time() + RECONNECT_TIMEOUT
2479   while time.time() < timeout_limit:
2480     all_connected = True
2481     for rd in bdevs:
2482       stats = rd.GetProcStatus()
2483       if not (stats.is_connected or stats.is_in_resync):
2484         all_connected = False
2485       if stats.is_standalone:
2486         # peer had different config info and this node became
2487         # standalone, even though this should not happen with the
2488         # new staged way of changing disk configs
2489         try:
2490           rd.AttachNet(multimaster)
2491         except errors.BlockDeviceError, err:
2492           _Fail("Can't change network configuration: %s", err)
2493     if all_connected:
2494       break
2495     time.sleep(sleep_time)
2496     sleep_time = min(5, sleep_time * 1.5)
2497   if not all_connected:
2498     _Fail("Timeout in disk reconnecting")
2499   if multimaster:
2500     # change to primary mode
2501     for rd in bdevs:
2502       try:
2503         rd.Open()
2504       except errors.BlockDeviceError, err:
2505         _Fail("Can't change to primary mode: %s", err)
2506
2507
2508 def DrbdWaitSync(nodes_ip, disks):
2509   """Wait until DRBDs have synchronized.
2510
2511   """
2512   bdevs = _FindDisks(nodes_ip, disks)
2513
2514   min_resync = 100
2515   alldone = True
2516   for rd in bdevs:
2517     stats = rd.GetProcStatus()
2518     if not (stats.is_connected or stats.is_in_resync):
2519       _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2520     alldone = alldone and (not stats.is_in_resync)
2521     if stats.sync_percent is not None:
2522       min_resync = min(min_resync, stats.sync_percent)
2523
2524   return (alldone, min_resync)
2525
2526
2527 def PowercycleNode(hypervisor_type):
2528   """Hard-powercycle the node.
2529
2530   Because we need to return first, and schedule the powercycle in the
2531   background, we won't be able to report failures nicely.
2532
2533   """
2534   hyper = hypervisor.GetHypervisor(hypervisor_type)
2535   try:
2536     pid = os.fork()
2537   except OSError:
2538     # if we can't fork, we'll pretend that we're in the child process
2539     pid = 0
2540   if pid > 0:
2541     return "Reboot scheduled in 5 seconds"
2542   time.sleep(5)
2543   hyper.PowercycleNode()
2544
2545
2546 class HooksRunner(object):
2547   """Hook runner.
2548
2549   This class is instantiated on the node side (ganeti-noded) and not
2550   on the master side.
2551
2552   """
2553   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2554
2555   def __init__(self, hooks_base_dir=None):
2556     """Constructor for hooks runner.
2557
2558     @type hooks_base_dir: str or None
2559     @param hooks_base_dir: if not None, this overrides the
2560         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2561
2562     """
2563     if hooks_base_dir is None:
2564       hooks_base_dir = constants.HOOKS_BASE_DIR
2565     self._BASE_DIR = hooks_base_dir
2566
2567   @staticmethod
2568   def ExecHook(script, env):
2569     """Exec one hook script.
2570
2571     @type script: str
2572     @param script: the full path to the script
2573     @type env: dict
2574     @param env: the environment with which to exec the script
2575     @rtype: tuple (success, message)
2576     @return: a tuple of success and message, where success
2577         indicates the succes of the operation, and message
2578         which will contain the error details in case we
2579         failed
2580
2581     """
2582     # exec the process using subprocess and log the output
2583     fdstdin = None
2584     try:
2585       fdstdin = open("/dev/null", "r")
2586       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2587                                stderr=subprocess.STDOUT, close_fds=True,
2588                                shell=False, cwd="/", env=env)
2589       output = ""
2590       try:
2591         output = child.stdout.read(4096)
2592         child.stdout.close()
2593       except EnvironmentError, err:
2594         output += "Hook script error: %s" % str(err)
2595
2596       while True:
2597         try:
2598           result = child.wait()
2599           break
2600         except EnvironmentError, err:
2601           if err.errno == errno.EINTR:
2602             continue
2603           raise
2604     finally:
2605       # try not to leak fds
2606       for fd in (fdstdin, ):
2607         if fd is not None:
2608           try:
2609             fd.close()
2610           except EnvironmentError, err:
2611             # just log the error
2612             #logging.exception("Error while closing fd %s", fd)
2613             pass
2614
2615     return result == 0, utils.SafeEncode(output.strip())
2616
2617   def RunHooks(self, hpath, phase, env):
2618     """Run the scripts in the hooks directory.
2619
2620     @type hpath: str
2621     @param hpath: the path to the hooks directory which
2622         holds the scripts
2623     @type phase: str
2624     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2625         L{constants.HOOKS_PHASE_POST}
2626     @type env: dict
2627     @param env: dictionary with the environment for the hook
2628     @rtype: list
2629     @return: list of 3-element tuples:
2630       - script path
2631       - script result, either L{constants.HKR_SUCCESS} or
2632         L{constants.HKR_FAIL}
2633       - output of the script
2634
2635     @raise errors.ProgrammerError: for invalid input
2636         parameters
2637
2638     """
2639     if phase == constants.HOOKS_PHASE_PRE:
2640       suffix = "pre"
2641     elif phase == constants.HOOKS_PHASE_POST:
2642       suffix = "post"
2643     else:
2644       _Fail("Unknown hooks phase '%s'", phase)
2645
2646     rr = []
2647
2648     subdir = "%s-%s.d" % (hpath, suffix)
2649     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2650     try:
2651       dir_contents = utils.ListVisibleFiles(dir_name)
2652     except OSError:
2653       # FIXME: must log output in case of failures
2654       return rr
2655
2656     # we use the standard python sort order,
2657     # so 00name is the recommended naming scheme
2658     dir_contents.sort()
2659     for relname in dir_contents:
2660       fname = os.path.join(dir_name, relname)
2661       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2662           self.RE_MASK.match(relname) is not None):
2663         rrval = constants.HKR_SKIP
2664         output = ""
2665       else:
2666         result, output = self.ExecHook(fname, env)
2667         if not result:
2668           rrval = constants.HKR_FAIL
2669         else:
2670           rrval = constants.HKR_SUCCESS
2671       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2672
2673     return rr
2674
2675
2676 class IAllocatorRunner(object):
2677   """IAllocator runner.
2678
2679   This class is instantiated on the node side (ganeti-noded) and not on
2680   the master side.
2681
2682   """
2683   def Run(self, name, idata):
2684     """Run an iallocator script.
2685
2686     @type name: str
2687     @param name: the iallocator script name
2688     @type idata: str
2689     @param idata: the allocator input data
2690
2691     @rtype: tuple
2692     @return: two element tuple of:
2693        - status
2694        - either error message or stdout of allocator (for success)
2695
2696     """
2697     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2698                                   os.path.isfile)
2699     if alloc_script is None:
2700       _Fail("iallocator module '%s' not found in the search path", name)
2701
2702     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2703     try:
2704       os.write(fd, idata)
2705       os.close(fd)
2706       result = utils.RunCmd([alloc_script, fin_name])
2707       if result.failed:
2708         _Fail("iallocator module '%s' failed: %s, output '%s'",
2709               name, result.fail_reason, result.output)
2710     finally:
2711       os.unlink(fin_name)
2712
2713     return result.stdout
2714
2715
2716 class DevCacheManager(object):
2717   """Simple class for managing a cache of block device information.
2718
2719   """
2720   _DEV_PREFIX = "/dev/"
2721   _ROOT_DIR = constants.BDEV_CACHE_DIR
2722
2723   @classmethod
2724   def _ConvertPath(cls, dev_path):
2725     """Converts a /dev/name path to the cache file name.
2726
2727     This replaces slashes with underscores and strips the /dev
2728     prefix. It then returns the full path to the cache file.
2729
2730     @type dev_path: str
2731     @param dev_path: the C{/dev/} path name
2732     @rtype: str
2733     @return: the converted path name
2734
2735     """
2736     if dev_path.startswith(cls._DEV_PREFIX):
2737       dev_path = dev_path[len(cls._DEV_PREFIX):]
2738     dev_path = dev_path.replace("/", "_")
2739     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2740     return fpath
2741
2742   @classmethod
2743   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2744     """Updates the cache information for a given device.
2745
2746     @type dev_path: str
2747     @param dev_path: the pathname of the device
2748     @type owner: str
2749     @param owner: the owner (instance name) of the device
2750     @type on_primary: bool
2751     @param on_primary: whether this is the primary
2752         node nor not
2753     @type iv_name: str
2754     @param iv_name: the instance-visible name of the
2755         device, as in objects.Disk.iv_name
2756
2757     @rtype: None
2758
2759     """
2760     if dev_path is None:
2761       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2762       return
2763     fpath = cls._ConvertPath(dev_path)
2764     if on_primary:
2765       state = "primary"
2766     else:
2767       state = "secondary"
2768     if iv_name is None:
2769       iv_name = "not_visible"
2770     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2771     try:
2772       utils.WriteFile(fpath, data=fdata)
2773     except EnvironmentError, err:
2774       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2775
2776   @classmethod
2777   def RemoveCache(cls, dev_path):
2778     """Remove data for a dev_path.
2779
2780     This is just a wrapper over L{utils.RemoveFile} with a converted
2781     path name and logging.
2782
2783     @type dev_path: str
2784     @param dev_path: the pathname of the device
2785
2786     @rtype: None
2787
2788     """
2789     if dev_path is None:
2790       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2791       return
2792     fpath = cls._ConvertPath(dev_path)
2793     try:
2794       utils.RemoveFile(fpath)
2795     except EnvironmentError, err:
2796       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)