Remove secrets and kill confd on cluster leave
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26
27 """
28
29
30 import os
31 import os.path
32 import shutil
33 import time
34 import stat
35 import errno
36 import re
37 import subprocess
38 import random
39 import logging
40 import tempfile
41 import zlib
42 import base64
43
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
52
53
54 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55
56
57 class RPCFail(Exception):
58   """Class denoting RPC failure.
59
60   Its argument is the error message.
61
62   """
63
64
65 def _Fail(msg, *args, **kwargs):
66   """Log an error and the raise an RPCFail exception.
67
68   This exception is then handled specially in the ganeti daemon and
69   turned into a 'failed' return type. As such, this function is a
70   useful shortcut for logging the error and returning it to the master
71   daemon.
72
73   @type msg: string
74   @param msg: the text of the exception
75   @raise RPCFail
76
77   """
78   if args:
79     msg = msg % args
80   if "log" not in kwargs or kwargs["log"]: # if we should log this error
81     if "exc" in kwargs and kwargs["exc"]:
82       logging.exception(msg)
83     else:
84       logging.error(msg)
85   raise RPCFail(msg)
86
87
88 def _GetConfig():
89   """Simple wrapper to return a SimpleStore.
90
91   @rtype: L{ssconf.SimpleStore}
92   @return: a SimpleStore instance
93
94   """
95   return ssconf.SimpleStore()
96
97
98 def _GetSshRunner(cluster_name):
99   """Simple wrapper to return an SshRunner.
100
101   @type cluster_name: str
102   @param cluster_name: the cluster name, which is needed
103       by the SshRunner constructor
104   @rtype: L{ssh.SshRunner}
105   @return: an SshRunner instance
106
107   """
108   return ssh.SshRunner(cluster_name)
109
110
111 def _Decompress(data):
112   """Unpacks data compressed by the RPC client.
113
114   @type data: list or tuple
115   @param data: Data sent by RPC client
116   @rtype: str
117   @return: Decompressed data
118
119   """
120   assert isinstance(data, (list, tuple))
121   assert len(data) == 2
122   (encoding, content) = data
123   if encoding == constants.RPC_ENCODING_NONE:
124     return content
125   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126     return zlib.decompress(base64.b64decode(content))
127   else:
128     raise AssertionError("Unknown data encoding")
129
130
131 def _CleanDirectory(path, exclude=None):
132   """Removes all regular files in a directory.
133
134   @type path: str
135   @param path: the directory to clean
136   @type exclude: list
137   @param exclude: list of files to be excluded, defaults
138       to the empty list
139
140   """
141   if not os.path.isdir(path):
142     return
143   if exclude is None:
144     exclude = []
145   else:
146     # Normalize excluded paths
147     exclude = [os.path.normpath(i) for i in exclude]
148
149   for rel_name in utils.ListVisibleFiles(path):
150     full_name = os.path.normpath(os.path.join(path, rel_name))
151     if full_name in exclude:
152       continue
153     if os.path.isfile(full_name) and not os.path.islink(full_name):
154       utils.RemoveFile(full_name)
155
156
157 def _BuildUploadFileList():
158   """Build the list of allowed upload files.
159
160   This is abstracted so that it's built only once at module import time.
161
162   """
163   allowed_files = set([
164     constants.CLUSTER_CONF_FILE,
165     constants.ETC_HOSTS,
166     constants.SSH_KNOWN_HOSTS_FILE,
167     constants.VNC_PASSWORD_FILE,
168     constants.RAPI_CERT_FILE,
169     constants.RAPI_USERS_FILE,
170     constants.HMAC_CLUSTER_KEY,
171     ])
172
173   for hv_name in constants.HYPER_TYPES:
174     hv_class = hypervisor.GetHypervisorClass(hv_name)
175     allowed_files.update(hv_class.GetAncillaryFiles())
176
177   return frozenset(allowed_files)
178
179
180 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181
182
183 def JobQueuePurge():
184   """Removes job queue files and archived jobs.
185
186   @rtype: tuple
187   @return: True, None
188
189   """
190   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192
193
194 def GetMasterInfo():
195   """Returns master information.
196
197   This is an utility function to compute master information, either
198   for consumption here or from the node daemon.
199
200   @rtype: tuple
201   @return: master_netdev, master_ip, master_name
202   @raise RPCFail: in case of errors
203
204   """
205   try:
206     cfg = _GetConfig()
207     master_netdev = cfg.GetMasterNetdev()
208     master_ip = cfg.GetMasterIP()
209     master_node = cfg.GetMasterNode()
210   except errors.ConfigurationError, err:
211     _Fail("Cluster configuration incomplete: %s", err, exc=True)
212   return (master_netdev, master_ip, master_node)
213
214
215 def StartMaster(start_daemons, no_voting):
216   """Activate local node as master node.
217
218   The function will always try activate the IP address of the master
219   (unless someone else has it). It will also start the master daemons,
220   based on the start_daemons parameter.
221
222   @type start_daemons: boolean
223   @param start_daemons: whether to also start the master
224       daemons (ganeti-masterd and ganeti-rapi)
225   @type no_voting: boolean
226   @param no_voting: whether to start ganeti-masterd without a node vote
227       (if start_daemons is True), but still non-interactively
228   @rtype: None
229
230   """
231   # GetMasterInfo will raise an exception if not able to return data
232   master_netdev, master_ip, _ = GetMasterInfo()
233
234   err_msgs = []
235   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236     if utils.OwnIpAddress(master_ip):
237       # we already have the ip:
238       logging.debug("Master IP already configured, doing nothing")
239     else:
240       msg = "Someone else has the master ip, not activating"
241       logging.error(msg)
242       err_msgs.append(msg)
243   else:
244     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245                            "dev", master_netdev, "label",
246                            "%s:0" % master_netdev])
247     if result.failed:
248       msg = "Can't activate master IP: %s" % result.output
249       logging.error(msg)
250       err_msgs.append(msg)
251
252     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253                            "-s", master_ip, master_ip])
254     # we'll ignore the exit code of arping
255
256   # and now start the master and rapi daemons
257   if start_daemons:
258     daemons_params = {
259         'ganeti-masterd': [],
260         'ganeti-rapi': [],
261         }
262     if no_voting:
263       daemons_params['ganeti-masterd'].append('--no-voting')
264       daemons_params['ganeti-masterd'].append('--yes-do-it')
265     for daemon in daemons_params:
266       cmd = [daemon]
267       cmd.extend(daemons_params[daemon])
268       result = utils.RunCmd(cmd)
269       if result.failed:
270         msg = "Can't start daemon %s: %s" % (daemon, result.output)
271         logging.error(msg)
272         err_msgs.append(msg)
273
274   if err_msgs:
275     _Fail("; ".join(err_msgs))
276
277
278 def StopMaster(stop_daemons):
279   """Deactivate this node as master.
280
281   The function will always try to deactivate the IP address of the
282   master. It will also stop the master daemons depending on the
283   stop_daemons parameter.
284
285   @type stop_daemons: boolean
286   @param stop_daemons: whether to also stop the master daemons
287       (ganeti-masterd and ganeti-rapi)
288   @rtype: None
289
290   """
291   # TODO: log and report back to the caller the error failures; we
292   # need to decide in which case we fail the RPC for this
293
294   # GetMasterInfo will raise an exception if not able to return data
295   master_netdev, master_ip, _ = GetMasterInfo()
296
297   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298                          "dev", master_netdev])
299   if result.failed:
300     logging.error("Can't remove the master IP, error: %s", result.output)
301     # but otherwise ignore the failure
302
303   if stop_daemons:
304     # stop/kill the rapi and the master daemon
305     for daemon in constants.RAPI, constants.MASTERD:
306       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307
308
309 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310   """Joins this node to the cluster.
311
312   This does the following:
313       - updates the hostkeys of the machine (rsa and dsa)
314       - adds the ssh private key to the user
315       - adds the ssh public key to the users' authorized_keys file
316
317   @type dsa: str
318   @param dsa: the DSA private key to write
319   @type dsapub: str
320   @param dsapub: the DSA public key to write
321   @type rsa: str
322   @param rsa: the RSA private key to write
323   @type rsapub: str
324   @param rsapub: the RSA public key to write
325   @type sshkey: str
326   @param sshkey: the SSH private key to write
327   @type sshpub: str
328   @param sshpub: the SSH public key to write
329   @rtype: boolean
330   @return: the success of the operation
331
332   """
333   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337   for name, content, mode in sshd_keys:
338     utils.WriteFile(name, data=content, mode=mode)
339
340   try:
341     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342                                                     mkdir=True)
343   except errors.OpExecError, err:
344     _Fail("Error while processing user ssh files: %s", err, exc=True)
345
346   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347     utils.WriteFile(name, data=content, mode=0600)
348
349   utils.AddAuthorizedKey(auth_keys, sshpub)
350
351   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352
353
354 def LeaveCluster():
355   """Cleans up and remove the current node.
356
357   This function cleans up and prepares the current node to be removed
358   from the cluster.
359
360   If processing is successful, then it raises an
361   L{errors.QuitGanetiException} which is used as a special case to
362   shutdown the node daemon.
363
364   """
365   _CleanDirectory(constants.DATA_DIR)
366   JobQueuePurge()
367
368   try:
369     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370
371     utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372
373     utils.RemoveFile(priv_key)
374     utils.RemoveFile(pub_key)
375   except errors.OpExecError:
376     logging.exception("Error while processing ssh files")
377
378   try:
379     utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380     utils.RemoveFile(constants.RAPI_CERT_FILE)
381     utils.RemoveFile(constants.SSL_CERT_FILE)
382   except:
383     logging.exception("Error while removing cluster secrets")
384
385   confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
386
387   if confd_pid:
388     utils.KillProcess(confd_pid, timeout=2)
389
390   # Raise a custom exception (handled in ganeti-noded)
391   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
392
393
394 def GetNodeInfo(vgname, hypervisor_type):
395   """Gives back a hash with different information about the node.
396
397   @type vgname: C{string}
398   @param vgname: the name of the volume group to ask for disk space information
399   @type hypervisor_type: C{str}
400   @param hypervisor_type: the name of the hypervisor to ask for
401       memory information
402   @rtype: C{dict}
403   @return: dictionary with the following keys:
404       - vg_size is the size of the configured volume group in MiB
405       - vg_free is the free size of the volume group in MiB
406       - memory_dom0 is the memory allocated for domain0 in MiB
407       - memory_free is the currently available (free) ram in MiB
408       - memory_total is the total number of ram in MiB
409
410   """
411   outputarray = {}
412   vginfo = _GetVGInfo(vgname)
413   outputarray['vg_size'] = vginfo['vg_size']
414   outputarray['vg_free'] = vginfo['vg_free']
415
416   hyper = hypervisor.GetHypervisor(hypervisor_type)
417   hyp_info = hyper.GetNodeInfo()
418   if hyp_info is not None:
419     outputarray.update(hyp_info)
420
421   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
422
423   return outputarray
424
425
426 def VerifyNode(what, cluster_name):
427   """Verify the status of the local node.
428
429   Based on the input L{what} parameter, various checks are done on the
430   local node.
431
432   If the I{filelist} key is present, this list of
433   files is checksummed and the file/checksum pairs are returned.
434
435   If the I{nodelist} key is present, we check that we have
436   connectivity via ssh with the target nodes (and check the hostname
437   report).
438
439   If the I{node-net-test} key is present, we check that we have
440   connectivity to the given nodes via both primary IP and, if
441   applicable, secondary IPs.
442
443   @type what: C{dict}
444   @param what: a dictionary of things to check:
445       - filelist: list of files for which to compute checksums
446       - nodelist: list of nodes we should check ssh communication with
447       - node-net-test: list of nodes we should check node daemon port
448         connectivity with
449       - hypervisor: list with hypervisors to run the verify for
450   @rtype: dict
451   @return: a dictionary with the same keys as the input dict, and
452       values representing the result of the checks
453
454   """
455   result = {}
456
457   if constants.NV_HYPERVISOR in what:
458     result[constants.NV_HYPERVISOR] = tmp = {}
459     for hv_name in what[constants.NV_HYPERVISOR]:
460       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
461
462   if constants.NV_FILELIST in what:
463     result[constants.NV_FILELIST] = utils.FingerprintFiles(
464       what[constants.NV_FILELIST])
465
466   if constants.NV_NODELIST in what:
467     result[constants.NV_NODELIST] = tmp = {}
468     random.shuffle(what[constants.NV_NODELIST])
469     for node in what[constants.NV_NODELIST]:
470       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
471       if not success:
472         tmp[node] = message
473
474   if constants.NV_NODENETTEST in what:
475     result[constants.NV_NODENETTEST] = tmp = {}
476     my_name = utils.HostInfo().name
477     my_pip = my_sip = None
478     for name, pip, sip in what[constants.NV_NODENETTEST]:
479       if name == my_name:
480         my_pip = pip
481         my_sip = sip
482         break
483     if not my_pip:
484       tmp[my_name] = ("Can't find my own primary/secondary IP"
485                       " in the node list")
486     else:
487       port = utils.GetDaemonPort(constants.NODED)
488       for name, pip, sip in what[constants.NV_NODENETTEST]:
489         fail = []
490         if not utils.TcpPing(pip, port, source=my_pip):
491           fail.append("primary")
492         if sip != pip:
493           if not utils.TcpPing(sip, port, source=my_sip):
494             fail.append("secondary")
495         if fail:
496           tmp[name] = ("failure using the %s interface(s)" %
497                        " and ".join(fail))
498
499   if constants.NV_LVLIST in what:
500     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
501
502   if constants.NV_INSTANCELIST in what:
503     result[constants.NV_INSTANCELIST] = GetInstanceList(
504       what[constants.NV_INSTANCELIST])
505
506   if constants.NV_VGLIST in what:
507     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
508
509   if constants.NV_VERSION in what:
510     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511                                     constants.RELEASE_VERSION)
512
513   if constants.NV_HVINFO in what:
514     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
516
517   if constants.NV_DRBDLIST in what:
518     try:
519       used_minors = bdev.DRBD8.GetUsedDevs().keys()
520     except errors.BlockDeviceError, err:
521       logging.warning("Can't get used minors list", exc_info=True)
522       used_minors = str(err)
523     result[constants.NV_DRBDLIST] = used_minors
524
525   return result
526
527
528 def GetVolumeList(vg_name):
529   """Compute list of logical volumes and their size.
530
531   @type vg_name: str
532   @param vg_name: the volume group whose LVs we should list
533   @rtype: dict
534   @return:
535       dictionary of all partions (key) with value being a tuple of
536       their size (in MiB), inactive and online status::
537
538         {'test1': ('20.06', True, True)}
539
540       in case of errors, a string is returned with the error
541       details.
542
543   """
544   lvs = {}
545   sep = '|'
546   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547                          "--separator=%s" % sep,
548                          "-olv_name,lv_size,lv_attr", vg_name])
549   if result.failed:
550     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
551
552   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
553   for line in result.stdout.splitlines():
554     line = line.strip()
555     match = valid_line_re.match(line)
556     if not match:
557       logging.error("Invalid line returned from lvs output: '%s'", line)
558       continue
559     name, size, attr = match.groups()
560     inactive = attr[4] == '-'
561     online = attr[5] == 'o'
562     virtual = attr[0] == 'v'
563     if virtual:
564       # we don't want to report such volumes as existing, since they
565       # don't really hold data
566       continue
567     lvs[name] = (size, inactive, online)
568
569   return lvs
570
571
572 def ListVolumeGroups():
573   """List the volume groups and their size.
574
575   @rtype: dict
576   @return: dictionary with keys volume name and values the
577       size of the volume
578
579   """
580   return utils.ListVolumeGroups()
581
582
583 def NodeVolumes():
584   """List all volumes on this node.
585
586   @rtype: list
587   @return:
588     A list of dictionaries, each having four keys:
589       - name: the logical volume name,
590       - size: the size of the logical volume
591       - dev: the physical device on which the LV lives
592       - vg: the volume group to which it belongs
593
594     In case of errors, we return an empty list and log the
595     error.
596
597     Note that since a logical volume can live on multiple physical
598     volumes, the resulting list might include a logical volume
599     multiple times.
600
601   """
602   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
603                          "--separator=|",
604                          "--options=lv_name,lv_size,devices,vg_name"])
605   if result.failed:
606     _Fail("Failed to list logical volumes, lvs output: %s",
607           result.output)
608
609   def parse_dev(dev):
610     if '(' in dev:
611       return dev.split('(')[0]
612     else:
613       return dev
614
615   def map_line(line):
616     return {
617       'name': line[0].strip(),
618       'size': line[1].strip(),
619       'dev': parse_dev(line[2].strip()),
620       'vg': line[3].strip(),
621     }
622
623   return [map_line(line.split('|')) for line in result.stdout.splitlines()
624           if line.count('|') >= 3]
625
626
627 def BridgesExist(bridges_list):
628   """Check if a list of bridges exist on the current node.
629
630   @rtype: boolean
631   @return: C{True} if all of them exist, C{False} otherwise
632
633   """
634   missing = []
635   for bridge in bridges_list:
636     if not utils.BridgeExists(bridge):
637       missing.append(bridge)
638
639   if missing:
640     _Fail("Missing bridges %s", ", ".join(missing))
641
642
643 def GetInstanceList(hypervisor_list):
644   """Provides a list of instances.
645
646   @type hypervisor_list: list
647   @param hypervisor_list: the list of hypervisors to query information
648
649   @rtype: list
650   @return: a list of all running instances on the current node
651     - instance1.example.com
652     - instance2.example.com
653
654   """
655   results = []
656   for hname in hypervisor_list:
657     try:
658       names = hypervisor.GetHypervisor(hname).ListInstances()
659       results.extend(names)
660     except errors.HypervisorError, err:
661       _Fail("Error enumerating instances (hypervisor %s): %s",
662             hname, err, exc=True)
663
664   return results
665
666
667 def GetInstanceInfo(instance, hname):
668   """Gives back the information about an instance as a dictionary.
669
670   @type instance: string
671   @param instance: the instance name
672   @type hname: string
673   @param hname: the hypervisor type of the instance
674
675   @rtype: dict
676   @return: dictionary with the following keys:
677       - memory: memory size of instance (int)
678       - state: xen state of instance (string)
679       - time: cpu time of instance (float)
680
681   """
682   output = {}
683
684   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
685   if iinfo is not None:
686     output['memory'] = iinfo[2]
687     output['state'] = iinfo[4]
688     output['time'] = iinfo[5]
689
690   return output
691
692
693 def GetInstanceMigratable(instance):
694   """Gives whether an instance can be migrated.
695
696   @type instance: L{objects.Instance}
697   @param instance: object representing the instance to be checked.
698
699   @rtype: tuple
700   @return: tuple of (result, description) where:
701       - result: whether the instance can be migrated or not
702       - description: a description of the issue, if relevant
703
704   """
705   hyper = hypervisor.GetHypervisor(instance.hypervisor)
706   iname = instance.name
707   if iname not in hyper.ListInstances():
708     _Fail("Instance %s is not running", iname)
709
710   for idx in range(len(instance.disks)):
711     link_name = _GetBlockDevSymlinkPath(iname, idx)
712     if not os.path.islink(link_name):
713       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
714
715
716 def GetAllInstancesInfo(hypervisor_list):
717   """Gather data about all instances.
718
719   This is the equivalent of L{GetInstanceInfo}, except that it
720   computes data for all instances at once, thus being faster if one
721   needs data about more than one instance.
722
723   @type hypervisor_list: list
724   @param hypervisor_list: list of hypervisors to query for instance data
725
726   @rtype: dict
727   @return: dictionary of instance: data, with data having the following keys:
728       - memory: memory size of instance (int)
729       - state: xen state of instance (string)
730       - time: cpu time of instance (float)
731       - vcpus: the number of vcpus
732
733   """
734   output = {}
735
736   for hname in hypervisor_list:
737     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
738     if iinfo:
739       for name, _, memory, vcpus, state, times in iinfo:
740         value = {
741           'memory': memory,
742           'vcpus': vcpus,
743           'state': state,
744           'time': times,
745           }
746         if name in output:
747           # we only check static parameters, like memory and vcpus,
748           # and not state and time which can change between the
749           # invocations of the different hypervisors
750           for key in 'memory', 'vcpus':
751             if value[key] != output[name][key]:
752               _Fail("Instance %s is running twice"
753                     " with different parameters", name)
754         output[name] = value
755
756   return output
757
758
759 def InstanceOsAdd(instance, reinstall):
760   """Add an OS to an instance.
761
762   @type instance: L{objects.Instance}
763   @param instance: Instance whose OS is to be installed
764   @type reinstall: boolean
765   @param reinstall: whether this is an instance reinstall
766   @rtype: None
767
768   """
769   inst_os = OSFromDisk(instance.os)
770
771   create_env = OSEnvironment(instance, inst_os)
772   if reinstall:
773     create_env['INSTANCE_REINSTALL'] = "1"
774
775   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
776                                      instance.name, int(time.time()))
777
778   result = utils.RunCmd([inst_os.create_script], env=create_env,
779                         cwd=inst_os.path, output=logfile,)
780   if result.failed:
781     logging.error("os create command '%s' returned error: %s, logfile: %s,"
782                   " output: %s", result.cmd, result.fail_reason, logfile,
783                   result.output)
784     lines = [utils.SafeEncode(val)
785              for val in utils.TailFile(logfile, lines=20)]
786     _Fail("OS create script failed (%s), last lines in the"
787           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
788
789
790 def RunRenameInstance(instance, old_name):
791   """Run the OS rename script for an instance.
792
793   @type instance: L{objects.Instance}
794   @param instance: Instance whose OS is to be installed
795   @type old_name: string
796   @param old_name: previous instance name
797   @rtype: boolean
798   @return: the success of the operation
799
800   """
801   inst_os = OSFromDisk(instance.os)
802
803   rename_env = OSEnvironment(instance, inst_os)
804   rename_env['OLD_INSTANCE_NAME'] = old_name
805
806   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
807                                            old_name,
808                                            instance.name, int(time.time()))
809
810   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
811                         cwd=inst_os.path, output=logfile)
812
813   if result.failed:
814     logging.error("os create command '%s' returned error: %s output: %s",
815                   result.cmd, result.fail_reason, result.output)
816     lines = [utils.SafeEncode(val)
817              for val in utils.TailFile(logfile, lines=20)]
818     _Fail("OS rename script failed (%s), last lines in the"
819           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
820
821
822 def _GetVGInfo(vg_name):
823   """Get information about the volume group.
824
825   @type vg_name: str
826   @param vg_name: the volume group which we query
827   @rtype: dict
828   @return:
829     A dictionary with the following keys:
830       - C{vg_size} is the total size of the volume group in MiB
831       - C{vg_free} is the free size of the volume group in MiB
832       - C{pv_count} are the number of physical disks in that VG
833
834     If an error occurs during gathering of data, we return the same dict
835     with keys all set to None.
836
837   """
838   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
839
840   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
841                          "--nosuffix", "--units=m", "--separator=:", vg_name])
842
843   if retval.failed:
844     logging.error("volume group %s not present", vg_name)
845     return retdic
846   valarr = retval.stdout.strip().rstrip(':').split(':')
847   if len(valarr) == 3:
848     try:
849       retdic = {
850         "vg_size": int(round(float(valarr[0]), 0)),
851         "vg_free": int(round(float(valarr[1]), 0)),
852         "pv_count": int(valarr[2]),
853         }
854     except ValueError, err:
855       logging.exception("Fail to parse vgs output: %s", err)
856   else:
857     logging.error("vgs output has the wrong number of fields (expected"
858                   " three): %s", str(valarr))
859   return retdic
860
861
862 def _GetBlockDevSymlinkPath(instance_name, idx):
863   return os.path.join(constants.DISK_LINKS_DIR,
864                       "%s:%d" % (instance_name, idx))
865
866
867 def _SymlinkBlockDev(instance_name, device_path, idx):
868   """Set up symlinks to a instance's block device.
869
870   This is an auxiliary function run when an instance is start (on the primary
871   node) or when an instance is migrated (on the target node).
872
873
874   @param instance_name: the name of the target instance
875   @param device_path: path of the physical block device, on the node
876   @param idx: the disk index
877   @return: absolute path to the disk's symlink
878
879   """
880   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
881   try:
882     os.symlink(device_path, link_name)
883   except OSError, err:
884     if err.errno == errno.EEXIST:
885       if (not os.path.islink(link_name) or
886           os.readlink(link_name) != device_path):
887         os.remove(link_name)
888         os.symlink(device_path, link_name)
889     else:
890       raise
891
892   return link_name
893
894
895 def _RemoveBlockDevLinks(instance_name, disks):
896   """Remove the block device symlinks belonging to the given instance.
897
898   """
899   for idx, _ in enumerate(disks):
900     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
901     if os.path.islink(link_name):
902       try:
903         os.remove(link_name)
904       except OSError:
905         logging.exception("Can't remove symlink '%s'", link_name)
906
907
908 def _GatherAndLinkBlockDevs(instance):
909   """Set up an instance's block device(s).
910
911   This is run on the primary node at instance startup. The block
912   devices must be already assembled.
913
914   @type instance: L{objects.Instance}
915   @param instance: the instance whose disks we shoul assemble
916   @rtype: list
917   @return: list of (disk_object, device_path)
918
919   """
920   block_devices = []
921   for idx, disk in enumerate(instance.disks):
922     device = _RecursiveFindBD(disk)
923     if device is None:
924       raise errors.BlockDeviceError("Block device '%s' is not set up." %
925                                     str(disk))
926     device.Open()
927     try:
928       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
929     except OSError, e:
930       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
931                                     e.strerror)
932
933     block_devices.append((disk, link_name))
934
935   return block_devices
936
937
938 def StartInstance(instance):
939   """Start an instance.
940
941   @type instance: L{objects.Instance}
942   @param instance: the instance object
943   @rtype: None
944
945   """
946   running_instances = GetInstanceList([instance.hypervisor])
947
948   if instance.name in running_instances:
949     logging.info("Instance %s already running, not starting", instance.name)
950     return
951
952   try:
953     block_devices = _GatherAndLinkBlockDevs(instance)
954     hyper = hypervisor.GetHypervisor(instance.hypervisor)
955     hyper.StartInstance(instance, block_devices)
956   except errors.BlockDeviceError, err:
957     _Fail("Block device error: %s", err, exc=True)
958   except errors.HypervisorError, err:
959     _RemoveBlockDevLinks(instance.name, instance.disks)
960     _Fail("Hypervisor error: %s", err, exc=True)
961
962
963 def InstanceShutdown(instance):
964   """Shut an instance down.
965
966   @note: this functions uses polling with a hardcoded timeout.
967
968   @type instance: L{objects.Instance}
969   @param instance: the instance object
970   @rtype: None
971
972   """
973   hv_name = instance.hypervisor
974   running_instances = GetInstanceList([hv_name])
975   iname = instance.name
976
977   if iname not in running_instances:
978     logging.info("Instance %s not running, doing nothing", iname)
979     return
980
981   hyper = hypervisor.GetHypervisor(hv_name)
982   try:
983     hyper.StopInstance(instance)
984   except errors.HypervisorError, err:
985     _Fail("Failed to stop instance %s: %s", iname, err)
986
987   # test every 10secs for 2min
988
989   time.sleep(1)
990   for _ in range(11):
991     if instance.name not in GetInstanceList([hv_name]):
992       break
993     time.sleep(10)
994   else:
995     # the shutdown did not succeed
996     logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
997
998     try:
999       hyper.StopInstance(instance, force=True)
1000     except errors.HypervisorError, err:
1001       _Fail("Failed to force stop instance %s: %s", iname, err)
1002
1003     time.sleep(1)
1004     if instance.name in GetInstanceList([hv_name]):
1005       _Fail("Could not shutdown instance %s even by destroy", iname)
1006
1007   _RemoveBlockDevLinks(iname, instance.disks)
1008
1009
1010 def InstanceReboot(instance, reboot_type):
1011   """Reboot an instance.
1012
1013   @type instance: L{objects.Instance}
1014   @param instance: the instance object to reboot
1015   @type reboot_type: str
1016   @param reboot_type: the type of reboot, one the following
1017     constants:
1018       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1019         instance OS, do not recreate the VM
1020       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1021         restart the VM (at the hypervisor level)
1022       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1023         not accepted here, since that mode is handled differently, in
1024         cmdlib, and translates into full stop and start of the
1025         instance (instead of a call_instance_reboot RPC)
1026   @rtype: None
1027
1028   """
1029   running_instances = GetInstanceList([instance.hypervisor])
1030
1031   if instance.name not in running_instances:
1032     _Fail("Cannot reboot instance %s that is not running", instance.name)
1033
1034   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1035   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1036     try:
1037       hyper.RebootInstance(instance)
1038     except errors.HypervisorError, err:
1039       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1040   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1041     try:
1042       InstanceShutdown(instance)
1043       return StartInstance(instance)
1044     except errors.HypervisorError, err:
1045       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1046   else:
1047     _Fail("Invalid reboot_type received: %s", reboot_type)
1048
1049
1050 def MigrationInfo(instance):
1051   """Gather information about an instance to be migrated.
1052
1053   @type instance: L{objects.Instance}
1054   @param instance: the instance definition
1055
1056   """
1057   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1058   try:
1059     info = hyper.MigrationInfo(instance)
1060   except errors.HypervisorError, err:
1061     _Fail("Failed to fetch migration information: %s", err, exc=True)
1062   return info
1063
1064
1065 def AcceptInstance(instance, info, target):
1066   """Prepare the node to accept an instance.
1067
1068   @type instance: L{objects.Instance}
1069   @param instance: the instance definition
1070   @type info: string/data (opaque)
1071   @param info: migration information, from the source node
1072   @type target: string
1073   @param target: target host (usually ip), on this node
1074
1075   """
1076   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1077   try:
1078     hyper.AcceptInstance(instance, info, target)
1079   except errors.HypervisorError, err:
1080     _Fail("Failed to accept instance: %s", err, exc=True)
1081
1082
1083 def FinalizeMigration(instance, info, success):
1084   """Finalize any preparation to accept an instance.
1085
1086   @type instance: L{objects.Instance}
1087   @param instance: the instance definition
1088   @type info: string/data (opaque)
1089   @param info: migration information, from the source node
1090   @type success: boolean
1091   @param success: whether the migration was a success or a failure
1092
1093   """
1094   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1095   try:
1096     hyper.FinalizeMigration(instance, info, success)
1097   except errors.HypervisorError, err:
1098     _Fail("Failed to finalize migration: %s", err, exc=True)
1099
1100
1101 def MigrateInstance(instance, target, live):
1102   """Migrates an instance to another node.
1103
1104   @type instance: L{objects.Instance}
1105   @param instance: the instance definition
1106   @type target: string
1107   @param target: the target node name
1108   @type live: boolean
1109   @param live: whether the migration should be done live or not (the
1110       interpretation of this parameter is left to the hypervisor)
1111   @rtype: tuple
1112   @return: a tuple of (success, msg) where:
1113       - succes is a boolean denoting the success/failure of the operation
1114       - msg is a string with details in case of failure
1115
1116   """
1117   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1118
1119   try:
1120     hyper.MigrateInstance(instance.name, target, live)
1121   except errors.HypervisorError, err:
1122     _Fail("Failed to migrate instance: %s", err, exc=True)
1123
1124
1125 def BlockdevCreate(disk, size, owner, on_primary, info):
1126   """Creates a block device for an instance.
1127
1128   @type disk: L{objects.Disk}
1129   @param disk: the object describing the disk we should create
1130   @type size: int
1131   @param size: the size of the physical underlying device, in MiB
1132   @type owner: str
1133   @param owner: the name of the instance for which disk is created,
1134       used for device cache data
1135   @type on_primary: boolean
1136   @param on_primary:  indicates if it is the primary node or not
1137   @type info: string
1138   @param info: string that will be sent to the physical device
1139       creation, used for example to set (LVM) tags on LVs
1140
1141   @return: the new unique_id of the device (this can sometime be
1142       computed only after creation), or None. On secondary nodes,
1143       it's not required to return anything.
1144
1145   """
1146   clist = []
1147   if disk.children:
1148     for child in disk.children:
1149       try:
1150         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1151       except errors.BlockDeviceError, err:
1152         _Fail("Can't assemble device %s: %s", child, err)
1153       if on_primary or disk.AssembleOnSecondary():
1154         # we need the children open in case the device itself has to
1155         # be assembled
1156         try:
1157           crdev.Open()
1158         except errors.BlockDeviceError, err:
1159           _Fail("Can't make child '%s' read-write: %s", child, err)
1160       clist.append(crdev)
1161
1162   try:
1163     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1164   except errors.BlockDeviceError, err:
1165     _Fail("Can't create block device: %s", err)
1166
1167   if on_primary or disk.AssembleOnSecondary():
1168     try:
1169       device.Assemble()
1170     except errors.BlockDeviceError, err:
1171       _Fail("Can't assemble device after creation, unusual event: %s", err)
1172     device.SetSyncSpeed(constants.SYNC_SPEED)
1173     if on_primary or disk.OpenOnSecondary():
1174       try:
1175         device.Open(force=True)
1176       except errors.BlockDeviceError, err:
1177         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1178     DevCacheManager.UpdateCache(device.dev_path, owner,
1179                                 on_primary, disk.iv_name)
1180
1181   device.SetInfo(info)
1182
1183   return device.unique_id
1184
1185
1186 def BlockdevRemove(disk):
1187   """Remove a block device.
1188
1189   @note: This is intended to be called recursively.
1190
1191   @type disk: L{objects.Disk}
1192   @param disk: the disk object we should remove
1193   @rtype: boolean
1194   @return: the success of the operation
1195
1196   """
1197   msgs = []
1198   try:
1199     rdev = _RecursiveFindBD(disk)
1200   except errors.BlockDeviceError, err:
1201     # probably can't attach
1202     logging.info("Can't attach to device %s in remove", disk)
1203     rdev = None
1204   if rdev is not None:
1205     r_path = rdev.dev_path
1206     try:
1207       rdev.Remove()
1208     except errors.BlockDeviceError, err:
1209       msgs.append(str(err))
1210     if not msgs:
1211       DevCacheManager.RemoveCache(r_path)
1212
1213   if disk.children:
1214     for child in disk.children:
1215       try:
1216         BlockdevRemove(child)
1217       except RPCFail, err:
1218         msgs.append(str(err))
1219
1220   if msgs:
1221     _Fail("; ".join(msgs))
1222
1223
1224 def _RecursiveAssembleBD(disk, owner, as_primary):
1225   """Activate a block device for an instance.
1226
1227   This is run on the primary and secondary nodes for an instance.
1228
1229   @note: this function is called recursively.
1230
1231   @type disk: L{objects.Disk}
1232   @param disk: the disk we try to assemble
1233   @type owner: str
1234   @param owner: the name of the instance which owns the disk
1235   @type as_primary: boolean
1236   @param as_primary: if we should make the block device
1237       read/write
1238
1239   @return: the assembled device or None (in case no device
1240       was assembled)
1241   @raise errors.BlockDeviceError: in case there is an error
1242       during the activation of the children or the device
1243       itself
1244
1245   """
1246   children = []
1247   if disk.children:
1248     mcn = disk.ChildrenNeeded()
1249     if mcn == -1:
1250       mcn = 0 # max number of Nones allowed
1251     else:
1252       mcn = len(disk.children) - mcn # max number of Nones
1253     for chld_disk in disk.children:
1254       try:
1255         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1256       except errors.BlockDeviceError, err:
1257         if children.count(None) >= mcn:
1258           raise
1259         cdev = None
1260         logging.error("Error in child activation (but continuing): %s",
1261                       str(err))
1262       children.append(cdev)
1263
1264   if as_primary or disk.AssembleOnSecondary():
1265     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1266     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1267     result = r_dev
1268     if as_primary or disk.OpenOnSecondary():
1269       r_dev.Open()
1270     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1271                                 as_primary, disk.iv_name)
1272
1273   else:
1274     result = True
1275   return result
1276
1277
1278 def BlockdevAssemble(disk, owner, as_primary):
1279   """Activate a block device for an instance.
1280
1281   This is a wrapper over _RecursiveAssembleBD.
1282
1283   @rtype: str or boolean
1284   @return: a C{/dev/...} path for primary nodes, and
1285       C{True} for secondary nodes
1286
1287   """
1288   try:
1289     result = _RecursiveAssembleBD(disk, owner, as_primary)
1290     if isinstance(result, bdev.BlockDev):
1291       result = result.dev_path
1292   except errors.BlockDeviceError, err:
1293     _Fail("Error while assembling disk: %s", err, exc=True)
1294
1295   return result
1296
1297
1298 def BlockdevShutdown(disk):
1299   """Shut down a block device.
1300
1301   First, if the device is assembled (Attach() is successful), then
1302   the device is shutdown. Then the children of the device are
1303   shutdown.
1304
1305   This function is called recursively. Note that we don't cache the
1306   children or such, as oppossed to assemble, shutdown of different
1307   devices doesn't require that the upper device was active.
1308
1309   @type disk: L{objects.Disk}
1310   @param disk: the description of the disk we should
1311       shutdown
1312   @rtype: None
1313
1314   """
1315   msgs = []
1316   r_dev = _RecursiveFindBD(disk)
1317   if r_dev is not None:
1318     r_path = r_dev.dev_path
1319     try:
1320       r_dev.Shutdown()
1321       DevCacheManager.RemoveCache(r_path)
1322     except errors.BlockDeviceError, err:
1323       msgs.append(str(err))
1324
1325   if disk.children:
1326     for child in disk.children:
1327       try:
1328         BlockdevShutdown(child)
1329       except RPCFail, err:
1330         msgs.append(str(err))
1331
1332   if msgs:
1333     _Fail("; ".join(msgs))
1334
1335
1336 def BlockdevAddchildren(parent_cdev, new_cdevs):
1337   """Extend a mirrored block device.
1338
1339   @type parent_cdev: L{objects.Disk}
1340   @param parent_cdev: the disk to which we should add children
1341   @type new_cdevs: list of L{objects.Disk}
1342   @param new_cdevs: the list of children which we should add
1343   @rtype: None
1344
1345   """
1346   parent_bdev = _RecursiveFindBD(parent_cdev)
1347   if parent_bdev is None:
1348     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1349   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1350   if new_bdevs.count(None) > 0:
1351     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1352   parent_bdev.AddChildren(new_bdevs)
1353
1354
1355 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1356   """Shrink a mirrored block device.
1357
1358   @type parent_cdev: L{objects.Disk}
1359   @param parent_cdev: the disk from which we should remove children
1360   @type new_cdevs: list of L{objects.Disk}
1361   @param new_cdevs: the list of children which we should remove
1362   @rtype: None
1363
1364   """
1365   parent_bdev = _RecursiveFindBD(parent_cdev)
1366   if parent_bdev is None:
1367     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1368   devs = []
1369   for disk in new_cdevs:
1370     rpath = disk.StaticDevPath()
1371     if rpath is None:
1372       bd = _RecursiveFindBD(disk)
1373       if bd is None:
1374         _Fail("Can't find device %s while removing children", disk)
1375       else:
1376         devs.append(bd.dev_path)
1377     else:
1378       devs.append(rpath)
1379   parent_bdev.RemoveChildren(devs)
1380
1381
1382 def BlockdevGetmirrorstatus(disks):
1383   """Get the mirroring status of a list of devices.
1384
1385   @type disks: list of L{objects.Disk}
1386   @param disks: the list of disks which we should query
1387   @rtype: disk
1388   @return:
1389       a list of (mirror_done, estimated_time) tuples, which
1390       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1391   @raise errors.BlockDeviceError: if any of the disks cannot be
1392       found
1393
1394   """
1395   stats = []
1396   for dsk in disks:
1397     rbd = _RecursiveFindBD(dsk)
1398     if rbd is None:
1399       _Fail("Can't find device %s", dsk)
1400
1401     stats.append(rbd.CombinedSyncStatus())
1402
1403   return stats
1404
1405
1406 def _RecursiveFindBD(disk):
1407   """Check if a device is activated.
1408
1409   If so, return information about the real device.
1410
1411   @type disk: L{objects.Disk}
1412   @param disk: the disk object we need to find
1413
1414   @return: None if the device can't be found,
1415       otherwise the device instance
1416
1417   """
1418   children = []
1419   if disk.children:
1420     for chdisk in disk.children:
1421       children.append(_RecursiveFindBD(chdisk))
1422
1423   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1424
1425
1426 def BlockdevFind(disk):
1427   """Check if a device is activated.
1428
1429   If it is, return information about the real device.
1430
1431   @type disk: L{objects.Disk}
1432   @param disk: the disk to find
1433   @rtype: None or objects.BlockDevStatus
1434   @return: None if the disk cannot be found, otherwise a the current
1435            information
1436
1437   """
1438   try:
1439     rbd = _RecursiveFindBD(disk)
1440   except errors.BlockDeviceError, err:
1441     _Fail("Failed to find device: %s", err, exc=True)
1442
1443   if rbd is None:
1444     return None
1445
1446   return rbd.GetSyncStatus()
1447
1448
1449 def BlockdevGetsize(disks):
1450   """Computes the size of the given disks.
1451
1452   If a disk is not found, returns None instead.
1453
1454   @type disks: list of L{objects.Disk}
1455   @param disks: the list of disk to compute the size for
1456   @rtype: list
1457   @return: list with elements None if the disk cannot be found,
1458       otherwise the size
1459
1460   """
1461   result = []
1462   for cf in disks:
1463     try:
1464       rbd = _RecursiveFindBD(cf)
1465     except errors.BlockDeviceError, err:
1466       result.append(None)
1467       continue
1468     if rbd is None:
1469       result.append(None)
1470     else:
1471       result.append(rbd.GetActualSize())
1472   return result
1473
1474
1475 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1476   """Export a block device to a remote node.
1477
1478   @type disk: L{objects.Disk}
1479   @param disk: the description of the disk to export
1480   @type dest_node: str
1481   @param dest_node: the destination node to export to
1482   @type dest_path: str
1483   @param dest_path: the destination path on the target node
1484   @type cluster_name: str
1485   @param cluster_name: the cluster name, needed for SSH hostalias
1486   @rtype: None
1487
1488   """
1489   real_disk = _RecursiveFindBD(disk)
1490   if real_disk is None:
1491     _Fail("Block device '%s' is not set up", disk)
1492
1493   real_disk.Open()
1494
1495   # the block size on the read dd is 1MiB to match our units
1496   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1497                                "dd if=%s bs=1048576 count=%s",
1498                                real_disk.dev_path, str(disk.size))
1499
1500   # we set here a smaller block size as, due to ssh buffering, more
1501   # than 64-128k will mostly ignored; we use nocreat to fail if the
1502   # device is not already there or we pass a wrong path; we use
1503   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1504   # to not buffer too much memory; this means that at best, we flush
1505   # every 64k, which will not be very fast
1506   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1507                                 " oflag=dsync", dest_path)
1508
1509   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1510                                                    constants.GANETI_RUNAS,
1511                                                    destcmd)
1512
1513   # all commands have been checked, so we're safe to combine them
1514   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1515
1516   result = utils.RunCmd(["bash", "-c", command])
1517
1518   if result.failed:
1519     _Fail("Disk copy command '%s' returned error: %s"
1520           " output: %s", command, result.fail_reason, result.output)
1521
1522
1523 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1524   """Write a file to the filesystem.
1525
1526   This allows the master to overwrite(!) a file. It will only perform
1527   the operation if the file belongs to a list of configuration files.
1528
1529   @type file_name: str
1530   @param file_name: the target file name
1531   @type data: str
1532   @param data: the new contents of the file
1533   @type mode: int
1534   @param mode: the mode to give the file (can be None)
1535   @type uid: int
1536   @param uid: the owner of the file (can be -1 for default)
1537   @type gid: int
1538   @param gid: the group of the file (can be -1 for default)
1539   @type atime: float
1540   @param atime: the atime to set on the file (can be None)
1541   @type mtime: float
1542   @param mtime: the mtime to set on the file (can be None)
1543   @rtype: None
1544
1545   """
1546   if not os.path.isabs(file_name):
1547     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1548
1549   if file_name not in _ALLOWED_UPLOAD_FILES:
1550     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1551           file_name)
1552
1553   raw_data = _Decompress(data)
1554
1555   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1556                   atime=atime, mtime=mtime)
1557
1558
1559 def WriteSsconfFiles(values):
1560   """Update all ssconf files.
1561
1562   Wrapper around the SimpleStore.WriteFiles.
1563
1564   """
1565   ssconf.SimpleStore().WriteFiles(values)
1566
1567
1568 def _ErrnoOrStr(err):
1569   """Format an EnvironmentError exception.
1570
1571   If the L{err} argument has an errno attribute, it will be looked up
1572   and converted into a textual C{E...} description. Otherwise the
1573   string representation of the error will be returned.
1574
1575   @type err: L{EnvironmentError}
1576   @param err: the exception to format
1577
1578   """
1579   if hasattr(err, 'errno'):
1580     detail = errno.errorcode[err.errno]
1581   else:
1582     detail = str(err)
1583   return detail
1584
1585
1586 def _OSOndiskAPIVersion(name, os_dir):
1587   """Compute and return the API version of a given OS.
1588
1589   This function will try to read the API version of the OS given by
1590   the 'name' parameter and residing in the 'os_dir' directory.
1591
1592   @type name: str
1593   @param name: the OS name we should look for
1594   @type os_dir: str
1595   @param os_dir: the directory inwhich we should look for the OS
1596   @rtype: tuple
1597   @return: tuple (status, data) with status denoting the validity and
1598       data holding either the vaid versions or an error message
1599
1600   """
1601   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1602
1603   try:
1604     st = os.stat(api_file)
1605   except EnvironmentError, err:
1606     return False, ("Required file 'ganeti_api_version' file not"
1607                    " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1608
1609   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1610     return False, ("File 'ganeti_api_version' file at %s is not"
1611                    " a regular file" % os_dir)
1612
1613   try:
1614     api_versions = utils.ReadFile(api_file).splitlines()
1615   except EnvironmentError, err:
1616     return False, ("Error while reading the API version file at %s: %s" %
1617                    (api_file, _ErrnoOrStr(err)))
1618
1619   try:
1620     api_versions = [int(version.strip()) for version in api_versions]
1621   except (TypeError, ValueError), err:
1622     return False, ("API version(s) can't be converted to integer: %s" %
1623                    str(err))
1624
1625   return True, api_versions
1626
1627
1628 def DiagnoseOS(top_dirs=None):
1629   """Compute the validity for all OSes.
1630
1631   @type top_dirs: list
1632   @param top_dirs: the list of directories in which to
1633       search (if not given defaults to
1634       L{constants.OS_SEARCH_PATH})
1635   @rtype: list of L{objects.OS}
1636   @return: a list of tuples (name, path, status, diagnose)
1637       for all (potential) OSes under all search paths, where:
1638           - name is the (potential) OS name
1639           - path is the full path to the OS
1640           - status True/False is the validity of the OS
1641           - diagnose is the error message for an invalid OS, otherwise empty
1642
1643   """
1644   if top_dirs is None:
1645     top_dirs = constants.OS_SEARCH_PATH
1646
1647   result = []
1648   for dir_name in top_dirs:
1649     if os.path.isdir(dir_name):
1650       try:
1651         f_names = utils.ListVisibleFiles(dir_name)
1652       except EnvironmentError, err:
1653         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1654         break
1655       for name in f_names:
1656         os_path = os.path.sep.join([dir_name, name])
1657         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1658         if status:
1659           diagnose = ""
1660         else:
1661           diagnose = os_inst
1662         result.append((name, os_path, status, diagnose))
1663
1664   return result
1665
1666
1667 def _TryOSFromDisk(name, base_dir=None):
1668   """Create an OS instance from disk.
1669
1670   This function will return an OS instance if the given name is a
1671   valid OS name.
1672
1673   @type base_dir: string
1674   @keyword base_dir: Base directory containing OS installations.
1675                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1676   @rtype: tuple
1677   @return: success and either the OS instance if we find a valid one,
1678       or error message
1679
1680   """
1681   if base_dir is None:
1682     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1683     if os_dir is None:
1684       return False, "Directory for OS %s not found in search path" % name
1685   else:
1686     os_dir = os.path.sep.join([base_dir, name])
1687
1688   status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1689   if not status:
1690     # push the error up
1691     return status, api_versions
1692
1693   if not constants.OS_API_VERSIONS.intersection(api_versions):
1694     return False, ("API version mismatch for path '%s': found %s, want %s." %
1695                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1696
1697   # OS Scripts dictionary, we will populate it with the actual script names
1698   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1699
1700   for script in os_scripts:
1701     os_scripts[script] = os.path.sep.join([os_dir, script])
1702
1703     try:
1704       st = os.stat(os_scripts[script])
1705     except EnvironmentError, err:
1706       return False, ("Script '%s' under path '%s' is missing (%s)" %
1707                      (script, os_dir, _ErrnoOrStr(err)))
1708
1709     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1710       return False, ("Script '%s' under path '%s' is not executable" %
1711                      (script, os_dir))
1712
1713     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1714       return False, ("Script '%s' under path '%s' is not a regular file" %
1715                      (script, os_dir))
1716
1717   os_obj = objects.OS(name=name, path=os_dir,
1718                       create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1719                       export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1720                       import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1721                       rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1722                       api_versions=api_versions)
1723   return True, os_obj
1724
1725
1726 def OSFromDisk(name, base_dir=None):
1727   """Create an OS instance from disk.
1728
1729   This function will return an OS instance if the given name is a
1730   valid OS name. Otherwise, it will raise an appropriate
1731   L{RPCFail} exception, detailing why this is not a valid OS.
1732
1733   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1734   an exception but returns true/false status data.
1735
1736   @type base_dir: string
1737   @keyword base_dir: Base directory containing OS installations.
1738                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1739   @rtype: L{objects.OS}
1740   @return: the OS instance if we find a valid one
1741   @raise RPCFail: if we don't find a valid OS
1742
1743   """
1744   status, payload = _TryOSFromDisk(name, base_dir)
1745
1746   if not status:
1747     _Fail(payload)
1748
1749   return payload
1750
1751
1752 def OSEnvironment(instance, os, debug=0):
1753   """Calculate the environment for an os script.
1754
1755   @type instance: L{objects.Instance}
1756   @param instance: target instance for the os script run
1757   @type os: L{objects.OS}
1758   @param os: operating system for which the environment is being built
1759   @type debug: integer
1760   @param debug: debug level (0 or 1, for OS Api 10)
1761   @rtype: dict
1762   @return: dict of environment variables
1763   @raise errors.BlockDeviceError: if the block device
1764       cannot be found
1765
1766   """
1767   result = {}
1768   api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1769   result['OS_API_VERSION'] = '%d' % api_version
1770   result['INSTANCE_NAME'] = instance.name
1771   result['INSTANCE_OS'] = instance.os
1772   result['HYPERVISOR'] = instance.hypervisor
1773   result['DISK_COUNT'] = '%d' % len(instance.disks)
1774   result['NIC_COUNT'] = '%d' % len(instance.nics)
1775   result['DEBUG_LEVEL'] = '%d' % debug
1776   for idx, disk in enumerate(instance.disks):
1777     real_disk = _RecursiveFindBD(disk)
1778     if real_disk is None:
1779       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1780                                     str(disk))
1781     real_disk.Open()
1782     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1783     result['DISK_%d_ACCESS' % idx] = disk.mode
1784     if constants.HV_DISK_TYPE in instance.hvparams:
1785       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1786         instance.hvparams[constants.HV_DISK_TYPE]
1787     if disk.dev_type in constants.LDS_BLOCK:
1788       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1789     elif disk.dev_type == constants.LD_FILE:
1790       result['DISK_%d_BACKEND_TYPE' % idx] = \
1791         'file:%s' % disk.physical_id[0]
1792   for idx, nic in enumerate(instance.nics):
1793     result['NIC_%d_MAC' % idx] = nic.mac
1794     if nic.ip:
1795       result['NIC_%d_IP' % idx] = nic.ip
1796     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1797     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1798       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1799     if nic.nicparams[constants.NIC_LINK]:
1800       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1801     if constants.HV_NIC_TYPE in instance.hvparams:
1802       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1803         instance.hvparams[constants.HV_NIC_TYPE]
1804
1805   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1806     for key, value in source.items():
1807       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1808
1809   return result
1810
1811 def BlockdevGrow(disk, amount):
1812   """Grow a stack of block devices.
1813
1814   This function is called recursively, with the childrens being the
1815   first ones to resize.
1816
1817   @type disk: L{objects.Disk}
1818   @param disk: the disk to be grown
1819   @rtype: (status, result)
1820   @return: a tuple with the status of the operation
1821       (True/False), and the errors message if status
1822       is False
1823
1824   """
1825   r_dev = _RecursiveFindBD(disk)
1826   if r_dev is None:
1827     _Fail("Cannot find block device %s", disk)
1828
1829   try:
1830     r_dev.Grow(amount)
1831   except errors.BlockDeviceError, err:
1832     _Fail("Failed to grow block device: %s", err, exc=True)
1833
1834
1835 def BlockdevSnapshot(disk):
1836   """Create a snapshot copy of a block device.
1837
1838   This function is called recursively, and the snapshot is actually created
1839   just for the leaf lvm backend device.
1840
1841   @type disk: L{objects.Disk}
1842   @param disk: the disk to be snapshotted
1843   @rtype: string
1844   @return: snapshot disk path
1845
1846   """
1847   if disk.children:
1848     if len(disk.children) == 1:
1849       # only one child, let's recurse on it
1850       return BlockdevSnapshot(disk.children[0])
1851     else:
1852       # more than one child, choose one that matches
1853       for child in disk.children:
1854         if child.size == disk.size:
1855           # return implies breaking the loop
1856           return BlockdevSnapshot(child)
1857   elif disk.dev_type == constants.LD_LV:
1858     r_dev = _RecursiveFindBD(disk)
1859     if r_dev is not None:
1860       # let's stay on the safe side and ask for the full size, for now
1861       return r_dev.Snapshot(disk.size)
1862     else:
1863       _Fail("Cannot find block device %s", disk)
1864   else:
1865     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1866           disk.unique_id, disk.dev_type)
1867
1868
1869 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1870   """Export a block device snapshot to a remote node.
1871
1872   @type disk: L{objects.Disk}
1873   @param disk: the description of the disk to export
1874   @type dest_node: str
1875   @param dest_node: the destination node to export to
1876   @type instance: L{objects.Instance}
1877   @param instance: the instance object to whom the disk belongs
1878   @type cluster_name: str
1879   @param cluster_name: the cluster name, needed for SSH hostalias
1880   @type idx: int
1881   @param idx: the index of the disk in the instance's disk list,
1882       used to export to the OS scripts environment
1883   @rtype: None
1884
1885   """
1886   inst_os = OSFromDisk(instance.os)
1887   export_env = OSEnvironment(instance, inst_os)
1888
1889   export_script = inst_os.export_script
1890
1891   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1892                                      instance.name, int(time.time()))
1893   if not os.path.exists(constants.LOG_OS_DIR):
1894     os.mkdir(constants.LOG_OS_DIR, 0750)
1895   real_disk = _RecursiveFindBD(disk)
1896   if real_disk is None:
1897     _Fail("Block device '%s' is not set up", disk)
1898
1899   real_disk.Open()
1900
1901   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1902   export_env['EXPORT_INDEX'] = str(idx)
1903
1904   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1905   destfile = disk.physical_id[1]
1906
1907   # the target command is built out of three individual commands,
1908   # which are joined by pipes; we check each individual command for
1909   # valid parameters
1910   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1911                                inst_os.path, export_script, logfile)
1912
1913   comprcmd = "gzip"
1914
1915   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1916                                 destdir, destdir, destfile)
1917   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1918                                                    constants.GANETI_RUNAS,
1919                                                    destcmd)
1920
1921   # all commands have been checked, so we're safe to combine them
1922   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1923
1924   result = utils.RunCmd(["bash", "-c", command], env=export_env)
1925
1926   if result.failed:
1927     _Fail("OS snapshot export command '%s' returned error: %s"
1928           " output: %s", command, result.fail_reason, result.output)
1929
1930
1931 def FinalizeExport(instance, snap_disks):
1932   """Write out the export configuration information.
1933
1934   @type instance: L{objects.Instance}
1935   @param instance: the instance which we export, used for
1936       saving configuration
1937   @type snap_disks: list of L{objects.Disk}
1938   @param snap_disks: list of snapshot block devices, which
1939       will be used to get the actual name of the dump file
1940
1941   @rtype: None
1942
1943   """
1944   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1945   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1946
1947   config = objects.SerializableConfigParser()
1948
1949   config.add_section(constants.INISECT_EXP)
1950   config.set(constants.INISECT_EXP, 'version', '0')
1951   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1952   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1953   config.set(constants.INISECT_EXP, 'os', instance.os)
1954   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1955
1956   config.add_section(constants.INISECT_INS)
1957   config.set(constants.INISECT_INS, 'name', instance.name)
1958   config.set(constants.INISECT_INS, 'memory', '%d' %
1959              instance.beparams[constants.BE_MEMORY])
1960   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1961              instance.beparams[constants.BE_VCPUS])
1962   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1963
1964   nic_total = 0
1965   for nic_count, nic in enumerate(instance.nics):
1966     nic_total += 1
1967     config.set(constants.INISECT_INS, 'nic%d_mac' %
1968                nic_count, '%s' % nic.mac)
1969     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1970     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1971                '%s' % nic.bridge)
1972   # TODO: redundant: on load can read nics until it doesn't exist
1973   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1974
1975   disk_total = 0
1976   for disk_count, disk in enumerate(snap_disks):
1977     if disk:
1978       disk_total += 1
1979       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1980                  ('%s' % disk.iv_name))
1981       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1982                  ('%s' % disk.physical_id[1]))
1983       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1984                  ('%d' % disk.size))
1985
1986   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1987
1988   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1989                   data=config.Dumps())
1990   shutil.rmtree(finaldestdir, True)
1991   shutil.move(destdir, finaldestdir)
1992
1993
1994 def ExportInfo(dest):
1995   """Get export configuration information.
1996
1997   @type dest: str
1998   @param dest: directory containing the export
1999
2000   @rtype: L{objects.SerializableConfigParser}
2001   @return: a serializable config file containing the
2002       export info
2003
2004   """
2005   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2006
2007   config = objects.SerializableConfigParser()
2008   config.read(cff)
2009
2010   if (not config.has_section(constants.INISECT_EXP) or
2011       not config.has_section(constants.INISECT_INS)):
2012     _Fail("Export info file doesn't have the required fields")
2013
2014   return config.Dumps()
2015
2016
2017 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2018   """Import an os image into an instance.
2019
2020   @type instance: L{objects.Instance}
2021   @param instance: instance to import the disks into
2022   @type src_node: string
2023   @param src_node: source node for the disk images
2024   @type src_images: list of string
2025   @param src_images: absolute paths of the disk images
2026   @rtype: list of boolean
2027   @return: each boolean represent the success of importing the n-th disk
2028
2029   """
2030   inst_os = OSFromDisk(instance.os)
2031   import_env = OSEnvironment(instance, inst_os)
2032   import_script = inst_os.import_script
2033
2034   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2035                                         instance.name, int(time.time()))
2036   if not os.path.exists(constants.LOG_OS_DIR):
2037     os.mkdir(constants.LOG_OS_DIR, 0750)
2038
2039   comprcmd = "gunzip"
2040   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2041                                import_script, logfile)
2042
2043   final_result = []
2044   for idx, image in enumerate(src_images):
2045     if image:
2046       destcmd = utils.BuildShellCmd('cat %s', image)
2047       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2048                                                        constants.GANETI_RUNAS,
2049                                                        destcmd)
2050       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2051       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2052       import_env['IMPORT_INDEX'] = str(idx)
2053       result = utils.RunCmd(command, env=import_env)
2054       if result.failed:
2055         logging.error("Disk import command '%s' returned error: %s"
2056                       " output: %s", command, result.fail_reason,
2057                       result.output)
2058         final_result.append("error importing disk %d: %s, %s" %
2059                             (idx, result.fail_reason, result.output[-100]))
2060
2061   if final_result:
2062     _Fail("; ".join(final_result), log=False)
2063
2064
2065 def ListExports():
2066   """Return a list of exports currently available on this machine.
2067
2068   @rtype: list
2069   @return: list of the exports
2070
2071   """
2072   if os.path.isdir(constants.EXPORT_DIR):
2073     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2074   else:
2075     _Fail("No exports directory")
2076
2077
2078 def RemoveExport(export):
2079   """Remove an existing export from the node.
2080
2081   @type export: str
2082   @param export: the name of the export to remove
2083   @rtype: None
2084
2085   """
2086   target = os.path.join(constants.EXPORT_DIR, export)
2087
2088   try:
2089     shutil.rmtree(target)
2090   except EnvironmentError, err:
2091     _Fail("Error while removing the export: %s", err, exc=True)
2092
2093
2094 def BlockdevRename(devlist):
2095   """Rename a list of block devices.
2096
2097   @type devlist: list of tuples
2098   @param devlist: list of tuples of the form  (disk,
2099       new_logical_id, new_physical_id); disk is an
2100       L{objects.Disk} object describing the current disk,
2101       and new logical_id/physical_id is the name we
2102       rename it to
2103   @rtype: boolean
2104   @return: True if all renames succeeded, False otherwise
2105
2106   """
2107   msgs = []
2108   result = True
2109   for disk, unique_id in devlist:
2110     dev = _RecursiveFindBD(disk)
2111     if dev is None:
2112       msgs.append("Can't find device %s in rename" % str(disk))
2113       result = False
2114       continue
2115     try:
2116       old_rpath = dev.dev_path
2117       dev.Rename(unique_id)
2118       new_rpath = dev.dev_path
2119       if old_rpath != new_rpath:
2120         DevCacheManager.RemoveCache(old_rpath)
2121         # FIXME: we should add the new cache information here, like:
2122         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2123         # but we don't have the owner here - maybe parse from existing
2124         # cache? for now, we only lose lvm data when we rename, which
2125         # is less critical than DRBD or MD
2126     except errors.BlockDeviceError, err:
2127       msgs.append("Can't rename device '%s' to '%s': %s" %
2128                   (dev, unique_id, err))
2129       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2130       result = False
2131   if not result:
2132     _Fail("; ".join(msgs))
2133
2134
2135 def _TransformFileStorageDir(file_storage_dir):
2136   """Checks whether given file_storage_dir is valid.
2137
2138   Checks wheter the given file_storage_dir is within the cluster-wide
2139   default file_storage_dir stored in SimpleStore. Only paths under that
2140   directory are allowed.
2141
2142   @type file_storage_dir: str
2143   @param file_storage_dir: the path to check
2144
2145   @return: the normalized path if valid, None otherwise
2146
2147   """
2148   cfg = _GetConfig()
2149   file_storage_dir = os.path.normpath(file_storage_dir)
2150   base_file_storage_dir = cfg.GetFileStorageDir()
2151   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2152       base_file_storage_dir):
2153     _Fail("File storage directory '%s' is not under base file"
2154           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2155   return file_storage_dir
2156
2157
2158 def CreateFileStorageDir(file_storage_dir):
2159   """Create file storage directory.
2160
2161   @type file_storage_dir: str
2162   @param file_storage_dir: directory to create
2163
2164   @rtype: tuple
2165   @return: tuple with first element a boolean indicating wheter dir
2166       creation was successful or not
2167
2168   """
2169   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2170   if os.path.exists(file_storage_dir):
2171     if not os.path.isdir(file_storage_dir):
2172       _Fail("Specified storage dir '%s' is not a directory",
2173             file_storage_dir)
2174   else:
2175     try:
2176       os.makedirs(file_storage_dir, 0750)
2177     except OSError, err:
2178       _Fail("Cannot create file storage directory '%s': %s",
2179             file_storage_dir, err, exc=True)
2180
2181
2182 def RemoveFileStorageDir(file_storage_dir):
2183   """Remove file storage directory.
2184
2185   Remove it only if it's empty. If not log an error and return.
2186
2187   @type file_storage_dir: str
2188   @param file_storage_dir: the directory we should cleanup
2189   @rtype: tuple (success,)
2190   @return: tuple of one element, C{success}, denoting
2191       whether the operation was successful
2192
2193   """
2194   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2195   if os.path.exists(file_storage_dir):
2196     if not os.path.isdir(file_storage_dir):
2197       _Fail("Specified Storage directory '%s' is not a directory",
2198             file_storage_dir)
2199     # deletes dir only if empty, otherwise we want to fail the rpc call
2200     try:
2201       os.rmdir(file_storage_dir)
2202     except OSError, err:
2203       _Fail("Cannot remove file storage directory '%s': %s",
2204             file_storage_dir, err)
2205
2206
2207 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2208   """Rename the file storage directory.
2209
2210   @type old_file_storage_dir: str
2211   @param old_file_storage_dir: the current path
2212   @type new_file_storage_dir: str
2213   @param new_file_storage_dir: the name we should rename to
2214   @rtype: tuple (success,)
2215   @return: tuple of one element, C{success}, denoting
2216       whether the operation was successful
2217
2218   """
2219   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2220   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2221   if not os.path.exists(new_file_storage_dir):
2222     if os.path.isdir(old_file_storage_dir):
2223       try:
2224         os.rename(old_file_storage_dir, new_file_storage_dir)
2225       except OSError, err:
2226         _Fail("Cannot rename '%s' to '%s': %s",
2227               old_file_storage_dir, new_file_storage_dir, err)
2228     else:
2229       _Fail("Specified storage dir '%s' is not a directory",
2230             old_file_storage_dir)
2231   else:
2232     if os.path.exists(old_file_storage_dir):
2233       _Fail("Cannot rename '%s' to '%s': both locations exist",
2234             old_file_storage_dir, new_file_storage_dir)
2235
2236
2237 def _EnsureJobQueueFile(file_name):
2238   """Checks whether the given filename is in the queue directory.
2239
2240   @type file_name: str
2241   @param file_name: the file name we should check
2242   @rtype: None
2243   @raises RPCFail: if the file is not valid
2244
2245   """
2246   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2247   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2248
2249   if not result:
2250     _Fail("Passed job queue file '%s' does not belong to"
2251           " the queue directory '%s'", file_name, queue_dir)
2252
2253
2254 def JobQueueUpdate(file_name, content):
2255   """Updates a file in the queue directory.
2256
2257   This is just a wrapper over L{utils.WriteFile}, with proper
2258   checking.
2259
2260   @type file_name: str
2261   @param file_name: the job file name
2262   @type content: str
2263   @param content: the new job contents
2264   @rtype: boolean
2265   @return: the success of the operation
2266
2267   """
2268   _EnsureJobQueueFile(file_name)
2269
2270   # Write and replace the file atomically
2271   utils.WriteFile(file_name, data=_Decompress(content))
2272
2273
2274 def JobQueueRename(old, new):
2275   """Renames a job queue file.
2276
2277   This is just a wrapper over os.rename with proper checking.
2278
2279   @type old: str
2280   @param old: the old (actual) file name
2281   @type new: str
2282   @param new: the desired file name
2283   @rtype: tuple
2284   @return: the success of the operation and payload
2285
2286   """
2287   _EnsureJobQueueFile(old)
2288   _EnsureJobQueueFile(new)
2289
2290   utils.RenameFile(old, new, mkdir=True)
2291
2292
2293 def JobQueueSetDrainFlag(drain_flag):
2294   """Set the drain flag for the queue.
2295
2296   This will set or unset the queue drain flag.
2297
2298   @type drain_flag: boolean
2299   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2300   @rtype: truple
2301   @return: always True, None
2302   @warning: the function always returns True
2303
2304   """
2305   if drain_flag:
2306     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2307   else:
2308     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2309
2310
2311 def BlockdevClose(instance_name, disks):
2312   """Closes the given block devices.
2313
2314   This means they will be switched to secondary mode (in case of
2315   DRBD).
2316
2317   @param instance_name: if the argument is not empty, the symlinks
2318       of this instance will be removed
2319   @type disks: list of L{objects.Disk}
2320   @param disks: the list of disks to be closed
2321   @rtype: tuple (success, message)
2322   @return: a tuple of success and message, where success
2323       indicates the succes of the operation, and message
2324       which will contain the error details in case we
2325       failed
2326
2327   """
2328   bdevs = []
2329   for cf in disks:
2330     rd = _RecursiveFindBD(cf)
2331     if rd is None:
2332       _Fail("Can't find device %s", cf)
2333     bdevs.append(rd)
2334
2335   msg = []
2336   for rd in bdevs:
2337     try:
2338       rd.Close()
2339     except errors.BlockDeviceError, err:
2340       msg.append(str(err))
2341   if msg:
2342     _Fail("Can't make devices secondary: %s", ",".join(msg))
2343   else:
2344     if instance_name:
2345       _RemoveBlockDevLinks(instance_name, disks)
2346
2347
2348 def ValidateHVParams(hvname, hvparams):
2349   """Validates the given hypervisor parameters.
2350
2351   @type hvname: string
2352   @param hvname: the hypervisor name
2353   @type hvparams: dict
2354   @param hvparams: the hypervisor parameters to be validated
2355   @rtype: None
2356
2357   """
2358   try:
2359     hv_type = hypervisor.GetHypervisor(hvname)
2360     hv_type.ValidateParameters(hvparams)
2361   except errors.HypervisorError, err:
2362     _Fail(str(err), log=False)
2363
2364
2365 def DemoteFromMC():
2366   """Demotes the current node from master candidate role.
2367
2368   """
2369   # try to ensure we're not the master by mistake
2370   master, myself = ssconf.GetMasterAndMyself()
2371   if master == myself:
2372     _Fail("ssconf status shows I'm the master node, will not demote")
2373   pid_file = utils.DaemonPidFileName(constants.MASTERD)
2374   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2375     _Fail("The master daemon is running, will not demote")
2376   try:
2377     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2378       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2379   except EnvironmentError, err:
2380     if err.errno != errno.ENOENT:
2381       _Fail("Error while backing up cluster file: %s", err, exc=True)
2382   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2383
2384
2385 def _FindDisks(nodes_ip, disks):
2386   """Sets the physical ID on disks and returns the block devices.
2387
2388   """
2389   # set the correct physical ID
2390   my_name = utils.HostInfo().name
2391   for cf in disks:
2392     cf.SetPhysicalID(my_name, nodes_ip)
2393
2394   bdevs = []
2395
2396   for cf in disks:
2397     rd = _RecursiveFindBD(cf)
2398     if rd is None:
2399       _Fail("Can't find device %s", cf)
2400     bdevs.append(rd)
2401   return bdevs
2402
2403
2404 def DrbdDisconnectNet(nodes_ip, disks):
2405   """Disconnects the network on a list of drbd devices.
2406
2407   """
2408   bdevs = _FindDisks(nodes_ip, disks)
2409
2410   # disconnect disks
2411   for rd in bdevs:
2412     try:
2413       rd.DisconnectNet()
2414     except errors.BlockDeviceError, err:
2415       _Fail("Can't change network configuration to standalone mode: %s",
2416             err, exc=True)
2417
2418
2419 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2420   """Attaches the network on a list of drbd devices.
2421
2422   """
2423   bdevs = _FindDisks(nodes_ip, disks)
2424
2425   if multimaster:
2426     for idx, rd in enumerate(bdevs):
2427       try:
2428         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2429       except EnvironmentError, err:
2430         _Fail("Can't create symlink: %s", err)
2431   # reconnect disks, switch to new master configuration and if
2432   # needed primary mode
2433   for rd in bdevs:
2434     try:
2435       rd.AttachNet(multimaster)
2436     except errors.BlockDeviceError, err:
2437       _Fail("Can't change network configuration: %s", err)
2438   # wait until the disks are connected; we need to retry the re-attach
2439   # if the device becomes standalone, as this might happen if the one
2440   # node disconnects and reconnects in a different mode before the
2441   # other node reconnects; in this case, one or both of the nodes will
2442   # decide it has wrong configuration and switch to standalone
2443   RECONNECT_TIMEOUT = 2 * 60
2444   sleep_time = 0.100 # start with 100 miliseconds
2445   timeout_limit = time.time() + RECONNECT_TIMEOUT
2446   while time.time() < timeout_limit:
2447     all_connected = True
2448     for rd in bdevs:
2449       stats = rd.GetProcStatus()
2450       if not (stats.is_connected or stats.is_in_resync):
2451         all_connected = False
2452       if stats.is_standalone:
2453         # peer had different config info and this node became
2454         # standalone, even though this should not happen with the
2455         # new staged way of changing disk configs
2456         try:
2457           rd.AttachNet(multimaster)
2458         except errors.BlockDeviceError, err:
2459           _Fail("Can't change network configuration: %s", err)
2460     if all_connected:
2461       break
2462     time.sleep(sleep_time)
2463     sleep_time = min(5, sleep_time * 1.5)
2464   if not all_connected:
2465     _Fail("Timeout in disk reconnecting")
2466   if multimaster:
2467     # change to primary mode
2468     for rd in bdevs:
2469       try:
2470         rd.Open()
2471       except errors.BlockDeviceError, err:
2472         _Fail("Can't change to primary mode: %s", err)
2473
2474
2475 def DrbdWaitSync(nodes_ip, disks):
2476   """Wait until DRBDs have synchronized.
2477
2478   """
2479   bdevs = _FindDisks(nodes_ip, disks)
2480
2481   min_resync = 100
2482   alldone = True
2483   for rd in bdevs:
2484     stats = rd.GetProcStatus()
2485     if not (stats.is_connected or stats.is_in_resync):
2486       _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2487     alldone = alldone and (not stats.is_in_resync)
2488     if stats.sync_percent is not None:
2489       min_resync = min(min_resync, stats.sync_percent)
2490
2491   return (alldone, min_resync)
2492
2493
2494 def PowercycleNode(hypervisor_type):
2495   """Hard-powercycle the node.
2496
2497   Because we need to return first, and schedule the powercycle in the
2498   background, we won't be able to report failures nicely.
2499
2500   """
2501   hyper = hypervisor.GetHypervisor(hypervisor_type)
2502   try:
2503     pid = os.fork()
2504   except OSError:
2505     # if we can't fork, we'll pretend that we're in the child process
2506     pid = 0
2507   if pid > 0:
2508     return "Reboot scheduled in 5 seconds"
2509   time.sleep(5)
2510   hyper.PowercycleNode()
2511
2512
2513 class HooksRunner(object):
2514   """Hook runner.
2515
2516   This class is instantiated on the node side (ganeti-noded) and not
2517   on the master side.
2518
2519   """
2520   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2521
2522   def __init__(self, hooks_base_dir=None):
2523     """Constructor for hooks runner.
2524
2525     @type hooks_base_dir: str or None
2526     @param hooks_base_dir: if not None, this overrides the
2527         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2528
2529     """
2530     if hooks_base_dir is None:
2531       hooks_base_dir = constants.HOOKS_BASE_DIR
2532     self._BASE_DIR = hooks_base_dir
2533
2534   @staticmethod
2535   def ExecHook(script, env):
2536     """Exec one hook script.
2537
2538     @type script: str
2539     @param script: the full path to the script
2540     @type env: dict
2541     @param env: the environment with which to exec the script
2542     @rtype: tuple (success, message)
2543     @return: a tuple of success and message, where success
2544         indicates the succes of the operation, and message
2545         which will contain the error details in case we
2546         failed
2547
2548     """
2549     # exec the process using subprocess and log the output
2550     fdstdin = None
2551     try:
2552       fdstdin = open("/dev/null", "r")
2553       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2554                                stderr=subprocess.STDOUT, close_fds=True,
2555                                shell=False, cwd="/", env=env)
2556       output = ""
2557       try:
2558         output = child.stdout.read(4096)
2559         child.stdout.close()
2560       except EnvironmentError, err:
2561         output += "Hook script error: %s" % str(err)
2562
2563       while True:
2564         try:
2565           result = child.wait()
2566           break
2567         except EnvironmentError, err:
2568           if err.errno == errno.EINTR:
2569             continue
2570           raise
2571     finally:
2572       # try not to leak fds
2573       for fd in (fdstdin, ):
2574         if fd is not None:
2575           try:
2576             fd.close()
2577           except EnvironmentError, err:
2578             # just log the error
2579             #logging.exception("Error while closing fd %s", fd)
2580             pass
2581
2582     return result == 0, utils.SafeEncode(output.strip())
2583
2584   def RunHooks(self, hpath, phase, env):
2585     """Run the scripts in the hooks directory.
2586
2587     @type hpath: str
2588     @param hpath: the path to the hooks directory which
2589         holds the scripts
2590     @type phase: str
2591     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2592         L{constants.HOOKS_PHASE_POST}
2593     @type env: dict
2594     @param env: dictionary with the environment for the hook
2595     @rtype: list
2596     @return: list of 3-element tuples:
2597       - script path
2598       - script result, either L{constants.HKR_SUCCESS} or
2599         L{constants.HKR_FAIL}
2600       - output of the script
2601
2602     @raise errors.ProgrammerError: for invalid input
2603         parameters
2604
2605     """
2606     if phase == constants.HOOKS_PHASE_PRE:
2607       suffix = "pre"
2608     elif phase == constants.HOOKS_PHASE_POST:
2609       suffix = "post"
2610     else:
2611       _Fail("Unknown hooks phase '%s'", phase)
2612
2613     rr = []
2614
2615     subdir = "%s-%s.d" % (hpath, suffix)
2616     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2617     try:
2618       dir_contents = utils.ListVisibleFiles(dir_name)
2619     except OSError:
2620       # FIXME: must log output in case of failures
2621       return rr
2622
2623     # we use the standard python sort order,
2624     # so 00name is the recommended naming scheme
2625     dir_contents.sort()
2626     for relname in dir_contents:
2627       fname = os.path.join(dir_name, relname)
2628       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2629           self.RE_MASK.match(relname) is not None):
2630         rrval = constants.HKR_SKIP
2631         output = ""
2632       else:
2633         result, output = self.ExecHook(fname, env)
2634         if not result:
2635           rrval = constants.HKR_FAIL
2636         else:
2637           rrval = constants.HKR_SUCCESS
2638       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2639
2640     return rr
2641
2642
2643 class IAllocatorRunner(object):
2644   """IAllocator runner.
2645
2646   This class is instantiated on the node side (ganeti-noded) and not on
2647   the master side.
2648
2649   """
2650   def Run(self, name, idata):
2651     """Run an iallocator script.
2652
2653     @type name: str
2654     @param name: the iallocator script name
2655     @type idata: str
2656     @param idata: the allocator input data
2657
2658     @rtype: tuple
2659     @return: two element tuple of:
2660        - status
2661        - either error message or stdout of allocator (for success)
2662
2663     """
2664     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2665                                   os.path.isfile)
2666     if alloc_script is None:
2667       _Fail("iallocator module '%s' not found in the search path", name)
2668
2669     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2670     try:
2671       os.write(fd, idata)
2672       os.close(fd)
2673       result = utils.RunCmd([alloc_script, fin_name])
2674       if result.failed:
2675         _Fail("iallocator module '%s' failed: %s, output '%s'",
2676               name, result.fail_reason, result.output)
2677     finally:
2678       os.unlink(fin_name)
2679
2680     return result.stdout
2681
2682
2683 class DevCacheManager(object):
2684   """Simple class for managing a cache of block device information.
2685
2686   """
2687   _DEV_PREFIX = "/dev/"
2688   _ROOT_DIR = constants.BDEV_CACHE_DIR
2689
2690   @classmethod
2691   def _ConvertPath(cls, dev_path):
2692     """Converts a /dev/name path to the cache file name.
2693
2694     This replaces slashes with underscores and strips the /dev
2695     prefix. It then returns the full path to the cache file.
2696
2697     @type dev_path: str
2698     @param dev_path: the C{/dev/} path name
2699     @rtype: str
2700     @return: the converted path name
2701
2702     """
2703     if dev_path.startswith(cls._DEV_PREFIX):
2704       dev_path = dev_path[len(cls._DEV_PREFIX):]
2705     dev_path = dev_path.replace("/", "_")
2706     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2707     return fpath
2708
2709   @classmethod
2710   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2711     """Updates the cache information for a given device.
2712
2713     @type dev_path: str
2714     @param dev_path: the pathname of the device
2715     @type owner: str
2716     @param owner: the owner (instance name) of the device
2717     @type on_primary: bool
2718     @param on_primary: whether this is the primary
2719         node nor not
2720     @type iv_name: str
2721     @param iv_name: the instance-visible name of the
2722         device, as in objects.Disk.iv_name
2723
2724     @rtype: None
2725
2726     """
2727     if dev_path is None:
2728       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2729       return
2730     fpath = cls._ConvertPath(dev_path)
2731     if on_primary:
2732       state = "primary"
2733     else:
2734       state = "secondary"
2735     if iv_name is None:
2736       iv_name = "not_visible"
2737     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2738     try:
2739       utils.WriteFile(fpath, data=fdata)
2740     except EnvironmentError, err:
2741       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2742
2743   @classmethod
2744   def RemoveCache(cls, dev_path):
2745     """Remove data for a dev_path.
2746
2747     This is just a wrapper over L{utils.RemoveFile} with a converted
2748     path name and logging.
2749
2750     @type dev_path: str
2751     @param dev_path: the pathname of the device
2752
2753     @rtype: None
2754
2755     """
2756     if dev_path is None:
2757       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2758       return
2759     fpath = cls._ConvertPath(dev_path)
2760     try:
2761       utils.RemoveFile(fpath)
2762     except EnvironmentError, err:
2763       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)