Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 87812fd3

History | View | Annotate | Download (81.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
class RPCFail(Exception):
50
  """Class denoting RPC failure.
51

52
  Its argument is the error message.
53

54
  """
55

    
56
def _Fail(msg, *args, **kwargs):
57
  """Log an error and the raise an RPCFail exception.
58

59
  This exception is then handled specially in the ganeti daemon and
60
  turned into a 'failed' return type. As such, this function is a
61
  useful shortcut for logging the error and returning it to the master
62
  daemon.
63

64
  @type msg: string
65
  @param msg: the text of the exception
66
  @raise RPCFail
67

68
  """
69
  if args:
70
    msg = msg % args
71
  if "exc" in kwargs and kwargs["exc"]:
72
    logging.exception(msg)
73
  else:
74
    logging.error(msg)
75
  raise RPCFail(msg)
76

    
77

    
78
def _GetConfig():
79
  """Simple wrapper to return a SimpleStore.
80

81
  @rtype: L{ssconf.SimpleStore}
82
  @return: a SimpleStore instance
83

84
  """
85
  return ssconf.SimpleStore()
86

    
87

    
88
def _GetSshRunner(cluster_name):
89
  """Simple wrapper to return an SshRunner.
90

91
  @type cluster_name: str
92
  @param cluster_name: the cluster name, which is needed
93
      by the SshRunner constructor
94
  @rtype: L{ssh.SshRunner}
95
  @return: an SshRunner instance
96

97
  """
98
  return ssh.SshRunner(cluster_name)
99

    
100

    
101
def _Decompress(data):
102
  """Unpacks data compressed by the RPC client.
103

104
  @type data: list or tuple
105
  @param data: Data sent by RPC client
106
  @rtype: str
107
  @return: Decompressed data
108

109
  """
110
  assert isinstance(data, (list, tuple))
111
  assert len(data) == 2
112
  (encoding, content) = data
113
  if encoding == constants.RPC_ENCODING_NONE:
114
    return content
115
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116
    return zlib.decompress(base64.b64decode(content))
117
  else:
118
    raise AssertionError("Unknown data encoding")
119

    
120

    
121
def _CleanDirectory(path, exclude=None):
122
  """Removes all regular files in a directory.
123

124
  @type path: str
125
  @param path: the directory to clean
126
  @type exclude: list
127
  @param exclude: list of files to be excluded, defaults
128
      to the empty list
129

130
  """
131
  if not os.path.isdir(path):
132
    return
133
  if exclude is None:
134
    exclude = []
135
  else:
136
    # Normalize excluded paths
137
    exclude = [os.path.normpath(i) for i in exclude]
138

    
139
  for rel_name in utils.ListVisibleFiles(path):
140
    full_name = os.path.normpath(os.path.join(path, rel_name))
141
    if full_name in exclude:
142
      continue
143
    if os.path.isfile(full_name) and not os.path.islink(full_name):
144
      utils.RemoveFile(full_name)
145

    
146

    
147
def JobQueuePurge():
148
  """Removes job queue files and archived jobs.
149

150
  @rtype: None
151

152
  """
153
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
155

    
156

    
157
def GetMasterInfo():
158
  """Returns master information.
159

160
  This is an utility function to compute master information, either
161
  for consumption here or from the node daemon.
162

163
  @rtype: tuple
164
  @return: (master_netdev, master_ip, master_name) if we have a good
165
      configuration, otherwise (None, None, None)
166

167
  """
168
  try:
169
    cfg = _GetConfig()
170
    master_netdev = cfg.GetMasterNetdev()
171
    master_ip = cfg.GetMasterIP()
172
    master_node = cfg.GetMasterNode()
173
  except errors.ConfigurationError, err:
174
    logging.exception("Cluster configuration incomplete")
175
    return (None, None, None)
176
  return (master_netdev, master_ip, master_node)
177

    
178

    
179
def StartMaster(start_daemons):
180
  """Activate local node as master node.
181

182
  The function will always try activate the IP address of the master
183
  (unless someone else has it). It will also start the master daemons,
184
  based on the start_daemons parameter.
185

186
  @type start_daemons: boolean
187
  @param start_daemons: whther to also start the master
188
      daemons (ganeti-masterd and ganeti-rapi)
189
  @rtype: None
190

191
  """
192
  ok = True
193
  master_netdev, master_ip, _ = GetMasterInfo()
194
  if not master_netdev:
195
    return False
196

    
197
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198
    if utils.OwnIpAddress(master_ip):
199
      # we already have the ip:
200
      logging.debug("Already started")
201
    else:
202
      logging.error("Someone else has the master ip, not activating")
203
      ok = False
204
  else:
205
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
206
                           "dev", master_netdev, "label",
207
                           "%s:0" % master_netdev])
208
    if result.failed:
209
      logging.error("Can't activate master IP: %s", result.output)
210
      ok = False
211

    
212
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
213
                           "-s", master_ip, master_ip])
214
    # we'll ignore the exit code of arping
215

    
216
  # and now start the master and rapi daemons
217
  if start_daemons:
218
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
219
      result = utils.RunCmd([daemon])
220
      if result.failed:
221
        logging.error("Can't start daemon %s: %s", daemon, result.output)
222
        ok = False
223
  return ok
224

    
225

    
226
def StopMaster(stop_daemons):
227
  """Deactivate this node as master.
228

229
  The function will always try to deactivate the IP address of the
230
  master. It will also stop the master daemons depending on the
231
  stop_daemons parameter.
232

233
  @type stop_daemons: boolean
234
  @param stop_daemons: whether to also stop the master daemons
235
      (ganeti-masterd and ganeti-rapi)
236
  @rtype: None
237

238
  """
239
  master_netdev, master_ip, _ = GetMasterInfo()
240
  if not master_netdev:
241
    return False
242

    
243
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
244
                         "dev", master_netdev])
245
  if result.failed:
246
    logging.error("Can't remove the master IP, error: %s", result.output)
247
    # but otherwise ignore the failure
248

    
249
  if stop_daemons:
250
    # stop/kill the rapi and the master daemon
251
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
252
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
253

    
254
  return True
255

    
256

    
257
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
258
  """Joins this node to the cluster.
259

260
  This does the following:
261
      - updates the hostkeys of the machine (rsa and dsa)
262
      - adds the ssh private key to the user
263
      - adds the ssh public key to the users' authorized_keys file
264

265
  @type dsa: str
266
  @param dsa: the DSA private key to write
267
  @type dsapub: str
268
  @param dsapub: the DSA public key to write
269
  @type rsa: str
270
  @param rsa: the RSA private key to write
271
  @type rsapub: str
272
  @param rsapub: the RSA public key to write
273
  @type sshkey: str
274
  @param sshkey: the SSH private key to write
275
  @type sshpub: str
276
  @param sshpub: the SSH public key to write
277
  @rtype: boolean
278
  @return: the success of the operation
279

280
  """
281
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
282
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
283
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
284
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
285
  for name, content, mode in sshd_keys:
286
    utils.WriteFile(name, data=content, mode=mode)
287

    
288
  try:
289
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
290
                                                    mkdir=True)
291
  except errors.OpExecError, err:
292
    _Fail("Error while processing user ssh files: %s", err, exc=True)
293

    
294
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
295
    utils.WriteFile(name, data=content, mode=0600)
296

    
297
  utils.AddAuthorizedKey(auth_keys, sshpub)
298

    
299
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
300

    
301
  return (True, "Node added successfully")
302

    
303

    
304
def LeaveCluster():
305
  """Cleans up and remove the current node.
306

307
  This function cleans up and prepares the current node to be removed
308
  from the cluster.
309

310
  If processing is successful, then it raises an
311
  L{errors.QuitGanetiException} which is used as a special case to
312
  shutdown the node daemon.
313

314
  """
315
  _CleanDirectory(constants.DATA_DIR)
316
  JobQueuePurge()
317

    
318
  try:
319
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
320
  except errors.OpExecError:
321
    logging.exception("Error while processing ssh files")
322
    return
323

    
324
  f = open(pub_key, 'r')
325
  try:
326
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
327
  finally:
328
    f.close()
329

    
330
  utils.RemoveFile(priv_key)
331
  utils.RemoveFile(pub_key)
332

    
333
  # Return a reassuring string to the caller, and quit
334
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
335

    
336

    
337
def GetNodeInfo(vgname, hypervisor_type):
338
  """Gives back a hash with different informations about the node.
339

340
  @type vgname: C{string}
341
  @param vgname: the name of the volume group to ask for disk space information
342
  @type hypervisor_type: C{str}
343
  @param hypervisor_type: the name of the hypervisor to ask for
344
      memory information
345
  @rtype: C{dict}
346
  @return: dictionary with the following keys:
347
      - vg_size is the size of the configured volume group in MiB
348
      - vg_free is the free size of the volume group in MiB
349
      - memory_dom0 is the memory allocated for domain0 in MiB
350
      - memory_free is the currently available (free) ram in MiB
351
      - memory_total is the total number of ram in MiB
352

353
  """
354
  outputarray = {}
355
  vginfo = _GetVGInfo(vgname)
356
  outputarray['vg_size'] = vginfo['vg_size']
357
  outputarray['vg_free'] = vginfo['vg_free']
358

    
359
  hyper = hypervisor.GetHypervisor(hypervisor_type)
360
  hyp_info = hyper.GetNodeInfo()
361
  if hyp_info is not None:
362
    outputarray.update(hyp_info)
363

    
364
  f = open("/proc/sys/kernel/random/boot_id", 'r')
365
  try:
366
    outputarray["bootid"] = f.read(128).rstrip("\n")
367
  finally:
368
    f.close()
369

    
370
  return outputarray
371

    
372

    
373
def VerifyNode(what, cluster_name):
374
  """Verify the status of the local node.
375

376
  Based on the input L{what} parameter, various checks are done on the
377
  local node.
378

379
  If the I{filelist} key is present, this list of
380
  files is checksummed and the file/checksum pairs are returned.
381

382
  If the I{nodelist} key is present, we check that we have
383
  connectivity via ssh with the target nodes (and check the hostname
384
  report).
385

386
  If the I{node-net-test} key is present, we check that we have
387
  connectivity to the given nodes via both primary IP and, if
388
  applicable, secondary IPs.
389

390
  @type what: C{dict}
391
  @param what: a dictionary of things to check:
392
      - filelist: list of files for which to compute checksums
393
      - nodelist: list of nodes we should check ssh communication with
394
      - node-net-test: list of nodes we should check node daemon port
395
        connectivity with
396
      - hypervisor: list with hypervisors to run the verify for
397
  @rtype: dict
398
  @return: a dictionary with the same keys as the input dict, and
399
      values representing the result of the checks
400

401
  """
402
  result = {}
403

    
404
  if constants.NV_HYPERVISOR in what:
405
    result[constants.NV_HYPERVISOR] = tmp = {}
406
    for hv_name in what[constants.NV_HYPERVISOR]:
407
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
408

    
409
  if constants.NV_FILELIST in what:
410
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
411
      what[constants.NV_FILELIST])
412

    
413
  if constants.NV_NODELIST in what:
414
    result[constants.NV_NODELIST] = tmp = {}
415
    random.shuffle(what[constants.NV_NODELIST])
416
    for node in what[constants.NV_NODELIST]:
417
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
418
      if not success:
419
        tmp[node] = message
420

    
421
  if constants.NV_NODENETTEST in what:
422
    result[constants.NV_NODENETTEST] = tmp = {}
423
    my_name = utils.HostInfo().name
424
    my_pip = my_sip = None
425
    for name, pip, sip in what[constants.NV_NODENETTEST]:
426
      if name == my_name:
427
        my_pip = pip
428
        my_sip = sip
429
        break
430
    if not my_pip:
431
      tmp[my_name] = ("Can't find my own primary/secondary IP"
432
                      " in the node list")
433
    else:
434
      port = utils.GetNodeDaemonPort()
435
      for name, pip, sip in what[constants.NV_NODENETTEST]:
436
        fail = []
437
        if not utils.TcpPing(pip, port, source=my_pip):
438
          fail.append("primary")
439
        if sip != pip:
440
          if not utils.TcpPing(sip, port, source=my_sip):
441
            fail.append("secondary")
442
        if fail:
443
          tmp[name] = ("failure using the %s interface(s)" %
444
                       " and ".join(fail))
445

    
446
  if constants.NV_LVLIST in what:
447
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
448

    
449
  if constants.NV_INSTANCELIST in what:
450
    result[constants.NV_INSTANCELIST] = GetInstanceList(
451
      what[constants.NV_INSTANCELIST])
452

    
453
  if constants.NV_VGLIST in what:
454
    result[constants.NV_VGLIST] = ListVolumeGroups()
455

    
456
  if constants.NV_VERSION in what:
457
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
458
                                    constants.RELEASE_VERSION)
459

    
460
  if constants.NV_HVINFO in what:
461
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
462
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
463

    
464
  if constants.NV_DRBDLIST in what:
465
    try:
466
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
467
    except errors.BlockDeviceError, err:
468
      logging.warning("Can't get used minors list", exc_info=True)
469
      used_minors = str(err)
470
    result[constants.NV_DRBDLIST] = used_minors
471

    
472
  return result
473

    
474

    
475
def GetVolumeList(vg_name):
476
  """Compute list of logical volumes and their size.
477

478
  @type vg_name: str
479
  @param vg_name: the volume group whose LVs we should list
480
  @rtype: dict
481
  @return:
482
      dictionary of all partions (key) with value being a tuple of
483
      their size (in MiB), inactive and online status::
484

485
        {'test1': ('20.06', True, True)}
486

487
      in case of errors, a string is returned with the error
488
      details.
489

490
  """
491
  lvs = {}
492
  sep = '|'
493
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
494
                         "--separator=%s" % sep,
495
                         "-olv_name,lv_size,lv_attr", vg_name])
496
  if result.failed:
497
    logging.error("Failed to list logical volumes, lvs output: %s",
498
                  result.output)
499
    return result.output
500

    
501
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
502
  for line in result.stdout.splitlines():
503
    line = line.strip()
504
    match = valid_line_re.match(line)
505
    if not match:
506
      logging.error("Invalid line returned from lvs output: '%s'", line)
507
      continue
508
    name, size, attr = match.groups()
509
    inactive = attr[4] == '-'
510
    online = attr[5] == 'o'
511
    lvs[name] = (size, inactive, online)
512

    
513
  return lvs
514

    
515

    
516
def ListVolumeGroups():
517
  """List the volume groups and their size.
518

519
  @rtype: dict
520
  @return: dictionary with keys volume name and values the
521
      size of the volume
522

523
  """
524
  return utils.ListVolumeGroups()
525

    
526

    
527
def NodeVolumes():
528
  """List all volumes on this node.
529

530
  @rtype: list
531
  @return:
532
    A list of dictionaries, each having four keys:
533
      - name: the logical volume name,
534
      - size: the size of the logical volume
535
      - dev: the physical device on which the LV lives
536
      - vg: the volume group to which it belongs
537

538
    In case of errors, we return an empty list and log the
539
    error.
540

541
    Note that since a logical volume can live on multiple physical
542
    volumes, the resulting list might include a logical volume
543
    multiple times.
544

545
  """
546
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547
                         "--separator=|",
548
                         "--options=lv_name,lv_size,devices,vg_name"])
549
  if result.failed:
550
    logging.error("Failed to list logical volumes, lvs output: %s",
551
                  result.output)
552
    return []
553

    
554
  def parse_dev(dev):
555
    if '(' in dev:
556
      return dev.split('(')[0]
557
    else:
558
      return dev
559

    
560
  def map_line(line):
561
    return {
562
      'name': line[0].strip(),
563
      'size': line[1].strip(),
564
      'dev': parse_dev(line[2].strip()),
565
      'vg': line[3].strip(),
566
    }
567

    
568
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
569
          if line.count('|') >= 3]
570

    
571

    
572
def BridgesExist(bridges_list):
573
  """Check if a list of bridges exist on the current node.
574

575
  @rtype: boolean
576
  @return: C{True} if all of them exist, C{False} otherwise
577

578
  """
579
  for bridge in bridges_list:
580
    if not utils.BridgeExists(bridge):
581
      return False
582

    
583
  return True
584

    
585

    
586
def GetInstanceList(hypervisor_list):
587
  """Provides a list of instances.
588

589
  @type hypervisor_list: list
590
  @param hypervisor_list: the list of hypervisors to query information
591

592
  @rtype: list
593
  @return: a list of all running instances on the current node
594
    - instance1.example.com
595
    - instance2.example.com
596

597
  """
598
  results = []
599
  for hname in hypervisor_list:
600
    try:
601
      names = hypervisor.GetHypervisor(hname).ListInstances()
602
      results.extend(names)
603
    except errors.HypervisorError, err:
604
      logging.exception("Error enumerating instances for hypevisor %s", hname)
605
      raise
606

    
607
  return results
608

    
609

    
610
def GetInstanceInfo(instance, hname):
611
  """Gives back the informations about an instance as a dictionary.
612

613
  @type instance: string
614
  @param instance: the instance name
615
  @type hname: string
616
  @param hname: the hypervisor type of the instance
617

618
  @rtype: dict
619
  @return: dictionary with the following keys:
620
      - memory: memory size of instance (int)
621
      - state: xen state of instance (string)
622
      - time: cpu time of instance (float)
623

624
  """
625
  output = {}
626

    
627
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
628
  if iinfo is not None:
629
    output['memory'] = iinfo[2]
630
    output['state'] = iinfo[4]
631
    output['time'] = iinfo[5]
632

    
633
  return output
634

    
635

    
636
def GetInstanceMigratable(instance):
637
  """Gives whether an instance can be migrated.
638

639
  @type instance: L{objects.Instance}
640
  @param instance: object representing the instance to be checked.
641

642
  @rtype: tuple
643
  @return: tuple of (result, description) where:
644
      - result: whether the instance can be migrated or not
645
      - description: a description of the issue, if relevant
646

647
  """
648
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
649
  if instance.name not in hyper.ListInstances():
650
    return (False, 'not running')
651

    
652
  for idx in range(len(instance.disks)):
653
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
654
    if not os.path.islink(link_name):
655
      return (False, 'not restarted since ganeti 1.2.5')
656

    
657
  return (True, '')
658

    
659

    
660
def GetAllInstancesInfo(hypervisor_list):
661
  """Gather data about all instances.
662

663
  This is the equivalent of L{GetInstanceInfo}, except that it
664
  computes data for all instances at once, thus being faster if one
665
  needs data about more than one instance.
666

667
  @type hypervisor_list: list
668
  @param hypervisor_list: list of hypervisors to query for instance data
669

670
  @rtype: dict
671
  @return: dictionary of instance: data, with data having the following keys:
672
      - memory: memory size of instance (int)
673
      - state: xen state of instance (string)
674
      - time: cpu time of instance (float)
675
      - vcpus: the number of vcpus
676

677
  """
678
  output = {}
679

    
680
  for hname in hypervisor_list:
681
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
682
    if iinfo:
683
      for name, inst_id, memory, vcpus, state, times in iinfo:
684
        value = {
685
          'memory': memory,
686
          'vcpus': vcpus,
687
          'state': state,
688
          'time': times,
689
          }
690
        if name in output:
691
          # we only check static parameters, like memory and vcpus,
692
          # and not state and time which can change between the
693
          # invocations of the different hypervisors
694
          for key in 'memory', 'vcpus':
695
            if value[key] != output[name][key]:
696
              raise errors.HypervisorError("Instance %s is running twice"
697
                                           " with different parameters" % name)
698
        output[name] = value
699

    
700
  return output
701

    
702

    
703
def InstanceOsAdd(instance, reinstall):
704
  """Add an OS to an instance.
705

706
  @type instance: L{objects.Instance}
707
  @param instance: Instance whose OS is to be installed
708
  @type reinstall: boolean
709
  @param reinstall: whether this is an instance reinstall
710
  @rtype: boolean
711
  @return: the success of the operation
712

713
  """
714
  try:
715
    inst_os = OSFromDisk(instance.os)
716
  except errors.InvalidOS, err:
717
    os_name, os_dir, os_err = err.args
718
    if os_dir is None:
719
      return (False, "Can't find OS '%s': %s" % (os_name, os_err))
720
    else:
721
      return (False, "Error parsing OS '%s' in directory %s: %s" %
722
              (os_name, os_dir, os_err))
723

    
724
  create_env = OSEnvironment(instance)
725
  if reinstall:
726
    create_env['INSTANCE_REINSTALL'] = "1"
727

    
728
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
729
                                     instance.name, int(time.time()))
730

    
731
  result = utils.RunCmd([inst_os.create_script], env=create_env,
732
                        cwd=inst_os.path, output=logfile,)
733
  if result.failed:
734
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
735
                  " output: %s", result.cmd, result.fail_reason, logfile,
736
                  result.output)
737
    lines = [utils.SafeEncode(val)
738
             for val in utils.TailFile(logfile, lines=20)]
739
    return (False, "OS create script failed (%s), last lines in the"
740
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
741

    
742
  return (True, "Successfully installed")
743

    
744

    
745
def RunRenameInstance(instance, old_name):
746
  """Run the OS rename script for an instance.
747

748
  @type instance: L{objects.Instance}
749
  @param instance: Instance whose OS is to be installed
750
  @type old_name: string
751
  @param old_name: previous instance name
752
  @rtype: boolean
753
  @return: the success of the operation
754

755
  """
756
  inst_os = OSFromDisk(instance.os)
757

    
758
  rename_env = OSEnvironment(instance)
759
  rename_env['OLD_INSTANCE_NAME'] = old_name
760

    
761
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
762
                                           old_name,
763
                                           instance.name, int(time.time()))
764

    
765
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
766
                        cwd=inst_os.path, output=logfile)
767

    
768
  if result.failed:
769
    logging.error("os create command '%s' returned error: %s output: %s",
770
                  result.cmd, result.fail_reason, result.output)
771
    lines = [utils.SafeEncode(val)
772
             for val in utils.TailFile(logfile, lines=20)]
773
    return (False, "OS rename script failed (%s), last lines in the"
774
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
775

    
776
  return (True, "Rename successful")
777

    
778

    
779
def _GetVGInfo(vg_name):
780
  """Get informations about the volume group.
781

782
  @type vg_name: str
783
  @param vg_name: the volume group which we query
784
  @rtype: dict
785
  @return:
786
    A dictionary with the following keys:
787
      - C{vg_size} is the total size of the volume group in MiB
788
      - C{vg_free} is the free size of the volume group in MiB
789
      - C{pv_count} are the number of physical disks in that VG
790

791
    If an error occurs during gathering of data, we return the same dict
792
    with keys all set to None.
793

794
  """
795
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
796

    
797
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
798
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
799

    
800
  if retval.failed:
801
    logging.error("volume group %s not present", vg_name)
802
    return retdic
803
  valarr = retval.stdout.strip().rstrip(':').split(':')
804
  if len(valarr) == 3:
805
    try:
806
      retdic = {
807
        "vg_size": int(round(float(valarr[0]), 0)),
808
        "vg_free": int(round(float(valarr[1]), 0)),
809
        "pv_count": int(valarr[2]),
810
        }
811
    except ValueError, err:
812
      logging.exception("Fail to parse vgs output")
813
  else:
814
    logging.error("vgs output has the wrong number of fields (expected"
815
                  " three): %s", str(valarr))
816
  return retdic
817

    
818

    
819
def _GetBlockDevSymlinkPath(instance_name, idx):
820
  return os.path.join(constants.DISK_LINKS_DIR,
821
                      "%s:%d" % (instance_name, idx))
822

    
823

    
824
def _SymlinkBlockDev(instance_name, device_path, idx):
825
  """Set up symlinks to a instance's block device.
826

827
  This is an auxiliary function run when an instance is start (on the primary
828
  node) or when an instance is migrated (on the target node).
829

830

831
  @param instance_name: the name of the target instance
832
  @param device_path: path of the physical block device, on the node
833
  @param idx: the disk index
834
  @return: absolute path to the disk's symlink
835

836
  """
837
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
838
  try:
839
    os.symlink(device_path, link_name)
840
  except OSError, err:
841
    if err.errno == errno.EEXIST:
842
      if (not os.path.islink(link_name) or
843
          os.readlink(link_name) != device_path):
844
        os.remove(link_name)
845
        os.symlink(device_path, link_name)
846
    else:
847
      raise
848

    
849
  return link_name
850

    
851

    
852
def _RemoveBlockDevLinks(instance_name, disks):
853
  """Remove the block device symlinks belonging to the given instance.
854

855
  """
856
  for idx, disk in enumerate(disks):
857
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
858
    if os.path.islink(link_name):
859
      try:
860
        os.remove(link_name)
861
      except OSError:
862
        logging.exception("Can't remove symlink '%s'", link_name)
863

    
864

    
865
def _GatherAndLinkBlockDevs(instance):
866
  """Set up an instance's block device(s).
867

868
  This is run on the primary node at instance startup. The block
869
  devices must be already assembled.
870

871
  @type instance: L{objects.Instance}
872
  @param instance: the instance whose disks we shoul assemble
873
  @rtype: list
874
  @return: list of (disk_object, device_path)
875

876
  """
877
  block_devices = []
878
  for idx, disk in enumerate(instance.disks):
879
    device = _RecursiveFindBD(disk)
880
    if device is None:
881
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
882
                                    str(disk))
883
    device.Open()
884
    try:
885
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
886
    except OSError, e:
887
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
888
                                    e.strerror)
889

    
890
    block_devices.append((disk, link_name))
891

    
892
  return block_devices
893

    
894

    
895
def StartInstance(instance):
896
  """Start an instance.
897

898
  @type instance: L{objects.Instance}
899
  @param instance: the instance object
900
  @rtype: boolean
901
  @return: whether the startup was successful or not
902

903
  """
904
  running_instances = GetInstanceList([instance.hypervisor])
905

    
906
  if instance.name in running_instances:
907
    return (True, "Already running")
908

    
909
  try:
910
    block_devices = _GatherAndLinkBlockDevs(instance)
911
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
912
    hyper.StartInstance(instance, block_devices)
913
  except errors.BlockDeviceError, err:
914
    _Fail("Block device error: %s", err, exc=True)
915
  except errors.HypervisorError, err:
916
    _RemoveBlockDevLinks(instance.name, instance.disks)
917
    _Fail("Hypervisor error: %s", err, exc=True)
918

    
919
  return (True, "Instance started successfully")
920

    
921

    
922
def InstanceShutdown(instance):
923
  """Shut an instance down.
924

925
  @note: this functions uses polling with a hardcoded timeout.
926

927
  @type instance: L{objects.Instance}
928
  @param instance: the instance object
929
  @rtype: boolean
930
  @return: whether the startup was successful or not
931

932
  """
933
  hv_name = instance.hypervisor
934
  running_instances = GetInstanceList([hv_name])
935

    
936
  if instance.name not in running_instances:
937
    return (True, "Instance already stopped")
938

    
939
  hyper = hypervisor.GetHypervisor(hv_name)
940
  try:
941
    hyper.StopInstance(instance)
942
  except errors.HypervisorError, err:
943
    _Fail("Failed to stop instance %s: %s", instance.name, err)
944

    
945
  # test every 10secs for 2min
946

    
947
  time.sleep(1)
948
  for dummy in range(11):
949
    if instance.name not in GetInstanceList([hv_name]):
950
      break
951
    time.sleep(10)
952
  else:
953
    # the shutdown did not succeed
954
    logging.error("Shutdown of '%s' unsuccessful, using destroy",
955
                  instance.name)
956

    
957
    try:
958
      hyper.StopInstance(instance, force=True)
959
    except errors.HypervisorError, err:
960
      _Fail("Failed to force stop instance %s: %s", instance.name, err)
961

    
962
    time.sleep(1)
963
    if instance.name in GetInstanceList([hv_name]):
964
      _Fail("Could not shutdown instance %s even by destroy", instance.name)
965

    
966
  _RemoveBlockDevLinks(instance.name, instance.disks)
967

    
968
  return (True, "Instance has been shutdown successfully")
969

    
970

    
971
def InstanceReboot(instance, reboot_type):
972
  """Reboot an instance.
973

974
  @type instance: L{objects.Instance}
975
  @param instance: the instance object to reboot
976
  @type reboot_type: str
977
  @param reboot_type: the type of reboot, one the following
978
    constants:
979
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
980
        instance OS, do not recreate the VM
981
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
982
        restart the VM (at the hypervisor level)
983
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
984
        is not accepted here, since that mode is handled
985
        differently
986
  @rtype: boolean
987
  @return: the success of the operation
988

989
  """
990
  running_instances = GetInstanceList([instance.hypervisor])
991

    
992
  if instance.name not in running_instances:
993
    _Fail("Cannot reboot instance %s that is not running", instance.name)
994

    
995
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
996
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
997
    try:
998
      hyper.RebootInstance(instance)
999
    except errors.HypervisorError, err:
1000
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1001
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1002
    try:
1003
      stop_result = InstanceShutdown(instance)
1004
      if not stop_result[0]:
1005
        return stop_result
1006
      return StartInstance(instance)
1007
    except errors.HypervisorError, err:
1008
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1009
  else:
1010
    _Fail("Invalid reboot_type received: %s", reboot_type)
1011

    
1012
  return (True, "Reboot successful")
1013

    
1014

    
1015
def MigrationInfo(instance):
1016
  """Gather information about an instance to be migrated.
1017

1018
  @type instance: L{objects.Instance}
1019
  @param instance: the instance definition
1020

1021
  """
1022
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1023
  try:
1024
    info = hyper.MigrationInfo(instance)
1025
  except errors.HypervisorError, err:
1026
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1027
  return (True, info)
1028

    
1029

    
1030
def AcceptInstance(instance, info, target):
1031
  """Prepare the node to accept an instance.
1032

1033
  @type instance: L{objects.Instance}
1034
  @param instance: the instance definition
1035
  @type info: string/data (opaque)
1036
  @param info: migration information, from the source node
1037
  @type target: string
1038
  @param target: target host (usually ip), on this node
1039

1040
  """
1041
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1042
  try:
1043
    hyper.AcceptInstance(instance, info, target)
1044
  except errors.HypervisorError, err:
1045
    _Fail("Failed to accept instance: %s", err, exc=True)
1046
  return (True, "Accept successfull")
1047

    
1048

    
1049
def FinalizeMigration(instance, info, success):
1050
  """Finalize any preparation to accept an instance.
1051

1052
  @type instance: L{objects.Instance}
1053
  @param instance: the instance definition
1054
  @type info: string/data (opaque)
1055
  @param info: migration information, from the source node
1056
  @type success: boolean
1057
  @param success: whether the migration was a success or a failure
1058

1059
  """
1060
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1061
  try:
1062
    hyper.FinalizeMigration(instance, info, success)
1063
  except errors.HypervisorError, err:
1064
    _Fail("Failed to finalize migration: %s", err, exc=True)
1065
  return (True, "Migration Finalized")
1066

    
1067

    
1068
def MigrateInstance(instance, target, live):
1069
  """Migrates an instance to another node.
1070

1071
  @type instance: L{objects.Instance}
1072
  @param instance: the instance definition
1073
  @type target: string
1074
  @param target: the target node name
1075
  @type live: boolean
1076
  @param live: whether the migration should be done live or not (the
1077
      interpretation of this parameter is left to the hypervisor)
1078
  @rtype: tuple
1079
  @return: a tuple of (success, msg) where:
1080
      - succes is a boolean denoting the success/failure of the operation
1081
      - msg is a string with details in case of failure
1082

1083
  """
1084
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1085

    
1086
  try:
1087
    hyper.MigrateInstance(instance.name, target, live)
1088
  except errors.HypervisorError, err:
1089
    _Fail("Failed to migrate instance: %s", err, exc=True)
1090
  return (True, "Migration successfull")
1091

    
1092

    
1093
def BlockdevCreate(disk, size, owner, on_primary, info):
1094
  """Creates a block device for an instance.
1095

1096
  @type disk: L{objects.Disk}
1097
  @param disk: the object describing the disk we should create
1098
  @type size: int
1099
  @param size: the size of the physical underlying device, in MiB
1100
  @type owner: str
1101
  @param owner: the name of the instance for which disk is created,
1102
      used for device cache data
1103
  @type on_primary: boolean
1104
  @param on_primary:  indicates if it is the primary node or not
1105
  @type info: string
1106
  @param info: string that will be sent to the physical device
1107
      creation, used for example to set (LVM) tags on LVs
1108

1109
  @return: the new unique_id of the device (this can sometime be
1110
      computed only after creation), or None. On secondary nodes,
1111
      it's not required to return anything.
1112

1113
  """
1114
  clist = []
1115
  if disk.children:
1116
    for child in disk.children:
1117
      try:
1118
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1119
      except errors.BlockDeviceError, err:
1120
        _Fail("Can't assemble device %s: %s", child, err)
1121
      if on_primary or disk.AssembleOnSecondary():
1122
        # we need the children open in case the device itself has to
1123
        # be assembled
1124
        try:
1125
          crdev.Open()
1126
        except errors.BlockDeviceError, err:
1127
          _Fail("Can't make child '%s' read-write: %s", child, err)
1128
      clist.append(crdev)
1129

    
1130
  try:
1131
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1132
  except errors.BlockDeviceError, err:
1133
    _Fail("Can't create block device: %s", err)
1134

    
1135
  if on_primary or disk.AssembleOnSecondary():
1136
    try:
1137
      device.Assemble()
1138
    except errors.BlockDeviceError, err:
1139
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1140
    device.SetSyncSpeed(constants.SYNC_SPEED)
1141
    if on_primary or disk.OpenOnSecondary():
1142
      try:
1143
        device.Open(force=True)
1144
      except errors.BlockDeviceError, err:
1145
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1146
    DevCacheManager.UpdateCache(device.dev_path, owner,
1147
                                on_primary, disk.iv_name)
1148

    
1149
  device.SetInfo(info)
1150

    
1151
  physical_id = device.unique_id
1152
  return True, physical_id
1153

    
1154

    
1155
def BlockdevRemove(disk):
1156
  """Remove a block device.
1157

1158
  @note: This is intended to be called recursively.
1159

1160
  @type disk: L{objects.Disk}
1161
  @param disk: the disk object we should remove
1162
  @rtype: boolean
1163
  @return: the success of the operation
1164

1165
  """
1166
  msgs = []
1167
  result = True
1168
  try:
1169
    rdev = _RecursiveFindBD(disk)
1170
  except errors.BlockDeviceError, err:
1171
    # probably can't attach
1172
    logging.info("Can't attach to device %s in remove", disk)
1173
    rdev = None
1174
  if rdev is not None:
1175
    r_path = rdev.dev_path
1176
    try:
1177
      rdev.Remove()
1178
    except errors.BlockDeviceError, err:
1179
      msgs.append(str(err))
1180
      result = False
1181
    if result:
1182
      DevCacheManager.RemoveCache(r_path)
1183

    
1184
  if disk.children:
1185
    for child in disk.children:
1186
      c_status, c_msg = BlockdevRemove(child)
1187
      result = result and c_status
1188
      if c_msg: # not an empty message
1189
        msgs.append(c_msg)
1190

    
1191
  return (result, "; ".join(msgs))
1192

    
1193

    
1194
def _RecursiveAssembleBD(disk, owner, as_primary):
1195
  """Activate a block device for an instance.
1196

1197
  This is run on the primary and secondary nodes for an instance.
1198

1199
  @note: this function is called recursively.
1200

1201
  @type disk: L{objects.Disk}
1202
  @param disk: the disk we try to assemble
1203
  @type owner: str
1204
  @param owner: the name of the instance which owns the disk
1205
  @type as_primary: boolean
1206
  @param as_primary: if we should make the block device
1207
      read/write
1208

1209
  @return: the assembled device or None (in case no device
1210
      was assembled)
1211
  @raise errors.BlockDeviceError: in case there is an error
1212
      during the activation of the children or the device
1213
      itself
1214

1215
  """
1216
  children = []
1217
  if disk.children:
1218
    mcn = disk.ChildrenNeeded()
1219
    if mcn == -1:
1220
      mcn = 0 # max number of Nones allowed
1221
    else:
1222
      mcn = len(disk.children) - mcn # max number of Nones
1223
    for chld_disk in disk.children:
1224
      try:
1225
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1226
      except errors.BlockDeviceError, err:
1227
        if children.count(None) >= mcn:
1228
          raise
1229
        cdev = None
1230
        logging.error("Error in child activation (but continuing): %s",
1231
                      str(err))
1232
      children.append(cdev)
1233

    
1234
  if as_primary or disk.AssembleOnSecondary():
1235
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1236
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1237
    result = r_dev
1238
    if as_primary or disk.OpenOnSecondary():
1239
      r_dev.Open()
1240
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1241
                                as_primary, disk.iv_name)
1242

    
1243
  else:
1244
    result = True
1245
  return result
1246

    
1247

    
1248
def BlockdevAssemble(disk, owner, as_primary):
1249
  """Activate a block device for an instance.
1250

1251
  This is a wrapper over _RecursiveAssembleBD.
1252

1253
  @rtype: str or boolean
1254
  @return: a C{/dev/...} path for primary nodes, and
1255
      C{True} for secondary nodes
1256

1257
  """
1258
  status = True
1259
  result = "no error information"
1260
  try:
1261
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1262
    if isinstance(result, bdev.BlockDev):
1263
      result = result.dev_path
1264
  except errors.BlockDeviceError, err:
1265
    result = "Error while assembling disk: %s" % str(err)
1266
    status = False
1267
  return (status, result)
1268

    
1269

    
1270
def BlockdevShutdown(disk):
1271
  """Shut down a block device.
1272

1273
  First, if the device is assembled (Attach() is successfull), then
1274
  the device is shutdown. Then the children of the device are
1275
  shutdown.
1276

1277
  This function is called recursively. Note that we don't cache the
1278
  children or such, as oppossed to assemble, shutdown of different
1279
  devices doesn't require that the upper device was active.
1280

1281
  @type disk: L{objects.Disk}
1282
  @param disk: the description of the disk we should
1283
      shutdown
1284
  @rtype: boolean
1285
  @return: the success of the operation
1286

1287
  """
1288
  msgs = []
1289
  result = True
1290
  r_dev = _RecursiveFindBD(disk)
1291
  if r_dev is not None:
1292
    r_path = r_dev.dev_path
1293
    try:
1294
      r_dev.Shutdown()
1295
      DevCacheManager.RemoveCache(r_path)
1296
    except errors.BlockDeviceError, err:
1297
      msgs.append(str(err))
1298
      result = False
1299

    
1300
  if disk.children:
1301
    for child in disk.children:
1302
      c_status, c_msg = BlockdevShutdown(child)
1303
      result = result and c_status
1304
      if c_msg: # not an empty message
1305
        msgs.append(c_msg)
1306

    
1307
  return (result, "; ".join(msgs))
1308

    
1309

    
1310
def BlockdevAddchildren(parent_cdev, new_cdevs):
1311
  """Extend a mirrored block device.
1312

1313
  @type parent_cdev: L{objects.Disk}
1314
  @param parent_cdev: the disk to which we should add children
1315
  @type new_cdevs: list of L{objects.Disk}
1316
  @param new_cdevs: the list of children which we should add
1317
  @rtype: boolean
1318
  @return: the success of the operation
1319

1320
  """
1321
  parent_bdev = _RecursiveFindBD(parent_cdev)
1322
  if parent_bdev is None:
1323
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1324
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1325
  if new_bdevs.count(None) > 0:
1326
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1327
  parent_bdev.AddChildren(new_bdevs)
1328
  return (True, None)
1329

    
1330

    
1331
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1332
  """Shrink a mirrored block device.
1333

1334
  @type parent_cdev: L{objects.Disk}
1335
  @param parent_cdev: the disk from which we should remove children
1336
  @type new_cdevs: list of L{objects.Disk}
1337
  @param new_cdevs: the list of children which we should remove
1338
  @rtype: boolean
1339
  @return: the success of the operation
1340

1341
  """
1342
  parent_bdev = _RecursiveFindBD(parent_cdev)
1343
  if parent_bdev is None:
1344
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1345
  devs = []
1346
  for disk in new_cdevs:
1347
    rpath = disk.StaticDevPath()
1348
    if rpath is None:
1349
      bd = _RecursiveFindBD(disk)
1350
      if bd is None:
1351
        _Fail("Can't find device %s while removing children", disk)
1352
      else:
1353
        devs.append(bd.dev_path)
1354
    else:
1355
      devs.append(rpath)
1356
  parent_bdev.RemoveChildren(devs)
1357
  return (True, None)
1358

    
1359

    
1360
def BlockdevGetmirrorstatus(disks):
1361
  """Get the mirroring status of a list of devices.
1362

1363
  @type disks: list of L{objects.Disk}
1364
  @param disks: the list of disks which we should query
1365
  @rtype: disk
1366
  @return:
1367
      a list of (mirror_done, estimated_time) tuples, which
1368
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1369
  @raise errors.BlockDeviceError: if any of the disks cannot be
1370
      found
1371

1372
  """
1373
  stats = []
1374
  for dsk in disks:
1375
    rbd = _RecursiveFindBD(dsk)
1376
    if rbd is None:
1377
      _Fail("Can't find device %s", dsk)
1378
    stats.append(rbd.CombinedSyncStatus())
1379
  return True, stats
1380

    
1381

    
1382
def _RecursiveFindBD(disk):
1383
  """Check if a device is activated.
1384

1385
  If so, return informations about the real device.
1386

1387
  @type disk: L{objects.Disk}
1388
  @param disk: the disk object we need to find
1389

1390
  @return: None if the device can't be found,
1391
      otherwise the device instance
1392

1393
  """
1394
  children = []
1395
  if disk.children:
1396
    for chdisk in disk.children:
1397
      children.append(_RecursiveFindBD(chdisk))
1398

    
1399
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1400

    
1401

    
1402
def BlockdevFind(disk):
1403
  """Check if a device is activated.
1404

1405
  If it is, return informations about the real device.
1406

1407
  @type disk: L{objects.Disk}
1408
  @param disk: the disk to find
1409
  @rtype: None or tuple
1410
  @return: None if the disk cannot be found, otherwise a
1411
      tuple (device_path, major, minor, sync_percent,
1412
      estimated_time, is_degraded)
1413

1414
  """
1415
  try:
1416
    rbd = _RecursiveFindBD(disk)
1417
  except errors.BlockDeviceError, err:
1418
    _Fail("Failed to find device: %s", err, exc=True)
1419
  if rbd is None:
1420
    return (True, None)
1421
  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1422

    
1423

    
1424
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1425
  """Write a file to the filesystem.
1426

1427
  This allows the master to overwrite(!) a file. It will only perform
1428
  the operation if the file belongs to a list of configuration files.
1429

1430
  @type file_name: str
1431
  @param file_name: the target file name
1432
  @type data: str
1433
  @param data: the new contents of the file
1434
  @type mode: int
1435
  @param mode: the mode to give the file (can be None)
1436
  @type uid: int
1437
  @param uid: the owner of the file (can be -1 for default)
1438
  @type gid: int
1439
  @param gid: the group of the file (can be -1 for default)
1440
  @type atime: float
1441
  @param atime: the atime to set on the file (can be None)
1442
  @type mtime: float
1443
  @param mtime: the mtime to set on the file (can be None)
1444
  @rtype: boolean
1445
  @return: the success of the operation; errors are logged
1446
      in the node daemon log
1447

1448
  """
1449
  if not os.path.isabs(file_name):
1450
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1451

    
1452
  allowed_files = set([
1453
    constants.CLUSTER_CONF_FILE,
1454
    constants.ETC_HOSTS,
1455
    constants.SSH_KNOWN_HOSTS_FILE,
1456
    constants.VNC_PASSWORD_FILE,
1457
    constants.RAPI_CERT_FILE,
1458
    constants.RAPI_USERS_FILE,
1459
    ])
1460

    
1461
  for hv_name in constants.HYPER_TYPES:
1462
    hv_class = hypervisor.GetHypervisor(hv_name)
1463
    allowed_files.update(hv_class.GetAncillaryFiles())
1464

    
1465
  if file_name not in allowed_files:
1466
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1467
          file_name)
1468

    
1469
  raw_data = _Decompress(data)
1470

    
1471
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1472
                  atime=atime, mtime=mtime)
1473
  return (True, "success")
1474

    
1475

    
1476
def WriteSsconfFiles(values):
1477
  """Update all ssconf files.
1478

1479
  Wrapper around the SimpleStore.WriteFiles.
1480

1481
  """
1482
  ssconf.SimpleStore().WriteFiles(values)
1483

    
1484

    
1485
def _ErrnoOrStr(err):
1486
  """Format an EnvironmentError exception.
1487

1488
  If the L{err} argument has an errno attribute, it will be looked up
1489
  and converted into a textual C{E...} description. Otherwise the
1490
  string representation of the error will be returned.
1491

1492
  @type err: L{EnvironmentError}
1493
  @param err: the exception to format
1494

1495
  """
1496
  if hasattr(err, 'errno'):
1497
    detail = errno.errorcode[err.errno]
1498
  else:
1499
    detail = str(err)
1500
  return detail
1501

    
1502

    
1503
def _OSOndiskVersion(name, os_dir):
1504
  """Compute and return the API version of a given OS.
1505

1506
  This function will try to read the API version of the OS given by
1507
  the 'name' parameter and residing in the 'os_dir' directory.
1508

1509
  @type name: str
1510
  @param name: the OS name we should look for
1511
  @type os_dir: str
1512
  @param os_dir: the directory inwhich we should look for the OS
1513
  @rtype: int or None
1514
  @return:
1515
      Either an integer denoting the version or None in the
1516
      case when this is not a valid OS name.
1517
  @raise errors.InvalidOS: if the OS cannot be found
1518

1519
  """
1520
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1521

    
1522
  try:
1523
    st = os.stat(api_file)
1524
  except EnvironmentError, err:
1525
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1526
                           " found (%s)" % _ErrnoOrStr(err))
1527

    
1528
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1529
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1530
                           " a regular file")
1531

    
1532
  try:
1533
    f = open(api_file)
1534
    try:
1535
      api_versions = f.readlines()
1536
    finally:
1537
      f.close()
1538
  except EnvironmentError, err:
1539
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1540
                           " API version (%s)" % _ErrnoOrStr(err))
1541

    
1542
  api_versions = [version.strip() for version in api_versions]
1543
  try:
1544
    api_versions = [int(version) for version in api_versions]
1545
  except (TypeError, ValueError), err:
1546
    raise errors.InvalidOS(name, os_dir,
1547
                           "API version is not integer (%s)" % str(err))
1548

    
1549
  return api_versions
1550

    
1551

    
1552
def DiagnoseOS(top_dirs=None):
1553
  """Compute the validity for all OSes.
1554

1555
  @type top_dirs: list
1556
  @param top_dirs: the list of directories in which to
1557
      search (if not given defaults to
1558
      L{constants.OS_SEARCH_PATH})
1559
  @rtype: list of L{objects.OS}
1560
  @return: an OS object for each name in all the given
1561
      directories
1562

1563
  """
1564
  if top_dirs is None:
1565
    top_dirs = constants.OS_SEARCH_PATH
1566

    
1567
  result = []
1568
  for dir_name in top_dirs:
1569
    if os.path.isdir(dir_name):
1570
      try:
1571
        f_names = utils.ListVisibleFiles(dir_name)
1572
      except EnvironmentError, err:
1573
        logging.exception("Can't list the OS directory %s", dir_name)
1574
        break
1575
      for name in f_names:
1576
        try:
1577
          os_inst = OSFromDisk(name, base_dir=dir_name)
1578
          result.append(os_inst)
1579
        except errors.InvalidOS, err:
1580
          result.append(objects.OS.FromInvalidOS(err))
1581

    
1582
  return result
1583

    
1584

    
1585
def OSFromDisk(name, base_dir=None):
1586
  """Create an OS instance from disk.
1587

1588
  This function will return an OS instance if the given name is a
1589
  valid OS name. Otherwise, it will raise an appropriate
1590
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1591

1592
  @type base_dir: string
1593
  @keyword base_dir: Base directory containing OS installations.
1594
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1595
  @rtype: L{objects.OS}
1596
  @return: the OS instance if we find a valid one
1597
  @raise errors.InvalidOS: if we don't find a valid OS
1598

1599
  """
1600
  if base_dir is None:
1601
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1602
    if os_dir is None:
1603
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1604
  else:
1605
    os_dir = os.path.sep.join([base_dir, name])
1606

    
1607
  api_versions = _OSOndiskVersion(name, os_dir)
1608

    
1609
  if constants.OS_API_VERSION not in api_versions:
1610
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1611
                           " (found %s want %s)"
1612
                           % (api_versions, constants.OS_API_VERSION))
1613

    
1614
  # OS Scripts dictionary, we will populate it with the actual script names
1615
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1616

    
1617
  for script in os_scripts:
1618
    os_scripts[script] = os.path.sep.join([os_dir, script])
1619

    
1620
    try:
1621
      st = os.stat(os_scripts[script])
1622
    except EnvironmentError, err:
1623
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1624
                             (script, _ErrnoOrStr(err)))
1625

    
1626
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1627
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1628
                             script)
1629

    
1630
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1631
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1632
                             script)
1633

    
1634

    
1635
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1636
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1637
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1638
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1639
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1640
                    api_versions=api_versions)
1641

    
1642
def OSEnvironment(instance, debug=0):
1643
  """Calculate the environment for an os script.
1644

1645
  @type instance: L{objects.Instance}
1646
  @param instance: target instance for the os script run
1647
  @type debug: integer
1648
  @param debug: debug level (0 or 1, for OS Api 10)
1649
  @rtype: dict
1650
  @return: dict of environment variables
1651
  @raise errors.BlockDeviceError: if the block device
1652
      cannot be found
1653

1654
  """
1655
  result = {}
1656
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1657
  result['INSTANCE_NAME'] = instance.name
1658
  result['INSTANCE_OS'] = instance.os
1659
  result['HYPERVISOR'] = instance.hypervisor
1660
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1661
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1662
  result['DEBUG_LEVEL'] = '%d' % debug
1663
  for idx, disk in enumerate(instance.disks):
1664
    real_disk = _RecursiveFindBD(disk)
1665
    if real_disk is None:
1666
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1667
                                    str(disk))
1668
    real_disk.Open()
1669
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1670
    result['DISK_%d_ACCESS' % idx] = disk.mode
1671
    if constants.HV_DISK_TYPE in instance.hvparams:
1672
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1673
        instance.hvparams[constants.HV_DISK_TYPE]
1674
    if disk.dev_type in constants.LDS_BLOCK:
1675
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1676
    elif disk.dev_type == constants.LD_FILE:
1677
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1678
        'file:%s' % disk.physical_id[0]
1679
  for idx, nic in enumerate(instance.nics):
1680
    result['NIC_%d_MAC' % idx] = nic.mac
1681
    if nic.ip:
1682
      result['NIC_%d_IP' % idx] = nic.ip
1683
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1684
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1685
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1686
    if nic.nicparams[constants.NIC_LINK]:
1687
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1688
    if constants.HV_NIC_TYPE in instance.hvparams:
1689
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1690
        instance.hvparams[constants.HV_NIC_TYPE]
1691

    
1692
  return result
1693

    
1694
def BlockdevGrow(disk, amount):
1695
  """Grow a stack of block devices.
1696

1697
  This function is called recursively, with the childrens being the
1698
  first ones to resize.
1699

1700
  @type disk: L{objects.Disk}
1701
  @param disk: the disk to be grown
1702
  @rtype: (status, result)
1703
  @return: a tuple with the status of the operation
1704
      (True/False), and the errors message if status
1705
      is False
1706

1707
  """
1708
  r_dev = _RecursiveFindBD(disk)
1709
  if r_dev is None:
1710
    return False, "Cannot find block device %s" % (disk,)
1711

    
1712
  try:
1713
    r_dev.Grow(amount)
1714
  except errors.BlockDeviceError, err:
1715
    _Fail("Failed to grow block device: %s", err, exc=True)
1716

    
1717
  return True, None
1718

    
1719

    
1720
def BlockdevSnapshot(disk):
1721
  """Create a snapshot copy of a block device.
1722

1723
  This function is called recursively, and the snapshot is actually created
1724
  just for the leaf lvm backend device.
1725

1726
  @type disk: L{objects.Disk}
1727
  @param disk: the disk to be snapshotted
1728
  @rtype: string
1729
  @return: snapshot disk path
1730

1731
  """
1732
  if disk.children:
1733
    if len(disk.children) == 1:
1734
      # only one child, let's recurse on it
1735
      return BlockdevSnapshot(disk.children[0])
1736
    else:
1737
      # more than one child, choose one that matches
1738
      for child in disk.children:
1739
        if child.size == disk.size:
1740
          # return implies breaking the loop
1741
          return BlockdevSnapshot(child)
1742
  elif disk.dev_type == constants.LD_LV:
1743
    r_dev = _RecursiveFindBD(disk)
1744
    if r_dev is not None:
1745
      # let's stay on the safe side and ask for the full size, for now
1746
      return True, r_dev.Snapshot(disk.size)
1747
    else:
1748
      _Fail("Cannot find block device %s", disk)
1749
  else:
1750
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1751
          disk.unique_id, disk.dev_type)
1752

    
1753

    
1754
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1755
  """Export a block device snapshot to a remote node.
1756

1757
  @type disk: L{objects.Disk}
1758
  @param disk: the description of the disk to export
1759
  @type dest_node: str
1760
  @param dest_node: the destination node to export to
1761
  @type instance: L{objects.Instance}
1762
  @param instance: the instance object to whom the disk belongs
1763
  @type cluster_name: str
1764
  @param cluster_name: the cluster name, needed for SSH hostalias
1765
  @type idx: int
1766
  @param idx: the index of the disk in the instance's disk list,
1767
      used to export to the OS scripts environment
1768
  @rtype: boolean
1769
  @return: the success of the operation
1770

1771
  """
1772
  export_env = OSEnvironment(instance)
1773

    
1774
  inst_os = OSFromDisk(instance.os)
1775
  export_script = inst_os.export_script
1776

    
1777
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1778
                                     instance.name, int(time.time()))
1779
  if not os.path.exists(constants.LOG_OS_DIR):
1780
    os.mkdir(constants.LOG_OS_DIR, 0750)
1781
  real_disk = _RecursiveFindBD(disk)
1782
  if real_disk is None:
1783
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1784
                                  str(disk))
1785
  real_disk.Open()
1786

    
1787
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1788
  export_env['EXPORT_INDEX'] = str(idx)
1789

    
1790
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1791
  destfile = disk.physical_id[1]
1792

    
1793
  # the target command is built out of three individual commands,
1794
  # which are joined by pipes; we check each individual command for
1795
  # valid parameters
1796
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1797
                               export_script, logfile)
1798

    
1799
  comprcmd = "gzip"
1800

    
1801
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1802
                                destdir, destdir, destfile)
1803
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1804
                                                   constants.GANETI_RUNAS,
1805
                                                   destcmd)
1806

    
1807
  # all commands have been checked, so we're safe to combine them
1808
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1809

    
1810
  result = utils.RunCmd(command, env=export_env)
1811

    
1812
  if result.failed:
1813
    logging.error("os snapshot export command '%s' returned error: %s"
1814
                  " output: %s", command, result.fail_reason, result.output)
1815
    return False
1816

    
1817
  return True
1818

    
1819

    
1820
def FinalizeExport(instance, snap_disks):
1821
  """Write out the export configuration information.
1822

1823
  @type instance: L{objects.Instance}
1824
  @param instance: the instance which we export, used for
1825
      saving configuration
1826
  @type snap_disks: list of L{objects.Disk}
1827
  @param snap_disks: list of snapshot block devices, which
1828
      will be used to get the actual name of the dump file
1829

1830
  @rtype: boolean
1831
  @return: the success of the operation
1832

1833
  """
1834
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1835
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1836

    
1837
  config = objects.SerializableConfigParser()
1838

    
1839
  config.add_section(constants.INISECT_EXP)
1840
  config.set(constants.INISECT_EXP, 'version', '0')
1841
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1842
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1843
  config.set(constants.INISECT_EXP, 'os', instance.os)
1844
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1845

    
1846
  config.add_section(constants.INISECT_INS)
1847
  config.set(constants.INISECT_INS, 'name', instance.name)
1848
  config.set(constants.INISECT_INS, 'memory', '%d' %
1849
             instance.beparams[constants.BE_MEMORY])
1850
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1851
             instance.beparams[constants.BE_VCPUS])
1852
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1853

    
1854
  nic_total = 0
1855
  for nic_count, nic in enumerate(instance.nics):
1856
    nic_total += 1
1857
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1858
               nic_count, '%s' % nic.mac)
1859
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1860
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1861
               '%s' % nic.bridge)
1862
  # TODO: redundant: on load can read nics until it doesn't exist
1863
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1864

    
1865
  disk_total = 0
1866
  for disk_count, disk in enumerate(snap_disks):
1867
    if disk:
1868
      disk_total += 1
1869
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1870
                 ('%s' % disk.iv_name))
1871
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1872
                 ('%s' % disk.physical_id[1]))
1873
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1874
                 ('%d' % disk.size))
1875

    
1876
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1877

    
1878
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1879
                  data=config.Dumps())
1880
  shutil.rmtree(finaldestdir, True)
1881
  shutil.move(destdir, finaldestdir)
1882

    
1883
  return True
1884

    
1885

    
1886
def ExportInfo(dest):
1887
  """Get export configuration information.
1888

1889
  @type dest: str
1890
  @param dest: directory containing the export
1891

1892
  @rtype: L{objects.SerializableConfigParser}
1893
  @return: a serializable config file containing the
1894
      export info
1895

1896
  """
1897
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1898

    
1899
  config = objects.SerializableConfigParser()
1900
  config.read(cff)
1901

    
1902
  if (not config.has_section(constants.INISECT_EXP) or
1903
      not config.has_section(constants.INISECT_INS)):
1904
    return None
1905

    
1906
  return config
1907

    
1908

    
1909
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1910
  """Import an os image into an instance.
1911

1912
  @type instance: L{objects.Instance}
1913
  @param instance: instance to import the disks into
1914
  @type src_node: string
1915
  @param src_node: source node for the disk images
1916
  @type src_images: list of string
1917
  @param src_images: absolute paths of the disk images
1918
  @rtype: list of boolean
1919
  @return: each boolean represent the success of importing the n-th disk
1920

1921
  """
1922
  import_env = OSEnvironment(instance)
1923
  inst_os = OSFromDisk(instance.os)
1924
  import_script = inst_os.import_script
1925

    
1926
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1927
                                        instance.name, int(time.time()))
1928
  if not os.path.exists(constants.LOG_OS_DIR):
1929
    os.mkdir(constants.LOG_OS_DIR, 0750)
1930

    
1931
  comprcmd = "gunzip"
1932
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1933
                               import_script, logfile)
1934

    
1935
  final_result = []
1936
  for idx, image in enumerate(src_images):
1937
    if image:
1938
      destcmd = utils.BuildShellCmd('cat %s', image)
1939
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1940
                                                       constants.GANETI_RUNAS,
1941
                                                       destcmd)
1942
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1943
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1944
      import_env['IMPORT_INDEX'] = str(idx)
1945
      result = utils.RunCmd(command, env=import_env)
1946
      if result.failed:
1947
        logging.error("Disk import command '%s' returned error: %s"
1948
                      " output: %s", command, result.fail_reason,
1949
                      result.output)
1950
        final_result.append(False)
1951
      else:
1952
        final_result.append(True)
1953
    else:
1954
      final_result.append(True)
1955

    
1956
  return final_result
1957

    
1958

    
1959
def ListExports():
1960
  """Return a list of exports currently available on this machine.
1961

1962
  @rtype: list
1963
  @return: list of the exports
1964

1965
  """
1966
  if os.path.isdir(constants.EXPORT_DIR):
1967
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1968
  else:
1969
    return []
1970

    
1971

    
1972
def RemoveExport(export):
1973
  """Remove an existing export from the node.
1974

1975
  @type export: str
1976
  @param export: the name of the export to remove
1977
  @rtype: boolean
1978
  @return: the success of the operation
1979

1980
  """
1981
  target = os.path.join(constants.EXPORT_DIR, export)
1982

    
1983
  shutil.rmtree(target)
1984
  # TODO: catch some of the relevant exceptions and provide a pretty
1985
  # error message if rmtree fails.
1986

    
1987
  return True
1988

    
1989

    
1990
def BlockdevRename(devlist):
1991
  """Rename a list of block devices.
1992

1993
  @type devlist: list of tuples
1994
  @param devlist: list of tuples of the form  (disk,
1995
      new_logical_id, new_physical_id); disk is an
1996
      L{objects.Disk} object describing the current disk,
1997
      and new logical_id/physical_id is the name we
1998
      rename it to
1999
  @rtype: boolean
2000
  @return: True if all renames succeeded, False otherwise
2001

2002
  """
2003
  msgs = []
2004
  result = True
2005
  for disk, unique_id in devlist:
2006
    dev = _RecursiveFindBD(disk)
2007
    if dev is None:
2008
      msgs.append("Can't find device %s in rename" % str(disk))
2009
      result = False
2010
      continue
2011
    try:
2012
      old_rpath = dev.dev_path
2013
      dev.Rename(unique_id)
2014
      new_rpath = dev.dev_path
2015
      if old_rpath != new_rpath:
2016
        DevCacheManager.RemoveCache(old_rpath)
2017
        # FIXME: we should add the new cache information here, like:
2018
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2019
        # but we don't have the owner here - maybe parse from existing
2020
        # cache? for now, we only lose lvm data when we rename, which
2021
        # is less critical than DRBD or MD
2022
    except errors.BlockDeviceError, err:
2023
      msgs.append("Can't rename device '%s' to '%s': %s" %
2024
                  (dev, unique_id, err))
2025
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2026
      result = False
2027
  return (result, "; ".join(msgs))
2028

    
2029

    
2030
def _TransformFileStorageDir(file_storage_dir):
2031
  """Checks whether given file_storage_dir is valid.
2032

2033
  Checks wheter the given file_storage_dir is within the cluster-wide
2034
  default file_storage_dir stored in SimpleStore. Only paths under that
2035
  directory are allowed.
2036

2037
  @type file_storage_dir: str
2038
  @param file_storage_dir: the path to check
2039

2040
  @return: the normalized path if valid, None otherwise
2041

2042
  """
2043
  cfg = _GetConfig()
2044
  file_storage_dir = os.path.normpath(file_storage_dir)
2045
  base_file_storage_dir = cfg.GetFileStorageDir()
2046
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2047
      base_file_storage_dir):
2048
    logging.error("file storage directory '%s' is not under base file"
2049
                  " storage directory '%s'",
2050
                  file_storage_dir, base_file_storage_dir)
2051
    return None
2052
  return file_storage_dir
2053

    
2054

    
2055
def CreateFileStorageDir(file_storage_dir):
2056
  """Create file storage directory.
2057

2058
  @type file_storage_dir: str
2059
  @param file_storage_dir: directory to create
2060

2061
  @rtype: tuple
2062
  @return: tuple with first element a boolean indicating wheter dir
2063
      creation was successful or not
2064

2065
  """
2066
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2067
  result = True,
2068
  if not file_storage_dir:
2069
    result = False,
2070
  else:
2071
    if os.path.exists(file_storage_dir):
2072
      if not os.path.isdir(file_storage_dir):
2073
        logging.error("'%s' is not a directory", file_storage_dir)
2074
        result = False,
2075
    else:
2076
      try:
2077
        os.makedirs(file_storage_dir, 0750)
2078
      except OSError, err:
2079
        logging.error("Cannot create file storage directory '%s': %s",
2080
                      file_storage_dir, err)
2081
        result = False,
2082
  return result
2083

    
2084

    
2085
def RemoveFileStorageDir(file_storage_dir):
2086
  """Remove file storage directory.
2087

2088
  Remove it only if it's empty. If not log an error and return.
2089

2090
  @type file_storage_dir: str
2091
  @param file_storage_dir: the directory we should cleanup
2092
  @rtype: tuple (success,)
2093
  @return: tuple of one element, C{success}, denoting
2094
      whether the operation was successfull
2095

2096
  """
2097
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2098
  result = True,
2099
  if not file_storage_dir:
2100
    result = False,
2101
  else:
2102
    if os.path.exists(file_storage_dir):
2103
      if not os.path.isdir(file_storage_dir):
2104
        logging.error("'%s' is not a directory", file_storage_dir)
2105
        result = False,
2106
      # deletes dir only if empty, otherwise we want to return False
2107
      try:
2108
        os.rmdir(file_storage_dir)
2109
      except OSError, err:
2110
        logging.exception("Cannot remove file storage directory '%s'",
2111
                          file_storage_dir)
2112
        result = False,
2113
  return result
2114

    
2115

    
2116
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2117
  """Rename the file storage directory.
2118

2119
  @type old_file_storage_dir: str
2120
  @param old_file_storage_dir: the current path
2121
  @type new_file_storage_dir: str
2122
  @param new_file_storage_dir: the name we should rename to
2123
  @rtype: tuple (success,)
2124
  @return: tuple of one element, C{success}, denoting
2125
      whether the operation was successful
2126

2127
  """
2128
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2129
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2130
  result = True,
2131
  if not old_file_storage_dir or not new_file_storage_dir:
2132
    result = False,
2133
  else:
2134
    if not os.path.exists(new_file_storage_dir):
2135
      if os.path.isdir(old_file_storage_dir):
2136
        try:
2137
          os.rename(old_file_storage_dir, new_file_storage_dir)
2138
        except OSError, err:
2139
          logging.exception("Cannot rename '%s' to '%s'",
2140
                            old_file_storage_dir, new_file_storage_dir)
2141
          result =  False,
2142
      else:
2143
        logging.error("'%s' is not a directory", old_file_storage_dir)
2144
        result = False,
2145
    else:
2146
      if os.path.exists(old_file_storage_dir):
2147
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2148
                      old_file_storage_dir, new_file_storage_dir)
2149
        result = False,
2150
  return result
2151

    
2152

    
2153
def _IsJobQueueFile(file_name):
2154
  """Checks whether the given filename is in the queue directory.
2155

2156
  @type file_name: str
2157
  @param file_name: the file name we should check
2158
  @rtype: boolean
2159
  @return: whether the file is under the queue directory
2160

2161
  """
2162
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2163
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2164

    
2165
  if not result:
2166
    logging.error("'%s' is not a file in the queue directory",
2167
                  file_name)
2168

    
2169
  return result
2170

    
2171

    
2172
def JobQueueUpdate(file_name, content):
2173
  """Updates a file in the queue directory.
2174

2175
  This is just a wrapper over L{utils.WriteFile}, with proper
2176
  checking.
2177

2178
  @type file_name: str
2179
  @param file_name: the job file name
2180
  @type content: str
2181
  @param content: the new job contents
2182
  @rtype: boolean
2183
  @return: the success of the operation
2184

2185
  """
2186
  if not _IsJobQueueFile(file_name):
2187
    return False
2188

    
2189
  # Write and replace the file atomically
2190
  utils.WriteFile(file_name, data=_Decompress(content))
2191

    
2192
  return True
2193

    
2194

    
2195
def JobQueueRename(old, new):
2196
  """Renames a job queue file.
2197

2198
  This is just a wrapper over os.rename with proper checking.
2199

2200
  @type old: str
2201
  @param old: the old (actual) file name
2202
  @type new: str
2203
  @param new: the desired file name
2204
  @rtype: boolean
2205
  @return: the success of the operation
2206

2207
  """
2208
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2209
    return False
2210

    
2211
  utils.RenameFile(old, new, mkdir=True)
2212

    
2213
  return True
2214

    
2215

    
2216
def JobQueueSetDrainFlag(drain_flag):
2217
  """Set the drain flag for the queue.
2218

2219
  This will set or unset the queue drain flag.
2220

2221
  @type drain_flag: boolean
2222
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2223
  @rtype: boolean
2224
  @return: always True
2225
  @warning: the function always returns True
2226

2227
  """
2228
  if drain_flag:
2229
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2230
  else:
2231
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2232

    
2233
  return True
2234

    
2235

    
2236
def BlockdevClose(instance_name, disks):
2237
  """Closes the given block devices.
2238

2239
  This means they will be switched to secondary mode (in case of
2240
  DRBD).
2241

2242
  @param instance_name: if the argument is not empty, the symlinks
2243
      of this instance will be removed
2244
  @type disks: list of L{objects.Disk}
2245
  @param disks: the list of disks to be closed
2246
  @rtype: tuple (success, message)
2247
  @return: a tuple of success and message, where success
2248
      indicates the succes of the operation, and message
2249
      which will contain the error details in case we
2250
      failed
2251

2252
  """
2253
  bdevs = []
2254
  for cf in disks:
2255
    rd = _RecursiveFindBD(cf)
2256
    if rd is None:
2257
      _Fail("Can't find device %s", cf)
2258
    bdevs.append(rd)
2259

    
2260
  msg = []
2261
  for rd in bdevs:
2262
    try:
2263
      rd.Close()
2264
    except errors.BlockDeviceError, err:
2265
      msg.append(str(err))
2266
  if msg:
2267
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2268
  else:
2269
    if instance_name:
2270
      _RemoveBlockDevLinks(instance_name, disks)
2271
    return (True, "All devices secondary")
2272

    
2273

    
2274
def ValidateHVParams(hvname, hvparams):
2275
  """Validates the given hypervisor parameters.
2276

2277
  @type hvname: string
2278
  @param hvname: the hypervisor name
2279
  @type hvparams: dict
2280
  @param hvparams: the hypervisor parameters to be validated
2281
  @rtype: tuple (success, message)
2282
  @return: a tuple of success and message, where success
2283
      indicates the succes of the operation, and message
2284
      which will contain the error details in case we
2285
      failed
2286

2287
  """
2288
  try:
2289
    hv_type = hypervisor.GetHypervisor(hvname)
2290
    hv_type.ValidateParameters(hvparams)
2291
    return (True, "Validation passed")
2292
  except errors.HypervisorError, err:
2293
    return (False, str(err))
2294

    
2295

    
2296
def DemoteFromMC():
2297
  """Demotes the current node from master candidate role.
2298

2299
  """
2300
  # try to ensure we're not the master by mistake
2301
  master, myself = ssconf.GetMasterAndMyself()
2302
  if master == myself:
2303
    return (False, "ssconf status shows I'm the master node, will not demote")
2304
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2305
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2306
    return (False, "The master daemon is running, will not demote")
2307
  try:
2308
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2309
  except EnvironmentError, err:
2310
    if err.errno != errno.ENOENT:
2311
      return (False, "Error while backing up cluster file: %s" % str(err))
2312
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2313
  return (True, "Done")
2314

    
2315

    
2316
def _FindDisks(nodes_ip, disks):
2317
  """Sets the physical ID on disks and returns the block devices.
2318

2319
  """
2320
  # set the correct physical ID
2321
  my_name = utils.HostInfo().name
2322
  for cf in disks:
2323
    cf.SetPhysicalID(my_name, nodes_ip)
2324

    
2325
  bdevs = []
2326

    
2327
  for cf in disks:
2328
    rd = _RecursiveFindBD(cf)
2329
    if rd is None:
2330
      return (False, "Can't find device %s" % cf)
2331
    bdevs.append(rd)
2332
  return (True, bdevs)
2333

    
2334

    
2335
def DrbdDisconnectNet(nodes_ip, disks):
2336
  """Disconnects the network on a list of drbd devices.
2337

2338
  """
2339
  status, bdevs = _FindDisks(nodes_ip, disks)
2340
  if not status:
2341
    return status, bdevs
2342

    
2343
  # disconnect disks
2344
  for rd in bdevs:
2345
    try:
2346
      rd.DisconnectNet()
2347
    except errors.BlockDeviceError, err:
2348
      _Fail("Can't change network configuration to standalone mode: %s",
2349
            err, exc=True)
2350
  return (True, "All disks are now disconnected")
2351

    
2352

    
2353
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2354
  """Attaches the network on a list of drbd devices.
2355

2356
  """
2357
  status, bdevs = _FindDisks(nodes_ip, disks)
2358
  if not status:
2359
    return status, bdevs
2360

    
2361
  if multimaster:
2362
    for idx, rd in enumerate(bdevs):
2363
      try:
2364
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2365
      except EnvironmentError, err:
2366
        _Fail("Can't create symlink: %s", err)
2367
  # reconnect disks, switch to new master configuration and if
2368
  # needed primary mode
2369
  for rd in bdevs:
2370
    try:
2371
      rd.AttachNet(multimaster)
2372
    except errors.BlockDeviceError, err:
2373
      _Fail("Can't change network configuration: %s", err)
2374
  # wait until the disks are connected; we need to retry the re-attach
2375
  # if the device becomes standalone, as this might happen if the one
2376
  # node disconnects and reconnects in a different mode before the
2377
  # other node reconnects; in this case, one or both of the nodes will
2378
  # decide it has wrong configuration and switch to standalone
2379
  RECONNECT_TIMEOUT = 2 * 60
2380
  sleep_time = 0.100 # start with 100 miliseconds
2381
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2382
  while time.time() < timeout_limit:
2383
    all_connected = True
2384
    for rd in bdevs:
2385
      stats = rd.GetProcStatus()
2386
      if not (stats.is_connected or stats.is_in_resync):
2387
        all_connected = False
2388
      if stats.is_standalone:
2389
        # peer had different config info and this node became
2390
        # standalone, even though this should not happen with the
2391
        # new staged way of changing disk configs
2392
        try:
2393
          rd.ReAttachNet(multimaster)
2394
        except errors.BlockDeviceError, err:
2395
          _Fail("Can't change network configuration: %s", err)
2396
    if all_connected:
2397
      break
2398
    time.sleep(sleep_time)
2399
    sleep_time = min(5, sleep_time * 1.5)
2400
  if not all_connected:
2401
    return (False, "Timeout in disk reconnecting")
2402
  if multimaster:
2403
    # change to primary mode
2404
    for rd in bdevs:
2405
      try:
2406
        rd.Open()
2407
      except errors.BlockDeviceError, err:
2408
        _Fail("Can't change to primary mode: %s", err)
2409
  if multimaster:
2410
    msg = "multi-master and primary"
2411
  else:
2412
    msg = "single-master"
2413
  return (True, "Disks are now configured as %s" % msg)
2414

    
2415

    
2416
def DrbdWaitSync(nodes_ip, disks):
2417
  """Wait until DRBDs have synchronized.
2418

2419
  """
2420
  status, bdevs = _FindDisks(nodes_ip, disks)
2421
  if not status:
2422
    return status, bdevs
2423

    
2424
  min_resync = 100
2425
  alldone = True
2426
  failure = False
2427
  for rd in bdevs:
2428
    stats = rd.GetProcStatus()
2429
    if not (stats.is_connected or stats.is_in_resync):
2430
      failure = True
2431
      break
2432
    alldone = alldone and (not stats.is_in_resync)
2433
    if stats.sync_percent is not None:
2434
      min_resync = min(min_resync, stats.sync_percent)
2435
  return (not failure, (alldone, min_resync))
2436

    
2437

    
2438
def PowercycleNode(hypervisor_type):
2439
  """Hard-powercycle the node.
2440

2441
  Because we need to return first, and schedule the powercycle in the
2442
  background, we won't be able to report failures nicely.
2443

2444
  """
2445
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2446
  try:
2447
    pid = os.fork()
2448
  except OSError, err:
2449
    # if we can't fork, we'll pretend that we're in the child process
2450
    pid = 0
2451
  if pid > 0:
2452
    return (True, "Reboot scheduled in 5 seconds")
2453
  time.sleep(5)
2454
  hyper.PowercycleNode()
2455

    
2456

    
2457
class HooksRunner(object):
2458
  """Hook runner.
2459

2460
  This class is instantiated on the node side (ganeti-noded) and not
2461
  on the master side.
2462

2463
  """
2464
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2465

    
2466
  def __init__(self, hooks_base_dir=None):
2467
    """Constructor for hooks runner.
2468

2469
    @type hooks_base_dir: str or None
2470
    @param hooks_base_dir: if not None, this overrides the
2471
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2472

2473
    """
2474
    if hooks_base_dir is None:
2475
      hooks_base_dir = constants.HOOKS_BASE_DIR
2476
    self._BASE_DIR = hooks_base_dir
2477

    
2478
  @staticmethod
2479
  def ExecHook(script, env):
2480
    """Exec one hook script.
2481

2482
    @type script: str
2483
    @param script: the full path to the script
2484
    @type env: dict
2485
    @param env: the environment with which to exec the script
2486
    @rtype: tuple (success, message)
2487
    @return: a tuple of success and message, where success
2488
        indicates the succes of the operation, and message
2489
        which will contain the error details in case we
2490
        failed
2491

2492
    """
2493
    # exec the process using subprocess and log the output
2494
    fdstdin = None
2495
    try:
2496
      fdstdin = open("/dev/null", "r")
2497
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2498
                               stderr=subprocess.STDOUT, close_fds=True,
2499
                               shell=False, cwd="/", env=env)
2500
      output = ""
2501
      try:
2502
        output = child.stdout.read(4096)
2503
        child.stdout.close()
2504
      except EnvironmentError, err:
2505
        output += "Hook script error: %s" % str(err)
2506

    
2507
      while True:
2508
        try:
2509
          result = child.wait()
2510
          break
2511
        except EnvironmentError, err:
2512
          if err.errno == errno.EINTR:
2513
            continue
2514
          raise
2515
    finally:
2516
      # try not to leak fds
2517
      for fd in (fdstdin, ):
2518
        if fd is not None:
2519
          try:
2520
            fd.close()
2521
          except EnvironmentError, err:
2522
            # just log the error
2523
            #logging.exception("Error while closing fd %s", fd)
2524
            pass
2525

    
2526
    return result == 0, utils.SafeEncode(output.strip())
2527

    
2528
  def RunHooks(self, hpath, phase, env):
2529
    """Run the scripts in the hooks directory.
2530

2531
    @type hpath: str
2532
    @param hpath: the path to the hooks directory which
2533
        holds the scripts
2534
    @type phase: str
2535
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2536
        L{constants.HOOKS_PHASE_POST}
2537
    @type env: dict
2538
    @param env: dictionary with the environment for the hook
2539
    @rtype: list
2540
    @return: list of 3-element tuples:
2541
      - script path
2542
      - script result, either L{constants.HKR_SUCCESS} or
2543
        L{constants.HKR_FAIL}
2544
      - output of the script
2545

2546
    @raise errors.ProgrammerError: for invalid input
2547
        parameters
2548

2549
    """
2550
    if phase == constants.HOOKS_PHASE_PRE:
2551
      suffix = "pre"
2552
    elif phase == constants.HOOKS_PHASE_POST:
2553
      suffix = "post"
2554
    else:
2555
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2556
    rr = []
2557

    
2558
    subdir = "%s-%s.d" % (hpath, suffix)
2559
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2560
    try:
2561
      dir_contents = utils.ListVisibleFiles(dir_name)
2562
    except OSError, err:
2563
      # FIXME: must log output in case of failures
2564
      return rr
2565

    
2566
    # we use the standard python sort order,
2567
    # so 00name is the recommended naming scheme
2568
    dir_contents.sort()
2569
    for relname in dir_contents:
2570
      fname = os.path.join(dir_name, relname)
2571
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2572
          self.RE_MASK.match(relname) is not None):
2573
        rrval = constants.HKR_SKIP
2574
        output = ""
2575
      else:
2576
        result, output = self.ExecHook(fname, env)
2577
        if not result:
2578
          rrval = constants.HKR_FAIL
2579
        else:
2580
          rrval = constants.HKR_SUCCESS
2581
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2582

    
2583
    return rr
2584

    
2585

    
2586
class IAllocatorRunner(object):
2587
  """IAllocator runner.
2588

2589
  This class is instantiated on the node side (ganeti-noded) and not on
2590
  the master side.
2591

2592
  """
2593
  def Run(self, name, idata):
2594
    """Run an iallocator script.
2595

2596
    @type name: str
2597
    @param name: the iallocator script name
2598
    @type idata: str
2599
    @param idata: the allocator input data
2600

2601
    @rtype: tuple
2602
    @return: four element tuple of:
2603
       - run status (one of the IARUN_ constants)
2604
       - stdout
2605
       - stderr
2606
       - fail reason (as from L{utils.RunResult})
2607

2608
    """
2609
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2610
                                  os.path.isfile)
2611
    if alloc_script is None:
2612
      return (constants.IARUN_NOTFOUND, None, None, None)
2613

    
2614
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2615
    try:
2616
      os.write(fd, idata)
2617
      os.close(fd)
2618
      result = utils.RunCmd([alloc_script, fin_name])
2619
      if result.failed:
2620
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2621
                result.fail_reason)
2622
    finally:
2623
      os.unlink(fin_name)
2624

    
2625
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2626

    
2627

    
2628
class DevCacheManager(object):
2629
  """Simple class for managing a cache of block device information.
2630

2631
  """
2632
  _DEV_PREFIX = "/dev/"
2633
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2634

    
2635
  @classmethod
2636
  def _ConvertPath(cls, dev_path):
2637
    """Converts a /dev/name path to the cache file name.
2638

2639
    This replaces slashes with underscores and strips the /dev
2640
    prefix. It then returns the full path to the cache file.
2641

2642
    @type dev_path: str
2643
    @param dev_path: the C{/dev/} path name
2644
    @rtype: str
2645
    @return: the converted path name
2646

2647
    """
2648
    if dev_path.startswith(cls._DEV_PREFIX):
2649
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2650
    dev_path = dev_path.replace("/", "_")
2651
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2652
    return fpath
2653

    
2654
  @classmethod
2655
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2656
    """Updates the cache information for a given device.
2657

2658
    @type dev_path: str
2659
    @param dev_path: the pathname of the device
2660
    @type owner: str
2661
    @param owner: the owner (instance name) of the device
2662
    @type on_primary: bool
2663
    @param on_primary: whether this is the primary
2664
        node nor not
2665
    @type iv_name: str
2666
    @param iv_name: the instance-visible name of the
2667
        device, as in objects.Disk.iv_name
2668

2669
    @rtype: None
2670

2671
    """
2672
    if dev_path is None:
2673
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2674
      return
2675
    fpath = cls._ConvertPath(dev_path)
2676
    if on_primary:
2677
      state = "primary"
2678
    else:
2679
      state = "secondary"
2680
    if iv_name is None:
2681
      iv_name = "not_visible"
2682
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2683
    try:
2684
      utils.WriteFile(fpath, data=fdata)
2685
    except EnvironmentError, err:
2686
      logging.exception("Can't update bdev cache for %s", dev_path)
2687

    
2688
  @classmethod
2689
  def RemoveCache(cls, dev_path):
2690
    """Remove data for a dev_path.
2691

2692
    This is just a wrapper over L{utils.RemoveFile} with a converted
2693
    path name and logging.
2694

2695
    @type dev_path: str
2696
    @param dev_path: the pathname of the device
2697

2698
    @rtype: None
2699

2700
    """
2701
    if dev_path is None:
2702
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2703
      return
2704
    fpath = cls._ConvertPath(dev_path)
2705
    try:
2706
      utils.RemoveFile(fpath)
2707
    except EnvironmentError, err:
2708
      logging.exception("Can't update bdev cache for %s", dev_path)