Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 7ef40fbe

History | View | Annotate | Download (98.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61

    
62

    
63
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64
_ALLOWED_CLEAN_DIRS = frozenset([
65
  constants.DATA_DIR,
66
  constants.JOB_QUEUE_ARCHIVE_DIR,
67
  constants.QUEUE_DIR,
68
  constants.CRYPTO_KEYS_DIR,
69
  ])
70
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71
_X509_KEY_FILE = "key"
72
_X509_CERT_FILE = "cert"
73
_IES_STATUS_FILE = "status"
74
_IES_PID_FILE = "pid"
75
_IES_CA_FILE = "ca"
76

    
77

    
78
class RPCFail(Exception):
79
  """Class denoting RPC failure.
80

81
  Its argument is the error message.
82

83
  """
84

    
85

    
86
def _Fail(msg, *args, **kwargs):
87
  """Log an error and the raise an RPCFail exception.
88

89
  This exception is then handled specially in the ganeti daemon and
90
  turned into a 'failed' return type. As such, this function is a
91
  useful shortcut for logging the error and returning it to the master
92
  daemon.
93

94
  @type msg: string
95
  @param msg: the text of the exception
96
  @raise RPCFail
97

98
  """
99
  if args:
100
    msg = msg % args
101
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
102
    if "exc" in kwargs and kwargs["exc"]:
103
      logging.exception(msg)
104
    else:
105
      logging.error(msg)
106
  raise RPCFail(msg)
107

    
108

    
109
def _GetConfig():
110
  """Simple wrapper to return a SimpleStore.
111

112
  @rtype: L{ssconf.SimpleStore}
113
  @return: a SimpleStore instance
114

115
  """
116
  return ssconf.SimpleStore()
117

    
118

    
119
def _GetSshRunner(cluster_name):
120
  """Simple wrapper to return an SshRunner.
121

122
  @type cluster_name: str
123
  @param cluster_name: the cluster name, which is needed
124
      by the SshRunner constructor
125
  @rtype: L{ssh.SshRunner}
126
  @return: an SshRunner instance
127

128
  """
129
  return ssh.SshRunner(cluster_name)
130

    
131

    
132
def _Decompress(data):
133
  """Unpacks data compressed by the RPC client.
134

135
  @type data: list or tuple
136
  @param data: Data sent by RPC client
137
  @rtype: str
138
  @return: Decompressed data
139

140
  """
141
  assert isinstance(data, (list, tuple))
142
  assert len(data) == 2
143
  (encoding, content) = data
144
  if encoding == constants.RPC_ENCODING_NONE:
145
    return content
146
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147
    return zlib.decompress(base64.b64decode(content))
148
  else:
149
    raise AssertionError("Unknown data encoding")
150

    
151

    
152
def _CleanDirectory(path, exclude=None):
153
  """Removes all regular files in a directory.
154

155
  @type path: str
156
  @param path: the directory to clean
157
  @type exclude: list
158
  @param exclude: list of files to be excluded, defaults
159
      to the empty list
160

161
  """
162
  if path not in _ALLOWED_CLEAN_DIRS:
163
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
164
          path)
165

    
166
  if not os.path.isdir(path):
167
    return
168
  if exclude is None:
169
    exclude = []
170
  else:
171
    # Normalize excluded paths
172
    exclude = [os.path.normpath(i) for i in exclude]
173

    
174
  for rel_name in utils.ListVisibleFiles(path):
175
    full_name = utils.PathJoin(path, rel_name)
176
    if full_name in exclude:
177
      continue
178
    if os.path.isfile(full_name) and not os.path.islink(full_name):
179
      utils.RemoveFile(full_name)
180

    
181

    
182
def _BuildUploadFileList():
183
  """Build the list of allowed upload files.
184

185
  This is abstracted so that it's built only once at module import time.
186

187
  """
188
  allowed_files = set([
189
    constants.CLUSTER_CONF_FILE,
190
    constants.ETC_HOSTS,
191
    constants.SSH_KNOWN_HOSTS_FILE,
192
    constants.VNC_PASSWORD_FILE,
193
    constants.RAPI_CERT_FILE,
194
    constants.RAPI_USERS_FILE,
195
    constants.CONFD_HMAC_KEY,
196
    constants.CLUSTER_DOMAIN_SECRET_FILE,
197
    ])
198

    
199
  for hv_name in constants.HYPER_TYPES:
200
    hv_class = hypervisor.GetHypervisorClass(hv_name)
201
    allowed_files.update(hv_class.GetAncillaryFiles())
202

    
203
  return frozenset(allowed_files)
204

    
205

    
206
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
207

    
208

    
209
def JobQueuePurge():
210
  """Removes job queue files and archived jobs.
211

212
  @rtype: tuple
213
  @return: True, None
214

215
  """
216
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
217
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
218

    
219

    
220
def GetMasterInfo():
221
  """Returns master information.
222

223
  This is an utility function to compute master information, either
224
  for consumption here or from the node daemon.
225

226
  @rtype: tuple
227
  @return: master_netdev, master_ip, master_name
228
  @raise RPCFail: in case of errors
229

230
  """
231
  try:
232
    cfg = _GetConfig()
233
    master_netdev = cfg.GetMasterNetdev()
234
    master_ip = cfg.GetMasterIP()
235
    master_node = cfg.GetMasterNode()
236
  except errors.ConfigurationError, err:
237
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
238
  return (master_netdev, master_ip, master_node)
239

    
240

    
241
def StartMaster(start_daemons, no_voting):
242
  """Activate local node as master node.
243

244
  The function will always try activate the IP address of the master
245
  (unless someone else has it). It will also start the master daemons,
246
  based on the start_daemons parameter.
247

248
  @type start_daemons: boolean
249
  @param start_daemons: whether to also start the master
250
      daemons (ganeti-masterd and ganeti-rapi)
251
  @type no_voting: boolean
252
  @param no_voting: whether to start ganeti-masterd without a node vote
253
      (if start_daemons is True), but still non-interactively
254
  @rtype: None
255

256
  """
257
  # GetMasterInfo will raise an exception if not able to return data
258
  master_netdev, master_ip, _ = GetMasterInfo()
259

    
260
  err_msgs = []
261
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
262
    if utils.OwnIpAddress(master_ip):
263
      # we already have the ip:
264
      logging.debug("Master IP already configured, doing nothing")
265
    else:
266
      msg = "Someone else has the master ip, not activating"
267
      logging.error(msg)
268
      err_msgs.append(msg)
269
  else:
270
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
271
                           "dev", master_netdev, "label",
272
                           "%s:0" % master_netdev])
273
    if result.failed:
274
      msg = "Can't activate master IP: %s" % result.output
275
      logging.error(msg)
276
      err_msgs.append(msg)
277

    
278
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
279
                           "-s", master_ip, master_ip])
280
    # we'll ignore the exit code of arping
281

    
282
  # and now start the master and rapi daemons
283
  if start_daemons:
284
    if no_voting:
285
      masterd_args = "--no-voting --yes-do-it"
286
    else:
287
      masterd_args = ""
288

    
289
    env = {
290
      "EXTRA_MASTERD_ARGS": masterd_args,
291
      }
292

    
293
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
294
    if result.failed:
295
      msg = "Can't start Ganeti master: %s" % result.output
296
      logging.error(msg)
297
      err_msgs.append(msg)
298

    
299
  if err_msgs:
300
    _Fail("; ".join(err_msgs))
301

    
302

    
303
def StopMaster(stop_daemons):
304
  """Deactivate this node as master.
305

306
  The function will always try to deactivate the IP address of the
307
  master. It will also stop the master daemons depending on the
308
  stop_daemons parameter.
309

310
  @type stop_daemons: boolean
311
  @param stop_daemons: whether to also stop the master daemons
312
      (ganeti-masterd and ganeti-rapi)
313
  @rtype: None
314

315
  """
316
  # TODO: log and report back to the caller the error failures; we
317
  # need to decide in which case we fail the RPC for this
318

    
319
  # GetMasterInfo will raise an exception if not able to return data
320
  master_netdev, master_ip, _ = GetMasterInfo()
321

    
322
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
323
                         "dev", master_netdev])
324
  if result.failed:
325
    logging.error("Can't remove the master IP, error: %s", result.output)
326
    # but otherwise ignore the failure
327

    
328
  if stop_daemons:
329
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
330
    if result.failed:
331
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
332
                    " and error %s",
333
                    result.cmd, result.exit_code, result.output)
334

    
335

    
336
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
337
  """Joins this node to the cluster.
338

339
  This does the following:
340
      - updates the hostkeys of the machine (rsa and dsa)
341
      - adds the ssh private key to the user
342
      - adds the ssh public key to the users' authorized_keys file
343

344
  @type dsa: str
345
  @param dsa: the DSA private key to write
346
  @type dsapub: str
347
  @param dsapub: the DSA public key to write
348
  @type rsa: str
349
  @param rsa: the RSA private key to write
350
  @type rsapub: str
351
  @param rsapub: the RSA public key to write
352
  @type sshkey: str
353
  @param sshkey: the SSH private key to write
354
  @type sshpub: str
355
  @param sshpub: the SSH public key to write
356
  @rtype: boolean
357
  @return: the success of the operation
358

359
  """
360
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
361
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
362
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
363
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
364
  for name, content, mode in sshd_keys:
365
    utils.WriteFile(name, data=content, mode=mode)
366

    
367
  try:
368
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
369
                                                    mkdir=True)
370
  except errors.OpExecError, err:
371
    _Fail("Error while processing user ssh files: %s", err, exc=True)
372

    
373
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
374
    utils.WriteFile(name, data=content, mode=0600)
375

    
376
  utils.AddAuthorizedKey(auth_keys, sshpub)
377

    
378
  result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
379
  if result.failed:
380
    _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
381
          result.cmd, result.exit_code, result.output)
382

    
383

    
384
def LeaveCluster(modify_ssh_setup):
385
  """Cleans up and remove the current node.
386

387
  This function cleans up and prepares the current node to be removed
388
  from the cluster.
389

390
  If processing is successful, then it raises an
391
  L{errors.QuitGanetiException} which is used as a special case to
392
  shutdown the node daemon.
393

394
  @param modify_ssh_setup: boolean
395

396
  """
397
  _CleanDirectory(constants.DATA_DIR)
398
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
399
  JobQueuePurge()
400

    
401
  if modify_ssh_setup:
402
    try:
403
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
404

    
405
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
406

    
407
      utils.RemoveFile(priv_key)
408
      utils.RemoveFile(pub_key)
409
    except errors.OpExecError:
410
      logging.exception("Error while processing ssh files")
411

    
412
  try:
413
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
414
    utils.RemoveFile(constants.RAPI_CERT_FILE)
415
    utils.RemoveFile(constants.NODED_CERT_FILE)
416
  except: # pylint: disable-msg=W0702
417
    logging.exception("Error while removing cluster secrets")
418

    
419
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
420
  if result.failed:
421
    logging.error("Command %s failed with exitcode %s and error %s",
422
                  result.cmd, result.exit_code, result.output)
423

    
424
  # Raise a custom exception (handled in ganeti-noded)
425
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
426

    
427

    
428
def GetNodeInfo(vgname, hypervisor_type):
429
  """Gives back a hash with different information about the node.
430

431
  @type vgname: C{string}
432
  @param vgname: the name of the volume group to ask for disk space information
433
  @type hypervisor_type: C{str}
434
  @param hypervisor_type: the name of the hypervisor to ask for
435
      memory information
436
  @rtype: C{dict}
437
  @return: dictionary with the following keys:
438
      - vg_size is the size of the configured volume group in MiB
439
      - vg_free is the free size of the volume group in MiB
440
      - memory_dom0 is the memory allocated for domain0 in MiB
441
      - memory_free is the currently available (free) ram in MiB
442
      - memory_total is the total number of ram in MiB
443

444
  """
445
  outputarray = {}
446
  vginfo = _GetVGInfo(vgname)
447
  outputarray['vg_size'] = vginfo['vg_size']
448
  outputarray['vg_free'] = vginfo['vg_free']
449

    
450
  hyper = hypervisor.GetHypervisor(hypervisor_type)
451
  hyp_info = hyper.GetNodeInfo()
452
  if hyp_info is not None:
453
    outputarray.update(hyp_info)
454

    
455
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
456

    
457
  return outputarray
458

    
459

    
460
def VerifyNode(what, cluster_name):
461
  """Verify the status of the local node.
462

463
  Based on the input L{what} parameter, various checks are done on the
464
  local node.
465

466
  If the I{filelist} key is present, this list of
467
  files is checksummed and the file/checksum pairs are returned.
468

469
  If the I{nodelist} key is present, we check that we have
470
  connectivity via ssh with the target nodes (and check the hostname
471
  report).
472

473
  If the I{node-net-test} key is present, we check that we have
474
  connectivity to the given nodes via both primary IP and, if
475
  applicable, secondary IPs.
476

477
  @type what: C{dict}
478
  @param what: a dictionary of things to check:
479
      - filelist: list of files for which to compute checksums
480
      - nodelist: list of nodes we should check ssh communication with
481
      - node-net-test: list of nodes we should check node daemon port
482
        connectivity with
483
      - hypervisor: list with hypervisors to run the verify for
484
  @rtype: dict
485
  @return: a dictionary with the same keys as the input dict, and
486
      values representing the result of the checks
487

488
  """
489
  result = {}
490
  my_name = utils.HostInfo().name
491
  port = utils.GetDaemonPort(constants.NODED)
492

    
493
  if constants.NV_HYPERVISOR in what:
494
    result[constants.NV_HYPERVISOR] = tmp = {}
495
    for hv_name in what[constants.NV_HYPERVISOR]:
496
      try:
497
        val = hypervisor.GetHypervisor(hv_name).Verify()
498
      except errors.HypervisorError, err:
499
        val = "Error while checking hypervisor: %s" % str(err)
500
      tmp[hv_name] = val
501

    
502
  if constants.NV_FILELIST in what:
503
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
504
      what[constants.NV_FILELIST])
505

    
506
  if constants.NV_NODELIST in what:
507
    result[constants.NV_NODELIST] = tmp = {}
508
    random.shuffle(what[constants.NV_NODELIST])
509
    for node in what[constants.NV_NODELIST]:
510
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
511
      if not success:
512
        tmp[node] = message
513

    
514
  if constants.NV_NODENETTEST in what:
515
    result[constants.NV_NODENETTEST] = tmp = {}
516
    my_pip = my_sip = None
517
    for name, pip, sip in what[constants.NV_NODENETTEST]:
518
      if name == my_name:
519
        my_pip = pip
520
        my_sip = sip
521
        break
522
    if not my_pip:
523
      tmp[my_name] = ("Can't find my own primary/secondary IP"
524
                      " in the node list")
525
    else:
526
      for name, pip, sip in what[constants.NV_NODENETTEST]:
527
        fail = []
528
        if not utils.TcpPing(pip, port, source=my_pip):
529
          fail.append("primary")
530
        if sip != pip:
531
          if not utils.TcpPing(sip, port, source=my_sip):
532
            fail.append("secondary")
533
        if fail:
534
          tmp[name] = ("failure using the %s interface(s)" %
535
                       " and ".join(fail))
536

    
537
  if constants.NV_MASTERIP in what:
538
    # FIXME: add checks on incoming data structures (here and in the
539
    # rest of the function)
540
    master_name, master_ip = what[constants.NV_MASTERIP]
541
    if master_name == my_name:
542
      source = constants.IP4_ADDRESS_LOCALHOST
543
    else:
544
      source = None
545
    result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
546
                                                  source=source)
547

    
548
  if constants.NV_LVLIST in what:
549
    try:
550
      val = GetVolumeList(what[constants.NV_LVLIST])
551
    except RPCFail, err:
552
      val = str(err)
553
    result[constants.NV_LVLIST] = val
554

    
555
  if constants.NV_INSTANCELIST in what:
556
    # GetInstanceList can fail
557
    try:
558
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
559
    except RPCFail, err:
560
      val = str(err)
561
    result[constants.NV_INSTANCELIST] = val
562

    
563
  if constants.NV_VGLIST in what:
564
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
565

    
566
  if constants.NV_PVLIST in what:
567
    result[constants.NV_PVLIST] = \
568
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
569
                                   filter_allocatable=False)
570

    
571
  if constants.NV_VERSION in what:
572
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
573
                                    constants.RELEASE_VERSION)
574

    
575
  if constants.NV_HVINFO in what:
576
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
577
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
578

    
579
  if constants.NV_DRBDLIST in what:
580
    try:
581
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
582
    except errors.BlockDeviceError, err:
583
      logging.warning("Can't get used minors list", exc_info=True)
584
      used_minors = str(err)
585
    result[constants.NV_DRBDLIST] = used_minors
586

    
587
  if constants.NV_DRBDHELPER in what:
588
    status = True
589
    try:
590
      payload = bdev.BaseDRBD.GetUsermodeHelper()
591
    except errors.BlockDeviceError, err:
592
      logging.error("Can't get DRBD usermode helper: %s", str(err))
593
      status = False
594
      payload = str(err)
595
    result[constants.NV_DRBDHELPER] = (status, payload)
596

    
597
  if constants.NV_NODESETUP in what:
598
    result[constants.NV_NODESETUP] = tmpr = []
599
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
600
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
601
                  " under /sys, missing required directories /sys/block"
602
                  " and /sys/class/net")
603
    if (not os.path.isdir("/proc/sys") or
604
        not os.path.isfile("/proc/sysrq-trigger")):
605
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
606
                  " under /proc, missing required directory /proc/sys and"
607
                  " the file /proc/sysrq-trigger")
608

    
609
  if constants.NV_TIME in what:
610
    result[constants.NV_TIME] = utils.SplitTime(time.time())
611

    
612
  if constants.NV_OSLIST in what:
613
    result[constants.NV_OSLIST] = DiagnoseOS()
614

    
615
  return result
616

    
617

    
618
def GetVolumeList(vg_name):
619
  """Compute list of logical volumes and their size.
620

621
  @type vg_name: str
622
  @param vg_name: the volume group whose LVs we should list
623
  @rtype: dict
624
  @return:
625
      dictionary of all partions (key) with value being a tuple of
626
      their size (in MiB), inactive and online status::
627

628
        {'test1': ('20.06', True, True)}
629

630
      in case of errors, a string is returned with the error
631
      details.
632

633
  """
634
  lvs = {}
635
  sep = '|'
636
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
637
                         "--separator=%s" % sep,
638
                         "-olv_name,lv_size,lv_attr", vg_name])
639
  if result.failed:
640
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
641

    
642
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
643
  for line in result.stdout.splitlines():
644
    line = line.strip()
645
    match = valid_line_re.match(line)
646
    if not match:
647
      logging.error("Invalid line returned from lvs output: '%s'", line)
648
      continue
649
    name, size, attr = match.groups()
650
    inactive = attr[4] == '-'
651
    online = attr[5] == 'o'
652
    virtual = attr[0] == 'v'
653
    if virtual:
654
      # we don't want to report such volumes as existing, since they
655
      # don't really hold data
656
      continue
657
    lvs[name] = (size, inactive, online)
658

    
659
  return lvs
660

    
661

    
662
def ListVolumeGroups():
663
  """List the volume groups and their size.
664

665
  @rtype: dict
666
  @return: dictionary with keys volume name and values the
667
      size of the volume
668

669
  """
670
  return utils.ListVolumeGroups()
671

    
672

    
673
def NodeVolumes():
674
  """List all volumes on this node.
675

676
  @rtype: list
677
  @return:
678
    A list of dictionaries, each having four keys:
679
      - name: the logical volume name,
680
      - size: the size of the logical volume
681
      - dev: the physical device on which the LV lives
682
      - vg: the volume group to which it belongs
683

684
    In case of errors, we return an empty list and log the
685
    error.
686

687
    Note that since a logical volume can live on multiple physical
688
    volumes, the resulting list might include a logical volume
689
    multiple times.
690

691
  """
692
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
693
                         "--separator=|",
694
                         "--options=lv_name,lv_size,devices,vg_name"])
695
  if result.failed:
696
    _Fail("Failed to list logical volumes, lvs output: %s",
697
          result.output)
698

    
699
  def parse_dev(dev):
700
    return dev.split('(')[0]
701

    
702
  def handle_dev(dev):
703
    return [parse_dev(x) for x in dev.split(",")]
704

    
705
  def map_line(line):
706
    line = [v.strip() for v in line]
707
    return [{'name': line[0], 'size': line[1],
708
             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
709

    
710
  all_devs = []
711
  for line in result.stdout.splitlines():
712
    if line.count('|') >= 3:
713
      all_devs.extend(map_line(line.split('|')))
714
    else:
715
      logging.warning("Strange line in the output from lvs: '%s'", line)
716
  return all_devs
717

    
718

    
719
def BridgesExist(bridges_list):
720
  """Check if a list of bridges exist on the current node.
721

722
  @rtype: boolean
723
  @return: C{True} if all of them exist, C{False} otherwise
724

725
  """
726
  missing = []
727
  for bridge in bridges_list:
728
    if not utils.BridgeExists(bridge):
729
      missing.append(bridge)
730

    
731
  if missing:
732
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
733

    
734

    
735
def GetInstanceList(hypervisor_list):
736
  """Provides a list of instances.
737

738
  @type hypervisor_list: list
739
  @param hypervisor_list: the list of hypervisors to query information
740

741
  @rtype: list
742
  @return: a list of all running instances on the current node
743
    - instance1.example.com
744
    - instance2.example.com
745

746
  """
747
  results = []
748
  for hname in hypervisor_list:
749
    try:
750
      names = hypervisor.GetHypervisor(hname).ListInstances()
751
      results.extend(names)
752
    except errors.HypervisorError, err:
753
      _Fail("Error enumerating instances (hypervisor %s): %s",
754
            hname, err, exc=True)
755

    
756
  return results
757

    
758

    
759
def GetInstanceInfo(instance, hname):
760
  """Gives back the information about an instance as a dictionary.
761

762
  @type instance: string
763
  @param instance: the instance name
764
  @type hname: string
765
  @param hname: the hypervisor type of the instance
766

767
  @rtype: dict
768
  @return: dictionary with the following keys:
769
      - memory: memory size of instance (int)
770
      - state: xen state of instance (string)
771
      - time: cpu time of instance (float)
772

773
  """
774
  output = {}
775

    
776
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
777
  if iinfo is not None:
778
    output['memory'] = iinfo[2]
779
    output['state'] = iinfo[4]
780
    output['time'] = iinfo[5]
781

    
782
  return output
783

    
784

    
785
def GetInstanceMigratable(instance):
786
  """Gives whether an instance can be migrated.
787

788
  @type instance: L{objects.Instance}
789
  @param instance: object representing the instance to be checked.
790

791
  @rtype: tuple
792
  @return: tuple of (result, description) where:
793
      - result: whether the instance can be migrated or not
794
      - description: a description of the issue, if relevant
795

796
  """
797
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
798
  iname = instance.name
799
  if iname not in hyper.ListInstances():
800
    _Fail("Instance %s is not running", iname)
801

    
802
  for idx in range(len(instance.disks)):
803
    link_name = _GetBlockDevSymlinkPath(iname, idx)
804
    if not os.path.islink(link_name):
805
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
806

    
807

    
808
def GetAllInstancesInfo(hypervisor_list):
809
  """Gather data about all instances.
810

811
  This is the equivalent of L{GetInstanceInfo}, except that it
812
  computes data for all instances at once, thus being faster if one
813
  needs data about more than one instance.
814

815
  @type hypervisor_list: list
816
  @param hypervisor_list: list of hypervisors to query for instance data
817

818
  @rtype: dict
819
  @return: dictionary of instance: data, with data having the following keys:
820
      - memory: memory size of instance (int)
821
      - state: xen state of instance (string)
822
      - time: cpu time of instance (float)
823
      - vcpus: the number of vcpus
824

825
  """
826
  output = {}
827

    
828
  for hname in hypervisor_list:
829
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
830
    if iinfo:
831
      for name, _, memory, vcpus, state, times in iinfo:
832
        value = {
833
          'memory': memory,
834
          'vcpus': vcpus,
835
          'state': state,
836
          'time': times,
837
          }
838
        if name in output:
839
          # we only check static parameters, like memory and vcpus,
840
          # and not state and time which can change between the
841
          # invocations of the different hypervisors
842
          for key in 'memory', 'vcpus':
843
            if value[key] != output[name][key]:
844
              _Fail("Instance %s is running twice"
845
                    " with different parameters", name)
846
        output[name] = value
847

    
848
  return output
849

    
850

    
851
def _InstanceLogName(kind, os_name, instance):
852
  """Compute the OS log filename for a given instance and operation.
853

854
  The instance name and os name are passed in as strings since not all
855
  operations have these as part of an instance object.
856

857
  @type kind: string
858
  @param kind: the operation type (e.g. add, import, etc.)
859
  @type os_name: string
860
  @param os_name: the os name
861
  @type instance: string
862
  @param instance: the name of the instance being imported/added/etc.
863

864
  """
865
  # TODO: Use tempfile.mkstemp to create unique filename
866
  base = ("%s-%s-%s-%s.log" %
867
          (kind, os_name, instance, utils.TimestampForFilename()))
868
  return utils.PathJoin(constants.LOG_OS_DIR, base)
869

    
870

    
871
def InstanceOsAdd(instance, reinstall, debug):
872
  """Add an OS to an instance.
873

874
  @type instance: L{objects.Instance}
875
  @param instance: Instance whose OS is to be installed
876
  @type reinstall: boolean
877
  @param reinstall: whether this is an instance reinstall
878
  @type debug: integer
879
  @param debug: debug level, passed to the OS scripts
880
  @rtype: None
881

882
  """
883
  inst_os = OSFromDisk(instance.os)
884

    
885
  create_env = OSEnvironment(instance, inst_os, debug)
886
  if reinstall:
887
    create_env['INSTANCE_REINSTALL'] = "1"
888

    
889
  logfile = _InstanceLogName("add", instance.os, instance.name)
890

    
891
  result = utils.RunCmd([inst_os.create_script], env=create_env,
892
                        cwd=inst_os.path, output=logfile,)
893
  if result.failed:
894
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
895
                  " output: %s", result.cmd, result.fail_reason, logfile,
896
                  result.output)
897
    lines = [utils.SafeEncode(val)
898
             for val in utils.TailFile(logfile, lines=20)]
899
    _Fail("OS create script failed (%s), last lines in the"
900
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
901

    
902

    
903
def RunRenameInstance(instance, old_name, debug):
904
  """Run the OS rename script for an instance.
905

906
  @type instance: L{objects.Instance}
907
  @param instance: Instance whose OS is to be installed
908
  @type old_name: string
909
  @param old_name: previous instance name
910
  @type debug: integer
911
  @param debug: debug level, passed to the OS scripts
912
  @rtype: boolean
913
  @return: the success of the operation
914

915
  """
916
  inst_os = OSFromDisk(instance.os)
917

    
918
  rename_env = OSEnvironment(instance, inst_os, debug)
919
  rename_env['OLD_INSTANCE_NAME'] = old_name
920

    
921
  logfile = _InstanceLogName("rename", instance.os,
922
                             "%s-%s" % (old_name, instance.name))
923

    
924
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
925
                        cwd=inst_os.path, output=logfile)
926

    
927
  if result.failed:
928
    logging.error("os create command '%s' returned error: %s output: %s",
929
                  result.cmd, result.fail_reason, result.output)
930
    lines = [utils.SafeEncode(val)
931
             for val in utils.TailFile(logfile, lines=20)]
932
    _Fail("OS rename script failed (%s), last lines in the"
933
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
934

    
935

    
936
def _GetVGInfo(vg_name):
937
  """Get information about the volume group.
938

939
  @type vg_name: str
940
  @param vg_name: the volume group which we query
941
  @rtype: dict
942
  @return:
943
    A dictionary with the following keys:
944
      - C{vg_size} is the total size of the volume group in MiB
945
      - C{vg_free} is the free size of the volume group in MiB
946
      - C{pv_count} are the number of physical disks in that VG
947

948
    If an error occurs during gathering of data, we return the same dict
949
    with keys all set to None.
950

951
  """
952
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
953

    
954
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
955
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
956

    
957
  if retval.failed:
958
    logging.error("volume group %s not present", vg_name)
959
    return retdic
960
  valarr = retval.stdout.strip().rstrip(':').split(':')
961
  if len(valarr) == 3:
962
    try:
963
      retdic = {
964
        "vg_size": int(round(float(valarr[0]), 0)),
965
        "vg_free": int(round(float(valarr[1]), 0)),
966
        "pv_count": int(valarr[2]),
967
        }
968
    except (TypeError, ValueError), err:
969
      logging.exception("Fail to parse vgs output: %s", err)
970
  else:
971
    logging.error("vgs output has the wrong number of fields (expected"
972
                  " three): %s", str(valarr))
973
  return retdic
974

    
975

    
976
def _GetBlockDevSymlinkPath(instance_name, idx):
977
  return utils.PathJoin(constants.DISK_LINKS_DIR,
978
                        "%s:%d" % (instance_name, idx))
979

    
980

    
981
def _SymlinkBlockDev(instance_name, device_path, idx):
982
  """Set up symlinks to a instance's block device.
983

984
  This is an auxiliary function run when an instance is start (on the primary
985
  node) or when an instance is migrated (on the target node).
986

987

988
  @param instance_name: the name of the target instance
989
  @param device_path: path of the physical block device, on the node
990
  @param idx: the disk index
991
  @return: absolute path to the disk's symlink
992

993
  """
994
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
995
  try:
996
    os.symlink(device_path, link_name)
997
  except OSError, err:
998
    if err.errno == errno.EEXIST:
999
      if (not os.path.islink(link_name) or
1000
          os.readlink(link_name) != device_path):
1001
        os.remove(link_name)
1002
        os.symlink(device_path, link_name)
1003
    else:
1004
      raise
1005

    
1006
  return link_name
1007

    
1008

    
1009
def _RemoveBlockDevLinks(instance_name, disks):
1010
  """Remove the block device symlinks belonging to the given instance.
1011

1012
  """
1013
  for idx, _ in enumerate(disks):
1014
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1015
    if os.path.islink(link_name):
1016
      try:
1017
        os.remove(link_name)
1018
      except OSError:
1019
        logging.exception("Can't remove symlink '%s'", link_name)
1020

    
1021

    
1022
def _GatherAndLinkBlockDevs(instance):
1023
  """Set up an instance's block device(s).
1024

1025
  This is run on the primary node at instance startup. The block
1026
  devices must be already assembled.
1027

1028
  @type instance: L{objects.Instance}
1029
  @param instance: the instance whose disks we shoul assemble
1030
  @rtype: list
1031
  @return: list of (disk_object, device_path)
1032

1033
  """
1034
  block_devices = []
1035
  for idx, disk in enumerate(instance.disks):
1036
    device = _RecursiveFindBD(disk)
1037
    if device is None:
1038
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1039
                                    str(disk))
1040
    device.Open()
1041
    try:
1042
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1043
    except OSError, e:
1044
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1045
                                    e.strerror)
1046

    
1047
    block_devices.append((disk, link_name))
1048

    
1049
  return block_devices
1050

    
1051

    
1052
def StartInstance(instance):
1053
  """Start an instance.
1054

1055
  @type instance: L{objects.Instance}
1056
  @param instance: the instance object
1057
  @rtype: None
1058

1059
  """
1060
  running_instances = GetInstanceList([instance.hypervisor])
1061

    
1062
  if instance.name in running_instances:
1063
    logging.info("Instance %s already running, not starting", instance.name)
1064
    return
1065

    
1066
  try:
1067
    block_devices = _GatherAndLinkBlockDevs(instance)
1068
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1069
    hyper.StartInstance(instance, block_devices)
1070
  except errors.BlockDeviceError, err:
1071
    _Fail("Block device error: %s", err, exc=True)
1072
  except errors.HypervisorError, err:
1073
    _RemoveBlockDevLinks(instance.name, instance.disks)
1074
    _Fail("Hypervisor error: %s", err, exc=True)
1075

    
1076

    
1077
def InstanceShutdown(instance, timeout):
1078
  """Shut an instance down.
1079

1080
  @note: this functions uses polling with a hardcoded timeout.
1081

1082
  @type instance: L{objects.Instance}
1083
  @param instance: the instance object
1084
  @type timeout: integer
1085
  @param timeout: maximum timeout for soft shutdown
1086
  @rtype: None
1087

1088
  """
1089
  hv_name = instance.hypervisor
1090
  hyper = hypervisor.GetHypervisor(hv_name)
1091
  iname = instance.name
1092

    
1093
  if instance.name not in hyper.ListInstances():
1094
    logging.info("Instance %s not running, doing nothing", iname)
1095
    return
1096

    
1097
  class _TryShutdown:
1098
    def __init__(self):
1099
      self.tried_once = False
1100

    
1101
    def __call__(self):
1102
      if iname not in hyper.ListInstances():
1103
        return
1104

    
1105
      try:
1106
        hyper.StopInstance(instance, retry=self.tried_once)
1107
      except errors.HypervisorError, err:
1108
        if iname not in hyper.ListInstances():
1109
          # if the instance is no longer existing, consider this a
1110
          # success and go to cleanup
1111
          return
1112

    
1113
        _Fail("Failed to stop instance %s: %s", iname, err)
1114

    
1115
      self.tried_once = True
1116

    
1117
      raise utils.RetryAgain()
1118

    
1119
  try:
1120
    utils.Retry(_TryShutdown(), 5, timeout)
1121
  except utils.RetryTimeout:
1122
    # the shutdown did not succeed
1123
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1124

    
1125
    try:
1126
      hyper.StopInstance(instance, force=True)
1127
    except errors.HypervisorError, err:
1128
      if iname in hyper.ListInstances():
1129
        # only raise an error if the instance still exists, otherwise
1130
        # the error could simply be "instance ... unknown"!
1131
        _Fail("Failed to force stop instance %s: %s", iname, err)
1132

    
1133
    time.sleep(1)
1134

    
1135
    if iname in hyper.ListInstances():
1136
      _Fail("Could not shutdown instance %s even by destroy", iname)
1137

    
1138
  try:
1139
    hyper.CleanupInstance(instance.name)
1140
  except errors.HypervisorError, err:
1141
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1142

    
1143
  _RemoveBlockDevLinks(iname, instance.disks)
1144

    
1145

    
1146
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1147
  """Reboot an instance.
1148

1149
  @type instance: L{objects.Instance}
1150
  @param instance: the instance object to reboot
1151
  @type reboot_type: str
1152
  @param reboot_type: the type of reboot, one the following
1153
    constants:
1154
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1155
        instance OS, do not recreate the VM
1156
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1157
        restart the VM (at the hypervisor level)
1158
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1159
        not accepted here, since that mode is handled differently, in
1160
        cmdlib, and translates into full stop and start of the
1161
        instance (instead of a call_instance_reboot RPC)
1162
  @type shutdown_timeout: integer
1163
  @param shutdown_timeout: maximum timeout for soft shutdown
1164
  @rtype: None
1165

1166
  """
1167
  running_instances = GetInstanceList([instance.hypervisor])
1168

    
1169
  if instance.name not in running_instances:
1170
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1171

    
1172
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1173
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1174
    try:
1175
      hyper.RebootInstance(instance)
1176
    except errors.HypervisorError, err:
1177
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1178
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1179
    try:
1180
      InstanceShutdown(instance, shutdown_timeout)
1181
      return StartInstance(instance)
1182
    except errors.HypervisorError, err:
1183
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1184
  else:
1185
    _Fail("Invalid reboot_type received: %s", reboot_type)
1186

    
1187

    
1188
def MigrationInfo(instance):
1189
  """Gather information about an instance to be migrated.
1190

1191
  @type instance: L{objects.Instance}
1192
  @param instance: the instance definition
1193

1194
  """
1195
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1196
  try:
1197
    info = hyper.MigrationInfo(instance)
1198
  except errors.HypervisorError, err:
1199
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1200
  return info
1201

    
1202

    
1203
def AcceptInstance(instance, info, target):
1204
  """Prepare the node to accept an instance.
1205

1206
  @type instance: L{objects.Instance}
1207
  @param instance: the instance definition
1208
  @type info: string/data (opaque)
1209
  @param info: migration information, from the source node
1210
  @type target: string
1211
  @param target: target host (usually ip), on this node
1212

1213
  """
1214
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1215
  try:
1216
    hyper.AcceptInstance(instance, info, target)
1217
  except errors.HypervisorError, err:
1218
    _Fail("Failed to accept instance: %s", err, exc=True)
1219

    
1220

    
1221
def FinalizeMigration(instance, info, success):
1222
  """Finalize any preparation to accept an instance.
1223

1224
  @type instance: L{objects.Instance}
1225
  @param instance: the instance definition
1226
  @type info: string/data (opaque)
1227
  @param info: migration information, from the source node
1228
  @type success: boolean
1229
  @param success: whether the migration was a success or a failure
1230

1231
  """
1232
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1233
  try:
1234
    hyper.FinalizeMigration(instance, info, success)
1235
  except errors.HypervisorError, err:
1236
    _Fail("Failed to finalize migration: %s", err, exc=True)
1237

    
1238

    
1239
def MigrateInstance(instance, target, live):
1240
  """Migrates an instance to another node.
1241

1242
  @type instance: L{objects.Instance}
1243
  @param instance: the instance definition
1244
  @type target: string
1245
  @param target: the target node name
1246
  @type live: boolean
1247
  @param live: whether the migration should be done live or not (the
1248
      interpretation of this parameter is left to the hypervisor)
1249
  @rtype: tuple
1250
  @return: a tuple of (success, msg) where:
1251
      - succes is a boolean denoting the success/failure of the operation
1252
      - msg is a string with details in case of failure
1253

1254
  """
1255
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1256

    
1257
  try:
1258
    hyper.MigrateInstance(instance, target, live)
1259
  except errors.HypervisorError, err:
1260
    _Fail("Failed to migrate instance: %s", err, exc=True)
1261

    
1262

    
1263
def BlockdevCreate(disk, size, owner, on_primary, info):
1264
  """Creates a block device for an instance.
1265

1266
  @type disk: L{objects.Disk}
1267
  @param disk: the object describing the disk we should create
1268
  @type size: int
1269
  @param size: the size of the physical underlying device, in MiB
1270
  @type owner: str
1271
  @param owner: the name of the instance for which disk is created,
1272
      used for device cache data
1273
  @type on_primary: boolean
1274
  @param on_primary:  indicates if it is the primary node or not
1275
  @type info: string
1276
  @param info: string that will be sent to the physical device
1277
      creation, used for example to set (LVM) tags on LVs
1278

1279
  @return: the new unique_id of the device (this can sometime be
1280
      computed only after creation), or None. On secondary nodes,
1281
      it's not required to return anything.
1282

1283
  """
1284
  # TODO: remove the obsolete 'size' argument
1285
  # pylint: disable-msg=W0613
1286
  clist = []
1287
  if disk.children:
1288
    for child in disk.children:
1289
      try:
1290
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1291
      except errors.BlockDeviceError, err:
1292
        _Fail("Can't assemble device %s: %s", child, err)
1293
      if on_primary or disk.AssembleOnSecondary():
1294
        # we need the children open in case the device itself has to
1295
        # be assembled
1296
        try:
1297
          # pylint: disable-msg=E1103
1298
          crdev.Open()
1299
        except errors.BlockDeviceError, err:
1300
          _Fail("Can't make child '%s' read-write: %s", child, err)
1301
      clist.append(crdev)
1302

    
1303
  try:
1304
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1305
  except errors.BlockDeviceError, err:
1306
    _Fail("Can't create block device: %s", err)
1307

    
1308
  if on_primary or disk.AssembleOnSecondary():
1309
    try:
1310
      device.Assemble()
1311
    except errors.BlockDeviceError, err:
1312
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1313
    device.SetSyncSpeed(constants.SYNC_SPEED)
1314
    if on_primary or disk.OpenOnSecondary():
1315
      try:
1316
        device.Open(force=True)
1317
      except errors.BlockDeviceError, err:
1318
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1319
    DevCacheManager.UpdateCache(device.dev_path, owner,
1320
                                on_primary, disk.iv_name)
1321

    
1322
  device.SetInfo(info)
1323

    
1324
  return device.unique_id
1325

    
1326

    
1327
def BlockdevRemove(disk):
1328
  """Remove a block device.
1329

1330
  @note: This is intended to be called recursively.
1331

1332
  @type disk: L{objects.Disk}
1333
  @param disk: the disk object we should remove
1334
  @rtype: boolean
1335
  @return: the success of the operation
1336

1337
  """
1338
  msgs = []
1339
  try:
1340
    rdev = _RecursiveFindBD(disk)
1341
  except errors.BlockDeviceError, err:
1342
    # probably can't attach
1343
    logging.info("Can't attach to device %s in remove", disk)
1344
    rdev = None
1345
  if rdev is not None:
1346
    r_path = rdev.dev_path
1347
    try:
1348
      rdev.Remove()
1349
    except errors.BlockDeviceError, err:
1350
      msgs.append(str(err))
1351
    if not msgs:
1352
      DevCacheManager.RemoveCache(r_path)
1353

    
1354
  if disk.children:
1355
    for child in disk.children:
1356
      try:
1357
        BlockdevRemove(child)
1358
      except RPCFail, err:
1359
        msgs.append(str(err))
1360

    
1361
  if msgs:
1362
    _Fail("; ".join(msgs))
1363

    
1364

    
1365
def _RecursiveAssembleBD(disk, owner, as_primary):
1366
  """Activate a block device for an instance.
1367

1368
  This is run on the primary and secondary nodes for an instance.
1369

1370
  @note: this function is called recursively.
1371

1372
  @type disk: L{objects.Disk}
1373
  @param disk: the disk we try to assemble
1374
  @type owner: str
1375
  @param owner: the name of the instance which owns the disk
1376
  @type as_primary: boolean
1377
  @param as_primary: if we should make the block device
1378
      read/write
1379

1380
  @return: the assembled device or None (in case no device
1381
      was assembled)
1382
  @raise errors.BlockDeviceError: in case there is an error
1383
      during the activation of the children or the device
1384
      itself
1385

1386
  """
1387
  children = []
1388
  if disk.children:
1389
    mcn = disk.ChildrenNeeded()
1390
    if mcn == -1:
1391
      mcn = 0 # max number of Nones allowed
1392
    else:
1393
      mcn = len(disk.children) - mcn # max number of Nones
1394
    for chld_disk in disk.children:
1395
      try:
1396
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1397
      except errors.BlockDeviceError, err:
1398
        if children.count(None) >= mcn:
1399
          raise
1400
        cdev = None
1401
        logging.error("Error in child activation (but continuing): %s",
1402
                      str(err))
1403
      children.append(cdev)
1404

    
1405
  if as_primary or disk.AssembleOnSecondary():
1406
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1407
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1408
    result = r_dev
1409
    if as_primary or disk.OpenOnSecondary():
1410
      r_dev.Open()
1411
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1412
                                as_primary, disk.iv_name)
1413

    
1414
  else:
1415
    result = True
1416
  return result
1417

    
1418

    
1419
def BlockdevAssemble(disk, owner, as_primary):
1420
  """Activate a block device for an instance.
1421

1422
  This is a wrapper over _RecursiveAssembleBD.
1423

1424
  @rtype: str or boolean
1425
  @return: a C{/dev/...} path for primary nodes, and
1426
      C{True} for secondary nodes
1427

1428
  """
1429
  try:
1430
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1431
    if isinstance(result, bdev.BlockDev):
1432
      # pylint: disable-msg=E1103
1433
      result = result.dev_path
1434
  except errors.BlockDeviceError, err:
1435
    _Fail("Error while assembling disk: %s", err, exc=True)
1436

    
1437
  return result
1438

    
1439

    
1440
def BlockdevShutdown(disk):
1441
  """Shut down a block device.
1442

1443
  First, if the device is assembled (Attach() is successful), then
1444
  the device is shutdown. Then the children of the device are
1445
  shutdown.
1446

1447
  This function is called recursively. Note that we don't cache the
1448
  children or such, as oppossed to assemble, shutdown of different
1449
  devices doesn't require that the upper device was active.
1450

1451
  @type disk: L{objects.Disk}
1452
  @param disk: the description of the disk we should
1453
      shutdown
1454
  @rtype: None
1455

1456
  """
1457
  msgs = []
1458
  r_dev = _RecursiveFindBD(disk)
1459
  if r_dev is not None:
1460
    r_path = r_dev.dev_path
1461
    try:
1462
      r_dev.Shutdown()
1463
      DevCacheManager.RemoveCache(r_path)
1464
    except errors.BlockDeviceError, err:
1465
      msgs.append(str(err))
1466

    
1467
  if disk.children:
1468
    for child in disk.children:
1469
      try:
1470
        BlockdevShutdown(child)
1471
      except RPCFail, err:
1472
        msgs.append(str(err))
1473

    
1474
  if msgs:
1475
    _Fail("; ".join(msgs))
1476

    
1477

    
1478
def BlockdevAddchildren(parent_cdev, new_cdevs):
1479
  """Extend a mirrored block device.
1480

1481
  @type parent_cdev: L{objects.Disk}
1482
  @param parent_cdev: the disk to which we should add children
1483
  @type new_cdevs: list of L{objects.Disk}
1484
  @param new_cdevs: the list of children which we should add
1485
  @rtype: None
1486

1487
  """
1488
  parent_bdev = _RecursiveFindBD(parent_cdev)
1489
  if parent_bdev is None:
1490
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1491
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1492
  if new_bdevs.count(None) > 0:
1493
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1494
  parent_bdev.AddChildren(new_bdevs)
1495

    
1496

    
1497
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1498
  """Shrink a mirrored block device.
1499

1500
  @type parent_cdev: L{objects.Disk}
1501
  @param parent_cdev: the disk from which we should remove children
1502
  @type new_cdevs: list of L{objects.Disk}
1503
  @param new_cdevs: the list of children which we should remove
1504
  @rtype: None
1505

1506
  """
1507
  parent_bdev = _RecursiveFindBD(parent_cdev)
1508
  if parent_bdev is None:
1509
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1510
  devs = []
1511
  for disk in new_cdevs:
1512
    rpath = disk.StaticDevPath()
1513
    if rpath is None:
1514
      bd = _RecursiveFindBD(disk)
1515
      if bd is None:
1516
        _Fail("Can't find device %s while removing children", disk)
1517
      else:
1518
        devs.append(bd.dev_path)
1519
    else:
1520
      if not utils.IsNormAbsPath(rpath):
1521
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1522
      devs.append(rpath)
1523
  parent_bdev.RemoveChildren(devs)
1524

    
1525

    
1526
def BlockdevGetmirrorstatus(disks):
1527
  """Get the mirroring status of a list of devices.
1528

1529
  @type disks: list of L{objects.Disk}
1530
  @param disks: the list of disks which we should query
1531
  @rtype: disk
1532
  @return:
1533
      a list of (mirror_done, estimated_time) tuples, which
1534
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1535
  @raise errors.BlockDeviceError: if any of the disks cannot be
1536
      found
1537

1538
  """
1539
  stats = []
1540
  for dsk in disks:
1541
    rbd = _RecursiveFindBD(dsk)
1542
    if rbd is None:
1543
      _Fail("Can't find device %s", dsk)
1544

    
1545
    stats.append(rbd.CombinedSyncStatus())
1546

    
1547
  return stats
1548

    
1549

    
1550
def _RecursiveFindBD(disk):
1551
  """Check if a device is activated.
1552

1553
  If so, return information about the real device.
1554

1555
  @type disk: L{objects.Disk}
1556
  @param disk: the disk object we need to find
1557

1558
  @return: None if the device can't be found,
1559
      otherwise the device instance
1560

1561
  """
1562
  children = []
1563
  if disk.children:
1564
    for chdisk in disk.children:
1565
      children.append(_RecursiveFindBD(chdisk))
1566

    
1567
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1568

    
1569

    
1570
def _OpenRealBD(disk):
1571
  """Opens the underlying block device of a disk.
1572

1573
  @type disk: L{objects.Disk}
1574
  @param disk: the disk object we want to open
1575

1576
  """
1577
  real_disk = _RecursiveFindBD(disk)
1578
  if real_disk is None:
1579
    _Fail("Block device '%s' is not set up", disk)
1580

    
1581
  real_disk.Open()
1582

    
1583
  return real_disk
1584

    
1585

    
1586
def BlockdevFind(disk):
1587
  """Check if a device is activated.
1588

1589
  If it is, return information about the real device.
1590

1591
  @type disk: L{objects.Disk}
1592
  @param disk: the disk to find
1593
  @rtype: None or objects.BlockDevStatus
1594
  @return: None if the disk cannot be found, otherwise a the current
1595
           information
1596

1597
  """
1598
  try:
1599
    rbd = _RecursiveFindBD(disk)
1600
  except errors.BlockDeviceError, err:
1601
    _Fail("Failed to find device: %s", err, exc=True)
1602

    
1603
  if rbd is None:
1604
    return None
1605

    
1606
  return rbd.GetSyncStatus()
1607

    
1608

    
1609
def BlockdevGetsize(disks):
1610
  """Computes the size of the given disks.
1611

1612
  If a disk is not found, returns None instead.
1613

1614
  @type disks: list of L{objects.Disk}
1615
  @param disks: the list of disk to compute the size for
1616
  @rtype: list
1617
  @return: list with elements None if the disk cannot be found,
1618
      otherwise the size
1619

1620
  """
1621
  result = []
1622
  for cf in disks:
1623
    try:
1624
      rbd = _RecursiveFindBD(cf)
1625
    except errors.BlockDeviceError:
1626
      result.append(None)
1627
      continue
1628
    if rbd is None:
1629
      result.append(None)
1630
    else:
1631
      result.append(rbd.GetActualSize())
1632
  return result
1633

    
1634

    
1635
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1636
  """Export a block device to a remote node.
1637

1638
  @type disk: L{objects.Disk}
1639
  @param disk: the description of the disk to export
1640
  @type dest_node: str
1641
  @param dest_node: the destination node to export to
1642
  @type dest_path: str
1643
  @param dest_path: the destination path on the target node
1644
  @type cluster_name: str
1645
  @param cluster_name: the cluster name, needed for SSH hostalias
1646
  @rtype: None
1647

1648
  """
1649
  real_disk = _OpenRealBD(disk)
1650

    
1651
  # the block size on the read dd is 1MiB to match our units
1652
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1653
                               "dd if=%s bs=1048576 count=%s",
1654
                               real_disk.dev_path, str(disk.size))
1655

    
1656
  # we set here a smaller block size as, due to ssh buffering, more
1657
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1658
  # device is not already there or we pass a wrong path; we use
1659
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1660
  # to not buffer too much memory; this means that at best, we flush
1661
  # every 64k, which will not be very fast
1662
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1663
                                " oflag=dsync", dest_path)
1664

    
1665
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1666
                                                   constants.GANETI_RUNAS,
1667
                                                   destcmd)
1668

    
1669
  # all commands have been checked, so we're safe to combine them
1670
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1671

    
1672
  result = utils.RunCmd(["bash", "-c", command])
1673

    
1674
  if result.failed:
1675
    _Fail("Disk copy command '%s' returned error: %s"
1676
          " output: %s", command, result.fail_reason, result.output)
1677

    
1678

    
1679
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1680
  """Write a file to the filesystem.
1681

1682
  This allows the master to overwrite(!) a file. It will only perform
1683
  the operation if the file belongs to a list of configuration files.
1684

1685
  @type file_name: str
1686
  @param file_name: the target file name
1687
  @type data: str
1688
  @param data: the new contents of the file
1689
  @type mode: int
1690
  @param mode: the mode to give the file (can be None)
1691
  @type uid: int
1692
  @param uid: the owner of the file (can be -1 for default)
1693
  @type gid: int
1694
  @param gid: the group of the file (can be -1 for default)
1695
  @type atime: float
1696
  @param atime: the atime to set on the file (can be None)
1697
  @type mtime: float
1698
  @param mtime: the mtime to set on the file (can be None)
1699
  @rtype: None
1700

1701
  """
1702
  if not os.path.isabs(file_name):
1703
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1704

    
1705
  if file_name not in _ALLOWED_UPLOAD_FILES:
1706
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1707
          file_name)
1708

    
1709
  raw_data = _Decompress(data)
1710

    
1711
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1712
                  atime=atime, mtime=mtime)
1713

    
1714

    
1715
def WriteSsconfFiles(values):
1716
  """Update all ssconf files.
1717

1718
  Wrapper around the SimpleStore.WriteFiles.
1719

1720
  """
1721
  ssconf.SimpleStore().WriteFiles(values)
1722

    
1723

    
1724
def _ErrnoOrStr(err):
1725
  """Format an EnvironmentError exception.
1726

1727
  If the L{err} argument has an errno attribute, it will be looked up
1728
  and converted into a textual C{E...} description. Otherwise the
1729
  string representation of the error will be returned.
1730

1731
  @type err: L{EnvironmentError}
1732
  @param err: the exception to format
1733

1734
  """
1735
  if hasattr(err, 'errno'):
1736
    detail = errno.errorcode[err.errno]
1737
  else:
1738
    detail = str(err)
1739
  return detail
1740

    
1741

    
1742
def _OSOndiskAPIVersion(os_dir):
1743
  """Compute and return the API version of a given OS.
1744

1745
  This function will try to read the API version of the OS residing in
1746
  the 'os_dir' directory.
1747

1748
  @type os_dir: str
1749
  @param os_dir: the directory in which we should look for the OS
1750
  @rtype: tuple
1751
  @return: tuple (status, data) with status denoting the validity and
1752
      data holding either the vaid versions or an error message
1753

1754
  """
1755
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1756

    
1757
  try:
1758
    st = os.stat(api_file)
1759
  except EnvironmentError, err:
1760
    return False, ("Required file '%s' not found under path %s: %s" %
1761
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1762

    
1763
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1764
    return False, ("File '%s' in %s is not a regular file" %
1765
                   (constants.OS_API_FILE, os_dir))
1766

    
1767
  try:
1768
    api_versions = utils.ReadFile(api_file).splitlines()
1769
  except EnvironmentError, err:
1770
    return False, ("Error while reading the API version file at %s: %s" %
1771
                   (api_file, _ErrnoOrStr(err)))
1772

    
1773
  try:
1774
    api_versions = [int(version.strip()) for version in api_versions]
1775
  except (TypeError, ValueError), err:
1776
    return False, ("API version(s) can't be converted to integer: %s" %
1777
                   str(err))
1778

    
1779
  return True, api_versions
1780

    
1781

    
1782
def DiagnoseOS(top_dirs=None):
1783
  """Compute the validity for all OSes.
1784

1785
  @type top_dirs: list
1786
  @param top_dirs: the list of directories in which to
1787
      search (if not given defaults to
1788
      L{constants.OS_SEARCH_PATH})
1789
  @rtype: list of L{objects.OS}
1790
  @return: a list of tuples (name, path, status, diagnose, variants,
1791
      parameters, api_version) for all (potential) OSes under all
1792
      search paths, where:
1793
          - name is the (potential) OS name
1794
          - path is the full path to the OS
1795
          - status True/False is the validity of the OS
1796
          - diagnose is the error message for an invalid OS, otherwise empty
1797
          - variants is a list of supported OS variants, if any
1798
          - parameters is a list of (name, help) parameters, if any
1799
          - api_version is a list of support OS API versions
1800

1801
  """
1802
  if top_dirs is None:
1803
    top_dirs = constants.OS_SEARCH_PATH
1804

    
1805
  result = []
1806
  for dir_name in top_dirs:
1807
    if os.path.isdir(dir_name):
1808
      try:
1809
        f_names = utils.ListVisibleFiles(dir_name)
1810
      except EnvironmentError, err:
1811
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1812
        break
1813
      for name in f_names:
1814
        os_path = utils.PathJoin(dir_name, name)
1815
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1816
        if status:
1817
          diagnose = ""
1818
          variants = os_inst.supported_variants
1819
          parameters = os_inst.supported_parameters
1820
          api_versions = os_inst.api_versions
1821
        else:
1822
          diagnose = os_inst
1823
          variants = parameters = api_versions = []
1824
        result.append((name, os_path, status, diagnose, variants,
1825
                       parameters, api_versions))
1826

    
1827
  return result
1828

    
1829

    
1830
def _TryOSFromDisk(name, base_dir=None):
1831
  """Create an OS instance from disk.
1832

1833
  This function will return an OS instance if the given name is a
1834
  valid OS name.
1835

1836
  @type base_dir: string
1837
  @keyword base_dir: Base directory containing OS installations.
1838
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1839
  @rtype: tuple
1840
  @return: success and either the OS instance if we find a valid one,
1841
      or error message
1842

1843
  """
1844
  if base_dir is None:
1845
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1846
  else:
1847
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1848

    
1849
  if os_dir is None:
1850
    return False, "Directory for OS %s not found in search path" % name
1851

    
1852
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1853
  if not status:
1854
    # push the error up
1855
    return status, api_versions
1856

    
1857
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1858
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1859
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1860

    
1861
  # OS Files dictionary, we will populate it with the absolute path names
1862
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1863

    
1864
  if max(api_versions) >= constants.OS_API_V15:
1865
    os_files[constants.OS_VARIANTS_FILE] = ''
1866

    
1867
  if max(api_versions) >= constants.OS_API_V20:
1868
    os_files[constants.OS_PARAMETERS_FILE] = ''
1869
  else:
1870
    del os_files[constants.OS_SCRIPT_VERIFY]
1871

    
1872
  for filename in os_files:
1873
    os_files[filename] = utils.PathJoin(os_dir, filename)
1874

    
1875
    try:
1876
      st = os.stat(os_files[filename])
1877
    except EnvironmentError, err:
1878
      return False, ("File '%s' under path '%s' is missing (%s)" %
1879
                     (filename, os_dir, _ErrnoOrStr(err)))
1880

    
1881
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1882
      return False, ("File '%s' under path '%s' is not a regular file" %
1883
                     (filename, os_dir))
1884

    
1885
    if filename in constants.OS_SCRIPTS:
1886
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1887
        return False, ("File '%s' under path '%s' is not executable" %
1888
                       (filename, os_dir))
1889

    
1890
  variants = []
1891
  if constants.OS_VARIANTS_FILE in os_files:
1892
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1893
    try:
1894
      variants = utils.ReadFile(variants_file).splitlines()
1895
    except EnvironmentError, err:
1896
      return False, ("Error while reading the OS variants file at %s: %s" %
1897
                     (variants_file, _ErrnoOrStr(err)))
1898
    if not variants:
1899
      return False, ("No supported os variant found")
1900

    
1901
  parameters = []
1902
  if constants.OS_PARAMETERS_FILE in os_files:
1903
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1904
    try:
1905
      parameters = utils.ReadFile(parameters_file).splitlines()
1906
    except EnvironmentError, err:
1907
      return False, ("Error while reading the OS parameters file at %s: %s" %
1908
                     (parameters_file, _ErrnoOrStr(err)))
1909
    parameters = [v.split(None, 1) for v in parameters]
1910

    
1911
  os_obj = objects.OS(name=name, path=os_dir,
1912
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1913
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1914
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1915
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1916
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1917
                                                 None),
1918
                      supported_variants=variants,
1919
                      supported_parameters=parameters,
1920
                      api_versions=api_versions)
1921
  return True, os_obj
1922

    
1923

    
1924
def OSFromDisk(name, base_dir=None):
1925
  """Create an OS instance from disk.
1926

1927
  This function will return an OS instance if the given name is a
1928
  valid OS name. Otherwise, it will raise an appropriate
1929
  L{RPCFail} exception, detailing why this is not a valid OS.
1930

1931
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1932
  an exception but returns true/false status data.
1933

1934
  @type base_dir: string
1935
  @keyword base_dir: Base directory containing OS installations.
1936
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1937
  @rtype: L{objects.OS}
1938
  @return: the OS instance if we find a valid one
1939
  @raise RPCFail: if we don't find a valid OS
1940

1941
  """
1942
  name_only = name.split("+", 1)[0]
1943
  status, payload = _TryOSFromDisk(name_only, base_dir)
1944

    
1945
  if not status:
1946
    _Fail(payload)
1947

    
1948
  return payload
1949

    
1950

    
1951
def OSCoreEnv(inst_os, os_params, debug=0):
1952
  """Calculate the basic environment for an os script.
1953

1954
  @type inst_os: L{objects.OS}
1955
  @param inst_os: operating system for which the environment is being built
1956
  @type os_params: dict
1957
  @param os_params: the OS parameters
1958
  @type debug: integer
1959
  @param debug: debug level (0 or 1, for OS Api 10)
1960
  @rtype: dict
1961
  @return: dict of environment variables
1962
  @raise errors.BlockDeviceError: if the block device
1963
      cannot be found
1964

1965
  """
1966
  result = {}
1967
  api_version = \
1968
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1969
  result['OS_API_VERSION'] = '%d' % api_version
1970
  result['OS_NAME'] = inst_os.name
1971
  result['DEBUG_LEVEL'] = '%d' % debug
1972

    
1973
  # OS variants
1974
  if api_version >= constants.OS_API_V15:
1975
    try:
1976
      variant = inst_os.name.split('+', 1)[1]
1977
    except IndexError:
1978
      variant = inst_os.supported_variants[0]
1979
    result['OS_VARIANT'] = variant
1980

    
1981
  # OS params
1982
  for pname, pvalue in os_params.items():
1983
    result['OSP_%s' % pname.upper()] = pvalue
1984

    
1985
  return result
1986

    
1987

    
1988
def OSEnvironment(instance, inst_os, debug=0):
1989
  """Calculate the environment for an os script.
1990

1991
  @type instance: L{objects.Instance}
1992
  @param instance: target instance for the os script run
1993
  @type inst_os: L{objects.OS}
1994
  @param inst_os: operating system for which the environment is being built
1995
  @type debug: integer
1996
  @param debug: debug level (0 or 1, for OS Api 10)
1997
  @rtype: dict
1998
  @return: dict of environment variables
1999
  @raise errors.BlockDeviceError: if the block device
2000
      cannot be found
2001

2002
  """
2003
  result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
2004

    
2005
  result['INSTANCE_NAME'] = instance.name
2006
  result['INSTANCE_OS'] = instance.os
2007
  result['HYPERVISOR'] = instance.hypervisor
2008
  result['DISK_COUNT'] = '%d' % len(instance.disks)
2009
  result['NIC_COUNT'] = '%d' % len(instance.nics)
2010

    
2011
  # Disks
2012
  for idx, disk in enumerate(instance.disks):
2013
    real_disk = _OpenRealBD(disk)
2014
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
2015
    result['DISK_%d_ACCESS' % idx] = disk.mode
2016
    if constants.HV_DISK_TYPE in instance.hvparams:
2017
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
2018
        instance.hvparams[constants.HV_DISK_TYPE]
2019
    if disk.dev_type in constants.LDS_BLOCK:
2020
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2021
    elif disk.dev_type == constants.LD_FILE:
2022
      result['DISK_%d_BACKEND_TYPE' % idx] = \
2023
        'file:%s' % disk.physical_id[0]
2024

    
2025
  # NICs
2026
  for idx, nic in enumerate(instance.nics):
2027
    result['NIC_%d_MAC' % idx] = nic.mac
2028
    if nic.ip:
2029
      result['NIC_%d_IP' % idx] = nic.ip
2030
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2031
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2032
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2033
    if nic.nicparams[constants.NIC_LINK]:
2034
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2035
    if constants.HV_NIC_TYPE in instance.hvparams:
2036
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
2037
        instance.hvparams[constants.HV_NIC_TYPE]
2038

    
2039
  # HV/BE params
2040
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2041
    for key, value in source.items():
2042
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2043

    
2044
  return result
2045

    
2046

    
2047
def BlockdevGrow(disk, amount):
2048
  """Grow a stack of block devices.
2049

2050
  This function is called recursively, with the childrens being the
2051
  first ones to resize.
2052

2053
  @type disk: L{objects.Disk}
2054
  @param disk: the disk to be grown
2055
  @rtype: (status, result)
2056
  @return: a tuple with the status of the operation
2057
      (True/False), and the errors message if status
2058
      is False
2059

2060
  """
2061
  r_dev = _RecursiveFindBD(disk)
2062
  if r_dev is None:
2063
    _Fail("Cannot find block device %s", disk)
2064

    
2065
  try:
2066
    r_dev.Grow(amount)
2067
  except errors.BlockDeviceError, err:
2068
    _Fail("Failed to grow block device: %s", err, exc=True)
2069

    
2070

    
2071
def BlockdevSnapshot(disk):
2072
  """Create a snapshot copy of a block device.
2073

2074
  This function is called recursively, and the snapshot is actually created
2075
  just for the leaf lvm backend device.
2076

2077
  @type disk: L{objects.Disk}
2078
  @param disk: the disk to be snapshotted
2079
  @rtype: string
2080
  @return: snapshot disk path
2081

2082
  """
2083
  if disk.dev_type == constants.LD_DRBD8:
2084
    if not disk.children:
2085
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2086
            disk.unique_id)
2087
    return BlockdevSnapshot(disk.children[0])
2088
  elif disk.dev_type == constants.LD_LV:
2089
    r_dev = _RecursiveFindBD(disk)
2090
    if r_dev is not None:
2091
      # FIXME: choose a saner value for the snapshot size
2092
      # let's stay on the safe side and ask for the full size, for now
2093
      return r_dev.Snapshot(disk.size)
2094
    else:
2095
      _Fail("Cannot find block device %s", disk)
2096
  else:
2097
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2098
          disk.unique_id, disk.dev_type)
2099

    
2100

    
2101
def FinalizeExport(instance, snap_disks):
2102
  """Write out the export configuration information.
2103

2104
  @type instance: L{objects.Instance}
2105
  @param instance: the instance which we export, used for
2106
      saving configuration
2107
  @type snap_disks: list of L{objects.Disk}
2108
  @param snap_disks: list of snapshot block devices, which
2109
      will be used to get the actual name of the dump file
2110

2111
  @rtype: None
2112

2113
  """
2114
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2115
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2116

    
2117
  config = objects.SerializableConfigParser()
2118

    
2119
  config.add_section(constants.INISECT_EXP)
2120
  config.set(constants.INISECT_EXP, 'version', '0')
2121
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2122
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2123
  config.set(constants.INISECT_EXP, 'os', instance.os)
2124
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2125

    
2126
  config.add_section(constants.INISECT_INS)
2127
  config.set(constants.INISECT_INS, 'name', instance.name)
2128
  config.set(constants.INISECT_INS, 'memory', '%d' %
2129
             instance.beparams[constants.BE_MEMORY])
2130
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2131
             instance.beparams[constants.BE_VCPUS])
2132
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2133
  config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2134

    
2135
  nic_total = 0
2136
  for nic_count, nic in enumerate(instance.nics):
2137
    nic_total += 1
2138
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2139
               nic_count, '%s' % nic.mac)
2140
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2141
    for param in constants.NICS_PARAMETER_TYPES:
2142
      config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2143
                 '%s' % nic.nicparams.get(param, None))
2144
  # TODO: redundant: on load can read nics until it doesn't exist
2145
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2146

    
2147
  disk_total = 0
2148
  for disk_count, disk in enumerate(snap_disks):
2149
    if disk:
2150
      disk_total += 1
2151
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2152
                 ('%s' % disk.iv_name))
2153
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2154
                 ('%s' % disk.physical_id[1]))
2155
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2156
                 ('%d' % disk.size))
2157

    
2158
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2159

    
2160
  # New-style hypervisor/backend parameters
2161

    
2162
  config.add_section(constants.INISECT_HYP)
2163
  for name, value in instance.hvparams.items():
2164
    if name not in constants.HVC_GLOBALS:
2165
      config.set(constants.INISECT_HYP, name, str(value))
2166

    
2167
  config.add_section(constants.INISECT_BEP)
2168
  for name, value in instance.beparams.items():
2169
    config.set(constants.INISECT_BEP, name, str(value))
2170

    
2171
  config.add_section(constants.INISECT_OSP)
2172
  for name, value in instance.osparams.items():
2173
    config.set(constants.INISECT_OSP, name, str(value))
2174

    
2175
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2176
                  data=config.Dumps())
2177
  shutil.rmtree(finaldestdir, ignore_errors=True)
2178
  shutil.move(destdir, finaldestdir)
2179

    
2180

    
2181
def ExportInfo(dest):
2182
  """Get export configuration information.
2183

2184
  @type dest: str
2185
  @param dest: directory containing the export
2186

2187
  @rtype: L{objects.SerializableConfigParser}
2188
  @return: a serializable config file containing the
2189
      export info
2190

2191
  """
2192
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2193

    
2194
  config = objects.SerializableConfigParser()
2195
  config.read(cff)
2196

    
2197
  if (not config.has_section(constants.INISECT_EXP) or
2198
      not config.has_section(constants.INISECT_INS)):
2199
    _Fail("Export info file doesn't have the required fields")
2200

    
2201
  return config.Dumps()
2202

    
2203

    
2204
def ListExports():
2205
  """Return a list of exports currently available on this machine.
2206

2207
  @rtype: list
2208
  @return: list of the exports
2209

2210
  """
2211
  if os.path.isdir(constants.EXPORT_DIR):
2212
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2213
  else:
2214
    _Fail("No exports directory")
2215

    
2216

    
2217
def RemoveExport(export):
2218
  """Remove an existing export from the node.
2219

2220
  @type export: str
2221
  @param export: the name of the export to remove
2222
  @rtype: None
2223

2224
  """
2225
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2226

    
2227
  try:
2228
    shutil.rmtree(target)
2229
  except EnvironmentError, err:
2230
    _Fail("Error while removing the export: %s", err, exc=True)
2231

    
2232

    
2233
def BlockdevRename(devlist):
2234
  """Rename a list of block devices.
2235

2236
  @type devlist: list of tuples
2237
  @param devlist: list of tuples of the form  (disk,
2238
      new_logical_id, new_physical_id); disk is an
2239
      L{objects.Disk} object describing the current disk,
2240
      and new logical_id/physical_id is the name we
2241
      rename it to
2242
  @rtype: boolean
2243
  @return: True if all renames succeeded, False otherwise
2244

2245
  """
2246
  msgs = []
2247
  result = True
2248
  for disk, unique_id in devlist:
2249
    dev = _RecursiveFindBD(disk)
2250
    if dev is None:
2251
      msgs.append("Can't find device %s in rename" % str(disk))
2252
      result = False
2253
      continue
2254
    try:
2255
      old_rpath = dev.dev_path
2256
      dev.Rename(unique_id)
2257
      new_rpath = dev.dev_path
2258
      if old_rpath != new_rpath:
2259
        DevCacheManager.RemoveCache(old_rpath)
2260
        # FIXME: we should add the new cache information here, like:
2261
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2262
        # but we don't have the owner here - maybe parse from existing
2263
        # cache? for now, we only lose lvm data when we rename, which
2264
        # is less critical than DRBD or MD
2265
    except errors.BlockDeviceError, err:
2266
      msgs.append("Can't rename device '%s' to '%s': %s" %
2267
                  (dev, unique_id, err))
2268
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2269
      result = False
2270
  if not result:
2271
    _Fail("; ".join(msgs))
2272

    
2273

    
2274
def _TransformFileStorageDir(file_storage_dir):
2275
  """Checks whether given file_storage_dir is valid.
2276

2277
  Checks wheter the given file_storage_dir is within the cluster-wide
2278
  default file_storage_dir stored in SimpleStore. Only paths under that
2279
  directory are allowed.
2280

2281
  @type file_storage_dir: str
2282
  @param file_storage_dir: the path to check
2283

2284
  @return: the normalized path if valid, None otherwise
2285

2286
  """
2287
  if not constants.ENABLE_FILE_STORAGE:
2288
    _Fail("File storage disabled at configure time")
2289
  cfg = _GetConfig()
2290
  file_storage_dir = os.path.normpath(file_storage_dir)
2291
  base_file_storage_dir = cfg.GetFileStorageDir()
2292
  if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2293
      base_file_storage_dir):
2294
    _Fail("File storage directory '%s' is not under base file"
2295
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2296
  return file_storage_dir
2297

    
2298

    
2299
def CreateFileStorageDir(file_storage_dir):
2300
  """Create file storage directory.
2301

2302
  @type file_storage_dir: str
2303
  @param file_storage_dir: directory to create
2304

2305
  @rtype: tuple
2306
  @return: tuple with first element a boolean indicating wheter dir
2307
      creation was successful or not
2308

2309
  """
2310
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2311
  if os.path.exists(file_storage_dir):
2312
    if not os.path.isdir(file_storage_dir):
2313
      _Fail("Specified storage dir '%s' is not a directory",
2314
            file_storage_dir)
2315
  else:
2316
    try:
2317
      os.makedirs(file_storage_dir, 0750)
2318
    except OSError, err:
2319
      _Fail("Cannot create file storage directory '%s': %s",
2320
            file_storage_dir, err, exc=True)
2321

    
2322

    
2323
def RemoveFileStorageDir(file_storage_dir):
2324
  """Remove file storage directory.
2325

2326
  Remove it only if it's empty. If not log an error and return.
2327

2328
  @type file_storage_dir: str
2329
  @param file_storage_dir: the directory we should cleanup
2330
  @rtype: tuple (success,)
2331
  @return: tuple of one element, C{success}, denoting
2332
      whether the operation was successful
2333

2334
  """
2335
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2336
  if os.path.exists(file_storage_dir):
2337
    if not os.path.isdir(file_storage_dir):
2338
      _Fail("Specified Storage directory '%s' is not a directory",
2339
            file_storage_dir)
2340
    # deletes dir only if empty, otherwise we want to fail the rpc call
2341
    try:
2342
      os.rmdir(file_storage_dir)
2343
    except OSError, err:
2344
      _Fail("Cannot remove file storage directory '%s': %s",
2345
            file_storage_dir, err)
2346

    
2347

    
2348
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2349
  """Rename the file storage directory.
2350

2351
  @type old_file_storage_dir: str
2352
  @param old_file_storage_dir: the current path
2353
  @type new_file_storage_dir: str
2354
  @param new_file_storage_dir: the name we should rename to
2355
  @rtype: tuple (success,)
2356
  @return: tuple of one element, C{success}, denoting
2357
      whether the operation was successful
2358

2359
  """
2360
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2361
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2362
  if not os.path.exists(new_file_storage_dir):
2363
    if os.path.isdir(old_file_storage_dir):
2364
      try:
2365
        os.rename(old_file_storage_dir, new_file_storage_dir)
2366
      except OSError, err:
2367
        _Fail("Cannot rename '%s' to '%s': %s",
2368
              old_file_storage_dir, new_file_storage_dir, err)
2369
    else:
2370
      _Fail("Specified storage dir '%s' is not a directory",
2371
            old_file_storage_dir)
2372
  else:
2373
    if os.path.exists(old_file_storage_dir):
2374
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2375
            old_file_storage_dir, new_file_storage_dir)
2376

    
2377

    
2378
def _EnsureJobQueueFile(file_name):
2379
  """Checks whether the given filename is in the queue directory.
2380

2381
  @type file_name: str
2382
  @param file_name: the file name we should check
2383
  @rtype: None
2384
  @raises RPCFail: if the file is not valid
2385

2386
  """
2387
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2388
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2389

    
2390
  if not result:
2391
    _Fail("Passed job queue file '%s' does not belong to"
2392
          " the queue directory '%s'", file_name, queue_dir)
2393

    
2394

    
2395
def JobQueueUpdate(file_name, content):
2396
  """Updates a file in the queue directory.
2397

2398
  This is just a wrapper over L{utils.WriteFile}, with proper
2399
  checking.
2400

2401
  @type file_name: str
2402
  @param file_name: the job file name
2403
  @type content: str
2404
  @param content: the new job contents
2405
  @rtype: boolean
2406
  @return: the success of the operation
2407

2408
  """
2409
  _EnsureJobQueueFile(file_name)
2410

    
2411
  # Write and replace the file atomically
2412
  utils.WriteFile(file_name, data=_Decompress(content))
2413

    
2414

    
2415
def JobQueueRename(old, new):
2416
  """Renames a job queue file.
2417

2418
  This is just a wrapper over os.rename with proper checking.
2419

2420
  @type old: str
2421
  @param old: the old (actual) file name
2422
  @type new: str
2423
  @param new: the desired file name
2424
  @rtype: tuple
2425
  @return: the success of the operation and payload
2426

2427
  """
2428
  _EnsureJobQueueFile(old)
2429
  _EnsureJobQueueFile(new)
2430

    
2431
  utils.RenameFile(old, new, mkdir=True)
2432

    
2433

    
2434
def BlockdevClose(instance_name, disks):
2435
  """Closes the given block devices.
2436

2437
  This means they will be switched to secondary mode (in case of
2438
  DRBD).
2439

2440
  @param instance_name: if the argument is not empty, the symlinks
2441
      of this instance will be removed
2442
  @type disks: list of L{objects.Disk}
2443
  @param disks: the list of disks to be closed
2444
  @rtype: tuple (success, message)
2445
  @return: a tuple of success and message, where success
2446
      indicates the succes of the operation, and message
2447
      which will contain the error details in case we
2448
      failed
2449

2450
  """
2451
  bdevs = []
2452
  for cf in disks:
2453
    rd = _RecursiveFindBD(cf)
2454
    if rd is None:
2455
      _Fail("Can't find device %s", cf)
2456
    bdevs.append(rd)
2457

    
2458
  msg = []
2459
  for rd in bdevs:
2460
    try:
2461
      rd.Close()
2462
    except errors.BlockDeviceError, err:
2463
      msg.append(str(err))
2464
  if msg:
2465
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2466
  else:
2467
    if instance_name:
2468
      _RemoveBlockDevLinks(instance_name, disks)
2469

    
2470

    
2471
def ValidateHVParams(hvname, hvparams):
2472
  """Validates the given hypervisor parameters.
2473

2474
  @type hvname: string
2475
  @param hvname: the hypervisor name
2476
  @type hvparams: dict
2477
  @param hvparams: the hypervisor parameters to be validated
2478
  @rtype: None
2479

2480
  """
2481
  try:
2482
    hv_type = hypervisor.GetHypervisor(hvname)
2483
    hv_type.ValidateParameters(hvparams)
2484
  except errors.HypervisorError, err:
2485
    _Fail(str(err), log=False)
2486

    
2487

    
2488
def _CheckOSPList(os_obj, parameters):
2489
  """Check whether a list of parameters is supported by the OS.
2490

2491
  @type os_obj: L{objects.OS}
2492
  @param os_obj: OS object to check
2493
  @type parameters: list
2494
  @param parameters: the list of parameters to check
2495

2496
  """
2497
  supported = [v[0] for v in os_obj.supported_parameters]
2498
  delta = frozenset(parameters).difference(supported)
2499
  if delta:
2500
    _Fail("The following parameters are not supported"
2501
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2502

    
2503

    
2504
def ValidateOS(required, osname, checks, osparams):
2505
  """Validate the given OS' parameters.
2506

2507
  @type required: boolean
2508
  @param required: whether absence of the OS should translate into
2509
      failure or not
2510
  @type osname: string
2511
  @param osname: the OS to be validated
2512
  @type checks: list
2513
  @param checks: list of the checks to run (currently only 'parameters')
2514
  @type osparams: dict
2515
  @param osparams: dictionary with OS parameters
2516
  @rtype: boolean
2517
  @return: True if the validation passed, or False if the OS was not
2518
      found and L{required} was false
2519

2520
  """
2521
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2522
    _Fail("Unknown checks required for OS %s: %s", osname,
2523
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2524

    
2525
  name_only = osname.split("+", 1)[0]
2526
  status, tbv = _TryOSFromDisk(name_only, None)
2527

    
2528
  if not status:
2529
    if required:
2530
      _Fail(tbv)
2531
    else:
2532
      return False
2533

    
2534
  if max(tbv.api_versions) < constants.OS_API_V20:
2535
    return True
2536

    
2537
  if constants.OS_VALIDATE_PARAMETERS in checks:
2538
    _CheckOSPList(tbv, osparams.keys())
2539

    
2540
  validate_env = OSCoreEnv(tbv, osparams)
2541
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2542
                        cwd=tbv.path)
2543
  if result.failed:
2544
    logging.error("os validate command '%s' returned error: %s output: %s",
2545
                  result.cmd, result.fail_reason, result.output)
2546
    _Fail("OS validation script failed (%s), output: %s",
2547
          result.fail_reason, result.output, log=False)
2548

    
2549
  return True
2550

    
2551

    
2552
def DemoteFromMC():
2553
  """Demotes the current node from master candidate role.
2554

2555
  """
2556
  # try to ensure we're not the master by mistake
2557
  master, myself = ssconf.GetMasterAndMyself()
2558
  if master == myself:
2559
    _Fail("ssconf status shows I'm the master node, will not demote")
2560

    
2561
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2562
  if not result.failed:
2563
    _Fail("The master daemon is running, will not demote")
2564

    
2565
  try:
2566
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2567
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2568
  except EnvironmentError, err:
2569
    if err.errno != errno.ENOENT:
2570
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2571

    
2572
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2573

    
2574

    
2575
def _GetX509Filenames(cryptodir, name):
2576
  """Returns the full paths for the private key and certificate.
2577

2578
  """
2579
  return (utils.PathJoin(cryptodir, name),
2580
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2581
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2582

    
2583

    
2584
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2585
  """Creates a new X509 certificate for SSL/TLS.
2586

2587
  @type validity: int
2588
  @param validity: Validity in seconds
2589
  @rtype: tuple; (string, string)
2590
  @return: Certificate name and public part
2591

2592
  """
2593
  (key_pem, cert_pem) = \
2594
    utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2595
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2596

    
2597
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2598
                              prefix="x509-%s-" % utils.TimestampForFilename())
2599
  try:
2600
    name = os.path.basename(cert_dir)
2601
    assert len(name) > 5
2602

    
2603
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2604

    
2605
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2606
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2607

    
2608
    # Never return private key as it shouldn't leave the node
2609
    return (name, cert_pem)
2610
  except Exception:
2611
    shutil.rmtree(cert_dir, ignore_errors=True)
2612
    raise
2613

    
2614

    
2615
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2616
  """Removes a X509 certificate.
2617

2618
  @type name: string
2619
  @param name: Certificate name
2620

2621
  """
2622
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2623

    
2624
  utils.RemoveFile(key_file)
2625
  utils.RemoveFile(cert_file)
2626

    
2627
  try:
2628
    os.rmdir(cert_dir)
2629
  except EnvironmentError, err:
2630
    _Fail("Cannot remove certificate directory '%s': %s",
2631
          cert_dir, err)
2632

    
2633

    
2634
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2635
  """Returns the command for the requested input/output.
2636

2637
  @type instance: L{objects.Instance}
2638
  @param instance: The instance object
2639
  @param mode: Import/export mode
2640
  @param ieio: Input/output type
2641
  @param ieargs: Input/output arguments
2642

2643
  """
2644
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2645

    
2646
  env = None
2647
  prefix = None
2648
  suffix = None
2649
  exp_size = None
2650

    
2651
  if ieio == constants.IEIO_FILE:
2652
    (filename, ) = ieargs
2653

    
2654
    if not utils.IsNormAbsPath(filename):
2655
      _Fail("Path '%s' is not normalized or absolute", filename)
2656

    
2657
    directory = os.path.normpath(os.path.dirname(filename))
2658

    
2659
    if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2660
        constants.EXPORT_DIR):
2661
      _Fail("File '%s' is not under exports directory '%s'",
2662
            filename, constants.EXPORT_DIR)
2663

    
2664
    # Create directory
2665
    utils.Makedirs(directory, mode=0750)
2666

    
2667
    quoted_filename = utils.ShellQuote(filename)
2668

    
2669
    if mode == constants.IEM_IMPORT:
2670
      suffix = "> %s" % quoted_filename
2671
    elif mode == constants.IEM_EXPORT:
2672
      suffix = "< %s" % quoted_filename
2673

    
2674
      # Retrieve file size
2675
      try:
2676
        st = os.stat(filename)
2677
      except EnvironmentError, err:
2678
        logging.error("Can't stat(2) %s: %s", filename, err)
2679
      else:
2680
        exp_size = utils.BytesToMebibyte(st.st_size)
2681

    
2682
  elif ieio == constants.IEIO_RAW_DISK:
2683
    (disk, ) = ieargs
2684

    
2685
    real_disk = _OpenRealBD(disk)
2686

    
2687
    if mode == constants.IEM_IMPORT:
2688
      # we set here a smaller block size as, due to transport buffering, more
2689
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2690
      # is not already there or we pass a wrong path; we use notrunc to no
2691
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2692
      # much memory; this means that at best, we flush every 64k, which will
2693
      # not be very fast
2694
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2695
                                    " bs=%s oflag=dsync"),
2696
                                    real_disk.dev_path,
2697
                                    str(64 * 1024))
2698

    
2699
    elif mode == constants.IEM_EXPORT:
2700
      # the block size on the read dd is 1MiB to match our units
2701
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2702
                                   real_disk.dev_path,
2703
                                   str(1024 * 1024), # 1 MB
2704
                                   str(disk.size))
2705
      exp_size = disk.size
2706

    
2707
  elif ieio == constants.IEIO_SCRIPT:
2708
    (disk, disk_index, ) = ieargs
2709

    
2710
    assert isinstance(disk_index, (int, long))
2711

    
2712
    real_disk = _OpenRealBD(disk)
2713

    
2714
    inst_os = OSFromDisk(instance.os)
2715
    env = OSEnvironment(instance, inst_os)
2716

    
2717
    if mode == constants.IEM_IMPORT:
2718
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2719
      env["IMPORT_INDEX"] = str(disk_index)
2720
      script = inst_os.import_script
2721

    
2722
    elif mode == constants.IEM_EXPORT:
2723
      env["EXPORT_DEVICE"] = real_disk.dev_path
2724
      env["EXPORT_INDEX"] = str(disk_index)
2725
      script = inst_os.export_script
2726

    
2727
    # TODO: Pass special environment only to script
2728
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2729

    
2730
    if mode == constants.IEM_IMPORT:
2731
      suffix = "| %s" % script_cmd
2732

    
2733
    elif mode == constants.IEM_EXPORT:
2734
      prefix = "%s |" % script_cmd
2735

    
2736
    # Let script predict size
2737
    exp_size = constants.IE_CUSTOM_SIZE
2738

    
2739
  else:
2740
    _Fail("Invalid %s I/O mode %r", mode, ieio)
2741

    
2742
  return (env, prefix, suffix, exp_size)
2743

    
2744

    
2745
def _CreateImportExportStatusDir(prefix):
2746
  """Creates status directory for import/export.
2747

2748
  """
2749
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2750
                          prefix=("%s-%s-" %
2751
                                  (prefix, utils.TimestampForFilename())))
2752

    
2753

    
2754
def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2755
  """Starts an import or export daemon.
2756

2757
  @param mode: Import/output mode
2758
  @type opts: L{objects.ImportExportOptions}
2759
  @param opts: Daemon options
2760
  @type host: string
2761
  @param host: Remote host for export (None for import)
2762
  @type port: int
2763
  @param port: Remote port for export (None for import)
2764
  @type instance: L{objects.Instance}
2765
  @param instance: Instance object
2766
  @param ieio: Input/output type
2767
  @param ieioargs: Input/output arguments
2768

2769
  """
2770
  if mode == constants.IEM_IMPORT:
2771
    prefix = "import"
2772

    
2773
    if not (host is None and port is None):
2774
      _Fail("Can not specify host or port on import")
2775

    
2776
  elif mode == constants.IEM_EXPORT:
2777
    prefix = "export"
2778

    
2779
    if host is None or port is None:
2780
      _Fail("Host and port must be specified for an export")
2781

    
2782
  else:
2783
    _Fail("Invalid mode %r", mode)
2784

    
2785
  if (opts.key_name is None) ^ (opts.ca_pem is None):
2786
    _Fail("Cluster certificate can only be used for both key and CA")
2787

    
2788
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2789
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2790

    
2791
  if opts.key_name is None:
2792
    # Use server.pem
2793
    key_path = constants.NODED_CERT_FILE
2794
    cert_path = constants.NODED_CERT_FILE
2795
    assert opts.ca_pem is None
2796
  else:
2797
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2798
                                                 opts.key_name)
2799
    assert opts.ca_pem is not None
2800

    
2801
  for i in [key_path, cert_path]:
2802
    if not os.path.exists(i):
2803
      _Fail("File '%s' does not exist" % i)
2804

    
2805
  status_dir = _CreateImportExportStatusDir(prefix)
2806
  try:
2807
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2808
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2809
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2810

    
2811
    if opts.ca_pem is None:
2812
      # Use server.pem
2813
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
2814
    else:
2815
      ca = opts.ca_pem
2816

    
2817
    # Write CA file
2818
    utils.WriteFile(ca_file, data=ca, mode=0400)
2819

    
2820
    cmd = [
2821
      constants.IMPORT_EXPORT_DAEMON,
2822
      status_file, mode,
2823
      "--key=%s" % key_path,
2824
      "--cert=%s" % cert_path,
2825
      "--ca=%s" % ca_file,
2826
      ]
2827

    
2828
    if host:
2829
      cmd.append("--host=%s" % host)
2830

    
2831
    if port:
2832
      cmd.append("--port=%s" % port)
2833

    
2834
    if opts.compress:
2835
      cmd.append("--compress=%s" % opts.compress)
2836

    
2837
    if opts.magic:
2838
      cmd.append("--magic=%s" % opts.magic)
2839

    
2840
    if exp_size is not None:
2841
      cmd.append("--expected-size=%s" % exp_size)
2842

    
2843
    if cmd_prefix:
2844
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
2845

    
2846
    if cmd_suffix:
2847
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
2848

    
2849
    logfile = _InstanceLogName(prefix, instance.os, instance.name)
2850

    
2851
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2852
    # support for receiving a file descriptor for output
2853
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2854
                      output=logfile)
2855

    
2856
    # The import/export name is simply the status directory name
2857
    return os.path.basename(status_dir)
2858

    
2859
  except Exception:
2860
    shutil.rmtree(status_dir, ignore_errors=True)
2861
    raise
2862

    
2863

    
2864
def GetImportExportStatus(names):
2865
  """Returns import/export daemon status.
2866

2867
  @type names: sequence
2868
  @param names: List of names
2869
  @rtype: List of dicts
2870
  @return: Returns a list of the state of each named import/export or None if a
2871
           status couldn't be read
2872

2873
  """
2874
  result = []
2875

    
2876
  for name in names:
2877
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2878
                                 _IES_STATUS_FILE)
2879

    
2880
    try:
2881
      data = utils.ReadFile(status_file)
2882
    except EnvironmentError, err:
2883
      if err.errno != errno.ENOENT:
2884
        raise
2885
      data = None
2886

    
2887
    if not data:
2888
      result.append(None)
2889
      continue
2890

    
2891
    result.append(serializer.LoadJson(data))
2892

    
2893
  return result
2894

    
2895

    
2896
def AbortImportExport(name):
2897
  """Sends SIGTERM to a running import/export daemon.
2898

2899
  """
2900
  logging.info("Abort import/export %s", name)
2901

    
2902
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2903
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2904

    
2905
  if pid:
2906
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2907
                 name, pid)
2908
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2909

    
2910

    
2911
def CleanupImportExport(name):
2912
  """Cleanup after an import or export.
2913

2914
  If the import/export daemon is still running it's killed. Afterwards the
2915
  whole status directory is removed.
2916

2917
  """
2918
  logging.info("Finalizing import/export %s", name)
2919

    
2920
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2921

    
2922
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2923

    
2924
  if pid:
2925
    logging.info("Import/export %s is still running with PID %s",
2926
                 name, pid)
2927
    utils.KillProcess(pid, waitpid=False)
2928

    
2929
  shutil.rmtree(status_dir, ignore_errors=True)
2930

    
2931

    
2932
def _FindDisks(nodes_ip, disks):
2933
  """Sets the physical ID on disks and returns the block devices.
2934

2935
  """
2936
  # set the correct physical ID
2937
  my_name = utils.HostInfo().name
2938
  for cf in disks:
2939
    cf.SetPhysicalID(my_name, nodes_ip)
2940

    
2941
  bdevs = []
2942

    
2943
  for cf in disks:
2944
    rd = _RecursiveFindBD(cf)
2945
    if rd is None:
2946
      _Fail("Can't find device %s", cf)
2947
    bdevs.append(rd)
2948
  return bdevs
2949

    
2950

    
2951
def DrbdDisconnectNet(nodes_ip, disks):
2952
  """Disconnects the network on a list of drbd devices.
2953

2954
  """
2955
  bdevs = _FindDisks(nodes_ip, disks)
2956

    
2957
  # disconnect disks
2958
  for rd in bdevs:
2959
    try:
2960
      rd.DisconnectNet()
2961
    except errors.BlockDeviceError, err:
2962
      _Fail("Can't change network configuration to standalone mode: %s",
2963
            err, exc=True)
2964

    
2965

    
2966
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2967
  """Attaches the network on a list of drbd devices.
2968

2969
  """
2970
  bdevs = _FindDisks(nodes_ip, disks)
2971

    
2972
  if multimaster:
2973
    for idx, rd in enumerate(bdevs):
2974
      try:
2975
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2976
      except EnvironmentError, err:
2977
        _Fail("Can't create symlink: %s", err)
2978
  # reconnect disks, switch to new master configuration and if
2979
  # needed primary mode
2980
  for rd in bdevs:
2981
    try:
2982
      rd.AttachNet(multimaster)
2983
    except errors.BlockDeviceError, err:
2984
      _Fail("Can't change network configuration: %s", err)
2985

    
2986
  # wait until the disks are connected; we need to retry the re-attach
2987
  # if the device becomes standalone, as this might happen if the one
2988
  # node disconnects and reconnects in a different mode before the
2989
  # other node reconnects; in this case, one or both of the nodes will
2990
  # decide it has wrong configuration and switch to standalone
2991

    
2992
  def _Attach():
2993
    all_connected = True
2994

    
2995
    for rd in bdevs:
2996
      stats = rd.GetProcStatus()
2997

    
2998
      all_connected = (all_connected and
2999
                       (stats.is_connected or stats.is_in_resync))
3000

    
3001
      if stats.is_standalone:
3002
        # peer had different config info and this node became
3003
        # standalone, even though this should not happen with the
3004
        # new staged way of changing disk configs
3005
        try:
3006
          rd.AttachNet(multimaster)
3007
        except errors.BlockDeviceError, err:
3008
          _Fail("Can't change network configuration: %s", err)
3009

    
3010
    if not all_connected:
3011
      raise utils.RetryAgain()
3012

    
3013
  try:
3014
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3015
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3016
  except utils.RetryTimeout:
3017
    _Fail("Timeout in disk reconnecting")
3018

    
3019
  if multimaster:
3020
    # change to primary mode
3021
    for rd in bdevs:
3022
      try:
3023
        rd.Open()
3024
      except errors.BlockDeviceError, err:
3025
        _Fail("Can't change to primary mode: %s", err)
3026

    
3027

    
3028
def DrbdWaitSync(nodes_ip, disks):
3029
  """Wait until DRBDs have synchronized.
3030

3031
  """
3032
  def _helper(rd):
3033
    stats = rd.GetProcStatus()
3034
    if not (stats.is_connected or stats.is_in_resync):
3035
      raise utils.RetryAgain()
3036
    return stats
3037

    
3038
  bdevs = _FindDisks(nodes_ip, disks)
3039

    
3040
  min_resync = 100
3041
  alldone = True
3042
  for rd in bdevs:
3043
    try:
3044
      # poll each second for 15 seconds
3045
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3046
    except utils.RetryTimeout:
3047
      stats = rd.GetProcStatus()
3048
      # last check
3049
      if not (stats.is_connected or stats.is_in_resync):
3050
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3051
    alldone = alldone and (not stats.is_in_resync)
3052
    if stats.sync_percent is not None:
3053
      min_resync = min(min_resync, stats.sync_percent)
3054

    
3055
  return (alldone, min_resync)
3056

    
3057

    
3058
def PowercycleNode(hypervisor_type):
3059
  """Hard-powercycle the node.
3060

3061
  Because we need to return first, and schedule the powercycle in the
3062
  background, we won't be able to report failures nicely.
3063

3064
  """
3065
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3066
  try:
3067
    pid = os.fork()
3068
  except OSError:
3069
    # if we can't fork, we'll pretend that we're in the child process
3070
    pid = 0
3071
  if pid > 0:
3072
    return "Reboot scheduled in 5 seconds"
3073
  # ensure the child is running on ram
3074
  try:
3075
    utils.Mlockall()
3076
  except Exception: # pylint: disable-msg=W0703
3077
    pass
3078
  time.sleep(5)
3079
  hyper.PowercycleNode()
3080

    
3081

    
3082
class HooksRunner(object):
3083
  """Hook runner.
3084

3085
  This class is instantiated on the node side (ganeti-noded) and not
3086
  on the master side.
3087

3088
  """
3089
  def __init__(self, hooks_base_dir=None):
3090
    """Constructor for hooks runner.
3091

3092
    @type hooks_base_dir: str or None
3093
    @param hooks_base_dir: if not None, this overrides the
3094
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3095

3096
    """
3097
    if hooks_base_dir is None:
3098
      hooks_base_dir = constants.HOOKS_BASE_DIR
3099
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3100
    # constant
3101
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3102

    
3103
  def RunHooks(self, hpath, phase, env):
3104
    """Run the scripts in the hooks directory.
3105

3106
    @type hpath: str
3107
    @param hpath: the path to the hooks directory which
3108
        holds the scripts
3109
    @type phase: str
3110
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3111
        L{constants.HOOKS_PHASE_POST}
3112
    @type env: dict
3113
    @param env: dictionary with the environment for the hook
3114
    @rtype: list
3115
    @return: list of 3-element tuples:
3116
      - script path
3117
      - script result, either L{constants.HKR_SUCCESS} or
3118
        L{constants.HKR_FAIL}
3119
      - output of the script
3120

3121
    @raise errors.ProgrammerError: for invalid input
3122
        parameters
3123

3124
    """
3125
    if phase == constants.HOOKS_PHASE_PRE:
3126
      suffix = "pre"
3127
    elif phase == constants.HOOKS_PHASE_POST:
3128
      suffix = "post"
3129
    else:
3130
      _Fail("Unknown hooks phase '%s'", phase)
3131

    
3132

    
3133
    subdir = "%s-%s.d" % (hpath, suffix)
3134
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3135

    
3136
    results = []
3137

    
3138
    if not os.path.isdir(dir_name):
3139
      # for non-existing/non-dirs, we simply exit instead of logging a
3140
      # warning at every operation
3141
      return results
3142

    
3143
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3144

    
3145
    for (relname, relstatus, runresult)  in runparts_results:
3146
      if relstatus == constants.RUNPARTS_SKIP:
3147
        rrval = constants.HKR_SKIP
3148
        output = ""
3149
      elif relstatus == constants.RUNPARTS_ERR:
3150
        rrval = constants.HKR_FAIL
3151
        output = "Hook script execution error: %s" % runresult
3152
      elif relstatus == constants.RUNPARTS_RUN:
3153
        if runresult.failed:
3154
          rrval = constants.HKR_FAIL
3155
        else:
3156
          rrval = constants.HKR_SUCCESS
3157
        output = utils.SafeEncode(runresult.output.strip())
3158
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3159

    
3160
    return results
3161

    
3162

    
3163
class IAllocatorRunner(object):
3164
  """IAllocator runner.
3165

3166
  This class is instantiated on the node side (ganeti-noded) and not on
3167
  the master side.
3168

3169
  """
3170
  @staticmethod
3171
  def Run(name, idata):
3172
    """Run an iallocator script.
3173

3174
    @type name: str
3175
    @param name: the iallocator script name
3176
    @type idata: str
3177
    @param idata: the allocator input data
3178

3179
    @rtype: tuple
3180
    @return: two element tuple of:
3181
       - status
3182
       - either error message or stdout of allocator (for success)
3183

3184
    """
3185
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3186
                                  os.path.isfile)
3187
    if alloc_script is None:
3188
      _Fail("iallocator module '%s' not found in the search path", name)
3189

    
3190
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3191
    try:
3192
      os.write(fd, idata)
3193
      os.close(fd)
3194
      result = utils.RunCmd([alloc_script, fin_name])
3195
      if result.failed:
3196
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3197
              name, result.fail_reason, result.output)
3198
    finally:
3199
      os.unlink(fin_name)
3200

    
3201
    return result.stdout
3202

    
3203

    
3204
class DevCacheManager(object):
3205
  """Simple class for managing a cache of block device information.
3206

3207
  """
3208
  _DEV_PREFIX = "/dev/"
3209
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3210

    
3211
  @classmethod
3212
  def _ConvertPath(cls, dev_path):
3213
    """Converts a /dev/name path to the cache file name.
3214

3215
    This replaces slashes with underscores and strips the /dev
3216
    prefix. It then returns the full path to the cache file.
3217

3218
    @type dev_path: str
3219
    @param dev_path: the C{/dev/} path name
3220
    @rtype: str
3221
    @return: the converted path name
3222

3223
    """
3224
    if dev_path.startswith(cls._DEV_PREFIX):
3225
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3226
    dev_path = dev_path.replace("/", "_")
3227
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3228
    return fpath
3229

    
3230
  @classmethod
3231
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3232
    """Updates the cache information for a given device.
3233

3234
    @type dev_path: str
3235
    @param dev_path: the pathname of the device
3236
    @type owner: str
3237
    @param owner: the owner (instance name) of the device
3238
    @type on_primary: bool
3239
    @param on_primary: whether this is the primary
3240
        node nor not
3241
    @type iv_name: str
3242
    @param iv_name: the instance-visible name of the
3243
        device, as in objects.Disk.iv_name
3244

3245
    @rtype: None
3246

3247
    """
3248
    if dev_path is None:
3249
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3250
      return
3251
    fpath = cls._ConvertPath(dev_path)
3252
    if on_primary:
3253
      state = "primary"
3254
    else:
3255
      state = "secondary"
3256
    if iv_name is None:
3257
      iv_name = "not_visible"
3258
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3259
    try:
3260
      utils.WriteFile(fpath, data=fdata)
3261
    except EnvironmentError, err:
3262
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3263

    
3264
  @classmethod
3265
  def RemoveCache(cls, dev_path):
3266
    """Remove data for a dev_path.
3267

3268
    This is just a wrapper over L{utils.RemoveFile} with a converted
3269
    path name and logging.
3270

3271
    @type dev_path: str
3272
    @param dev_path: the pathname of the device
3273

3274
    @rtype: None
3275

3276
    """
3277
    if dev_path is None:
3278
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3279
      return
3280
    fpath = cls._ConvertPath(dev_path)
3281
    try:
3282
      utils.RemoveFile(fpath)
3283
    except EnvironmentError, err:
3284
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)