Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 40684c3a

History | View | Annotate | Download (98.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61

    
62

    
63
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64
_ALLOWED_CLEAN_DIRS = frozenset([
65
  constants.DATA_DIR,
66
  constants.JOB_QUEUE_ARCHIVE_DIR,
67
  constants.QUEUE_DIR,
68
  constants.CRYPTO_KEYS_DIR,
69
  ])
70
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71
_X509_KEY_FILE = "key"
72
_X509_CERT_FILE = "cert"
73
_IES_STATUS_FILE = "status"
74
_IES_PID_FILE = "pid"
75
_IES_CA_FILE = "ca"
76

    
77

    
78
class RPCFail(Exception):
79
  """Class denoting RPC failure.
80

81
  Its argument is the error message.
82

83
  """
84

    
85

    
86
def _Fail(msg, *args, **kwargs):
87
  """Log an error and the raise an RPCFail exception.
88

89
  This exception is then handled specially in the ganeti daemon and
90
  turned into a 'failed' return type. As such, this function is a
91
  useful shortcut for logging the error and returning it to the master
92
  daemon.
93

94
  @type msg: string
95
  @param msg: the text of the exception
96
  @raise RPCFail
97

98
  """
99
  if args:
100
    msg = msg % args
101
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
102
    if "exc" in kwargs and kwargs["exc"]:
103
      logging.exception(msg)
104
    else:
105
      logging.error(msg)
106
  raise RPCFail(msg)
107

    
108

    
109
def _GetConfig():
110
  """Simple wrapper to return a SimpleStore.
111

112
  @rtype: L{ssconf.SimpleStore}
113
  @return: a SimpleStore instance
114

115
  """
116
  return ssconf.SimpleStore()
117

    
118

    
119
def _GetSshRunner(cluster_name):
120
  """Simple wrapper to return an SshRunner.
121

122
  @type cluster_name: str
123
  @param cluster_name: the cluster name, which is needed
124
      by the SshRunner constructor
125
  @rtype: L{ssh.SshRunner}
126
  @return: an SshRunner instance
127

128
  """
129
  return ssh.SshRunner(cluster_name)
130

    
131

    
132
def _Decompress(data):
133
  """Unpacks data compressed by the RPC client.
134

135
  @type data: list or tuple
136
  @param data: Data sent by RPC client
137
  @rtype: str
138
  @return: Decompressed data
139

140
  """
141
  assert isinstance(data, (list, tuple))
142
  assert len(data) == 2
143
  (encoding, content) = data
144
  if encoding == constants.RPC_ENCODING_NONE:
145
    return content
146
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147
    return zlib.decompress(base64.b64decode(content))
148
  else:
149
    raise AssertionError("Unknown data encoding")
150

    
151

    
152
def _CleanDirectory(path, exclude=None):
153
  """Removes all regular files in a directory.
154

155
  @type path: str
156
  @param path: the directory to clean
157
  @type exclude: list
158
  @param exclude: list of files to be excluded, defaults
159
      to the empty list
160

161
  """
162
  if path not in _ALLOWED_CLEAN_DIRS:
163
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
164
          path)
165

    
166
  if not os.path.isdir(path):
167
    return
168
  if exclude is None:
169
    exclude = []
170
  else:
171
    # Normalize excluded paths
172
    exclude = [os.path.normpath(i) for i in exclude]
173

    
174
  for rel_name in utils.ListVisibleFiles(path):
175
    full_name = utils.PathJoin(path, rel_name)
176
    if full_name in exclude:
177
      continue
178
    if os.path.isfile(full_name) and not os.path.islink(full_name):
179
      utils.RemoveFile(full_name)
180

    
181

    
182
def _BuildUploadFileList():
183
  """Build the list of allowed upload files.
184

185
  This is abstracted so that it's built only once at module import time.
186

187
  """
188
  allowed_files = set([
189
    constants.CLUSTER_CONF_FILE,
190
    constants.ETC_HOSTS,
191
    constants.SSH_KNOWN_HOSTS_FILE,
192
    constants.VNC_PASSWORD_FILE,
193
    constants.RAPI_CERT_FILE,
194
    constants.RAPI_USERS_FILE,
195
    constants.CONFD_HMAC_KEY,
196
    constants.CLUSTER_DOMAIN_SECRET_FILE,
197
    ])
198

    
199
  for hv_name in constants.HYPER_TYPES:
200
    hv_class = hypervisor.GetHypervisorClass(hv_name)
201
    allowed_files.update(hv_class.GetAncillaryFiles())
202

    
203
  return frozenset(allowed_files)
204

    
205

    
206
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
207

    
208

    
209
def JobQueuePurge():
210
  """Removes job queue files and archived jobs.
211

212
  @rtype: tuple
213
  @return: True, None
214

215
  """
216
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
217
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
218

    
219

    
220
def GetMasterInfo():
221
  """Returns master information.
222

223
  This is an utility function to compute master information, either
224
  for consumption here or from the node daemon.
225

226
  @rtype: tuple
227
  @return: master_netdev, master_ip, master_name
228
  @raise RPCFail: in case of errors
229

230
  """
231
  try:
232
    cfg = _GetConfig()
233
    master_netdev = cfg.GetMasterNetdev()
234
    master_ip = cfg.GetMasterIP()
235
    master_node = cfg.GetMasterNode()
236
  except errors.ConfigurationError, err:
237
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
238
  return (master_netdev, master_ip, master_node)
239

    
240

    
241
def StartMaster(start_daemons, no_voting):
242
  """Activate local node as master node.
243

244
  The function will always try activate the IP address of the master
245
  (unless someone else has it). It will also start the master daemons,
246
  based on the start_daemons parameter.
247

248
  @type start_daemons: boolean
249
  @param start_daemons: whether to also start the master
250
      daemons (ganeti-masterd and ganeti-rapi)
251
  @type no_voting: boolean
252
  @param no_voting: whether to start ganeti-masterd without a node vote
253
      (if start_daemons is True), but still non-interactively
254
  @rtype: None
255

256
  """
257
  # GetMasterInfo will raise an exception if not able to return data
258
  master_netdev, master_ip, _ = GetMasterInfo()
259

    
260
  err_msgs = []
261
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
262
    if utils.OwnIpAddress(master_ip):
263
      # we already have the ip:
264
      logging.debug("Master IP already configured, doing nothing")
265
    else:
266
      msg = "Someone else has the master ip, not activating"
267
      logging.error(msg)
268
      err_msgs.append(msg)
269
  else:
270
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
271
                           "dev", master_netdev, "label",
272
                           "%s:0" % master_netdev])
273
    if result.failed:
274
      msg = "Can't activate master IP: %s" % result.output
275
      logging.error(msg)
276
      err_msgs.append(msg)
277

    
278
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
279
                           "-s", master_ip, master_ip])
280
    # we'll ignore the exit code of arping
281

    
282
  # and now start the master and rapi daemons
283
  if start_daemons:
284
    if no_voting:
285
      masterd_args = "--no-voting --yes-do-it"
286
    else:
287
      masterd_args = ""
288

    
289
    env = {
290
      "EXTRA_MASTERD_ARGS": masterd_args,
291
      }
292

    
293
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
294
    if result.failed:
295
      msg = "Can't start Ganeti master: %s" % result.output
296
      logging.error(msg)
297
      err_msgs.append(msg)
298

    
299
  if err_msgs:
300
    _Fail("; ".join(err_msgs))
301

    
302

    
303
def StopMaster(stop_daemons):
304
  """Deactivate this node as master.
305

306
  The function will always try to deactivate the IP address of the
307
  master. It will also stop the master daemons depending on the
308
  stop_daemons parameter.
309

310
  @type stop_daemons: boolean
311
  @param stop_daemons: whether to also stop the master daemons
312
      (ganeti-masterd and ganeti-rapi)
313
  @rtype: None
314

315
  """
316
  # TODO: log and report back to the caller the error failures; we
317
  # need to decide in which case we fail the RPC for this
318

    
319
  # GetMasterInfo will raise an exception if not able to return data
320
  master_netdev, master_ip, _ = GetMasterInfo()
321

    
322
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
323
                         "dev", master_netdev])
324
  if result.failed:
325
    logging.error("Can't remove the master IP, error: %s", result.output)
326
    # but otherwise ignore the failure
327

    
328
  if stop_daemons:
329
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
330
    if result.failed:
331
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
332
                    " and error %s",
333
                    result.cmd, result.exit_code, result.output)
334

    
335

    
336
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
337
  """Joins this node to the cluster.
338

339
  This does the following:
340
      - updates the hostkeys of the machine (rsa and dsa)
341
      - adds the ssh private key to the user
342
      - adds the ssh public key to the users' authorized_keys file
343

344
  @type dsa: str
345
  @param dsa: the DSA private key to write
346
  @type dsapub: str
347
  @param dsapub: the DSA public key to write
348
  @type rsa: str
349
  @param rsa: the RSA private key to write
350
  @type rsapub: str
351
  @param rsapub: the RSA public key to write
352
  @type sshkey: str
353
  @param sshkey: the SSH private key to write
354
  @type sshpub: str
355
  @param sshpub: the SSH public key to write
356
  @rtype: boolean
357
  @return: the success of the operation
358

359
  """
360
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
361
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
362
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
363
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
364
  for name, content, mode in sshd_keys:
365
    utils.WriteFile(name, data=content, mode=mode)
366

    
367
  try:
368
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
369
                                                    mkdir=True)
370
  except errors.OpExecError, err:
371
    _Fail("Error while processing user ssh files: %s", err, exc=True)
372

    
373
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
374
    utils.WriteFile(name, data=content, mode=0600)
375

    
376
  utils.AddAuthorizedKey(auth_keys, sshpub)
377

    
378
  result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
379
  if result.failed:
380
    _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
381
          result.cmd, result.exit_code, result.output)
382

    
383

    
384
def LeaveCluster(modify_ssh_setup):
385
  """Cleans up and remove the current node.
386

387
  This function cleans up and prepares the current node to be removed
388
  from the cluster.
389

390
  If processing is successful, then it raises an
391
  L{errors.QuitGanetiException} which is used as a special case to
392
  shutdown the node daemon.
393

394
  @param modify_ssh_setup: boolean
395

396
  """
397
  _CleanDirectory(constants.DATA_DIR)
398
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
399
  JobQueuePurge()
400

    
401
  if modify_ssh_setup:
402
    try:
403
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
404

    
405
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
406

    
407
      utils.RemoveFile(priv_key)
408
      utils.RemoveFile(pub_key)
409
    except errors.OpExecError:
410
      logging.exception("Error while processing ssh files")
411

    
412
  try:
413
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
414
    utils.RemoveFile(constants.RAPI_CERT_FILE)
415
    utils.RemoveFile(constants.NODED_CERT_FILE)
416
  except: # pylint: disable-msg=W0702
417
    logging.exception("Error while removing cluster secrets")
418

    
419
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
420
  if result.failed:
421
    logging.error("Command %s failed with exitcode %s and error %s",
422
                  result.cmd, result.exit_code, result.output)
423

    
424
  # Raise a custom exception (handled in ganeti-noded)
425
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
426

    
427

    
428
def GetNodeInfo(vgname, hypervisor_type):
429
  """Gives back a hash with different information about the node.
430

431
  @type vgname: C{string}
432
  @param vgname: the name of the volume group to ask for disk space information
433
  @type hypervisor_type: C{str}
434
  @param hypervisor_type: the name of the hypervisor to ask for
435
      memory information
436
  @rtype: C{dict}
437
  @return: dictionary with the following keys:
438
      - vg_size is the size of the configured volume group in MiB
439
      - vg_free is the free size of the volume group in MiB
440
      - memory_dom0 is the memory allocated for domain0 in MiB
441
      - memory_free is the currently available (free) ram in MiB
442
      - memory_total is the total number of ram in MiB
443

444
  """
445
  outputarray = {}
446
  vginfo = _GetVGInfo(vgname)
447
  outputarray['vg_size'] = vginfo['vg_size']
448
  outputarray['vg_free'] = vginfo['vg_free']
449

    
450
  hyper = hypervisor.GetHypervisor(hypervisor_type)
451
  hyp_info = hyper.GetNodeInfo()
452
  if hyp_info is not None:
453
    outputarray.update(hyp_info)
454

    
455
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
456

    
457
  return outputarray
458

    
459

    
460
def VerifyNode(what, cluster_name):
461
  """Verify the status of the local node.
462

463
  Based on the input L{what} parameter, various checks are done on the
464
  local node.
465

466
  If the I{filelist} key is present, this list of
467
  files is checksummed and the file/checksum pairs are returned.
468

469
  If the I{nodelist} key is present, we check that we have
470
  connectivity via ssh with the target nodes (and check the hostname
471
  report).
472

473
  If the I{node-net-test} key is present, we check that we have
474
  connectivity to the given nodes via both primary IP and, if
475
  applicable, secondary IPs.
476

477
  @type what: C{dict}
478
  @param what: a dictionary of things to check:
479
      - filelist: list of files for which to compute checksums
480
      - nodelist: list of nodes we should check ssh communication with
481
      - node-net-test: list of nodes we should check node daemon port
482
        connectivity with
483
      - hypervisor: list with hypervisors to run the verify for
484
  @rtype: dict
485
  @return: a dictionary with the same keys as the input dict, and
486
      values representing the result of the checks
487

488
  """
489
  result = {}
490
  my_name = utils.HostInfo().name
491
  port = utils.GetDaemonPort(constants.NODED)
492

    
493
  if constants.NV_HYPERVISOR in what:
494
    result[constants.NV_HYPERVISOR] = tmp = {}
495
    for hv_name in what[constants.NV_HYPERVISOR]:
496
      try:
497
        val = hypervisor.GetHypervisor(hv_name).Verify()
498
      except errors.HypervisorError, err:
499
        val = "Error while checking hypervisor: %s" % str(err)
500
      tmp[hv_name] = val
501

    
502
  if constants.NV_FILELIST in what:
503
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
504
      what[constants.NV_FILELIST])
505

    
506
  if constants.NV_NODELIST in what:
507
    result[constants.NV_NODELIST] = tmp = {}
508
    random.shuffle(what[constants.NV_NODELIST])
509
    for node in what[constants.NV_NODELIST]:
510
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
511
      if not success:
512
        tmp[node] = message
513

    
514
  if constants.NV_NODENETTEST in what:
515
    result[constants.NV_NODENETTEST] = tmp = {}
516
    my_pip = my_sip = None
517
    for name, pip, sip in what[constants.NV_NODENETTEST]:
518
      if name == my_name:
519
        my_pip = pip
520
        my_sip = sip
521
        break
522
    if not my_pip:
523
      tmp[my_name] = ("Can't find my own primary/secondary IP"
524
                      " in the node list")
525
    else:
526
      for name, pip, sip in what[constants.NV_NODENETTEST]:
527
        fail = []
528
        if not utils.TcpPing(pip, port, source=my_pip):
529
          fail.append("primary")
530
        if sip != pip:
531
          if not utils.TcpPing(sip, port, source=my_sip):
532
            fail.append("secondary")
533
        if fail:
534
          tmp[name] = ("failure using the %s interface(s)" %
535
                       " and ".join(fail))
536

    
537
  if constants.NV_MASTERIP in what:
538
    # FIXME: add checks on incoming data structures (here and in the
539
    # rest of the function)
540
    master_name, master_ip = what[constants.NV_MASTERIP]
541
    if master_name == my_name:
542
      source = constants.LOCALHOST_IP_ADDRESS
543
    else:
544
      source = None
545
    result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
546
                                                  source=source)
547

    
548
  if constants.NV_LVLIST in what:
549
    try:
550
      val = GetVolumeList(what[constants.NV_LVLIST])
551
    except RPCFail, err:
552
      val = str(err)
553
    result[constants.NV_LVLIST] = val
554

    
555
  if constants.NV_INSTANCELIST in what:
556
    # GetInstanceList can fail
557
    try:
558
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
559
    except RPCFail, err:
560
      val = str(err)
561
    result[constants.NV_INSTANCELIST] = val
562

    
563
  if constants.NV_VGLIST in what:
564
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
565

    
566
  if constants.NV_PVLIST in what:
567
    result[constants.NV_PVLIST] = \
568
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
569
                                   filter_allocatable=False)
570

    
571
  if constants.NV_VERSION in what:
572
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
573
                                    constants.RELEASE_VERSION)
574

    
575
  if constants.NV_HVINFO in what:
576
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
577
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
578

    
579
  if constants.NV_DRBDLIST in what:
580
    try:
581
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
582
    except errors.BlockDeviceError, err:
583
      logging.warning("Can't get used minors list", exc_info=True)
584
      used_minors = str(err)
585
    result[constants.NV_DRBDLIST] = used_minors
586

    
587
  if constants.NV_NODESETUP in what:
588
    result[constants.NV_NODESETUP] = tmpr = []
589
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
590
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
591
                  " under /sys, missing required directories /sys/block"
592
                  " and /sys/class/net")
593
    if (not os.path.isdir("/proc/sys") or
594
        not os.path.isfile("/proc/sysrq-trigger")):
595
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
596
                  " under /proc, missing required directory /proc/sys and"
597
                  " the file /proc/sysrq-trigger")
598

    
599
  if constants.NV_TIME in what:
600
    result[constants.NV_TIME] = utils.SplitTime(time.time())
601

    
602
  if constants.NV_OSLIST in what:
603
    result[constants.NV_OSLIST] = DiagnoseOS()
604

    
605
  return result
606

    
607

    
608
def GetVolumeList(vg_name):
609
  """Compute list of logical volumes and their size.
610

611
  @type vg_name: str
612
  @param vg_name: the volume group whose LVs we should list
613
  @rtype: dict
614
  @return:
615
      dictionary of all partions (key) with value being a tuple of
616
      their size (in MiB), inactive and online status::
617

618
        {'test1': ('20.06', True, True)}
619

620
      in case of errors, a string is returned with the error
621
      details.
622

623
  """
624
  lvs = {}
625
  sep = '|'
626
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
627
                         "--separator=%s" % sep,
628
                         "-olv_name,lv_size,lv_attr", vg_name])
629
  if result.failed:
630
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
631

    
632
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
633
  for line in result.stdout.splitlines():
634
    line = line.strip()
635
    match = valid_line_re.match(line)
636
    if not match:
637
      logging.error("Invalid line returned from lvs output: '%s'", line)
638
      continue
639
    name, size, attr = match.groups()
640
    inactive = attr[4] == '-'
641
    online = attr[5] == 'o'
642
    virtual = attr[0] == 'v'
643
    if virtual:
644
      # we don't want to report such volumes as existing, since they
645
      # don't really hold data
646
      continue
647
    lvs[name] = (size, inactive, online)
648

    
649
  return lvs
650

    
651

    
652
def ListVolumeGroups():
653
  """List the volume groups and their size.
654

655
  @rtype: dict
656
  @return: dictionary with keys volume name and values the
657
      size of the volume
658

659
  """
660
  return utils.ListVolumeGroups()
661

    
662

    
663
def NodeVolumes():
664
  """List all volumes on this node.
665

666
  @rtype: list
667
  @return:
668
    A list of dictionaries, each having four keys:
669
      - name: the logical volume name,
670
      - size: the size of the logical volume
671
      - dev: the physical device on which the LV lives
672
      - vg: the volume group to which it belongs
673

674
    In case of errors, we return an empty list and log the
675
    error.
676

677
    Note that since a logical volume can live on multiple physical
678
    volumes, the resulting list might include a logical volume
679
    multiple times.
680

681
  """
682
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
683
                         "--separator=|",
684
                         "--options=lv_name,lv_size,devices,vg_name"])
685
  if result.failed:
686
    _Fail("Failed to list logical volumes, lvs output: %s",
687
          result.output)
688

    
689
  def parse_dev(dev):
690
    return dev.split('(')[0]
691

    
692
  def handle_dev(dev):
693
    return [parse_dev(x) for x in dev.split(",")]
694

    
695
  def map_line(line):
696
    line = [v.strip() for v in line]
697
    return [{'name': line[0], 'size': line[1],
698
             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
699

    
700
  all_devs = []
701
  for line in result.stdout.splitlines():
702
    if line.count('|') >= 3:
703
      all_devs.extend(map_line(line.split('|')))
704
    else:
705
      logging.warning("Strange line in the output from lvs: '%s'", line)
706
  return all_devs
707

    
708

    
709
def BridgesExist(bridges_list):
710
  """Check if a list of bridges exist on the current node.
711

712
  @rtype: boolean
713
  @return: C{True} if all of them exist, C{False} otherwise
714

715
  """
716
  missing = []
717
  for bridge in bridges_list:
718
    if not utils.BridgeExists(bridge):
719
      missing.append(bridge)
720

    
721
  if missing:
722
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
723

    
724

    
725
def GetInstanceList(hypervisor_list):
726
  """Provides a list of instances.
727

728
  @type hypervisor_list: list
729
  @param hypervisor_list: the list of hypervisors to query information
730

731
  @rtype: list
732
  @return: a list of all running instances on the current node
733
    - instance1.example.com
734
    - instance2.example.com
735

736
  """
737
  results = []
738
  for hname in hypervisor_list:
739
    try:
740
      names = hypervisor.GetHypervisor(hname).ListInstances()
741
      results.extend(names)
742
    except errors.HypervisorError, err:
743
      _Fail("Error enumerating instances (hypervisor %s): %s",
744
            hname, err, exc=True)
745

    
746
  return results
747

    
748

    
749
def GetInstanceInfo(instance, hname):
750
  """Gives back the information about an instance as a dictionary.
751

752
  @type instance: string
753
  @param instance: the instance name
754
  @type hname: string
755
  @param hname: the hypervisor type of the instance
756

757
  @rtype: dict
758
  @return: dictionary with the following keys:
759
      - memory: memory size of instance (int)
760
      - state: xen state of instance (string)
761
      - time: cpu time of instance (float)
762

763
  """
764
  output = {}
765

    
766
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
767
  if iinfo is not None:
768
    output['memory'] = iinfo[2]
769
    output['state'] = iinfo[4]
770
    output['time'] = iinfo[5]
771

    
772
  return output
773

    
774

    
775
def GetInstanceMigratable(instance):
776
  """Gives whether an instance can be migrated.
777

778
  @type instance: L{objects.Instance}
779
  @param instance: object representing the instance to be checked.
780

781
  @rtype: tuple
782
  @return: tuple of (result, description) where:
783
      - result: whether the instance can be migrated or not
784
      - description: a description of the issue, if relevant
785

786
  """
787
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
788
  iname = instance.name
789
  if iname not in hyper.ListInstances():
790
    _Fail("Instance %s is not running", iname)
791

    
792
  for idx in range(len(instance.disks)):
793
    link_name = _GetBlockDevSymlinkPath(iname, idx)
794
    if not os.path.islink(link_name):
795
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
796

    
797

    
798
def GetAllInstancesInfo(hypervisor_list):
799
  """Gather data about all instances.
800

801
  This is the equivalent of L{GetInstanceInfo}, except that it
802
  computes data for all instances at once, thus being faster if one
803
  needs data about more than one instance.
804

805
  @type hypervisor_list: list
806
  @param hypervisor_list: list of hypervisors to query for instance data
807

808
  @rtype: dict
809
  @return: dictionary of instance: data, with data having the following keys:
810
      - memory: memory size of instance (int)
811
      - state: xen state of instance (string)
812
      - time: cpu time of instance (float)
813
      - vcpus: the number of vcpus
814

815
  """
816
  output = {}
817

    
818
  for hname in hypervisor_list:
819
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
820
    if iinfo:
821
      for name, _, memory, vcpus, state, times in iinfo:
822
        value = {
823
          'memory': memory,
824
          'vcpus': vcpus,
825
          'state': state,
826
          'time': times,
827
          }
828
        if name in output:
829
          # we only check static parameters, like memory and vcpus,
830
          # and not state and time which can change between the
831
          # invocations of the different hypervisors
832
          for key in 'memory', 'vcpus':
833
            if value[key] != output[name][key]:
834
              _Fail("Instance %s is running twice"
835
                    " with different parameters", name)
836
        output[name] = value
837

    
838
  return output
839

    
840

    
841
def _InstanceLogName(kind, os_name, instance):
842
  """Compute the OS log filename for a given instance and operation.
843

844
  The instance name and os name are passed in as strings since not all
845
  operations have these as part of an instance object.
846

847
  @type kind: string
848
  @param kind: the operation type (e.g. add, import, etc.)
849
  @type os_name: string
850
  @param os_name: the os name
851
  @type instance: string
852
  @param instance: the name of the instance being imported/added/etc.
853

854
  """
855
  # TODO: Use tempfile.mkstemp to create unique filename
856
  base = ("%s-%s-%s-%s.log" %
857
          (kind, os_name, instance, utils.TimestampForFilename()))
858
  return utils.PathJoin(constants.LOG_OS_DIR, base)
859

    
860

    
861
def InstanceOsAdd(instance, reinstall, debug):
862
  """Add an OS to an instance.
863

864
  @type instance: L{objects.Instance}
865
  @param instance: Instance whose OS is to be installed
866
  @type reinstall: boolean
867
  @param reinstall: whether this is an instance reinstall
868
  @type debug: integer
869
  @param debug: debug level, passed to the OS scripts
870
  @rtype: None
871

872
  """
873
  inst_os = OSFromDisk(instance.os)
874

    
875
  create_env = OSEnvironment(instance, inst_os, debug)
876
  if reinstall:
877
    create_env['INSTANCE_REINSTALL'] = "1"
878

    
879
  logfile = _InstanceLogName("add", instance.os, instance.name)
880

    
881
  result = utils.RunCmd([inst_os.create_script], env=create_env,
882
                        cwd=inst_os.path, output=logfile,)
883
  if result.failed:
884
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
885
                  " output: %s", result.cmd, result.fail_reason, logfile,
886
                  result.output)
887
    lines = [utils.SafeEncode(val)
888
             for val in utils.TailFile(logfile, lines=20)]
889
    _Fail("OS create script failed (%s), last lines in the"
890
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
891

    
892

    
893
def RunRenameInstance(instance, old_name, debug):
894
  """Run the OS rename script for an instance.
895

896
  @type instance: L{objects.Instance}
897
  @param instance: Instance whose OS is to be installed
898
  @type old_name: string
899
  @param old_name: previous instance name
900
  @type debug: integer
901
  @param debug: debug level, passed to the OS scripts
902
  @rtype: boolean
903
  @return: the success of the operation
904

905
  """
906
  inst_os = OSFromDisk(instance.os)
907

    
908
  rename_env = OSEnvironment(instance, inst_os, debug)
909
  rename_env['OLD_INSTANCE_NAME'] = old_name
910

    
911
  logfile = _InstanceLogName("rename", instance.os,
912
                             "%s-%s" % (old_name, instance.name))
913

    
914
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
915
                        cwd=inst_os.path, output=logfile)
916

    
917
  if result.failed:
918
    logging.error("os create command '%s' returned error: %s output: %s",
919
                  result.cmd, result.fail_reason, result.output)
920
    lines = [utils.SafeEncode(val)
921
             for val in utils.TailFile(logfile, lines=20)]
922
    _Fail("OS rename script failed (%s), last lines in the"
923
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
924

    
925

    
926
def _GetVGInfo(vg_name):
927
  """Get information about the volume group.
928

929
  @type vg_name: str
930
  @param vg_name: the volume group which we query
931
  @rtype: dict
932
  @return:
933
    A dictionary with the following keys:
934
      - C{vg_size} is the total size of the volume group in MiB
935
      - C{vg_free} is the free size of the volume group in MiB
936
      - C{pv_count} are the number of physical disks in that VG
937

938
    If an error occurs during gathering of data, we return the same dict
939
    with keys all set to None.
940

941
  """
942
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
943

    
944
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
945
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
946

    
947
  if retval.failed:
948
    logging.error("volume group %s not present", vg_name)
949
    return retdic
950
  valarr = retval.stdout.strip().rstrip(':').split(':')
951
  if len(valarr) == 3:
952
    try:
953
      retdic = {
954
        "vg_size": int(round(float(valarr[0]), 0)),
955
        "vg_free": int(round(float(valarr[1]), 0)),
956
        "pv_count": int(valarr[2]),
957
        }
958
    except (TypeError, ValueError), err:
959
      logging.exception("Fail to parse vgs output: %s", err)
960
  else:
961
    logging.error("vgs output has the wrong number of fields (expected"
962
                  " three): %s", str(valarr))
963
  return retdic
964

    
965

    
966
def _GetBlockDevSymlinkPath(instance_name, idx):
967
  return utils.PathJoin(constants.DISK_LINKS_DIR,
968
                        "%s:%d" % (instance_name, idx))
969

    
970

    
971
def _SymlinkBlockDev(instance_name, device_path, idx):
972
  """Set up symlinks to a instance's block device.
973

974
  This is an auxiliary function run when an instance is start (on the primary
975
  node) or when an instance is migrated (on the target node).
976

977

978
  @param instance_name: the name of the target instance
979
  @param device_path: path of the physical block device, on the node
980
  @param idx: the disk index
981
  @return: absolute path to the disk's symlink
982

983
  """
984
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
985
  try:
986
    os.symlink(device_path, link_name)
987
  except OSError, err:
988
    if err.errno == errno.EEXIST:
989
      if (not os.path.islink(link_name) or
990
          os.readlink(link_name) != device_path):
991
        os.remove(link_name)
992
        os.symlink(device_path, link_name)
993
    else:
994
      raise
995

    
996
  return link_name
997

    
998

    
999
def _RemoveBlockDevLinks(instance_name, disks):
1000
  """Remove the block device symlinks belonging to the given instance.
1001

1002
  """
1003
  for idx, _ in enumerate(disks):
1004
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1005
    if os.path.islink(link_name):
1006
      try:
1007
        os.remove(link_name)
1008
      except OSError:
1009
        logging.exception("Can't remove symlink '%s'", link_name)
1010

    
1011

    
1012
def _GatherAndLinkBlockDevs(instance):
1013
  """Set up an instance's block device(s).
1014

1015
  This is run on the primary node at instance startup. The block
1016
  devices must be already assembled.
1017

1018
  @type instance: L{objects.Instance}
1019
  @param instance: the instance whose disks we shoul assemble
1020
  @rtype: list
1021
  @return: list of (disk_object, device_path)
1022

1023
  """
1024
  block_devices = []
1025
  for idx, disk in enumerate(instance.disks):
1026
    device = _RecursiveFindBD(disk)
1027
    if device is None:
1028
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1029
                                    str(disk))
1030
    device.Open()
1031
    try:
1032
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1033
    except OSError, e:
1034
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1035
                                    e.strerror)
1036

    
1037
    block_devices.append((disk, link_name))
1038

    
1039
  return block_devices
1040

    
1041

    
1042
def StartInstance(instance):
1043
  """Start an instance.
1044

1045
  @type instance: L{objects.Instance}
1046
  @param instance: the instance object
1047
  @rtype: None
1048

1049
  """
1050
  running_instances = GetInstanceList([instance.hypervisor])
1051

    
1052
  if instance.name in running_instances:
1053
    logging.info("Instance %s already running, not starting", instance.name)
1054
    return
1055

    
1056
  try:
1057
    block_devices = _GatherAndLinkBlockDevs(instance)
1058
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1059
    hyper.StartInstance(instance, block_devices)
1060
  except errors.BlockDeviceError, err:
1061
    _Fail("Block device error: %s", err, exc=True)
1062
  except errors.HypervisorError, err:
1063
    _RemoveBlockDevLinks(instance.name, instance.disks)
1064
    _Fail("Hypervisor error: %s", err, exc=True)
1065

    
1066

    
1067
def InstanceShutdown(instance, timeout):
1068
  """Shut an instance down.
1069

1070
  @note: this functions uses polling with a hardcoded timeout.
1071

1072
  @type instance: L{objects.Instance}
1073
  @param instance: the instance object
1074
  @type timeout: integer
1075
  @param timeout: maximum timeout for soft shutdown
1076
  @rtype: None
1077

1078
  """
1079
  hv_name = instance.hypervisor
1080
  hyper = hypervisor.GetHypervisor(hv_name)
1081
  iname = instance.name
1082

    
1083
  if instance.name not in hyper.ListInstances():
1084
    logging.info("Instance %s not running, doing nothing", iname)
1085
    return
1086

    
1087
  class _TryShutdown:
1088
    def __init__(self):
1089
      self.tried_once = False
1090

    
1091
    def __call__(self):
1092
      if iname not in hyper.ListInstances():
1093
        return
1094

    
1095
      try:
1096
        hyper.StopInstance(instance, retry=self.tried_once)
1097
      except errors.HypervisorError, err:
1098
        if iname not in hyper.ListInstances():
1099
          # if the instance is no longer existing, consider this a
1100
          # success and go to cleanup
1101
          return
1102

    
1103
        _Fail("Failed to stop instance %s: %s", iname, err)
1104

    
1105
      self.tried_once = True
1106

    
1107
      raise utils.RetryAgain()
1108

    
1109
  try:
1110
    utils.Retry(_TryShutdown(), 5, timeout)
1111
  except utils.RetryTimeout:
1112
    # the shutdown did not succeed
1113
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1114

    
1115
    try:
1116
      hyper.StopInstance(instance, force=True)
1117
    except errors.HypervisorError, err:
1118
      if iname in hyper.ListInstances():
1119
        # only raise an error if the instance still exists, otherwise
1120
        # the error could simply be "instance ... unknown"!
1121
        _Fail("Failed to force stop instance %s: %s", iname, err)
1122

    
1123
    time.sleep(1)
1124

    
1125
    if iname in hyper.ListInstances():
1126
      _Fail("Could not shutdown instance %s even by destroy", iname)
1127

    
1128
  try:
1129
    hyper.CleanupInstance(instance.name)
1130
  except errors.HypervisorError, err:
1131
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1132

    
1133
  _RemoveBlockDevLinks(iname, instance.disks)
1134

    
1135

    
1136
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1137
  """Reboot an instance.
1138

1139
  @type instance: L{objects.Instance}
1140
  @param instance: the instance object to reboot
1141
  @type reboot_type: str
1142
  @param reboot_type: the type of reboot, one the following
1143
    constants:
1144
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1145
        instance OS, do not recreate the VM
1146
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1147
        restart the VM (at the hypervisor level)
1148
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1149
        not accepted here, since that mode is handled differently, in
1150
        cmdlib, and translates into full stop and start of the
1151
        instance (instead of a call_instance_reboot RPC)
1152
  @type shutdown_timeout: integer
1153
  @param shutdown_timeout: maximum timeout for soft shutdown
1154
  @rtype: None
1155

1156
  """
1157
  running_instances = GetInstanceList([instance.hypervisor])
1158

    
1159
  if instance.name not in running_instances:
1160
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1161

    
1162
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1163
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1164
    try:
1165
      hyper.RebootInstance(instance)
1166
    except errors.HypervisorError, err:
1167
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1168
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1169
    try:
1170
      InstanceShutdown(instance, shutdown_timeout)
1171
      return StartInstance(instance)
1172
    except errors.HypervisorError, err:
1173
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1174
  else:
1175
    _Fail("Invalid reboot_type received: %s", reboot_type)
1176

    
1177

    
1178
def MigrationInfo(instance):
1179
  """Gather information about an instance to be migrated.
1180

1181
  @type instance: L{objects.Instance}
1182
  @param instance: the instance definition
1183

1184
  """
1185
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1186
  try:
1187
    info = hyper.MigrationInfo(instance)
1188
  except errors.HypervisorError, err:
1189
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1190
  return info
1191

    
1192

    
1193
def AcceptInstance(instance, info, target):
1194
  """Prepare the node to accept an instance.
1195

1196
  @type instance: L{objects.Instance}
1197
  @param instance: the instance definition
1198
  @type info: string/data (opaque)
1199
  @param info: migration information, from the source node
1200
  @type target: string
1201
  @param target: target host (usually ip), on this node
1202

1203
  """
1204
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1205
  try:
1206
    hyper.AcceptInstance(instance, info, target)
1207
  except errors.HypervisorError, err:
1208
    _Fail("Failed to accept instance: %s", err, exc=True)
1209

    
1210

    
1211
def FinalizeMigration(instance, info, success):
1212
  """Finalize any preparation to accept an instance.
1213

1214
  @type instance: L{objects.Instance}
1215
  @param instance: the instance definition
1216
  @type info: string/data (opaque)
1217
  @param info: migration information, from the source node
1218
  @type success: boolean
1219
  @param success: whether the migration was a success or a failure
1220

1221
  """
1222
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1223
  try:
1224
    hyper.FinalizeMigration(instance, info, success)
1225
  except errors.HypervisorError, err:
1226
    _Fail("Failed to finalize migration: %s", err, exc=True)
1227

    
1228

    
1229
def MigrateInstance(instance, target, live):
1230
  """Migrates an instance to another node.
1231

1232
  @type instance: L{objects.Instance}
1233
  @param instance: the instance definition
1234
  @type target: string
1235
  @param target: the target node name
1236
  @type live: boolean
1237
  @param live: whether the migration should be done live or not (the
1238
      interpretation of this parameter is left to the hypervisor)
1239
  @rtype: tuple
1240
  @return: a tuple of (success, msg) where:
1241
      - succes is a boolean denoting the success/failure of the operation
1242
      - msg is a string with details in case of failure
1243

1244
  """
1245
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1246

    
1247
  try:
1248
    hyper.MigrateInstance(instance, target, live)
1249
  except errors.HypervisorError, err:
1250
    _Fail("Failed to migrate instance: %s", err, exc=True)
1251

    
1252

    
1253
def BlockdevCreate(disk, size, owner, on_primary, info):
1254
  """Creates a block device for an instance.
1255

1256
  @type disk: L{objects.Disk}
1257
  @param disk: the object describing the disk we should create
1258
  @type size: int
1259
  @param size: the size of the physical underlying device, in MiB
1260
  @type owner: str
1261
  @param owner: the name of the instance for which disk is created,
1262
      used for device cache data
1263
  @type on_primary: boolean
1264
  @param on_primary:  indicates if it is the primary node or not
1265
  @type info: string
1266
  @param info: string that will be sent to the physical device
1267
      creation, used for example to set (LVM) tags on LVs
1268

1269
  @return: the new unique_id of the device (this can sometime be
1270
      computed only after creation), or None. On secondary nodes,
1271
      it's not required to return anything.
1272

1273
  """
1274
  # TODO: remove the obsolete 'size' argument
1275
  # pylint: disable-msg=W0613
1276
  clist = []
1277
  if disk.children:
1278
    for child in disk.children:
1279
      try:
1280
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1281
      except errors.BlockDeviceError, err:
1282
        _Fail("Can't assemble device %s: %s", child, err)
1283
      if on_primary or disk.AssembleOnSecondary():
1284
        # we need the children open in case the device itself has to
1285
        # be assembled
1286
        try:
1287
          # pylint: disable-msg=E1103
1288
          crdev.Open()
1289
        except errors.BlockDeviceError, err:
1290
          _Fail("Can't make child '%s' read-write: %s", child, err)
1291
      clist.append(crdev)
1292

    
1293
  try:
1294
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1295
  except errors.BlockDeviceError, err:
1296
    _Fail("Can't create block device: %s", err)
1297

    
1298
  if on_primary or disk.AssembleOnSecondary():
1299
    try:
1300
      device.Assemble()
1301
    except errors.BlockDeviceError, err:
1302
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1303
    device.SetSyncSpeed(constants.SYNC_SPEED)
1304
    if on_primary or disk.OpenOnSecondary():
1305
      try:
1306
        device.Open(force=True)
1307
      except errors.BlockDeviceError, err:
1308
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1309
    DevCacheManager.UpdateCache(device.dev_path, owner,
1310
                                on_primary, disk.iv_name)
1311

    
1312
  device.SetInfo(info)
1313

    
1314
  return device.unique_id
1315

    
1316

    
1317
def BlockdevRemove(disk):
1318
  """Remove a block device.
1319

1320
  @note: This is intended to be called recursively.
1321

1322
  @type disk: L{objects.Disk}
1323
  @param disk: the disk object we should remove
1324
  @rtype: boolean
1325
  @return: the success of the operation
1326

1327
  """
1328
  msgs = []
1329
  try:
1330
    rdev = _RecursiveFindBD(disk)
1331
  except errors.BlockDeviceError, err:
1332
    # probably can't attach
1333
    logging.info("Can't attach to device %s in remove", disk)
1334
    rdev = None
1335
  if rdev is not None:
1336
    r_path = rdev.dev_path
1337
    try:
1338
      rdev.Remove()
1339
    except errors.BlockDeviceError, err:
1340
      msgs.append(str(err))
1341
    if not msgs:
1342
      DevCacheManager.RemoveCache(r_path)
1343

    
1344
  if disk.children:
1345
    for child in disk.children:
1346
      try:
1347
        BlockdevRemove(child)
1348
      except RPCFail, err:
1349
        msgs.append(str(err))
1350

    
1351
  if msgs:
1352
    _Fail("; ".join(msgs))
1353

    
1354

    
1355
def _RecursiveAssembleBD(disk, owner, as_primary):
1356
  """Activate a block device for an instance.
1357

1358
  This is run on the primary and secondary nodes for an instance.
1359

1360
  @note: this function is called recursively.
1361

1362
  @type disk: L{objects.Disk}
1363
  @param disk: the disk we try to assemble
1364
  @type owner: str
1365
  @param owner: the name of the instance which owns the disk
1366
  @type as_primary: boolean
1367
  @param as_primary: if we should make the block device
1368
      read/write
1369

1370
  @return: the assembled device or None (in case no device
1371
      was assembled)
1372
  @raise errors.BlockDeviceError: in case there is an error
1373
      during the activation of the children or the device
1374
      itself
1375

1376
  """
1377
  children = []
1378
  if disk.children:
1379
    mcn = disk.ChildrenNeeded()
1380
    if mcn == -1:
1381
      mcn = 0 # max number of Nones allowed
1382
    else:
1383
      mcn = len(disk.children) - mcn # max number of Nones
1384
    for chld_disk in disk.children:
1385
      try:
1386
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1387
      except errors.BlockDeviceError, err:
1388
        if children.count(None) >= mcn:
1389
          raise
1390
        cdev = None
1391
        logging.error("Error in child activation (but continuing): %s",
1392
                      str(err))
1393
      children.append(cdev)
1394

    
1395
  if as_primary or disk.AssembleOnSecondary():
1396
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1397
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1398
    result = r_dev
1399
    if as_primary or disk.OpenOnSecondary():
1400
      r_dev.Open()
1401
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1402
                                as_primary, disk.iv_name)
1403

    
1404
  else:
1405
    result = True
1406
  return result
1407

    
1408

    
1409
def BlockdevAssemble(disk, owner, as_primary):
1410
  """Activate a block device for an instance.
1411

1412
  This is a wrapper over _RecursiveAssembleBD.
1413

1414
  @rtype: str or boolean
1415
  @return: a C{/dev/...} path for primary nodes, and
1416
      C{True} for secondary nodes
1417

1418
  """
1419
  try:
1420
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1421
    if isinstance(result, bdev.BlockDev):
1422
      # pylint: disable-msg=E1103
1423
      result = result.dev_path
1424
  except errors.BlockDeviceError, err:
1425
    _Fail("Error while assembling disk: %s", err, exc=True)
1426

    
1427
  return result
1428

    
1429

    
1430
def BlockdevShutdown(disk):
1431
  """Shut down a block device.
1432

1433
  First, if the device is assembled (Attach() is successful), then
1434
  the device is shutdown. Then the children of the device are
1435
  shutdown.
1436

1437
  This function is called recursively. Note that we don't cache the
1438
  children or such, as oppossed to assemble, shutdown of different
1439
  devices doesn't require that the upper device was active.
1440

1441
  @type disk: L{objects.Disk}
1442
  @param disk: the description of the disk we should
1443
      shutdown
1444
  @rtype: None
1445

1446
  """
1447
  msgs = []
1448
  r_dev = _RecursiveFindBD(disk)
1449
  if r_dev is not None:
1450
    r_path = r_dev.dev_path
1451
    try:
1452
      r_dev.Shutdown()
1453
      DevCacheManager.RemoveCache(r_path)
1454
    except errors.BlockDeviceError, err:
1455
      msgs.append(str(err))
1456

    
1457
  if disk.children:
1458
    for child in disk.children:
1459
      try:
1460
        BlockdevShutdown(child)
1461
      except RPCFail, err:
1462
        msgs.append(str(err))
1463

    
1464
  if msgs:
1465
    _Fail("; ".join(msgs))
1466

    
1467

    
1468
def BlockdevAddchildren(parent_cdev, new_cdevs):
1469
  """Extend a mirrored block device.
1470

1471
  @type parent_cdev: L{objects.Disk}
1472
  @param parent_cdev: the disk to which we should add children
1473
  @type new_cdevs: list of L{objects.Disk}
1474
  @param new_cdevs: the list of children which we should add
1475
  @rtype: None
1476

1477
  """
1478
  parent_bdev = _RecursiveFindBD(parent_cdev)
1479
  if parent_bdev is None:
1480
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1481
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1482
  if new_bdevs.count(None) > 0:
1483
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1484
  parent_bdev.AddChildren(new_bdevs)
1485

    
1486

    
1487
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1488
  """Shrink a mirrored block device.
1489

1490
  @type parent_cdev: L{objects.Disk}
1491
  @param parent_cdev: the disk from which we should remove children
1492
  @type new_cdevs: list of L{objects.Disk}
1493
  @param new_cdevs: the list of children which we should remove
1494
  @rtype: None
1495

1496
  """
1497
  parent_bdev = _RecursiveFindBD(parent_cdev)
1498
  if parent_bdev is None:
1499
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1500
  devs = []
1501
  for disk in new_cdevs:
1502
    rpath = disk.StaticDevPath()
1503
    if rpath is None:
1504
      bd = _RecursiveFindBD(disk)
1505
      if bd is None:
1506
        _Fail("Can't find device %s while removing children", disk)
1507
      else:
1508
        devs.append(bd.dev_path)
1509
    else:
1510
      if not utils.IsNormAbsPath(rpath):
1511
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1512
      devs.append(rpath)
1513
  parent_bdev.RemoveChildren(devs)
1514

    
1515

    
1516
def BlockdevGetmirrorstatus(disks):
1517
  """Get the mirroring status of a list of devices.
1518

1519
  @type disks: list of L{objects.Disk}
1520
  @param disks: the list of disks which we should query
1521
  @rtype: disk
1522
  @return:
1523
      a list of (mirror_done, estimated_time) tuples, which
1524
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1525
  @raise errors.BlockDeviceError: if any of the disks cannot be
1526
      found
1527

1528
  """
1529
  stats = []
1530
  for dsk in disks:
1531
    rbd = _RecursiveFindBD(dsk)
1532
    if rbd is None:
1533
      _Fail("Can't find device %s", dsk)
1534

    
1535
    stats.append(rbd.CombinedSyncStatus())
1536

    
1537
  return stats
1538

    
1539

    
1540
def _RecursiveFindBD(disk):
1541
  """Check if a device is activated.
1542

1543
  If so, return information about the real device.
1544

1545
  @type disk: L{objects.Disk}
1546
  @param disk: the disk object we need to find
1547

1548
  @return: None if the device can't be found,
1549
      otherwise the device instance
1550

1551
  """
1552
  children = []
1553
  if disk.children:
1554
    for chdisk in disk.children:
1555
      children.append(_RecursiveFindBD(chdisk))
1556

    
1557
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1558

    
1559

    
1560
def _OpenRealBD(disk):
1561
  """Opens the underlying block device of a disk.
1562

1563
  @type disk: L{objects.Disk}
1564
  @param disk: the disk object we want to open
1565

1566
  """
1567
  real_disk = _RecursiveFindBD(disk)
1568
  if real_disk is None:
1569
    _Fail("Block device '%s' is not set up", disk)
1570

    
1571
  real_disk.Open()
1572

    
1573
  return real_disk
1574

    
1575

    
1576
def BlockdevFind(disk):
1577
  """Check if a device is activated.
1578

1579
  If it is, return information about the real device.
1580

1581
  @type disk: L{objects.Disk}
1582
  @param disk: the disk to find
1583
  @rtype: None or objects.BlockDevStatus
1584
  @return: None if the disk cannot be found, otherwise a the current
1585
           information
1586

1587
  """
1588
  try:
1589
    rbd = _RecursiveFindBD(disk)
1590
  except errors.BlockDeviceError, err:
1591
    _Fail("Failed to find device: %s", err, exc=True)
1592

    
1593
  if rbd is None:
1594
    return None
1595

    
1596
  return rbd.GetSyncStatus()
1597

    
1598

    
1599
def BlockdevGetsize(disks):
1600
  """Computes the size of the given disks.
1601

1602
  If a disk is not found, returns None instead.
1603

1604
  @type disks: list of L{objects.Disk}
1605
  @param disks: the list of disk to compute the size for
1606
  @rtype: list
1607
  @return: list with elements None if the disk cannot be found,
1608
      otherwise the size
1609

1610
  """
1611
  result = []
1612
  for cf in disks:
1613
    try:
1614
      rbd = _RecursiveFindBD(cf)
1615
    except errors.BlockDeviceError:
1616
      result.append(None)
1617
      continue
1618
    if rbd is None:
1619
      result.append(None)
1620
    else:
1621
      result.append(rbd.GetActualSize())
1622
  return result
1623

    
1624

    
1625
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1626
  """Export a block device to a remote node.
1627

1628
  @type disk: L{objects.Disk}
1629
  @param disk: the description of the disk to export
1630
  @type dest_node: str
1631
  @param dest_node: the destination node to export to
1632
  @type dest_path: str
1633
  @param dest_path: the destination path on the target node
1634
  @type cluster_name: str
1635
  @param cluster_name: the cluster name, needed for SSH hostalias
1636
  @rtype: None
1637

1638
  """
1639
  real_disk = _OpenRealBD(disk)
1640

    
1641
  # the block size on the read dd is 1MiB to match our units
1642
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1643
                               "dd if=%s bs=1048576 count=%s",
1644
                               real_disk.dev_path, str(disk.size))
1645

    
1646
  # we set here a smaller block size as, due to ssh buffering, more
1647
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1648
  # device is not already there or we pass a wrong path; we use
1649
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1650
  # to not buffer too much memory; this means that at best, we flush
1651
  # every 64k, which will not be very fast
1652
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1653
                                " oflag=dsync", dest_path)
1654

    
1655
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1656
                                                   constants.GANETI_RUNAS,
1657
                                                   destcmd)
1658

    
1659
  # all commands have been checked, so we're safe to combine them
1660
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1661

    
1662
  result = utils.RunCmd(["bash", "-c", command])
1663

    
1664
  if result.failed:
1665
    _Fail("Disk copy command '%s' returned error: %s"
1666
          " output: %s", command, result.fail_reason, result.output)
1667

    
1668

    
1669
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1670
  """Write a file to the filesystem.
1671

1672
  This allows the master to overwrite(!) a file. It will only perform
1673
  the operation if the file belongs to a list of configuration files.
1674

1675
  @type file_name: str
1676
  @param file_name: the target file name
1677
  @type data: str
1678
  @param data: the new contents of the file
1679
  @type mode: int
1680
  @param mode: the mode to give the file (can be None)
1681
  @type uid: int
1682
  @param uid: the owner of the file (can be -1 for default)
1683
  @type gid: int
1684
  @param gid: the group of the file (can be -1 for default)
1685
  @type atime: float
1686
  @param atime: the atime to set on the file (can be None)
1687
  @type mtime: float
1688
  @param mtime: the mtime to set on the file (can be None)
1689
  @rtype: None
1690

1691
  """
1692
  if not os.path.isabs(file_name):
1693
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1694

    
1695
  if file_name not in _ALLOWED_UPLOAD_FILES:
1696
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1697
          file_name)
1698

    
1699
  raw_data = _Decompress(data)
1700

    
1701
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1702
                  atime=atime, mtime=mtime)
1703

    
1704

    
1705
def WriteSsconfFiles(values):
1706
  """Update all ssconf files.
1707

1708
  Wrapper around the SimpleStore.WriteFiles.
1709

1710
  """
1711
  ssconf.SimpleStore().WriteFiles(values)
1712

    
1713

    
1714
def _ErrnoOrStr(err):
1715
  """Format an EnvironmentError exception.
1716

1717
  If the L{err} argument has an errno attribute, it will be looked up
1718
  and converted into a textual C{E...} description. Otherwise the
1719
  string representation of the error will be returned.
1720

1721
  @type err: L{EnvironmentError}
1722
  @param err: the exception to format
1723

1724
  """
1725
  if hasattr(err, 'errno'):
1726
    detail = errno.errorcode[err.errno]
1727
  else:
1728
    detail = str(err)
1729
  return detail
1730

    
1731

    
1732
def _OSOndiskAPIVersion(os_dir):
1733
  """Compute and return the API version of a given OS.
1734

1735
  This function will try to read the API version of the OS residing in
1736
  the 'os_dir' directory.
1737

1738
  @type os_dir: str
1739
  @param os_dir: the directory in which we should look for the OS
1740
  @rtype: tuple
1741
  @return: tuple (status, data) with status denoting the validity and
1742
      data holding either the vaid versions or an error message
1743

1744
  """
1745
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1746

    
1747
  try:
1748
    st = os.stat(api_file)
1749
  except EnvironmentError, err:
1750
    return False, ("Required file '%s' not found under path %s: %s" %
1751
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1752

    
1753
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1754
    return False, ("File '%s' in %s is not a regular file" %
1755
                   (constants.OS_API_FILE, os_dir))
1756

    
1757
  try:
1758
    api_versions = utils.ReadFile(api_file).splitlines()
1759
  except EnvironmentError, err:
1760
    return False, ("Error while reading the API version file at %s: %s" %
1761
                   (api_file, _ErrnoOrStr(err)))
1762

    
1763
  try:
1764
    api_versions = [int(version.strip()) for version in api_versions]
1765
  except (TypeError, ValueError), err:
1766
    return False, ("API version(s) can't be converted to integer: %s" %
1767
                   str(err))
1768

    
1769
  return True, api_versions
1770

    
1771

    
1772
def DiagnoseOS(top_dirs=None):
1773
  """Compute the validity for all OSes.
1774

1775
  @type top_dirs: list
1776
  @param top_dirs: the list of directories in which to
1777
      search (if not given defaults to
1778
      L{constants.OS_SEARCH_PATH})
1779
  @rtype: list of L{objects.OS}
1780
  @return: a list of tuples (name, path, status, diagnose, variants,
1781
      parameters, api_version) for all (potential) OSes under all
1782
      search paths, where:
1783
          - name is the (potential) OS name
1784
          - path is the full path to the OS
1785
          - status True/False is the validity of the OS
1786
          - diagnose is the error message for an invalid OS, otherwise empty
1787
          - variants is a list of supported OS variants, if any
1788
          - parameters is a list of (name, help) parameters, if any
1789
          - api_version is a list of support OS API versions
1790

1791
  """
1792
  if top_dirs is None:
1793
    top_dirs = constants.OS_SEARCH_PATH
1794

    
1795
  result = []
1796
  for dir_name in top_dirs:
1797
    if os.path.isdir(dir_name):
1798
      try:
1799
        f_names = utils.ListVisibleFiles(dir_name)
1800
      except EnvironmentError, err:
1801
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1802
        break
1803
      for name in f_names:
1804
        os_path = utils.PathJoin(dir_name, name)
1805
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1806
        if status:
1807
          diagnose = ""
1808
          variants = os_inst.supported_variants
1809
          parameters = os_inst.supported_parameters
1810
          api_versions = os_inst.api_versions
1811
        else:
1812
          diagnose = os_inst
1813
          variants = parameters = api_versions = []
1814
        result.append((name, os_path, status, diagnose, variants,
1815
                       parameters, api_versions))
1816

    
1817
  return result
1818

    
1819

    
1820
def _TryOSFromDisk(name, base_dir=None):
1821
  """Create an OS instance from disk.
1822

1823
  This function will return an OS instance if the given name is a
1824
  valid OS name.
1825

1826
  @type base_dir: string
1827
  @keyword base_dir: Base directory containing OS installations.
1828
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1829
  @rtype: tuple
1830
  @return: success and either the OS instance if we find a valid one,
1831
      or error message
1832

1833
  """
1834
  if base_dir is None:
1835
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1836
  else:
1837
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1838

    
1839
  if os_dir is None:
1840
    return False, "Directory for OS %s not found in search path" % name
1841

    
1842
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1843
  if not status:
1844
    # push the error up
1845
    return status, api_versions
1846

    
1847
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1848
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1849
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1850

    
1851
  # OS Files dictionary, we will populate it with the absolute path names
1852
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1853

    
1854
  if max(api_versions) >= constants.OS_API_V15:
1855
    os_files[constants.OS_VARIANTS_FILE] = ''
1856

    
1857
  if max(api_versions) >= constants.OS_API_V20:
1858
    os_files[constants.OS_PARAMETERS_FILE] = ''
1859
  else:
1860
    del os_files[constants.OS_SCRIPT_VERIFY]
1861

    
1862
  for filename in os_files:
1863
    os_files[filename] = utils.PathJoin(os_dir, filename)
1864

    
1865
    try:
1866
      st = os.stat(os_files[filename])
1867
    except EnvironmentError, err:
1868
      return False, ("File '%s' under path '%s' is missing (%s)" %
1869
                     (filename, os_dir, _ErrnoOrStr(err)))
1870

    
1871
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1872
      return False, ("File '%s' under path '%s' is not a regular file" %
1873
                     (filename, os_dir))
1874

    
1875
    if filename in constants.OS_SCRIPTS:
1876
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1877
        return False, ("File '%s' under path '%s' is not executable" %
1878
                       (filename, os_dir))
1879

    
1880
  variants = []
1881
  if constants.OS_VARIANTS_FILE in os_files:
1882
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1883
    try:
1884
      variants = utils.ReadFile(variants_file).splitlines()
1885
    except EnvironmentError, err:
1886
      return False, ("Error while reading the OS variants file at %s: %s" %
1887
                     (variants_file, _ErrnoOrStr(err)))
1888
    if not variants:
1889
      return False, ("No supported os variant found")
1890

    
1891
  parameters = []
1892
  if constants.OS_PARAMETERS_FILE in os_files:
1893
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1894
    try:
1895
      parameters = utils.ReadFile(parameters_file).splitlines()
1896
    except EnvironmentError, err:
1897
      return False, ("Error while reading the OS parameters file at %s: %s" %
1898
                     (parameters_file, _ErrnoOrStr(err)))
1899
    parameters = [v.split(None, 1) for v in parameters]
1900

    
1901
  os_obj = objects.OS(name=name, path=os_dir,
1902
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1903
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1904
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1905
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1906
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1907
                                                 None),
1908
                      supported_variants=variants,
1909
                      supported_parameters=parameters,
1910
                      api_versions=api_versions)
1911
  return True, os_obj
1912

    
1913

    
1914
def OSFromDisk(name, base_dir=None):
1915
  """Create an OS instance from disk.
1916

1917
  This function will return an OS instance if the given name is a
1918
  valid OS name. Otherwise, it will raise an appropriate
1919
  L{RPCFail} exception, detailing why this is not a valid OS.
1920

1921
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1922
  an exception but returns true/false status data.
1923

1924
  @type base_dir: string
1925
  @keyword base_dir: Base directory containing OS installations.
1926
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1927
  @rtype: L{objects.OS}
1928
  @return: the OS instance if we find a valid one
1929
  @raise RPCFail: if we don't find a valid OS
1930

1931
  """
1932
  name_only = name.split("+", 1)[0]
1933
  status, payload = _TryOSFromDisk(name_only, base_dir)
1934

    
1935
  if not status:
1936
    _Fail(payload)
1937

    
1938
  return payload
1939

    
1940

    
1941
def OSCoreEnv(inst_os, os_params, debug=0):
1942
  """Calculate the basic environment for an os script.
1943

1944
  @type inst_os: L{objects.OS}
1945
  @param inst_os: operating system for which the environment is being built
1946
  @type os_params: dict
1947
  @param os_params: the OS parameters
1948
  @type debug: integer
1949
  @param debug: debug level (0 or 1, for OS Api 10)
1950
  @rtype: dict
1951
  @return: dict of environment variables
1952
  @raise errors.BlockDeviceError: if the block device
1953
      cannot be found
1954

1955
  """
1956
  result = {}
1957
  api_version = \
1958
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1959
  result['OS_API_VERSION'] = '%d' % api_version
1960
  result['OS_NAME'] = inst_os.name
1961
  result['DEBUG_LEVEL'] = '%d' % debug
1962

    
1963
  # OS variants
1964
  if api_version >= constants.OS_API_V15:
1965
    try:
1966
      variant = inst_os.name.split('+', 1)[1]
1967
    except IndexError:
1968
      variant = inst_os.supported_variants[0]
1969
    result['OS_VARIANT'] = variant
1970

    
1971
  # OS params
1972
  for pname, pvalue in os_params.items():
1973
    result['OSP_%s' % pname.upper()] = pvalue
1974

    
1975
  return result
1976

    
1977

    
1978
def OSEnvironment(instance, inst_os, debug=0):
1979
  """Calculate the environment for an os script.
1980

1981
  @type instance: L{objects.Instance}
1982
  @param instance: target instance for the os script run
1983
  @type inst_os: L{objects.OS}
1984
  @param inst_os: operating system for which the environment is being built
1985
  @type debug: integer
1986
  @param debug: debug level (0 or 1, for OS Api 10)
1987
  @rtype: dict
1988
  @return: dict of environment variables
1989
  @raise errors.BlockDeviceError: if the block device
1990
      cannot be found
1991

1992
  """
1993
  result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
1994

    
1995
  result['INSTANCE_NAME'] = instance.name
1996
  result['INSTANCE_OS'] = instance.os
1997
  result['HYPERVISOR'] = instance.hypervisor
1998
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1999
  result['NIC_COUNT'] = '%d' % len(instance.nics)
2000

    
2001
  # Disks
2002
  for idx, disk in enumerate(instance.disks):
2003
    real_disk = _OpenRealBD(disk)
2004
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
2005
    result['DISK_%d_ACCESS' % idx] = disk.mode
2006
    if constants.HV_DISK_TYPE in instance.hvparams:
2007
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
2008
        instance.hvparams[constants.HV_DISK_TYPE]
2009
    if disk.dev_type in constants.LDS_BLOCK:
2010
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2011
    elif disk.dev_type == constants.LD_FILE:
2012
      result['DISK_%d_BACKEND_TYPE' % idx] = \
2013
        'file:%s' % disk.physical_id[0]
2014

    
2015
  # NICs
2016
  for idx, nic in enumerate(instance.nics):
2017
    result['NIC_%d_MAC' % idx] = nic.mac
2018
    if nic.ip:
2019
      result['NIC_%d_IP' % idx] = nic.ip
2020
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2021
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2022
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2023
    if nic.nicparams[constants.NIC_LINK]:
2024
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2025
    if constants.HV_NIC_TYPE in instance.hvparams:
2026
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
2027
        instance.hvparams[constants.HV_NIC_TYPE]
2028

    
2029
  # HV/BE params
2030
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2031
    for key, value in source.items():
2032
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2033

    
2034
  return result
2035

    
2036

    
2037
def BlockdevGrow(disk, amount):
2038
  """Grow a stack of block devices.
2039

2040
  This function is called recursively, with the childrens being the
2041
  first ones to resize.
2042

2043
  @type disk: L{objects.Disk}
2044
  @param disk: the disk to be grown
2045
  @rtype: (status, result)
2046
  @return: a tuple with the status of the operation
2047
      (True/False), and the errors message if status
2048
      is False
2049

2050
  """
2051
  r_dev = _RecursiveFindBD(disk)
2052
  if r_dev is None:
2053
    _Fail("Cannot find block device %s", disk)
2054

    
2055
  try:
2056
    r_dev.Grow(amount)
2057
  except errors.BlockDeviceError, err:
2058
    _Fail("Failed to grow block device: %s", err, exc=True)
2059

    
2060

    
2061
def BlockdevSnapshot(disk):
2062
  """Create a snapshot copy of a block device.
2063

2064
  This function is called recursively, and the snapshot is actually created
2065
  just for the leaf lvm backend device.
2066

2067
  @type disk: L{objects.Disk}
2068
  @param disk: the disk to be snapshotted
2069
  @rtype: string
2070
  @return: snapshot disk path
2071

2072
  """
2073
  if disk.dev_type == constants.LD_DRBD8:
2074
    if not disk.children:
2075
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2076
            disk.unique_id)
2077
    return BlockdevSnapshot(disk.children[0])
2078
  elif disk.dev_type == constants.LD_LV:
2079
    r_dev = _RecursiveFindBD(disk)
2080
    if r_dev is not None:
2081
      # FIXME: choose a saner value for the snapshot size
2082
      # let's stay on the safe side and ask for the full size, for now
2083
      return r_dev.Snapshot(disk.size)
2084
    else:
2085
      _Fail("Cannot find block device %s", disk)
2086
  else:
2087
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2088
          disk.unique_id, disk.dev_type)
2089

    
2090

    
2091
def FinalizeExport(instance, snap_disks):
2092
  """Write out the export configuration information.
2093

2094
  @type instance: L{objects.Instance}
2095
  @param instance: the instance which we export, used for
2096
      saving configuration
2097
  @type snap_disks: list of L{objects.Disk}
2098
  @param snap_disks: list of snapshot block devices, which
2099
      will be used to get the actual name of the dump file
2100

2101
  @rtype: None
2102

2103
  """
2104
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2105
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2106

    
2107
  config = objects.SerializableConfigParser()
2108

    
2109
  config.add_section(constants.INISECT_EXP)
2110
  config.set(constants.INISECT_EXP, 'version', '0')
2111
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2112
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2113
  config.set(constants.INISECT_EXP, 'os', instance.os)
2114
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2115

    
2116
  config.add_section(constants.INISECT_INS)
2117
  config.set(constants.INISECT_INS, 'name', instance.name)
2118
  config.set(constants.INISECT_INS, 'memory', '%d' %
2119
             instance.beparams[constants.BE_MEMORY])
2120
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2121
             instance.beparams[constants.BE_VCPUS])
2122
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2123
  config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2124

    
2125
  nic_total = 0
2126
  for nic_count, nic in enumerate(instance.nics):
2127
    nic_total += 1
2128
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2129
               nic_count, '%s' % nic.mac)
2130
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2131
    for param in constants.NICS_PARAMETER_TYPES:
2132
      config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2133
                 '%s' % nic.nicparams.get(param, None))
2134
  # TODO: redundant: on load can read nics until it doesn't exist
2135
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2136

    
2137
  disk_total = 0
2138
  for disk_count, disk in enumerate(snap_disks):
2139
    if disk:
2140
      disk_total += 1
2141
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2142
                 ('%s' % disk.iv_name))
2143
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2144
                 ('%s' % disk.physical_id[1]))
2145
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2146
                 ('%d' % disk.size))
2147

    
2148
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2149

    
2150
  # New-style hypervisor/backend parameters
2151

    
2152
  config.add_section(constants.INISECT_HYP)
2153
  for name, value in instance.hvparams.items():
2154
    if name not in constants.HVC_GLOBALS:
2155
      config.set(constants.INISECT_HYP, name, str(value))
2156

    
2157
  config.add_section(constants.INISECT_BEP)
2158
  for name, value in instance.beparams.items():
2159
    config.set(constants.INISECT_BEP, name, str(value))
2160

    
2161
  config.add_section(constants.INISECT_OSP)
2162
  for name, value in instance.osparams.items():
2163
    config.set(constants.INISECT_OSP, name, str(value))
2164

    
2165
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2166
                  data=config.Dumps())
2167
  shutil.rmtree(finaldestdir, ignore_errors=True)
2168
  shutil.move(destdir, finaldestdir)
2169

    
2170

    
2171
def ExportInfo(dest):
2172
  """Get export configuration information.
2173

2174
  @type dest: str
2175
  @param dest: directory containing the export
2176

2177
  @rtype: L{objects.SerializableConfigParser}
2178
  @return: a serializable config file containing the
2179
      export info
2180

2181
  """
2182
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2183

    
2184
  config = objects.SerializableConfigParser()
2185
  config.read(cff)
2186

    
2187
  if (not config.has_section(constants.INISECT_EXP) or
2188
      not config.has_section(constants.INISECT_INS)):
2189
    _Fail("Export info file doesn't have the required fields")
2190

    
2191
  return config.Dumps()
2192

    
2193

    
2194
def ListExports():
2195
  """Return a list of exports currently available on this machine.
2196

2197
  @rtype: list
2198
  @return: list of the exports
2199

2200
  """
2201
  if os.path.isdir(constants.EXPORT_DIR):
2202
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2203
  else:
2204
    _Fail("No exports directory")
2205

    
2206

    
2207
def RemoveExport(export):
2208
  """Remove an existing export from the node.
2209

2210
  @type export: str
2211
  @param export: the name of the export to remove
2212
  @rtype: None
2213

2214
  """
2215
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2216

    
2217
  try:
2218
    shutil.rmtree(target)
2219
  except EnvironmentError, err:
2220
    _Fail("Error while removing the export: %s", err, exc=True)
2221

    
2222

    
2223
def BlockdevRename(devlist):
2224
  """Rename a list of block devices.
2225

2226
  @type devlist: list of tuples
2227
  @param devlist: list of tuples of the form  (disk,
2228
      new_logical_id, new_physical_id); disk is an
2229
      L{objects.Disk} object describing the current disk,
2230
      and new logical_id/physical_id is the name we
2231
      rename it to
2232
  @rtype: boolean
2233
  @return: True if all renames succeeded, False otherwise
2234

2235
  """
2236
  msgs = []
2237
  result = True
2238
  for disk, unique_id in devlist:
2239
    dev = _RecursiveFindBD(disk)
2240
    if dev is None:
2241
      msgs.append("Can't find device %s in rename" % str(disk))
2242
      result = False
2243
      continue
2244
    try:
2245
      old_rpath = dev.dev_path
2246
      dev.Rename(unique_id)
2247
      new_rpath = dev.dev_path
2248
      if old_rpath != new_rpath:
2249
        DevCacheManager.RemoveCache(old_rpath)
2250
        # FIXME: we should add the new cache information here, like:
2251
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2252
        # but we don't have the owner here - maybe parse from existing
2253
        # cache? for now, we only lose lvm data when we rename, which
2254
        # is less critical than DRBD or MD
2255
    except errors.BlockDeviceError, err:
2256
      msgs.append("Can't rename device '%s' to '%s': %s" %
2257
                  (dev, unique_id, err))
2258
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2259
      result = False
2260
  if not result:
2261
    _Fail("; ".join(msgs))
2262

    
2263

    
2264
def _TransformFileStorageDir(file_storage_dir):
2265
  """Checks whether given file_storage_dir is valid.
2266

2267
  Checks wheter the given file_storage_dir is within the cluster-wide
2268
  default file_storage_dir stored in SimpleStore. Only paths under that
2269
  directory are allowed.
2270

2271
  @type file_storage_dir: str
2272
  @param file_storage_dir: the path to check
2273

2274
  @return: the normalized path if valid, None otherwise
2275

2276
  """
2277
  if not constants.ENABLE_FILE_STORAGE:
2278
    _Fail("File storage disabled at configure time")
2279
  cfg = _GetConfig()
2280
  file_storage_dir = os.path.normpath(file_storage_dir)
2281
  base_file_storage_dir = cfg.GetFileStorageDir()
2282
  if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2283
      base_file_storage_dir):
2284
    _Fail("File storage directory '%s' is not under base file"
2285
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2286
  return file_storage_dir
2287

    
2288

    
2289
def CreateFileStorageDir(file_storage_dir):
2290
  """Create file storage directory.
2291

2292
  @type file_storage_dir: str
2293
  @param file_storage_dir: directory to create
2294

2295
  @rtype: tuple
2296
  @return: tuple with first element a boolean indicating wheter dir
2297
      creation was successful or not
2298

2299
  """
2300
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2301
  if os.path.exists(file_storage_dir):
2302
    if not os.path.isdir(file_storage_dir):
2303
      _Fail("Specified storage dir '%s' is not a directory",
2304
            file_storage_dir)
2305
  else:
2306
    try:
2307
      os.makedirs(file_storage_dir, 0750)
2308
    except OSError, err:
2309
      _Fail("Cannot create file storage directory '%s': %s",
2310
            file_storage_dir, err, exc=True)
2311

    
2312

    
2313
def RemoveFileStorageDir(file_storage_dir):
2314
  """Remove file storage directory.
2315

2316
  Remove it only if it's empty. If not log an error and return.
2317

2318
  @type file_storage_dir: str
2319
  @param file_storage_dir: the directory we should cleanup
2320
  @rtype: tuple (success,)
2321
  @return: tuple of one element, C{success}, denoting
2322
      whether the operation was successful
2323

2324
  """
2325
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2326
  if os.path.exists(file_storage_dir):
2327
    if not os.path.isdir(file_storage_dir):
2328
      _Fail("Specified Storage directory '%s' is not a directory",
2329
            file_storage_dir)
2330
    # deletes dir only if empty, otherwise we want to fail the rpc call
2331
    try:
2332
      os.rmdir(file_storage_dir)
2333
    except OSError, err:
2334
      _Fail("Cannot remove file storage directory '%s': %s",
2335
            file_storage_dir, err)
2336

    
2337

    
2338
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2339
  """Rename the file storage directory.
2340

2341
  @type old_file_storage_dir: str
2342
  @param old_file_storage_dir: the current path
2343
  @type new_file_storage_dir: str
2344
  @param new_file_storage_dir: the name we should rename to
2345
  @rtype: tuple (success,)
2346
  @return: tuple of one element, C{success}, denoting
2347
      whether the operation was successful
2348

2349
  """
2350
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2351
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2352
  if not os.path.exists(new_file_storage_dir):
2353
    if os.path.isdir(old_file_storage_dir):
2354
      try:
2355
        os.rename(old_file_storage_dir, new_file_storage_dir)
2356
      except OSError, err:
2357
        _Fail("Cannot rename '%s' to '%s': %s",
2358
              old_file_storage_dir, new_file_storage_dir, err)
2359
    else:
2360
      _Fail("Specified storage dir '%s' is not a directory",
2361
            old_file_storage_dir)
2362
  else:
2363
    if os.path.exists(old_file_storage_dir):
2364
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2365
            old_file_storage_dir, new_file_storage_dir)
2366

    
2367

    
2368
def _EnsureJobQueueFile(file_name):
2369
  """Checks whether the given filename is in the queue directory.
2370

2371
  @type file_name: str
2372
  @param file_name: the file name we should check
2373
  @rtype: None
2374
  @raises RPCFail: if the file is not valid
2375

2376
  """
2377
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2378
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2379

    
2380
  if not result:
2381
    _Fail("Passed job queue file '%s' does not belong to"
2382
          " the queue directory '%s'", file_name, queue_dir)
2383

    
2384

    
2385
def JobQueueUpdate(file_name, content):
2386
  """Updates a file in the queue directory.
2387

2388
  This is just a wrapper over L{utils.WriteFile}, with proper
2389
  checking.
2390

2391
  @type file_name: str
2392
  @param file_name: the job file name
2393
  @type content: str
2394
  @param content: the new job contents
2395
  @rtype: boolean
2396
  @return: the success of the operation
2397

2398
  """
2399
  _EnsureJobQueueFile(file_name)
2400

    
2401
  # Write and replace the file atomically
2402
  utils.WriteFile(file_name, data=_Decompress(content))
2403

    
2404

    
2405
def JobQueueRename(old, new):
2406
  """Renames a job queue file.
2407

2408
  This is just a wrapper over os.rename with proper checking.
2409

2410
  @type old: str
2411
  @param old: the old (actual) file name
2412
  @type new: str
2413
  @param new: the desired file name
2414
  @rtype: tuple
2415
  @return: the success of the operation and payload
2416

2417
  """
2418
  _EnsureJobQueueFile(old)
2419
  _EnsureJobQueueFile(new)
2420

    
2421
  utils.RenameFile(old, new, mkdir=True)
2422

    
2423

    
2424
def BlockdevClose(instance_name, disks):
2425
  """Closes the given block devices.
2426

2427
  This means they will be switched to secondary mode (in case of
2428
  DRBD).
2429

2430
  @param instance_name: if the argument is not empty, the symlinks
2431
      of this instance will be removed
2432
  @type disks: list of L{objects.Disk}
2433
  @param disks: the list of disks to be closed
2434
  @rtype: tuple (success, message)
2435
  @return: a tuple of success and message, where success
2436
      indicates the succes of the operation, and message
2437
      which will contain the error details in case we
2438
      failed
2439

2440
  """
2441
  bdevs = []
2442
  for cf in disks:
2443
    rd = _RecursiveFindBD(cf)
2444
    if rd is None:
2445
      _Fail("Can't find device %s", cf)
2446
    bdevs.append(rd)
2447

    
2448
  msg = []
2449
  for rd in bdevs:
2450
    try:
2451
      rd.Close()
2452
    except errors.BlockDeviceError, err:
2453
      msg.append(str(err))
2454
  if msg:
2455
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2456
  else:
2457
    if instance_name:
2458
      _RemoveBlockDevLinks(instance_name, disks)
2459

    
2460

    
2461
def ValidateHVParams(hvname, hvparams):
2462
  """Validates the given hypervisor parameters.
2463

2464
  @type hvname: string
2465
  @param hvname: the hypervisor name
2466
  @type hvparams: dict
2467
  @param hvparams: the hypervisor parameters to be validated
2468
  @rtype: None
2469

2470
  """
2471
  try:
2472
    hv_type = hypervisor.GetHypervisor(hvname)
2473
    hv_type.ValidateParameters(hvparams)
2474
  except errors.HypervisorError, err:
2475
    _Fail(str(err), log=False)
2476

    
2477

    
2478
def _CheckOSPList(os_obj, parameters):
2479
  """Check whether a list of parameters is supported by the OS.
2480

2481
  @type os_obj: L{objects.OS}
2482
  @param os_obj: OS object to check
2483
  @type parameters: list
2484
  @param parameters: the list of parameters to check
2485

2486
  """
2487
  supported = [v[0] for v in os_obj.supported_parameters]
2488
  delta = frozenset(parameters).difference(supported)
2489
  if delta:
2490
    _Fail("The following parameters are not supported"
2491
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2492

    
2493

    
2494
def ValidateOS(required, osname, checks, osparams):
2495
  """Validate the given OS' parameters.
2496

2497
  @type required: boolean
2498
  @param required: whether absence of the OS should translate into
2499
      failure or not
2500
  @type osname: string
2501
  @param osname: the OS to be validated
2502
  @type checks: list
2503
  @param checks: list of the checks to run (currently only 'parameters')
2504
  @type osparams: dict
2505
  @param osparams: dictionary with OS parameters
2506
  @rtype: boolean
2507
  @return: True if the validation passed, or False if the OS was not
2508
      found and L{required} was false
2509

2510
  """
2511
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2512
    _Fail("Unknown checks required for OS %s: %s", osname,
2513
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2514

    
2515
  name_only = osname.split("+", 1)[0]
2516
  status, tbv = _TryOSFromDisk(name_only, None)
2517

    
2518
  if not status:
2519
    if required:
2520
      _Fail(tbv)
2521
    else:
2522
      return False
2523

    
2524
  if constants.OS_VALIDATE_PARAMETERS in checks:
2525
    _CheckOSPList(tbv, osparams.keys())
2526

    
2527
  validate_env = OSCoreEnv(tbv, osparams)
2528
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2529
                        cwd=tbv.path)
2530
  if result.failed:
2531
    logging.error("os validate command '%s' returned error: %s output: %s",
2532
                  result.cmd, result.fail_reason, result.output)
2533
    _Fail("OS validation script failed (%s), output: %s",
2534
          result.fail_reason, result.output, log=False)
2535

    
2536
  return True
2537

    
2538

    
2539
def DemoteFromMC():
2540
  """Demotes the current node from master candidate role.
2541

2542
  """
2543
  # try to ensure we're not the master by mistake
2544
  master, myself = ssconf.GetMasterAndMyself()
2545
  if master == myself:
2546
    _Fail("ssconf status shows I'm the master node, will not demote")
2547

    
2548
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2549
  if not result.failed:
2550
    _Fail("The master daemon is running, will not demote")
2551

    
2552
  try:
2553
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2554
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2555
  except EnvironmentError, err:
2556
    if err.errno != errno.ENOENT:
2557
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2558

    
2559
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2560

    
2561

    
2562
def _GetX509Filenames(cryptodir, name):
2563
  """Returns the full paths for the private key and certificate.
2564

2565
  """
2566
  return (utils.PathJoin(cryptodir, name),
2567
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2568
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2569

    
2570

    
2571
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2572
  """Creates a new X509 certificate for SSL/TLS.
2573

2574
  @type validity: int
2575
  @param validity: Validity in seconds
2576
  @rtype: tuple; (string, string)
2577
  @return: Certificate name and public part
2578

2579
  """
2580
  (key_pem, cert_pem) = \
2581
    utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2582
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2583

    
2584
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2585
                              prefix="x509-%s-" % utils.TimestampForFilename())
2586
  try:
2587
    name = os.path.basename(cert_dir)
2588
    assert len(name) > 5
2589

    
2590
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2591

    
2592
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2593
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2594

    
2595
    # Never return private key as it shouldn't leave the node
2596
    return (name, cert_pem)
2597
  except Exception:
2598
    shutil.rmtree(cert_dir, ignore_errors=True)
2599
    raise
2600

    
2601

    
2602
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2603
  """Removes a X509 certificate.
2604

2605
  @type name: string
2606
  @param name: Certificate name
2607

2608
  """
2609
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2610

    
2611
  utils.RemoveFile(key_file)
2612
  utils.RemoveFile(cert_file)
2613

    
2614
  try:
2615
    os.rmdir(cert_dir)
2616
  except EnvironmentError, err:
2617
    _Fail("Cannot remove certificate directory '%s': %s",
2618
          cert_dir, err)
2619

    
2620

    
2621
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2622
  """Returns the command for the requested input/output.
2623

2624
  @type instance: L{objects.Instance}
2625
  @param instance: The instance object
2626
  @param mode: Import/export mode
2627
  @param ieio: Input/output type
2628
  @param ieargs: Input/output arguments
2629

2630
  """
2631
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2632

    
2633
  env = None
2634
  prefix = None
2635
  suffix = None
2636
  exp_size = None
2637

    
2638
  if ieio == constants.IEIO_FILE:
2639
    (filename, ) = ieargs
2640

    
2641
    if not utils.IsNormAbsPath(filename):
2642
      _Fail("Path '%s' is not normalized or absolute", filename)
2643

    
2644
    directory = os.path.normpath(os.path.dirname(filename))
2645

    
2646
    if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2647
        constants.EXPORT_DIR):
2648
      _Fail("File '%s' is not under exports directory '%s'",
2649
            filename, constants.EXPORT_DIR)
2650

    
2651
    # Create directory
2652
    utils.Makedirs(directory, mode=0750)
2653

    
2654
    quoted_filename = utils.ShellQuote(filename)
2655

    
2656
    if mode == constants.IEM_IMPORT:
2657
      suffix = "> %s" % quoted_filename
2658
    elif mode == constants.IEM_EXPORT:
2659
      suffix = "< %s" % quoted_filename
2660

    
2661
      # Retrieve file size
2662
      try:
2663
        st = os.stat(filename)
2664
      except EnvironmentError, err:
2665
        logging.error("Can't stat(2) %s: %s", filename, err)
2666
      else:
2667
        exp_size = utils.BytesToMebibyte(st.st_size)
2668

    
2669
  elif ieio == constants.IEIO_RAW_DISK:
2670
    (disk, ) = ieargs
2671

    
2672
    real_disk = _OpenRealBD(disk)
2673

    
2674
    if mode == constants.IEM_IMPORT:
2675
      # we set here a smaller block size as, due to transport buffering, more
2676
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2677
      # is not already there or we pass a wrong path; we use notrunc to no
2678
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2679
      # much memory; this means that at best, we flush every 64k, which will
2680
      # not be very fast
2681
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2682
                                    " bs=%s oflag=dsync"),
2683
                                    real_disk.dev_path,
2684
                                    str(64 * 1024))
2685

    
2686
    elif mode == constants.IEM_EXPORT:
2687
      # the block size on the read dd is 1MiB to match our units
2688
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2689
                                   real_disk.dev_path,
2690
                                   str(1024 * 1024), # 1 MB
2691
                                   str(disk.size))
2692
      exp_size = disk.size
2693

    
2694
  elif ieio == constants.IEIO_SCRIPT:
2695
    (disk, disk_index, ) = ieargs
2696

    
2697
    assert isinstance(disk_index, (int, long))
2698

    
2699
    real_disk = _OpenRealBD(disk)
2700

    
2701
    inst_os = OSFromDisk(instance.os)
2702
    env = OSEnvironment(instance, inst_os)
2703

    
2704
    if mode == constants.IEM_IMPORT:
2705
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2706
      env["IMPORT_INDEX"] = str(disk_index)
2707
      script = inst_os.import_script
2708

    
2709
    elif mode == constants.IEM_EXPORT:
2710
      env["EXPORT_DEVICE"] = real_disk.dev_path
2711
      env["EXPORT_INDEX"] = str(disk_index)
2712
      script = inst_os.export_script
2713

    
2714
    # TODO: Pass special environment only to script
2715
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2716

    
2717
    if mode == constants.IEM_IMPORT:
2718
      suffix = "| %s" % script_cmd
2719

    
2720
    elif mode == constants.IEM_EXPORT:
2721
      prefix = "%s |" % script_cmd
2722

    
2723
    # Let script predict size
2724
    exp_size = constants.IE_CUSTOM_SIZE
2725

    
2726
  else:
2727
    _Fail("Invalid %s I/O mode %r", mode, ieio)
2728

    
2729
  return (env, prefix, suffix, exp_size)
2730

    
2731

    
2732
def _CreateImportExportStatusDir(prefix):
2733
  """Creates status directory for import/export.
2734

2735
  """
2736
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2737
                          prefix=("%s-%s-" %
2738
                                  (prefix, utils.TimestampForFilename())))
2739

    
2740

    
2741
def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2742
  """Starts an import or export daemon.
2743

2744
  @param mode: Import/output mode
2745
  @type opts: L{objects.ImportExportOptions}
2746
  @param opts: Daemon options
2747
  @type host: string
2748
  @param host: Remote host for export (None for import)
2749
  @type port: int
2750
  @param port: Remote port for export (None for import)
2751
  @type instance: L{objects.Instance}
2752
  @param instance: Instance object
2753
  @param ieio: Input/output type
2754
  @param ieioargs: Input/output arguments
2755

2756
  """
2757
  if mode == constants.IEM_IMPORT:
2758
    prefix = "import"
2759

    
2760
    if not (host is None and port is None):
2761
      _Fail("Can not specify host or port on import")
2762

    
2763
  elif mode == constants.IEM_EXPORT:
2764
    prefix = "export"
2765

    
2766
    if host is None or port is None:
2767
      _Fail("Host and port must be specified for an export")
2768

    
2769
  else:
2770
    _Fail("Invalid mode %r", mode)
2771

    
2772
  if (opts.key_name is None) ^ (opts.ca_pem is None):
2773
    _Fail("Cluster certificate can only be used for both key and CA")
2774

    
2775
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2776
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2777

    
2778
  if opts.key_name is None:
2779
    # Use server.pem
2780
    key_path = constants.NODED_CERT_FILE
2781
    cert_path = constants.NODED_CERT_FILE
2782
    assert opts.ca_pem is None
2783
  else:
2784
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2785
                                                 opts.key_name)
2786
    assert opts.ca_pem is not None
2787

    
2788
  for i in [key_path, cert_path]:
2789
    if not os.path.exists(i):
2790
      _Fail("File '%s' does not exist" % i)
2791

    
2792
  status_dir = _CreateImportExportStatusDir(prefix)
2793
  try:
2794
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2795
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2796
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2797

    
2798
    if opts.ca_pem is None:
2799
      # Use server.pem
2800
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
2801
    else:
2802
      ca = opts.ca_pem
2803

    
2804
    # Write CA file
2805
    utils.WriteFile(ca_file, data=ca, mode=0400)
2806

    
2807
    cmd = [
2808
      constants.IMPORT_EXPORT_DAEMON,
2809
      status_file, mode,
2810
      "--key=%s" % key_path,
2811
      "--cert=%s" % cert_path,
2812
      "--ca=%s" % ca_file,
2813
      ]
2814

    
2815
    if host:
2816
      cmd.append("--host=%s" % host)
2817

    
2818
    if port:
2819
      cmd.append("--port=%s" % port)
2820

    
2821
    if opts.compress:
2822
      cmd.append("--compress=%s" % opts.compress)
2823

    
2824
    if opts.magic:
2825
      cmd.append("--magic=%s" % opts.magic)
2826

    
2827
    if exp_size is not None:
2828
      cmd.append("--expected-size=%s" % exp_size)
2829

    
2830
    if cmd_prefix:
2831
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
2832

    
2833
    if cmd_suffix:
2834
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
2835

    
2836
    logfile = _InstanceLogName(prefix, instance.os, instance.name)
2837

    
2838
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2839
    # support for receiving a file descriptor for output
2840
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2841
                      output=logfile)
2842

    
2843
    # The import/export name is simply the status directory name
2844
    return os.path.basename(status_dir)
2845

    
2846
  except Exception:
2847
    shutil.rmtree(status_dir, ignore_errors=True)
2848
    raise
2849

    
2850

    
2851
def GetImportExportStatus(names):
2852
  """Returns import/export daemon status.
2853

2854
  @type names: sequence
2855
  @param names: List of names
2856
  @rtype: List of dicts
2857
  @return: Returns a list of the state of each named import/export or None if a
2858
           status couldn't be read
2859

2860
  """
2861
  result = []
2862

    
2863
  for name in names:
2864
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2865
                                 _IES_STATUS_FILE)
2866

    
2867
    try:
2868
      data = utils.ReadFile(status_file)
2869
    except EnvironmentError, err:
2870
      if err.errno != errno.ENOENT:
2871
        raise
2872
      data = None
2873

    
2874
    if not data:
2875
      result.append(None)
2876
      continue
2877

    
2878
    result.append(serializer.LoadJson(data))
2879

    
2880
  return result
2881

    
2882

    
2883
def AbortImportExport(name):
2884
  """Sends SIGTERM to a running import/export daemon.
2885

2886
  """
2887
  logging.info("Abort import/export %s", name)
2888

    
2889
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2890
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2891

    
2892
  if pid:
2893
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2894
                 name, pid)
2895
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2896

    
2897

    
2898
def CleanupImportExport(name):
2899
  """Cleanup after an import or export.
2900

2901
  If the import/export daemon is still running it's killed. Afterwards the
2902
  whole status directory is removed.
2903

2904
  """
2905
  logging.info("Finalizing import/export %s", name)
2906

    
2907
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2908

    
2909
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2910

    
2911
  if pid:
2912
    logging.info("Import/export %s is still running with PID %s",
2913
                 name, pid)
2914
    utils.KillProcess(pid, waitpid=False)
2915

    
2916
  shutil.rmtree(status_dir, ignore_errors=True)
2917

    
2918

    
2919
def _FindDisks(nodes_ip, disks):
2920
  """Sets the physical ID on disks and returns the block devices.
2921

2922
  """
2923
  # set the correct physical ID
2924
  my_name = utils.HostInfo().name
2925
  for cf in disks:
2926
    cf.SetPhysicalID(my_name, nodes_ip)
2927

    
2928
  bdevs = []
2929

    
2930
  for cf in disks:
2931
    rd = _RecursiveFindBD(cf)
2932
    if rd is None:
2933
      _Fail("Can't find device %s", cf)
2934
    bdevs.append(rd)
2935
  return bdevs
2936

    
2937

    
2938
def DrbdDisconnectNet(nodes_ip, disks):
2939
  """Disconnects the network on a list of drbd devices.
2940

2941
  """
2942
  bdevs = _FindDisks(nodes_ip, disks)
2943

    
2944
  # disconnect disks
2945
  for rd in bdevs:
2946
    try:
2947
      rd.DisconnectNet()
2948
    except errors.BlockDeviceError, err:
2949
      _Fail("Can't change network configuration to standalone mode: %s",
2950
            err, exc=True)
2951

    
2952

    
2953
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2954
  """Attaches the network on a list of drbd devices.
2955

2956
  """
2957
  bdevs = _FindDisks(nodes_ip, disks)
2958

    
2959
  if multimaster:
2960
    for idx, rd in enumerate(bdevs):
2961
      try:
2962
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2963
      except EnvironmentError, err:
2964
        _Fail("Can't create symlink: %s", err)
2965
  # reconnect disks, switch to new master configuration and if
2966
  # needed primary mode
2967
  for rd in bdevs:
2968
    try:
2969
      rd.AttachNet(multimaster)
2970
    except errors.BlockDeviceError, err:
2971
      _Fail("Can't change network configuration: %s", err)
2972

    
2973
  # wait until the disks are connected; we need to retry the re-attach
2974
  # if the device becomes standalone, as this might happen if the one
2975
  # node disconnects and reconnects in a different mode before the
2976
  # other node reconnects; in this case, one or both of the nodes will
2977
  # decide it has wrong configuration and switch to standalone
2978

    
2979
  def _Attach():
2980
    all_connected = True
2981

    
2982
    for rd in bdevs:
2983
      stats = rd.GetProcStatus()
2984

    
2985
      all_connected = (all_connected and
2986
                       (stats.is_connected or stats.is_in_resync))
2987

    
2988
      if stats.is_standalone:
2989
        # peer had different config info and this node became
2990
        # standalone, even though this should not happen with the
2991
        # new staged way of changing disk configs
2992
        try:
2993
          rd.AttachNet(multimaster)
2994
        except errors.BlockDeviceError, err:
2995
          _Fail("Can't change network configuration: %s", err)
2996

    
2997
    if not all_connected:
2998
      raise utils.RetryAgain()
2999

    
3000
  try:
3001
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3002
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3003
  except utils.RetryTimeout:
3004
    _Fail("Timeout in disk reconnecting")
3005

    
3006
  if multimaster:
3007
    # change to primary mode
3008
    for rd in bdevs:
3009
      try:
3010
        rd.Open()
3011
      except errors.BlockDeviceError, err:
3012
        _Fail("Can't change to primary mode: %s", err)
3013

    
3014

    
3015
def DrbdWaitSync(nodes_ip, disks):
3016
  """Wait until DRBDs have synchronized.
3017

3018
  """
3019
  def _helper(rd):
3020
    stats = rd.GetProcStatus()
3021
    if not (stats.is_connected or stats.is_in_resync):
3022
      raise utils.RetryAgain()
3023
    return stats
3024

    
3025
  bdevs = _FindDisks(nodes_ip, disks)
3026

    
3027
  min_resync = 100
3028
  alldone = True
3029
  for rd in bdevs:
3030
    try:
3031
      # poll each second for 15 seconds
3032
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3033
    except utils.RetryTimeout:
3034
      stats = rd.GetProcStatus()
3035
      # last check
3036
      if not (stats.is_connected or stats.is_in_resync):
3037
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3038
    alldone = alldone and (not stats.is_in_resync)
3039
    if stats.sync_percent is not None:
3040
      min_resync = min(min_resync, stats.sync_percent)
3041

    
3042
  return (alldone, min_resync)
3043

    
3044

    
3045
def PowercycleNode(hypervisor_type):
3046
  """Hard-powercycle the node.
3047

3048
  Because we need to return first, and schedule the powercycle in the
3049
  background, we won't be able to report failures nicely.
3050

3051
  """
3052
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3053
  try:
3054
    pid = os.fork()
3055
  except OSError:
3056
    # if we can't fork, we'll pretend that we're in the child process
3057
    pid = 0
3058
  if pid > 0:
3059
    return "Reboot scheduled in 5 seconds"
3060
  # ensure the child is running on ram
3061
  try:
3062
    utils.Mlockall()
3063
  except Exception: # pylint: disable-msg=W0703
3064
    pass
3065
  time.sleep(5)
3066
  hyper.PowercycleNode()
3067

    
3068

    
3069
class HooksRunner(object):
3070
  """Hook runner.
3071

3072
  This class is instantiated on the node side (ganeti-noded) and not
3073
  on the master side.
3074

3075
  """
3076
  def __init__(self, hooks_base_dir=None):
3077
    """Constructor for hooks runner.
3078

3079
    @type hooks_base_dir: str or None
3080
    @param hooks_base_dir: if not None, this overrides the
3081
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3082

3083
    """
3084
    if hooks_base_dir is None:
3085
      hooks_base_dir = constants.HOOKS_BASE_DIR
3086
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3087
    # constant
3088
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3089

    
3090
  def RunHooks(self, hpath, phase, env):
3091
    """Run the scripts in the hooks directory.
3092

3093
    @type hpath: str
3094
    @param hpath: the path to the hooks directory which
3095
        holds the scripts
3096
    @type phase: str
3097
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3098
        L{constants.HOOKS_PHASE_POST}
3099
    @type env: dict
3100
    @param env: dictionary with the environment for the hook
3101
    @rtype: list
3102
    @return: list of 3-element tuples:
3103
      - script path
3104
      - script result, either L{constants.HKR_SUCCESS} or
3105
        L{constants.HKR_FAIL}
3106
      - output of the script
3107

3108
    @raise errors.ProgrammerError: for invalid input
3109
        parameters
3110

3111
    """
3112
    if phase == constants.HOOKS_PHASE_PRE:
3113
      suffix = "pre"
3114
    elif phase == constants.HOOKS_PHASE_POST:
3115
      suffix = "post"
3116
    else:
3117
      _Fail("Unknown hooks phase '%s'", phase)
3118

    
3119

    
3120
    subdir = "%s-%s.d" % (hpath, suffix)
3121
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3122

    
3123
    results = []
3124

    
3125
    if not os.path.isdir(dir_name):
3126
      # for non-existing/non-dirs, we simply exit instead of logging a
3127
      # warning at every operation
3128
      return results
3129

    
3130
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3131

    
3132
    for (relname, relstatus, runresult)  in runparts_results:
3133
      if relstatus == constants.RUNPARTS_SKIP:
3134
        rrval = constants.HKR_SKIP
3135
        output = ""
3136
      elif relstatus == constants.RUNPARTS_ERR:
3137
        rrval = constants.HKR_FAIL
3138
        output = "Hook script execution error: %s" % runresult
3139
      elif relstatus == constants.RUNPARTS_RUN:
3140
        if runresult.failed:
3141
          rrval = constants.HKR_FAIL
3142
        else:
3143
          rrval = constants.HKR_SUCCESS
3144
        output = utils.SafeEncode(runresult.output.strip())
3145
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3146

    
3147
    return results
3148

    
3149

    
3150
class IAllocatorRunner(object):
3151
  """IAllocator runner.
3152

3153
  This class is instantiated on the node side (ganeti-noded) and not on
3154
  the master side.
3155

3156
  """
3157
  @staticmethod
3158
  def Run(name, idata):
3159
    """Run an iallocator script.
3160

3161
    @type name: str
3162
    @param name: the iallocator script name
3163
    @type idata: str
3164
    @param idata: the allocator input data
3165

3166
    @rtype: tuple
3167
    @return: two element tuple of:
3168
       - status
3169
       - either error message or stdout of allocator (for success)
3170

3171
    """
3172
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3173
                                  os.path.isfile)
3174
    if alloc_script is None:
3175
      _Fail("iallocator module '%s' not found in the search path", name)
3176

    
3177
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3178
    try:
3179
      os.write(fd, idata)
3180
      os.close(fd)
3181
      result = utils.RunCmd([alloc_script, fin_name])
3182
      if result.failed:
3183
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3184
              name, result.fail_reason, result.output)
3185
    finally:
3186
      os.unlink(fin_name)
3187

    
3188
    return result.stdout
3189

    
3190

    
3191
class DevCacheManager(object):
3192
  """Simple class for managing a cache of block device information.
3193

3194
  """
3195
  _DEV_PREFIX = "/dev/"
3196
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3197

    
3198
  @classmethod
3199
  def _ConvertPath(cls, dev_path):
3200
    """Converts a /dev/name path to the cache file name.
3201

3202
    This replaces slashes with underscores and strips the /dev
3203
    prefix. It then returns the full path to the cache file.
3204

3205
    @type dev_path: str
3206
    @param dev_path: the C{/dev/} path name
3207
    @rtype: str
3208
    @return: the converted path name
3209

3210
    """
3211
    if dev_path.startswith(cls._DEV_PREFIX):
3212
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3213
    dev_path = dev_path.replace("/", "_")
3214
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3215
    return fpath
3216

    
3217
  @classmethod
3218
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3219
    """Updates the cache information for a given device.
3220

3221
    @type dev_path: str
3222
    @param dev_path: the pathname of the device
3223
    @type owner: str
3224
    @param owner: the owner (instance name) of the device
3225
    @type on_primary: bool
3226
    @param on_primary: whether this is the primary
3227
        node nor not
3228
    @type iv_name: str
3229
    @param iv_name: the instance-visible name of the
3230
        device, as in objects.Disk.iv_name
3231

3232
    @rtype: None
3233

3234
    """
3235
    if dev_path is None:
3236
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3237
      return
3238
    fpath = cls._ConvertPath(dev_path)
3239
    if on_primary:
3240
      state = "primary"
3241
    else:
3242
      state = "secondary"
3243
    if iv_name is None:
3244
      iv_name = "not_visible"
3245
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3246
    try:
3247
      utils.WriteFile(fpath, data=fdata)
3248
    except EnvironmentError, err:
3249
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3250

    
3251
  @classmethod
3252
  def RemoveCache(cls, dev_path):
3253
    """Remove data for a dev_path.
3254

3255
    This is just a wrapper over L{utils.RemoveFile} with a converted
3256
    path name and logging.
3257

3258
    @type dev_path: str
3259
    @param dev_path: the pathname of the device
3260

3261
    @rtype: None
3262

3263
    """
3264
    if dev_path is None:
3265
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3266
      return
3267
    fpath = cls._ConvertPath(dev_path)
3268
    try:
3269
      utils.RemoveFile(fpath)
3270
    except EnvironmentError, err:
3271
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)