Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 2503680f

History | View | Annotate | Download (81.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
def _GetConfig():
50
  """Simple wrapper to return a SimpleStore.
51

52
  @rtype: L{ssconf.SimpleStore}
53
  @return: a SimpleStore instance
54

55
  """
56
  return ssconf.SimpleStore()
57

    
58

    
59
def _GetSshRunner(cluster_name):
60
  """Simple wrapper to return an SshRunner.
61

62
  @type cluster_name: str
63
  @param cluster_name: the cluster name, which is needed
64
      by the SshRunner constructor
65
  @rtype: L{ssh.SshRunner}
66
  @return: an SshRunner instance
67

68
  """
69
  return ssh.SshRunner(cluster_name)
70

    
71

    
72
def _Decompress(data):
73
  """Unpacks data compressed by the RPC client.
74

75
  @type data: list or tuple
76
  @param data: Data sent by RPC client
77
  @rtype: str
78
  @return: Decompressed data
79

80
  """
81
  assert isinstance(data, (list, tuple))
82
  assert len(data) == 2
83
  (encoding, content) = data
84
  if encoding == constants.RPC_ENCODING_NONE:
85
    return content
86
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87
    return zlib.decompress(base64.b64decode(content))
88
  else:
89
    raise AssertionError("Unknown data encoding")
90

    
91

    
92
def _CleanDirectory(path, exclude=None):
93
  """Removes all regular files in a directory.
94

95
  @type path: str
96
  @param path: the directory to clean
97
  @type exclude: list
98
  @param exclude: list of files to be excluded, defaults
99
      to the empty list
100

101
  """
102
  if not os.path.isdir(path):
103
    return
104
  if exclude is None:
105
    exclude = []
106
  else:
107
    # Normalize excluded paths
108
    exclude = [os.path.normpath(i) for i in exclude]
109

    
110
  for rel_name in utils.ListVisibleFiles(path):
111
    full_name = os.path.normpath(os.path.join(path, rel_name))
112
    if full_name in exclude:
113
      continue
114
    if os.path.isfile(full_name) and not os.path.islink(full_name):
115
      utils.RemoveFile(full_name)
116

    
117

    
118
def JobQueuePurge():
119
  """Removes job queue files and archived jobs.
120

121
  @rtype: None
122

123
  """
124
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126

    
127

    
128
def GetMasterInfo():
129
  """Returns master information.
130

131
  This is an utility function to compute master information, either
132
  for consumption here or from the node daemon.
133

134
  @rtype: tuple
135
  @return: (master_netdev, master_ip, master_name) if we have a good
136
      configuration, otherwise (None, None, None)
137

138
  """
139
  try:
140
    cfg = _GetConfig()
141
    master_netdev = cfg.GetMasterNetdev()
142
    master_ip = cfg.GetMasterIP()
143
    master_node = cfg.GetMasterNode()
144
  except errors.ConfigurationError, err:
145
    logging.exception("Cluster configuration incomplete")
146
    return (None, None, None)
147
  return (master_netdev, master_ip, master_node)
148

    
149

    
150
def StartMaster(start_daemons, no_voting):
151
  """Activate local node as master node.
152

153
  The function will always try activate the IP address of the master
154
  (unless someone else has it). It will also start the master daemons,
155
  based on the start_daemons parameter.
156

157
  @type start_daemons: boolean
158
  @param start_daemons: whther to also start the master
159
      daemons (ganeti-masterd and ganeti-rapi)
160
  @type no_voting: boolean
161
  @param no_voting: whether to start ganeti-masterd without a node vote
162
      (if start_daemons is True), but still non-interactively
163
  @rtype: None
164

165
  """
166
  ok = True
167
  master_netdev, master_ip, _ = GetMasterInfo()
168
  if not master_netdev:
169
    return False
170

    
171
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
172
    if utils.OwnIpAddress(master_ip):
173
      # we already have the ip:
174
      logging.debug("Already started")
175
    else:
176
      logging.error("Someone else has the master ip, not activating")
177
      ok = False
178
  else:
179
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
180
                           "dev", master_netdev, "label",
181
                           "%s:0" % master_netdev])
182
    if result.failed:
183
      logging.error("Can't activate master IP: %s", result.output)
184
      ok = False
185

    
186
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
187
                           "-s", master_ip, master_ip])
188
    # we'll ignore the exit code of arping
189

    
190
  # and now start the master and rapi daemons
191
  if start_daemons:
192
    daemons_params = {
193
        'ganeti-masterd': [],
194
        'ganeti-rapi': [],
195
        }
196
    if no_voting:
197
      daemons_params['ganeti-masterd'].append('--no-voting')
198
      daemons_params['ganeti-masterd'].append('--yes-do-it')
199
    for daemon in daemons_params:
200
      cmd = [daemon]
201
      cmd.extend(daemons_params[daemon])
202
      result = utils.RunCmd(cmd)
203
      if result.failed:
204
        logging.error("Can't start daemon %s: %s", daemon, result.output)
205
        ok = False
206
  return ok
207

    
208

    
209
def StopMaster(stop_daemons):
210
  """Deactivate this node as master.
211

212
  The function will always try to deactivate the IP address of the
213
  master. It will also stop the master daemons depending on the
214
  stop_daemons parameter.
215

216
  @type stop_daemons: boolean
217
  @param stop_daemons: whether to also stop the master daemons
218
      (ganeti-masterd and ganeti-rapi)
219
  @rtype: None
220

221
  """
222
  master_netdev, master_ip, _ = GetMasterInfo()
223
  if not master_netdev:
224
    return False
225

    
226
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
227
                         "dev", master_netdev])
228
  if result.failed:
229
    logging.error("Can't remove the master IP, error: %s", result.output)
230
    # but otherwise ignore the failure
231

    
232
  if stop_daemons:
233
    # stop/kill the rapi and the master daemon
234
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
235
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
236

    
237
  return True
238

    
239

    
240
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
241
  """Joins this node to the cluster.
242

243
  This does the following:
244
      - updates the hostkeys of the machine (rsa and dsa)
245
      - adds the ssh private key to the user
246
      - adds the ssh public key to the users' authorized_keys file
247

248
  @type dsa: str
249
  @param dsa: the DSA private key to write
250
  @type dsapub: str
251
  @param dsapub: the DSA public key to write
252
  @type rsa: str
253
  @param rsa: the RSA private key to write
254
  @type rsapub: str
255
  @param rsapub: the RSA public key to write
256
  @type sshkey: str
257
  @param sshkey: the SSH private key to write
258
  @type sshpub: str
259
  @param sshpub: the SSH public key to write
260
  @rtype: boolean
261
  @return: the success of the operation
262

263
  """
264
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
265
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
266
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
267
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
268
  for name, content, mode in sshd_keys:
269
    utils.WriteFile(name, data=content, mode=mode)
270

    
271
  try:
272
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
273
                                                    mkdir=True)
274
  except errors.OpExecError, err:
275
    msg = "Error while processing user ssh files"
276
    logging.exception(msg)
277
    return (False, "%s: %s" % (msg, err))
278

    
279
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
280
    utils.WriteFile(name, data=content, mode=0600)
281

    
282
  utils.AddAuthorizedKey(auth_keys, sshpub)
283

    
284
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
285

    
286
  return (True, "Node added successfully")
287

    
288

    
289
def LeaveCluster():
290
  """Cleans up and remove the current node.
291

292
  This function cleans up and prepares the current node to be removed
293
  from the cluster.
294

295
  If processing is successful, then it raises an
296
  L{errors.QuitGanetiException} which is used as a special case to
297
  shutdown the node daemon.
298

299
  """
300
  _CleanDirectory(constants.DATA_DIR)
301
  JobQueuePurge()
302

    
303
  try:
304
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
305
  except errors.OpExecError:
306
    logging.exception("Error while processing ssh files")
307
    return
308

    
309
  f = open(pub_key, 'r')
310
  try:
311
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
312
  finally:
313
    f.close()
314

    
315
  utils.RemoveFile(priv_key)
316
  utils.RemoveFile(pub_key)
317

    
318
  # Return a reassuring string to the caller, and quit
319
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
320

    
321

    
322
def GetNodeInfo(vgname, hypervisor_type):
323
  """Gives back a hash with different informations about the node.
324

325
  @type vgname: C{string}
326
  @param vgname: the name of the volume group to ask for disk space information
327
  @type hypervisor_type: C{str}
328
  @param hypervisor_type: the name of the hypervisor to ask for
329
      memory information
330
  @rtype: C{dict}
331
  @return: dictionary with the following keys:
332
      - vg_size is the size of the configured volume group in MiB
333
      - vg_free is the free size of the volume group in MiB
334
      - memory_dom0 is the memory allocated for domain0 in MiB
335
      - memory_free is the currently available (free) ram in MiB
336
      - memory_total is the total number of ram in MiB
337

338
  """
339
  outputarray = {}
340
  vginfo = _GetVGInfo(vgname)
341
  outputarray['vg_size'] = vginfo['vg_size']
342
  outputarray['vg_free'] = vginfo['vg_free']
343

    
344
  hyper = hypervisor.GetHypervisor(hypervisor_type)
345
  hyp_info = hyper.GetNodeInfo()
346
  if hyp_info is not None:
347
    outputarray.update(hyp_info)
348

    
349
  f = open("/proc/sys/kernel/random/boot_id", 'r')
350
  try:
351
    outputarray["bootid"] = f.read(128).rstrip("\n")
352
  finally:
353
    f.close()
354

    
355
  return outputarray
356

    
357

    
358
def VerifyNode(what, cluster_name):
359
  """Verify the status of the local node.
360

361
  Based on the input L{what} parameter, various checks are done on the
362
  local node.
363

364
  If the I{filelist} key is present, this list of
365
  files is checksummed and the file/checksum pairs are returned.
366

367
  If the I{nodelist} key is present, we check that we have
368
  connectivity via ssh with the target nodes (and check the hostname
369
  report).
370

371
  If the I{node-net-test} key is present, we check that we have
372
  connectivity to the given nodes via both primary IP and, if
373
  applicable, secondary IPs.
374

375
  @type what: C{dict}
376
  @param what: a dictionary of things to check:
377
      - filelist: list of files for which to compute checksums
378
      - nodelist: list of nodes we should check ssh communication with
379
      - node-net-test: list of nodes we should check node daemon port
380
        connectivity with
381
      - hypervisor: list with hypervisors to run the verify for
382
  @rtype: dict
383
  @return: a dictionary with the same keys as the input dict, and
384
      values representing the result of the checks
385

386
  """
387
  result = {}
388

    
389
  if constants.NV_HYPERVISOR in what:
390
    result[constants.NV_HYPERVISOR] = tmp = {}
391
    for hv_name in what[constants.NV_HYPERVISOR]:
392
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
393

    
394
  if constants.NV_FILELIST in what:
395
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
396
      what[constants.NV_FILELIST])
397

    
398
  if constants.NV_NODELIST in what:
399
    result[constants.NV_NODELIST] = tmp = {}
400
    random.shuffle(what[constants.NV_NODELIST])
401
    for node in what[constants.NV_NODELIST]:
402
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
403
      if not success:
404
        tmp[node] = message
405

    
406
  if constants.NV_NODENETTEST in what:
407
    result[constants.NV_NODENETTEST] = tmp = {}
408
    my_name = utils.HostInfo().name
409
    my_pip = my_sip = None
410
    for name, pip, sip in what[constants.NV_NODENETTEST]:
411
      if name == my_name:
412
        my_pip = pip
413
        my_sip = sip
414
        break
415
    if not my_pip:
416
      tmp[my_name] = ("Can't find my own primary/secondary IP"
417
                      " in the node list")
418
    else:
419
      port = utils.GetNodeDaemonPort()
420
      for name, pip, sip in what[constants.NV_NODENETTEST]:
421
        fail = []
422
        if not utils.TcpPing(pip, port, source=my_pip):
423
          fail.append("primary")
424
        if sip != pip:
425
          if not utils.TcpPing(sip, port, source=my_sip):
426
            fail.append("secondary")
427
        if fail:
428
          tmp[name] = ("failure using the %s interface(s)" %
429
                       " and ".join(fail))
430

    
431
  if constants.NV_LVLIST in what:
432
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
433

    
434
  if constants.NV_INSTANCELIST in what:
435
    result[constants.NV_INSTANCELIST] = GetInstanceList(
436
      what[constants.NV_INSTANCELIST])
437

    
438
  if constants.NV_VGLIST in what:
439
    result[constants.NV_VGLIST] = ListVolumeGroups()
440

    
441
  if constants.NV_VERSION in what:
442
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
443
                                    constants.RELEASE_VERSION)
444

    
445
  if constants.NV_HVINFO in what:
446
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
447
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
448

    
449
  if constants.NV_DRBDLIST in what:
450
    try:
451
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
452
    except errors.BlockDeviceError, err:
453
      logging.warning("Can't get used minors list", exc_info=True)
454
      used_minors = str(err)
455
    result[constants.NV_DRBDLIST] = used_minors
456

    
457
  return result
458

    
459

    
460
def GetVolumeList(vg_name):
461
  """Compute list of logical volumes and their size.
462

463
  @type vg_name: str
464
  @param vg_name: the volume group whose LVs we should list
465
  @rtype: dict
466
  @return:
467
      dictionary of all partions (key) with value being a tuple of
468
      their size (in MiB), inactive and online status::
469

470
        {'test1': ('20.06', True, True)}
471

472
      in case of errors, a string is returned with the error
473
      details.
474

475
  """
476
  lvs = {}
477
  sep = '|'
478
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
479
                         "--separator=%s" % sep,
480
                         "-olv_name,lv_size,lv_attr", vg_name])
481
  if result.failed:
482
    logging.error("Failed to list logical volumes, lvs output: %s",
483
                  result.output)
484
    return result.output
485

    
486
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
487
  for line in result.stdout.splitlines():
488
    line = line.strip()
489
    match = valid_line_re.match(line)
490
    if not match:
491
      logging.error("Invalid line returned from lvs output: '%s'", line)
492
      continue
493
    name, size, attr = match.groups()
494
    inactive = attr[4] == '-'
495
    online = attr[5] == 'o'
496
    lvs[name] = (size, inactive, online)
497

    
498
  return lvs
499

    
500

    
501
def ListVolumeGroups():
502
  """List the volume groups and their size.
503

504
  @rtype: dict
505
  @return: dictionary with keys volume name and values the
506
      size of the volume
507

508
  """
509
  return utils.ListVolumeGroups()
510

    
511

    
512
def NodeVolumes():
513
  """List all volumes on this node.
514

515
  @rtype: list
516
  @return:
517
    A list of dictionaries, each having four keys:
518
      - name: the logical volume name,
519
      - size: the size of the logical volume
520
      - dev: the physical device on which the LV lives
521
      - vg: the volume group to which it belongs
522

523
    In case of errors, we return an empty list and log the
524
    error.
525

526
    Note that since a logical volume can live on multiple physical
527
    volumes, the resulting list might include a logical volume
528
    multiple times.
529

530
  """
531
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
532
                         "--separator=|",
533
                         "--options=lv_name,lv_size,devices,vg_name"])
534
  if result.failed:
535
    logging.error("Failed to list logical volumes, lvs output: %s",
536
                  result.output)
537
    return []
538

    
539
  def parse_dev(dev):
540
    if '(' in dev:
541
      return dev.split('(')[0]
542
    else:
543
      return dev
544

    
545
  def map_line(line):
546
    return {
547
      'name': line[0].strip(),
548
      'size': line[1].strip(),
549
      'dev': parse_dev(line[2].strip()),
550
      'vg': line[3].strip(),
551
    }
552

    
553
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
554
          if line.count('|') >= 3]
555

    
556

    
557
def BridgesExist(bridges_list):
558
  """Check if a list of bridges exist on the current node.
559

560
  @rtype: boolean
561
  @return: C{True} if all of them exist, C{False} otherwise
562

563
  """
564
  for bridge in bridges_list:
565
    if not utils.BridgeExists(bridge):
566
      return False
567

    
568
  return True
569

    
570

    
571
def GetInstanceList(hypervisor_list):
572
  """Provides a list of instances.
573

574
  @type hypervisor_list: list
575
  @param hypervisor_list: the list of hypervisors to query information
576

577
  @rtype: list
578
  @return: a list of all running instances on the current node
579
    - instance1.example.com
580
    - instance2.example.com
581

582
  """
583
  results = []
584
  for hname in hypervisor_list:
585
    try:
586
      names = hypervisor.GetHypervisor(hname).ListInstances()
587
      results.extend(names)
588
    except errors.HypervisorError, err:
589
      logging.exception("Error enumerating instances for hypevisor %s", hname)
590
      raise
591

    
592
  return results
593

    
594

    
595
def GetInstanceInfo(instance, hname):
596
  """Gives back the informations about an instance as a dictionary.
597

598
  @type instance: string
599
  @param instance: the instance name
600
  @type hname: string
601
  @param hname: the hypervisor type of the instance
602

603
  @rtype: dict
604
  @return: dictionary with the following keys:
605
      - memory: memory size of instance (int)
606
      - state: xen state of instance (string)
607
      - time: cpu time of instance (float)
608

609
  """
610
  output = {}
611

    
612
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
613
  if iinfo is not None:
614
    output['memory'] = iinfo[2]
615
    output['state'] = iinfo[4]
616
    output['time'] = iinfo[5]
617

    
618
  return output
619

    
620

    
621
def GetInstanceMigratable(instance):
622
  """Gives whether an instance can be migrated.
623

624
  @type instance: L{objects.Instance}
625
  @param instance: object representing the instance to be checked.
626

627
  @rtype: tuple
628
  @return: tuple of (result, description) where:
629
      - result: whether the instance can be migrated or not
630
      - description: a description of the issue, if relevant
631

632
  """
633
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
634
  if instance.name not in hyper.ListInstances():
635
    return (False, 'not running')
636

    
637
  for idx in range(len(instance.disks)):
638
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
639
    if not os.path.islink(link_name):
640
      return (False, 'not restarted since ganeti 1.2.5')
641

    
642
  return (True, '')
643

    
644

    
645
def GetAllInstancesInfo(hypervisor_list):
646
  """Gather data about all instances.
647

648
  This is the equivalent of L{GetInstanceInfo}, except that it
649
  computes data for all instances at once, thus being faster if one
650
  needs data about more than one instance.
651

652
  @type hypervisor_list: list
653
  @param hypervisor_list: list of hypervisors to query for instance data
654

655
  @rtype: dict
656
  @return: dictionary of instance: data, with data having the following keys:
657
      - memory: memory size of instance (int)
658
      - state: xen state of instance (string)
659
      - time: cpu time of instance (float)
660
      - vcpus: the number of vcpus
661

662
  """
663
  output = {}
664

    
665
  for hname in hypervisor_list:
666
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
667
    if iinfo:
668
      for name, inst_id, memory, vcpus, state, times in iinfo:
669
        value = {
670
          'memory': memory,
671
          'vcpus': vcpus,
672
          'state': state,
673
          'time': times,
674
          }
675
        if name in output:
676
          # we only check static parameters, like memory and vcpus,
677
          # and not state and time which can change between the
678
          # invocations of the different hypervisors
679
          for key in 'memory', 'vcpus':
680
            if value[key] != output[name][key]:
681
              raise errors.HypervisorError("Instance %s is running twice"
682
                                           " with different parameters" % name)
683
        output[name] = value
684

    
685
  return output
686

    
687

    
688
def InstanceOsAdd(instance):
689
  """Add an OS to an instance.
690

691
  @type instance: L{objects.Instance}
692
  @param instance: Instance whose OS is to be installed
693
  @rtype: boolean
694
  @return: the success of the operation
695

696
  """
697
  try:
698
    inst_os = OSFromDisk(instance.os)
699
  except errors.InvalidOS, err:
700
    os_name, os_dir, os_err = err.args
701
    if os_dir is None:
702
      return (False, "Can't find OS '%s': %s" % (os_name, os_err))
703
    else:
704
      return (False, "Error parsing OS '%s' in directory %s: %s" %
705
              (os_name, os_dir, os_err))
706

    
707
  create_env = OSEnvironment(instance)
708

    
709
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
710
                                     instance.name, int(time.time()))
711

    
712
  result = utils.RunCmd([inst_os.create_script], env=create_env,
713
                        cwd=inst_os.path, output=logfile,)
714
  if result.failed:
715
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
716
                  " output: %s", result.cmd, result.fail_reason, logfile,
717
                  result.output)
718
    lines = [utils.SafeEncode(val)
719
             for val in utils.TailFile(logfile, lines=20)]
720
    return (False, "OS create script failed (%s), last lines in the"
721
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
722

    
723
  return (True, "Successfully installed")
724

    
725

    
726
def RunRenameInstance(instance, old_name):
727
  """Run the OS rename script for an instance.
728

729
  @type instance: L{objects.Instance}
730
  @param instance: Instance whose OS is to be installed
731
  @type old_name: string
732
  @param old_name: previous instance name
733
  @rtype: boolean
734
  @return: the success of the operation
735

736
  """
737
  inst_os = OSFromDisk(instance.os)
738

    
739
  rename_env = OSEnvironment(instance)
740
  rename_env['OLD_INSTANCE_NAME'] = old_name
741

    
742
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
743
                                           old_name,
744
                                           instance.name, int(time.time()))
745

    
746
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
747
                        cwd=inst_os.path, output=logfile)
748

    
749
  if result.failed:
750
    logging.error("os create command '%s' returned error: %s output: %s",
751
                  result.cmd, result.fail_reason, result.output)
752
    lines = [utils.SafeEncode(val)
753
             for val in utils.TailFile(logfile, lines=20)]
754
    return (False, "OS rename script failed (%s), last lines in the"
755
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
756

    
757
  return (True, "Rename successful")
758

    
759

    
760
def _GetVGInfo(vg_name):
761
  """Get informations about the volume group.
762

763
  @type vg_name: str
764
  @param vg_name: the volume group which we query
765
  @rtype: dict
766
  @return:
767
    A dictionary with the following keys:
768
      - C{vg_size} is the total size of the volume group in MiB
769
      - C{vg_free} is the free size of the volume group in MiB
770
      - C{pv_count} are the number of physical disks in that VG
771

772
    If an error occurs during gathering of data, we return the same dict
773
    with keys all set to None.
774

775
  """
776
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
777

    
778
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
779
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
780

    
781
  if retval.failed:
782
    logging.error("volume group %s not present", vg_name)
783
    return retdic
784
  valarr = retval.stdout.strip().rstrip(':').split(':')
785
  if len(valarr) == 3:
786
    try:
787
      retdic = {
788
        "vg_size": int(round(float(valarr[0]), 0)),
789
        "vg_free": int(round(float(valarr[1]), 0)),
790
        "pv_count": int(valarr[2]),
791
        }
792
    except ValueError, err:
793
      logging.exception("Fail to parse vgs output")
794
  else:
795
    logging.error("vgs output has the wrong number of fields (expected"
796
                  " three): %s", str(valarr))
797
  return retdic
798

    
799

    
800
def _GetBlockDevSymlinkPath(instance_name, idx):
801
  return os.path.join(constants.DISK_LINKS_DIR,
802
                      "%s:%d" % (instance_name, idx))
803

    
804

    
805
def _SymlinkBlockDev(instance_name, device_path, idx):
806
  """Set up symlinks to a instance's block device.
807

808
  This is an auxiliary function run when an instance is start (on the primary
809
  node) or when an instance is migrated (on the target node).
810

811

812
  @param instance_name: the name of the target instance
813
  @param device_path: path of the physical block device, on the node
814
  @param idx: the disk index
815
  @return: absolute path to the disk's symlink
816

817
  """
818
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
819
  try:
820
    os.symlink(device_path, link_name)
821
  except OSError, err:
822
    if err.errno == errno.EEXIST:
823
      if (not os.path.islink(link_name) or
824
          os.readlink(link_name) != device_path):
825
        os.remove(link_name)
826
        os.symlink(device_path, link_name)
827
    else:
828
      raise
829

    
830
  return link_name
831

    
832

    
833
def _RemoveBlockDevLinks(instance_name, disks):
834
  """Remove the block device symlinks belonging to the given instance.
835

836
  """
837
  for idx, disk in enumerate(disks):
838
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
839
    if os.path.islink(link_name):
840
      try:
841
        os.remove(link_name)
842
      except OSError:
843
        logging.exception("Can't remove symlink '%s'", link_name)
844

    
845

    
846
def _GatherAndLinkBlockDevs(instance):
847
  """Set up an instance's block device(s).
848

849
  This is run on the primary node at instance startup. The block
850
  devices must be already assembled.
851

852
  @type instance: L{objects.Instance}
853
  @param instance: the instance whose disks we shoul assemble
854
  @rtype: list
855
  @return: list of (disk_object, device_path)
856

857
  """
858
  block_devices = []
859
  for idx, disk in enumerate(instance.disks):
860
    device = _RecursiveFindBD(disk)
861
    if device is None:
862
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
863
                                    str(disk))
864
    device.Open()
865
    try:
866
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
867
    except OSError, e:
868
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
869
                                    e.strerror)
870

    
871
    block_devices.append((disk, link_name))
872

    
873
  return block_devices
874

    
875

    
876
def StartInstance(instance):
877
  """Start an instance.
878

879
  @type instance: L{objects.Instance}
880
  @param instance: the instance object
881
  @rtype: boolean
882
  @return: whether the startup was successful or not
883

884
  """
885
  running_instances = GetInstanceList([instance.hypervisor])
886

    
887
  if instance.name in running_instances:
888
    return (True, "Already running")
889

    
890
  try:
891
    block_devices = _GatherAndLinkBlockDevs(instance)
892
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
893
    hyper.StartInstance(instance, block_devices)
894
  except errors.BlockDeviceError, err:
895
    logging.exception("Failed to start instance")
896
    return (False, "Block device error: %s" % str(err))
897
  except errors.HypervisorError, err:
898
    logging.exception("Failed to start instance")
899
    _RemoveBlockDevLinks(instance.name, instance.disks)
900
    return (False, "Hypervisor error: %s" % str(err))
901

    
902
  return (True, "Instance started successfully")
903

    
904

    
905
def InstanceShutdown(instance):
906
  """Shut an instance down.
907

908
  @note: this functions uses polling with a hardcoded timeout.
909

910
  @type instance: L{objects.Instance}
911
  @param instance: the instance object
912
  @rtype: boolean
913
  @return: whether the startup was successful or not
914

915
  """
916
  hv_name = instance.hypervisor
917
  running_instances = GetInstanceList([hv_name])
918

    
919
  if instance.name not in running_instances:
920
    return (True, "Instance already stopped")
921

    
922
  hyper = hypervisor.GetHypervisor(hv_name)
923
  try:
924
    hyper.StopInstance(instance)
925
  except errors.HypervisorError, err:
926
    msg = "Failed to stop instance %s: %s" % (instance.name, err)
927
    logging.error(msg)
928
    return (False, msg)
929

    
930
  # test every 10secs for 2min
931

    
932
  time.sleep(1)
933
  for dummy in range(11):
934
    if instance.name not in GetInstanceList([hv_name]):
935
      break
936
    time.sleep(10)
937
  else:
938
    # the shutdown did not succeed
939
    logging.error("Shutdown of '%s' unsuccessful, using destroy",
940
                  instance.name)
941

    
942
    try:
943
      hyper.StopInstance(instance, force=True)
944
    except errors.HypervisorError, err:
945
      msg = "Failed to force stop instance %s: %s" % (instance.name, err)
946
      logging.error(msg)
947
      return (False, msg)
948

    
949
    time.sleep(1)
950
    if instance.name in GetInstanceList([hv_name]):
951
      msg = ("Could not shutdown instance %s even by destroy" %
952
             instance.name)
953
      logging.error(msg)
954
      return (False, msg)
955

    
956
  _RemoveBlockDevLinks(instance.name, instance.disks)
957

    
958
  return (True, "Instance has been shutdown successfully")
959

    
960

    
961
def InstanceReboot(instance, reboot_type):
962
  """Reboot an instance.
963

964
  @type instance: L{objects.Instance}
965
  @param instance: the instance object to reboot
966
  @type reboot_type: str
967
  @param reboot_type: the type of reboot, one the following
968
    constants:
969
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
970
        instance OS, do not recreate the VM
971
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
972
        restart the VM (at the hypervisor level)
973
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
974
        not accepted here, since that mode is handled differently, in
975
        cmdlib, and translates into full stop and start of the
976
        instance (instead of a call_instance_reboot RPC)
977
  @rtype: boolean
978
  @return: the success of the operation
979

980
  """
981
  running_instances = GetInstanceList([instance.hypervisor])
982

    
983
  if instance.name not in running_instances:
984
    msg = "Cannot reboot instance %s that is not running" % instance.name
985
    logging.error(msg)
986
    return (False, msg)
987

    
988
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
989
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
990
    try:
991
      hyper.RebootInstance(instance)
992
    except errors.HypervisorError, err:
993
      msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
994
      logging.error(msg)
995
      return (False, msg)
996
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
997
    try:
998
      stop_result = InstanceShutdown(instance)
999
      if not stop_result[0]:
1000
        return stop_result
1001
      return StartInstance(instance)
1002
    except errors.HypervisorError, err:
1003
      msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
1004
      logging.error(msg)
1005
      return (False, msg)
1006
  else:
1007
    return (False, "Invalid reboot_type received: %s" % (reboot_type,))
1008

    
1009
  return (True, "Reboot successful")
1010

    
1011

    
1012
def MigrationInfo(instance):
1013
  """Gather information about an instance to be migrated.
1014

1015
  @type instance: L{objects.Instance}
1016
  @param instance: the instance definition
1017

1018
  """
1019
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1020
  try:
1021
    info = hyper.MigrationInfo(instance)
1022
  except errors.HypervisorError, err:
1023
    msg = "Failed to fetch migration information"
1024
    logging.exception(msg)
1025
    return (False, '%s: %s' % (msg, err))
1026
  return (True, info)
1027

    
1028

    
1029
def AcceptInstance(instance, info, target):
1030
  """Prepare the node to accept an instance.
1031

1032
  @type instance: L{objects.Instance}
1033
  @param instance: the instance definition
1034
  @type info: string/data (opaque)
1035
  @param info: migration information, from the source node
1036
  @type target: string
1037
  @param target: target host (usually ip), on this node
1038

1039
  """
1040
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1041
  try:
1042
    hyper.AcceptInstance(instance, info, target)
1043
  except errors.HypervisorError, err:
1044
    msg = "Failed to accept instance"
1045
    logging.exception(msg)
1046
    return (False, '%s: %s' % (msg, err))
1047
  return (True, "Accept successfull")
1048

    
1049

    
1050
def FinalizeMigration(instance, info, success):
1051
  """Finalize any preparation to accept an instance.
1052

1053
  @type instance: L{objects.Instance}
1054
  @param instance: the instance definition
1055
  @type info: string/data (opaque)
1056
  @param info: migration information, from the source node
1057
  @type success: boolean
1058
  @param success: whether the migration was a success or a failure
1059

1060
  """
1061
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1062
  try:
1063
    hyper.FinalizeMigration(instance, info, success)
1064
  except errors.HypervisorError, err:
1065
    msg = "Failed to finalize migration"
1066
    logging.exception(msg)
1067
    return (False, '%s: %s' % (msg, err))
1068
  return (True, "Migration Finalized")
1069

    
1070

    
1071
def MigrateInstance(instance, target, live):
1072
  """Migrates an instance to another node.
1073

1074
  @type instance: L{objects.Instance}
1075
  @param instance: the instance definition
1076
  @type target: string
1077
  @param target: the target node name
1078
  @type live: boolean
1079
  @param live: whether the migration should be done live or not (the
1080
      interpretation of this parameter is left to the hypervisor)
1081
  @rtype: tuple
1082
  @return: a tuple of (success, msg) where:
1083
      - succes is a boolean denoting the success/failure of the operation
1084
      - msg is a string with details in case of failure
1085

1086
  """
1087
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1088

    
1089
  try:
1090
    hyper.MigrateInstance(instance.name, target, live)
1091
  except errors.HypervisorError, err:
1092
    msg = "Failed to migrate instance"
1093
    logging.exception(msg)
1094
    return (False, "%s: %s" % (msg, err))
1095
  return (True, "Migration successfull")
1096

    
1097

    
1098
def BlockdevCreate(disk, size, owner, on_primary, info):
1099
  """Creates a block device for an instance.
1100

1101
  @type disk: L{objects.Disk}
1102
  @param disk: the object describing the disk we should create
1103
  @type size: int
1104
  @param size: the size of the physical underlying device, in MiB
1105
  @type owner: str
1106
  @param owner: the name of the instance for which disk is created,
1107
      used for device cache data
1108
  @type on_primary: boolean
1109
  @param on_primary:  indicates if it is the primary node or not
1110
  @type info: string
1111
  @param info: string that will be sent to the physical device
1112
      creation, used for example to set (LVM) tags on LVs
1113

1114
  @return: the new unique_id of the device (this can sometime be
1115
      computed only after creation), or None. On secondary nodes,
1116
      it's not required to return anything.
1117

1118
  """
1119
  clist = []
1120
  if disk.children:
1121
    for child in disk.children:
1122
      try:
1123
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1124
      except errors.BlockDeviceError, err:
1125
        errmsg = "Can't assemble device %s: %s" % (child, err)
1126
        logging.error(errmsg)
1127
        return False, errmsg
1128
      if on_primary or disk.AssembleOnSecondary():
1129
        # we need the children open in case the device itself has to
1130
        # be assembled
1131
        try:
1132
          crdev.Open()
1133
        except errors.BlockDeviceError, err:
1134
          errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1135
          logging.error(errmsg)
1136
          return False, errmsg
1137
      clist.append(crdev)
1138

    
1139
  try:
1140
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1141
  except errors.BlockDeviceError, err:
1142
    return False, "Can't create block device: %s" % str(err)
1143

    
1144
  if on_primary or disk.AssembleOnSecondary():
1145
    try:
1146
      device.Assemble()
1147
    except errors.BlockDeviceError, err:
1148
      errmsg = ("Can't assemble device after creation, very"
1149
                " unusual event: %s" % str(err))
1150
      logging.error(errmsg)
1151
      return False, errmsg
1152
    device.SetSyncSpeed(constants.SYNC_SPEED)
1153
    if on_primary or disk.OpenOnSecondary():
1154
      try:
1155
        device.Open(force=True)
1156
      except errors.BlockDeviceError, err:
1157
        errmsg = ("Can't make device r/w after creation, very"
1158
                  " unusual event: %s" % str(err))
1159
        logging.error(errmsg)
1160
        return False, errmsg
1161
    DevCacheManager.UpdateCache(device.dev_path, owner,
1162
                                on_primary, disk.iv_name)
1163

    
1164
  device.SetInfo(info)
1165

    
1166
  physical_id = device.unique_id
1167
  return True, physical_id
1168

    
1169

    
1170
def BlockdevRemove(disk):
1171
  """Remove a block device.
1172

1173
  @note: This is intended to be called recursively.
1174

1175
  @type disk: L{objects.Disk}
1176
  @param disk: the disk object we should remove
1177
  @rtype: boolean
1178
  @return: the success of the operation
1179

1180
  """
1181
  msgs = []
1182
  result = True
1183
  try:
1184
    rdev = _RecursiveFindBD(disk)
1185
  except errors.BlockDeviceError, err:
1186
    # probably can't attach
1187
    logging.info("Can't attach to device %s in remove", disk)
1188
    rdev = None
1189
  if rdev is not None:
1190
    r_path = rdev.dev_path
1191
    try:
1192
      rdev.Remove()
1193
    except errors.BlockDeviceError, err:
1194
      msgs.append(str(err))
1195
      result = False
1196
    if result:
1197
      DevCacheManager.RemoveCache(r_path)
1198

    
1199
  if disk.children:
1200
    for child in disk.children:
1201
      c_status, c_msg = BlockdevRemove(child)
1202
      result = result and c_status
1203
      if c_msg: # not an empty message
1204
        msgs.append(c_msg)
1205

    
1206
  return (result, "; ".join(msgs))
1207

    
1208

    
1209
def _RecursiveAssembleBD(disk, owner, as_primary):
1210
  """Activate a block device for an instance.
1211

1212
  This is run on the primary and secondary nodes for an instance.
1213

1214
  @note: this function is called recursively.
1215

1216
  @type disk: L{objects.Disk}
1217
  @param disk: the disk we try to assemble
1218
  @type owner: str
1219
  @param owner: the name of the instance which owns the disk
1220
  @type as_primary: boolean
1221
  @param as_primary: if we should make the block device
1222
      read/write
1223

1224
  @return: the assembled device or None (in case no device
1225
      was assembled)
1226
  @raise errors.BlockDeviceError: in case there is an error
1227
      during the activation of the children or the device
1228
      itself
1229

1230
  """
1231
  children = []
1232
  if disk.children:
1233
    mcn = disk.ChildrenNeeded()
1234
    if mcn == -1:
1235
      mcn = 0 # max number of Nones allowed
1236
    else:
1237
      mcn = len(disk.children) - mcn # max number of Nones
1238
    for chld_disk in disk.children:
1239
      try:
1240
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1241
      except errors.BlockDeviceError, err:
1242
        if children.count(None) >= mcn:
1243
          raise
1244
        cdev = None
1245
        logging.error("Error in child activation (but continuing): %s",
1246
                      str(err))
1247
      children.append(cdev)
1248

    
1249
  if as_primary or disk.AssembleOnSecondary():
1250
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1251
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1252
    result = r_dev
1253
    if as_primary or disk.OpenOnSecondary():
1254
      r_dev.Open()
1255
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1256
                                as_primary, disk.iv_name)
1257

    
1258
  else:
1259
    result = True
1260
  return result
1261

    
1262

    
1263
def BlockdevAssemble(disk, owner, as_primary):
1264
  """Activate a block device for an instance.
1265

1266
  This is a wrapper over _RecursiveAssembleBD.
1267

1268
  @rtype: str or boolean
1269
  @return: a C{/dev/...} path for primary nodes, and
1270
      C{True} for secondary nodes
1271

1272
  """
1273
  status = True
1274
  result = "no error information"
1275
  try:
1276
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1277
    if isinstance(result, bdev.BlockDev):
1278
      result = result.dev_path
1279
  except errors.BlockDeviceError, err:
1280
    result = "Error while assembling disk: %s" % str(err)
1281
    status = False
1282
  return (status, result)
1283

    
1284

    
1285
def BlockdevShutdown(disk):
1286
  """Shut down a block device.
1287

1288
  First, if the device is assembled (Attach() is successfull), then
1289
  the device is shutdown. Then the children of the device are
1290
  shutdown.
1291

1292
  This function is called recursively. Note that we don't cache the
1293
  children or such, as oppossed to assemble, shutdown of different
1294
  devices doesn't require that the upper device was active.
1295

1296
  @type disk: L{objects.Disk}
1297
  @param disk: the description of the disk we should
1298
      shutdown
1299
  @rtype: boolean
1300
  @return: the success of the operation
1301

1302
  """
1303
  msgs = []
1304
  result = True
1305
  r_dev = _RecursiveFindBD(disk)
1306
  if r_dev is not None:
1307
    r_path = r_dev.dev_path
1308
    try:
1309
      r_dev.Shutdown()
1310
      DevCacheManager.RemoveCache(r_path)
1311
    except errors.BlockDeviceError, err:
1312
      msgs.append(str(err))
1313
      result = False
1314

    
1315
  if disk.children:
1316
    for child in disk.children:
1317
      c_status, c_msg = BlockdevShutdown(child)
1318
      result = result and c_status
1319
      if c_msg: # not an empty message
1320
        msgs.append(c_msg)
1321

    
1322
  return (result, "; ".join(msgs))
1323

    
1324

    
1325
def BlockdevAddchildren(parent_cdev, new_cdevs):
1326
  """Extend a mirrored block device.
1327

1328
  @type parent_cdev: L{objects.Disk}
1329
  @param parent_cdev: the disk to which we should add children
1330
  @type new_cdevs: list of L{objects.Disk}
1331
  @param new_cdevs: the list of children which we should add
1332
  @rtype: boolean
1333
  @return: the success of the operation
1334

1335
  """
1336
  parent_bdev = _RecursiveFindBD(parent_cdev)
1337
  if parent_bdev is None:
1338
    logging.error("Can't find parent device")
1339
    return False
1340
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1341
  if new_bdevs.count(None) > 0:
1342
    logging.error("Can't find new device(s) to add: %s:%s",
1343
                  new_bdevs, new_cdevs)
1344
    return False
1345
  parent_bdev.AddChildren(new_bdevs)
1346
  return True
1347

    
1348

    
1349
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1350
  """Shrink a mirrored block device.
1351

1352
  @type parent_cdev: L{objects.Disk}
1353
  @param parent_cdev: the disk from which we should remove children
1354
  @type new_cdevs: list of L{objects.Disk}
1355
  @param new_cdevs: the list of children which we should remove
1356
  @rtype: boolean
1357
  @return: the success of the operation
1358

1359
  """
1360
  parent_bdev = _RecursiveFindBD(parent_cdev)
1361
  if parent_bdev is None:
1362
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1363
    return False
1364
  devs = []
1365
  for disk in new_cdevs:
1366
    rpath = disk.StaticDevPath()
1367
    if rpath is None:
1368
      bd = _RecursiveFindBD(disk)
1369
      if bd is None:
1370
        logging.error("Can't find dynamic device %s while removing children",
1371
                      disk)
1372
        return False
1373
      else:
1374
        devs.append(bd.dev_path)
1375
    else:
1376
      devs.append(rpath)
1377
  parent_bdev.RemoveChildren(devs)
1378
  return True
1379

    
1380

    
1381
def BlockdevGetmirrorstatus(disks):
1382
  """Get the mirroring status of a list of devices.
1383

1384
  @type disks: list of L{objects.Disk}
1385
  @param disks: the list of disks which we should query
1386
  @rtype: disk
1387
  @return:
1388
      a list of (mirror_done, estimated_time) tuples, which
1389
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1390
  @raise errors.BlockDeviceError: if any of the disks cannot be
1391
      found
1392

1393
  """
1394
  stats = []
1395
  for dsk in disks:
1396
    rbd = _RecursiveFindBD(dsk)
1397
    if rbd is None:
1398
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1399
    stats.append(rbd.CombinedSyncStatus())
1400
  return stats
1401

    
1402

    
1403
def _RecursiveFindBD(disk):
1404
  """Check if a device is activated.
1405

1406
  If so, return informations about the real device.
1407

1408
  @type disk: L{objects.Disk}
1409
  @param disk: the disk object we need to find
1410

1411
  @return: None if the device can't be found,
1412
      otherwise the device instance
1413

1414
  """
1415
  children = []
1416
  if disk.children:
1417
    for chdisk in disk.children:
1418
      children.append(_RecursiveFindBD(chdisk))
1419

    
1420
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1421

    
1422

    
1423
def BlockdevFind(disk):
1424
  """Check if a device is activated.
1425

1426
  If it is, return informations about the real device.
1427

1428
  @type disk: L{objects.Disk}
1429
  @param disk: the disk to find
1430
  @rtype: None or tuple
1431
  @return: None if the disk cannot be found, otherwise a
1432
      tuple (device_path, major, minor, sync_percent,
1433
      estimated_time, is_degraded)
1434

1435
  """
1436
  try:
1437
    rbd = _RecursiveFindBD(disk)
1438
  except errors.BlockDeviceError, err:
1439
    return (False, str(err))
1440
  if rbd is None:
1441
    return (True, None)
1442
  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1443

    
1444

    
1445
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1446
  """Write a file to the filesystem.
1447

1448
  This allows the master to overwrite(!) a file. It will only perform
1449
  the operation if the file belongs to a list of configuration files.
1450

1451
  @type file_name: str
1452
  @param file_name: the target file name
1453
  @type data: str
1454
  @param data: the new contents of the file
1455
  @type mode: int
1456
  @param mode: the mode to give the file (can be None)
1457
  @type uid: int
1458
  @param uid: the owner of the file (can be -1 for default)
1459
  @type gid: int
1460
  @param gid: the group of the file (can be -1 for default)
1461
  @type atime: float
1462
  @param atime: the atime to set on the file (can be None)
1463
  @type mtime: float
1464
  @param mtime: the mtime to set on the file (can be None)
1465
  @rtype: boolean
1466
  @return: the success of the operation; errors are logged
1467
      in the node daemon log
1468

1469
  """
1470
  if not os.path.isabs(file_name):
1471
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1472
                  file_name)
1473
    return False
1474

    
1475
  allowed_files = [
1476
    constants.CLUSTER_CONF_FILE,
1477
    constants.ETC_HOSTS,
1478
    constants.SSH_KNOWN_HOSTS_FILE,
1479
    constants.VNC_PASSWORD_FILE,
1480
    ]
1481

    
1482
  if file_name not in allowed_files:
1483
    logging.error("Filename passed to UploadFile not in allowed"
1484
                 " upload targets: '%s'", file_name)
1485
    return False
1486

    
1487
  raw_data = _Decompress(data)
1488

    
1489
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1490
                  atime=atime, mtime=mtime)
1491
  return True
1492

    
1493

    
1494
def WriteSsconfFiles(values):
1495
  """Update all ssconf files.
1496

1497
  Wrapper around the SimpleStore.WriteFiles.
1498

1499
  """
1500
  ssconf.SimpleStore().WriteFiles(values)
1501

    
1502

    
1503
def _ErrnoOrStr(err):
1504
  """Format an EnvironmentError exception.
1505

1506
  If the L{err} argument has an errno attribute, it will be looked up
1507
  and converted into a textual C{E...} description. Otherwise the
1508
  string representation of the error will be returned.
1509

1510
  @type err: L{EnvironmentError}
1511
  @param err: the exception to format
1512

1513
  """
1514
  if hasattr(err, 'errno'):
1515
    detail = errno.errorcode[err.errno]
1516
  else:
1517
    detail = str(err)
1518
  return detail
1519

    
1520

    
1521
def _OSOndiskVersion(name, os_dir):
1522
  """Compute and return the API version of a given OS.
1523

1524
  This function will try to read the API version of the OS given by
1525
  the 'name' parameter and residing in the 'os_dir' directory.
1526

1527
  @type name: str
1528
  @param name: the OS name we should look for
1529
  @type os_dir: str
1530
  @param os_dir: the directory inwhich we should look for the OS
1531
  @rtype: int or None
1532
  @return:
1533
      Either an integer denoting the version or None in the
1534
      case when this is not a valid OS name.
1535
  @raise errors.InvalidOS: if the OS cannot be found
1536

1537
  """
1538
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1539

    
1540
  try:
1541
    st = os.stat(api_file)
1542
  except EnvironmentError, err:
1543
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1544
                           " found (%s)" % _ErrnoOrStr(err))
1545

    
1546
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1547
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1548
                           " a regular file")
1549

    
1550
  try:
1551
    f = open(api_file)
1552
    try:
1553
      api_versions = f.readlines()
1554
    finally:
1555
      f.close()
1556
  except EnvironmentError, err:
1557
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1558
                           " API version (%s)" % _ErrnoOrStr(err))
1559

    
1560
  api_versions = [version.strip() for version in api_versions]
1561
  try:
1562
    api_versions = [int(version) for version in api_versions]
1563
  except (TypeError, ValueError), err:
1564
    raise errors.InvalidOS(name, os_dir,
1565
                           "API version is not integer (%s)" % str(err))
1566

    
1567
  return api_versions
1568

    
1569

    
1570
def DiagnoseOS(top_dirs=None):
1571
  """Compute the validity for all OSes.
1572

1573
  @type top_dirs: list
1574
  @param top_dirs: the list of directories in which to
1575
      search (if not given defaults to
1576
      L{constants.OS_SEARCH_PATH})
1577
  @rtype: list of L{objects.OS}
1578
  @return: an OS object for each name in all the given
1579
      directories
1580

1581
  """
1582
  if top_dirs is None:
1583
    top_dirs = constants.OS_SEARCH_PATH
1584

    
1585
  result = []
1586
  for dir_name in top_dirs:
1587
    if os.path.isdir(dir_name):
1588
      try:
1589
        f_names = utils.ListVisibleFiles(dir_name)
1590
      except EnvironmentError, err:
1591
        logging.exception("Can't list the OS directory %s", dir_name)
1592
        break
1593
      for name in f_names:
1594
        try:
1595
          os_inst = OSFromDisk(name, base_dir=dir_name)
1596
          result.append(os_inst)
1597
        except errors.InvalidOS, err:
1598
          result.append(objects.OS.FromInvalidOS(err))
1599

    
1600
  return result
1601

    
1602

    
1603
def OSFromDisk(name, base_dir=None):
1604
  """Create an OS instance from disk.
1605

1606
  This function will return an OS instance if the given name is a
1607
  valid OS name. Otherwise, it will raise an appropriate
1608
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1609

1610
  @type base_dir: string
1611
  @keyword base_dir: Base directory containing OS installations.
1612
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1613
  @rtype: L{objects.OS}
1614
  @return: the OS instance if we find a valid one
1615
  @raise errors.InvalidOS: if we don't find a valid OS
1616

1617
  """
1618
  if base_dir is None:
1619
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1620
    if os_dir is None:
1621
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1622
  else:
1623
    os_dir = os.path.sep.join([base_dir, name])
1624

    
1625
  api_versions = _OSOndiskVersion(name, os_dir)
1626

    
1627
  if constants.OS_API_VERSION not in api_versions:
1628
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1629
                           " (found %s want %s)"
1630
                           % (api_versions, constants.OS_API_VERSION))
1631

    
1632
  # OS Scripts dictionary, we will populate it with the actual script names
1633
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1634

    
1635
  for script in os_scripts:
1636
    os_scripts[script] = os.path.sep.join([os_dir, script])
1637

    
1638
    try:
1639
      st = os.stat(os_scripts[script])
1640
    except EnvironmentError, err:
1641
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1642
                             (script, _ErrnoOrStr(err)))
1643

    
1644
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1645
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1646
                             script)
1647

    
1648
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1649
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1650
                             script)
1651

    
1652

    
1653
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1654
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1655
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1656
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1657
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1658
                    api_versions=api_versions)
1659

    
1660
def OSEnvironment(instance, debug=0):
1661
  """Calculate the environment for an os script.
1662

1663
  @type instance: L{objects.Instance}
1664
  @param instance: target instance for the os script run
1665
  @type debug: integer
1666
  @param debug: debug level (0 or 1, for OS Api 10)
1667
  @rtype: dict
1668
  @return: dict of environment variables
1669
  @raise errors.BlockDeviceError: if the block device
1670
      cannot be found
1671

1672
  """
1673
  result = {}
1674
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1675
  result['INSTANCE_NAME'] = instance.name
1676
  result['INSTANCE_OS'] = instance.os
1677
  result['HYPERVISOR'] = instance.hypervisor
1678
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1679
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1680
  result['DEBUG_LEVEL'] = '%d' % debug
1681
  for idx, disk in enumerate(instance.disks):
1682
    real_disk = _RecursiveFindBD(disk)
1683
    if real_disk is None:
1684
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1685
                                    str(disk))
1686
    real_disk.Open()
1687
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1688
    result['DISK_%d_ACCESS' % idx] = disk.mode
1689
    if constants.HV_DISK_TYPE in instance.hvparams:
1690
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1691
        instance.hvparams[constants.HV_DISK_TYPE]
1692
    if disk.dev_type in constants.LDS_BLOCK:
1693
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1694
    elif disk.dev_type == constants.LD_FILE:
1695
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1696
        'file:%s' % disk.physical_id[0]
1697
  for idx, nic in enumerate(instance.nics):
1698
    result['NIC_%d_MAC' % idx] = nic.mac
1699
    if nic.ip:
1700
      result['NIC_%d_IP' % idx] = nic.ip
1701
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1702
    if constants.HV_NIC_TYPE in instance.hvparams:
1703
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1704
        instance.hvparams[constants.HV_NIC_TYPE]
1705

    
1706
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1707
    for key, value in source.items():
1708
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1709

    
1710
  return result
1711

    
1712
def BlockdevGrow(disk, amount):
1713
  """Grow a stack of block devices.
1714

1715
  This function is called recursively, with the childrens being the
1716
  first ones to resize.
1717

1718
  @type disk: L{objects.Disk}
1719
  @param disk: the disk to be grown
1720
  @rtype: (status, result)
1721
  @return: a tuple with the status of the operation
1722
      (True/False), and the errors message if status
1723
      is False
1724

1725
  """
1726
  r_dev = _RecursiveFindBD(disk)
1727
  if r_dev is None:
1728
    return False, "Cannot find block device %s" % (disk,)
1729

    
1730
  try:
1731
    r_dev.Grow(amount)
1732
  except errors.BlockDeviceError, err:
1733
    return False, str(err)
1734

    
1735
  return True, None
1736

    
1737

    
1738
def BlockdevSnapshot(disk):
1739
  """Create a snapshot copy of a block device.
1740

1741
  This function is called recursively, and the snapshot is actually created
1742
  just for the leaf lvm backend device.
1743

1744
  @type disk: L{objects.Disk}
1745
  @param disk: the disk to be snapshotted
1746
  @rtype: string
1747
  @return: snapshot disk path
1748

1749
  """
1750
  if disk.children:
1751
    if len(disk.children) == 1:
1752
      # only one child, let's recurse on it
1753
      return BlockdevSnapshot(disk.children[0])
1754
    else:
1755
      # more than one child, choose one that matches
1756
      for child in disk.children:
1757
        if child.size == disk.size:
1758
          # return implies breaking the loop
1759
          return BlockdevSnapshot(child)
1760
  elif disk.dev_type == constants.LD_LV:
1761
    r_dev = _RecursiveFindBD(disk)
1762
    if r_dev is not None:
1763
      # let's stay on the safe side and ask for the full size, for now
1764
      return r_dev.Snapshot(disk.size)
1765
    else:
1766
      return None
1767
  else:
1768
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1769
                                 " '%s' of type '%s'" %
1770
                                 (disk.unique_id, disk.dev_type))
1771

    
1772

    
1773
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1774
  """Export a block device snapshot to a remote node.
1775

1776
  @type disk: L{objects.Disk}
1777
  @param disk: the description of the disk to export
1778
  @type dest_node: str
1779
  @param dest_node: the destination node to export to
1780
  @type instance: L{objects.Instance}
1781
  @param instance: the instance object to whom the disk belongs
1782
  @type cluster_name: str
1783
  @param cluster_name: the cluster name, needed for SSH hostalias
1784
  @type idx: int
1785
  @param idx: the index of the disk in the instance's disk list,
1786
      used to export to the OS scripts environment
1787
  @rtype: boolean
1788
  @return: the success of the operation
1789

1790
  """
1791
  export_env = OSEnvironment(instance)
1792

    
1793
  inst_os = OSFromDisk(instance.os)
1794
  export_script = inst_os.export_script
1795

    
1796
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1797
                                     instance.name, int(time.time()))
1798
  if not os.path.exists(constants.LOG_OS_DIR):
1799
    os.mkdir(constants.LOG_OS_DIR, 0750)
1800
  real_disk = _RecursiveFindBD(disk)
1801
  if real_disk is None:
1802
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1803
                                  str(disk))
1804
  real_disk.Open()
1805

    
1806
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1807
  export_env['EXPORT_INDEX'] = str(idx)
1808

    
1809
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1810
  destfile = disk.physical_id[1]
1811

    
1812
  # the target command is built out of three individual commands,
1813
  # which are joined by pipes; we check each individual command for
1814
  # valid parameters
1815
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1816
                               export_script, logfile)
1817

    
1818
  comprcmd = "gzip"
1819

    
1820
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1821
                                destdir, destdir, destfile)
1822
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1823
                                                   constants.GANETI_RUNAS,
1824
                                                   destcmd)
1825

    
1826
  # all commands have been checked, so we're safe to combine them
1827
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1828

    
1829
  result = utils.RunCmd(command, env=export_env)
1830

    
1831
  if result.failed:
1832
    logging.error("os snapshot export command '%s' returned error: %s"
1833
                  " output: %s", command, result.fail_reason, result.output)
1834
    return False
1835

    
1836
  return True
1837

    
1838

    
1839
def FinalizeExport(instance, snap_disks):
1840
  """Write out the export configuration information.
1841

1842
  @type instance: L{objects.Instance}
1843
  @param instance: the instance which we export, used for
1844
      saving configuration
1845
  @type snap_disks: list of L{objects.Disk}
1846
  @param snap_disks: list of snapshot block devices, which
1847
      will be used to get the actual name of the dump file
1848

1849
  @rtype: boolean
1850
  @return: the success of the operation
1851

1852
  """
1853
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1854
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1855

    
1856
  config = objects.SerializableConfigParser()
1857

    
1858
  config.add_section(constants.INISECT_EXP)
1859
  config.set(constants.INISECT_EXP, 'version', '0')
1860
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1861
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1862
  config.set(constants.INISECT_EXP, 'os', instance.os)
1863
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1864

    
1865
  config.add_section(constants.INISECT_INS)
1866
  config.set(constants.INISECT_INS, 'name', instance.name)
1867
  config.set(constants.INISECT_INS, 'memory', '%d' %
1868
             instance.beparams[constants.BE_MEMORY])
1869
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1870
             instance.beparams[constants.BE_VCPUS])
1871
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1872

    
1873
  nic_total = 0
1874
  for nic_count, nic in enumerate(instance.nics):
1875
    nic_total += 1
1876
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1877
               nic_count, '%s' % nic.mac)
1878
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1879
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1880
               '%s' % nic.bridge)
1881
  # TODO: redundant: on load can read nics until it doesn't exist
1882
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1883

    
1884
  disk_total = 0
1885
  for disk_count, disk in enumerate(snap_disks):
1886
    if disk:
1887
      disk_total += 1
1888
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1889
                 ('%s' % disk.iv_name))
1890
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1891
                 ('%s' % disk.physical_id[1]))
1892
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1893
                 ('%d' % disk.size))
1894

    
1895
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1896

    
1897
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1898
                  data=config.Dumps())
1899
  shutil.rmtree(finaldestdir, True)
1900
  shutil.move(destdir, finaldestdir)
1901

    
1902
  return True
1903

    
1904

    
1905
def ExportInfo(dest):
1906
  """Get export configuration information.
1907

1908
  @type dest: str
1909
  @param dest: directory containing the export
1910

1911
  @rtype: L{objects.SerializableConfigParser}
1912
  @return: a serializable config file containing the
1913
      export info
1914

1915
  """
1916
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1917

    
1918
  config = objects.SerializableConfigParser()
1919
  config.read(cff)
1920

    
1921
  if (not config.has_section(constants.INISECT_EXP) or
1922
      not config.has_section(constants.INISECT_INS)):
1923
    return None
1924

    
1925
  return config
1926

    
1927

    
1928
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1929
  """Import an os image into an instance.
1930

1931
  @type instance: L{objects.Instance}
1932
  @param instance: instance to import the disks into
1933
  @type src_node: string
1934
  @param src_node: source node for the disk images
1935
  @type src_images: list of string
1936
  @param src_images: absolute paths of the disk images
1937
  @rtype: list of boolean
1938
  @return: each boolean represent the success of importing the n-th disk
1939

1940
  """
1941
  import_env = OSEnvironment(instance)
1942
  inst_os = OSFromDisk(instance.os)
1943
  import_script = inst_os.import_script
1944

    
1945
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1946
                                        instance.name, int(time.time()))
1947
  if not os.path.exists(constants.LOG_OS_DIR):
1948
    os.mkdir(constants.LOG_OS_DIR, 0750)
1949

    
1950
  comprcmd = "gunzip"
1951
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1952
                               import_script, logfile)
1953

    
1954
  final_result = []
1955
  for idx, image in enumerate(src_images):
1956
    if image:
1957
      destcmd = utils.BuildShellCmd('cat %s', image)
1958
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1959
                                                       constants.GANETI_RUNAS,
1960
                                                       destcmd)
1961
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1962
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1963
      import_env['IMPORT_INDEX'] = str(idx)
1964
      result = utils.RunCmd(command, env=import_env)
1965
      if result.failed:
1966
        logging.error("Disk import command '%s' returned error: %s"
1967
                      " output: %s", command, result.fail_reason,
1968
                      result.output)
1969
        final_result.append(False)
1970
      else:
1971
        final_result.append(True)
1972
    else:
1973
      final_result.append(True)
1974

    
1975
  return final_result
1976

    
1977

    
1978
def ListExports():
1979
  """Return a list of exports currently available on this machine.
1980

1981
  @rtype: list
1982
  @return: list of the exports
1983

1984
  """
1985
  if os.path.isdir(constants.EXPORT_DIR):
1986
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1987
  else:
1988
    return []
1989

    
1990

    
1991
def RemoveExport(export):
1992
  """Remove an existing export from the node.
1993

1994
  @type export: str
1995
  @param export: the name of the export to remove
1996
  @rtype: boolean
1997
  @return: the success of the operation
1998

1999
  """
2000
  target = os.path.join(constants.EXPORT_DIR, export)
2001

    
2002
  shutil.rmtree(target)
2003
  # TODO: catch some of the relevant exceptions and provide a pretty
2004
  # error message if rmtree fails.
2005

    
2006
  return True
2007

    
2008

    
2009
def BlockdevRename(devlist):
2010
  """Rename a list of block devices.
2011

2012
  @type devlist: list of tuples
2013
  @param devlist: list of tuples of the form  (disk,
2014
      new_logical_id, new_physical_id); disk is an
2015
      L{objects.Disk} object describing the current disk,
2016
      and new logical_id/physical_id is the name we
2017
      rename it to
2018
  @rtype: boolean
2019
  @return: True if all renames succeeded, False otherwise
2020

2021
  """
2022
  result = True
2023
  for disk, unique_id in devlist:
2024
    dev = _RecursiveFindBD(disk)
2025
    if dev is None:
2026
      result = False
2027
      continue
2028
    try:
2029
      old_rpath = dev.dev_path
2030
      dev.Rename(unique_id)
2031
      new_rpath = dev.dev_path
2032
      if old_rpath != new_rpath:
2033
        DevCacheManager.RemoveCache(old_rpath)
2034
        # FIXME: we should add the new cache information here, like:
2035
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2036
        # but we don't have the owner here - maybe parse from existing
2037
        # cache? for now, we only lose lvm data when we rename, which
2038
        # is less critical than DRBD or MD
2039
    except errors.BlockDeviceError, err:
2040
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2041
      result = False
2042
  return result
2043

    
2044

    
2045
def _TransformFileStorageDir(file_storage_dir):
2046
  """Checks whether given file_storage_dir is valid.
2047

2048
  Checks wheter the given file_storage_dir is within the cluster-wide
2049
  default file_storage_dir stored in SimpleStore. Only paths under that
2050
  directory are allowed.
2051

2052
  @type file_storage_dir: str
2053
  @param file_storage_dir: the path to check
2054

2055
  @return: the normalized path if valid, None otherwise
2056

2057
  """
2058
  cfg = _GetConfig()
2059
  file_storage_dir = os.path.normpath(file_storage_dir)
2060
  base_file_storage_dir = cfg.GetFileStorageDir()
2061
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2062
      base_file_storage_dir):
2063
    logging.error("file storage directory '%s' is not under base file"
2064
                  " storage directory '%s'",
2065
                  file_storage_dir, base_file_storage_dir)
2066
    return None
2067
  return file_storage_dir
2068

    
2069

    
2070
def CreateFileStorageDir(file_storage_dir):
2071
  """Create file storage directory.
2072

2073
  @type file_storage_dir: str
2074
  @param file_storage_dir: directory to create
2075

2076
  @rtype: tuple
2077
  @return: tuple with first element a boolean indicating wheter dir
2078
      creation was successful or not
2079

2080
  """
2081
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2082
  result = True,
2083
  if not file_storage_dir:
2084
    result = False,
2085
  else:
2086
    if os.path.exists(file_storage_dir):
2087
      if not os.path.isdir(file_storage_dir):
2088
        logging.error("'%s' is not a directory", file_storage_dir)
2089
        result = False,
2090
    else:
2091
      try:
2092
        os.makedirs(file_storage_dir, 0750)
2093
      except OSError, err:
2094
        logging.error("Cannot create file storage directory '%s': %s",
2095
                      file_storage_dir, err)
2096
        result = False,
2097
  return result
2098

    
2099

    
2100
def RemoveFileStorageDir(file_storage_dir):
2101
  """Remove file storage directory.
2102

2103
  Remove it only if it's empty. If not log an error and return.
2104

2105
  @type file_storage_dir: str
2106
  @param file_storage_dir: the directory we should cleanup
2107
  @rtype: tuple (success,)
2108
  @return: tuple of one element, C{success}, denoting
2109
      whether the operation was successfull
2110

2111
  """
2112
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2113
  result = True,
2114
  if not file_storage_dir:
2115
    result = False,
2116
  else:
2117
    if os.path.exists(file_storage_dir):
2118
      if not os.path.isdir(file_storage_dir):
2119
        logging.error("'%s' is not a directory", file_storage_dir)
2120
        result = False,
2121
      # deletes dir only if empty, otherwise we want to return False
2122
      try:
2123
        os.rmdir(file_storage_dir)
2124
      except OSError, err:
2125
        logging.exception("Cannot remove file storage directory '%s'",
2126
                          file_storage_dir)
2127
        result = False,
2128
  return result
2129

    
2130

    
2131
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2132
  """Rename the file storage directory.
2133

2134
  @type old_file_storage_dir: str
2135
  @param old_file_storage_dir: the current path
2136
  @type new_file_storage_dir: str
2137
  @param new_file_storage_dir: the name we should rename to
2138
  @rtype: tuple (success,)
2139
  @return: tuple of one element, C{success}, denoting
2140
      whether the operation was successful
2141

2142
  """
2143
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2144
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2145
  result = True,
2146
  if not old_file_storage_dir or not new_file_storage_dir:
2147
    result = False,
2148
  else:
2149
    if not os.path.exists(new_file_storage_dir):
2150
      if os.path.isdir(old_file_storage_dir):
2151
        try:
2152
          os.rename(old_file_storage_dir, new_file_storage_dir)
2153
        except OSError, err:
2154
          logging.exception("Cannot rename '%s' to '%s'",
2155
                            old_file_storage_dir, new_file_storage_dir)
2156
          result =  False,
2157
      else:
2158
        logging.error("'%s' is not a directory", old_file_storage_dir)
2159
        result = False,
2160
    else:
2161
      if os.path.exists(old_file_storage_dir):
2162
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2163
                      old_file_storage_dir, new_file_storage_dir)
2164
        result = False,
2165
  return result
2166

    
2167

    
2168
def _IsJobQueueFile(file_name):
2169
  """Checks whether the given filename is in the queue directory.
2170

2171
  @type file_name: str
2172
  @param file_name: the file name we should check
2173
  @rtype: boolean
2174
  @return: whether the file is under the queue directory
2175

2176
  """
2177
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2178
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2179

    
2180
  if not result:
2181
    logging.error("'%s' is not a file in the queue directory",
2182
                  file_name)
2183

    
2184
  return result
2185

    
2186

    
2187
def JobQueueUpdate(file_name, content):
2188
  """Updates a file in the queue directory.
2189

2190
  This is just a wrapper over L{utils.WriteFile}, with proper
2191
  checking.
2192

2193
  @type file_name: str
2194
  @param file_name: the job file name
2195
  @type content: str
2196
  @param content: the new job contents
2197
  @rtype: boolean
2198
  @return: the success of the operation
2199

2200
  """
2201
  if not _IsJobQueueFile(file_name):
2202
    return False
2203

    
2204
  # Write and replace the file atomically
2205
  utils.WriteFile(file_name, data=_Decompress(content))
2206

    
2207
  return True
2208

    
2209

    
2210
def JobQueueRename(old, new):
2211
  """Renames a job queue file.
2212

2213
  This is just a wrapper over os.rename with proper checking.
2214

2215
  @type old: str
2216
  @param old: the old (actual) file name
2217
  @type new: str
2218
  @param new: the desired file name
2219
  @rtype: boolean
2220
  @return: the success of the operation
2221

2222
  """
2223
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2224
    return False
2225

    
2226
  utils.RenameFile(old, new, mkdir=True)
2227

    
2228
  return True
2229

    
2230

    
2231
def JobQueueSetDrainFlag(drain_flag):
2232
  """Set the drain flag for the queue.
2233

2234
  This will set or unset the queue drain flag.
2235

2236
  @type drain_flag: boolean
2237
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2238
  @rtype: boolean
2239
  @return: always True
2240
  @warning: the function always returns True
2241

2242
  """
2243
  if drain_flag:
2244
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2245
  else:
2246
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2247

    
2248
  return True
2249

    
2250

    
2251
def BlockdevClose(instance_name, disks):
2252
  """Closes the given block devices.
2253

2254
  This means they will be switched to secondary mode (in case of
2255
  DRBD).
2256

2257
  @param instance_name: if the argument is not empty, the symlinks
2258
      of this instance will be removed
2259
  @type disks: list of L{objects.Disk}
2260
  @param disks: the list of disks to be closed
2261
  @rtype: tuple (success, message)
2262
  @return: a tuple of success and message, where success
2263
      indicates the succes of the operation, and message
2264
      which will contain the error details in case we
2265
      failed
2266

2267
  """
2268
  bdevs = []
2269
  for cf in disks:
2270
    rd = _RecursiveFindBD(cf)
2271
    if rd is None:
2272
      return (False, "Can't find device %s" % cf)
2273
    bdevs.append(rd)
2274

    
2275
  msg = []
2276
  for rd in bdevs:
2277
    try:
2278
      rd.Close()
2279
    except errors.BlockDeviceError, err:
2280
      msg.append(str(err))
2281
  if msg:
2282
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2283
  else:
2284
    if instance_name:
2285
      _RemoveBlockDevLinks(instance_name, disks)
2286
    return (True, "All devices secondary")
2287

    
2288

    
2289
def ValidateHVParams(hvname, hvparams):
2290
  """Validates the given hypervisor parameters.
2291

2292
  @type hvname: string
2293
  @param hvname: the hypervisor name
2294
  @type hvparams: dict
2295
  @param hvparams: the hypervisor parameters to be validated
2296
  @rtype: tuple (success, message)
2297
  @return: a tuple of success and message, where success
2298
      indicates the succes of the operation, and message
2299
      which will contain the error details in case we
2300
      failed
2301

2302
  """
2303
  try:
2304
    hv_type = hypervisor.GetHypervisor(hvname)
2305
    hv_type.ValidateParameters(hvparams)
2306
    return (True, "Validation passed")
2307
  except errors.HypervisorError, err:
2308
    return (False, str(err))
2309

    
2310

    
2311
def DemoteFromMC():
2312
  """Demotes the current node from master candidate role.
2313

2314
  """
2315
  # try to ensure we're not the master by mistake
2316
  master, myself = ssconf.GetMasterAndMyself()
2317
  if master == myself:
2318
    return (False, "ssconf status shows I'm the master node, will not demote")
2319
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2320
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2321
    return (False, "The master daemon is running, will not demote")
2322
  try:
2323
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2324
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2325
  except EnvironmentError, err:
2326
    if err.errno != errno.ENOENT:
2327
      return (False, "Error while backing up cluster file: %s" % str(err))
2328
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2329
  return (True, "Done")
2330

    
2331

    
2332
def _FindDisks(nodes_ip, disks):
2333
  """Sets the physical ID on disks and returns the block devices.
2334

2335
  """
2336
  # set the correct physical ID
2337
  my_name = utils.HostInfo().name
2338
  for cf in disks:
2339
    cf.SetPhysicalID(my_name, nodes_ip)
2340

    
2341
  bdevs = []
2342

    
2343
  for cf in disks:
2344
    rd = _RecursiveFindBD(cf)
2345
    if rd is None:
2346
      return (False, "Can't find device %s" % cf)
2347
    bdevs.append(rd)
2348
  return (True, bdevs)
2349

    
2350

    
2351
def DrbdDisconnectNet(nodes_ip, disks):
2352
  """Disconnects the network on a list of drbd devices.
2353

2354
  """
2355
  status, bdevs = _FindDisks(nodes_ip, disks)
2356
  if not status:
2357
    return status, bdevs
2358

    
2359
  # disconnect disks
2360
  for rd in bdevs:
2361
    try:
2362
      rd.DisconnectNet()
2363
    except errors.BlockDeviceError, err:
2364
      logging.exception("Failed to go into standalone mode")
2365
      return (False, "Can't change network configuration: %s" % str(err))
2366
  return (True, "All disks are now disconnected")
2367

    
2368

    
2369
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2370
  """Attaches the network on a list of drbd devices.
2371

2372
  """
2373
  status, bdevs = _FindDisks(nodes_ip, disks)
2374
  if not status:
2375
    return status, bdevs
2376

    
2377
  if multimaster:
2378
    for idx, rd in enumerate(bdevs):
2379
      try:
2380
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2381
      except EnvironmentError, err:
2382
        return (False, "Can't create symlink: %s" % str(err))
2383
  # reconnect disks, switch to new master configuration and if
2384
  # needed primary mode
2385
  for rd in bdevs:
2386
    try:
2387
      rd.AttachNet(multimaster)
2388
    except errors.BlockDeviceError, err:
2389
      return (False, "Can't change network configuration: %s" % str(err))
2390
  # wait until the disks are connected; we need to retry the re-attach
2391
  # if the device becomes standalone, as this might happen if the one
2392
  # node disconnects and reconnects in a different mode before the
2393
  # other node reconnects; in this case, one or both of the nodes will
2394
  # decide it has wrong configuration and switch to standalone
2395
  RECONNECT_TIMEOUT = 2 * 60
2396
  sleep_time = 0.100 # start with 100 miliseconds
2397
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2398
  while time.time() < timeout_limit:
2399
    all_connected = True
2400
    for rd in bdevs:
2401
      stats = rd.GetProcStatus()
2402
      if not (stats.is_connected or stats.is_in_resync):
2403
        all_connected = False
2404
      if stats.is_standalone:
2405
        # peer had different config info and this node became
2406
        # standalone, even though this should not happen with the
2407
        # new staged way of changing disk configs
2408
        try:
2409
          rd.AttachNet(multimaster)
2410
        except errors.BlockDeviceError, err:
2411
          return (False, "Can't change network configuration: %s" % str(err))
2412
    if all_connected:
2413
      break
2414
    time.sleep(sleep_time)
2415
    sleep_time = min(5, sleep_time * 1.5)
2416
  if not all_connected:
2417
    return (False, "Timeout in disk reconnecting")
2418
  if multimaster:
2419
    # change to primary mode
2420
    for rd in bdevs:
2421
      try:
2422
        rd.Open()
2423
      except errors.BlockDeviceError, err:
2424
        return (False, "Can't change to primary mode: %s" % str(err))
2425
  if multimaster:
2426
    msg = "multi-master and primary"
2427
  else:
2428
    msg = "single-master"
2429
  return (True, "Disks are now configured as %s" % msg)
2430

    
2431

    
2432
def DrbdWaitSync(nodes_ip, disks):
2433
  """Wait until DRBDs have synchronized.
2434

2435
  """
2436
  status, bdevs = _FindDisks(nodes_ip, disks)
2437
  if not status:
2438
    return status, bdevs
2439

    
2440
  min_resync = 100
2441
  alldone = True
2442
  failure = False
2443
  for rd in bdevs:
2444
    stats = rd.GetProcStatus()
2445
    if not (stats.is_connected or stats.is_in_resync):
2446
      failure = True
2447
      break
2448
    alldone = alldone and (not stats.is_in_resync)
2449
    if stats.sync_percent is not None:
2450
      min_resync = min(min_resync, stats.sync_percent)
2451
  return (not failure, (alldone, min_resync))
2452

    
2453

    
2454
class HooksRunner(object):
2455
  """Hook runner.
2456

2457
  This class is instantiated on the node side (ganeti-noded) and not
2458
  on the master side.
2459

2460
  """
2461
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2462

    
2463
  def __init__(self, hooks_base_dir=None):
2464
    """Constructor for hooks runner.
2465

2466
    @type hooks_base_dir: str or None
2467
    @param hooks_base_dir: if not None, this overrides the
2468
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2469

2470
    """
2471
    if hooks_base_dir is None:
2472
      hooks_base_dir = constants.HOOKS_BASE_DIR
2473
    self._BASE_DIR = hooks_base_dir
2474

    
2475
  @staticmethod
2476
  def ExecHook(script, env):
2477
    """Exec one hook script.
2478

2479
    @type script: str
2480
    @param script: the full path to the script
2481
    @type env: dict
2482
    @param env: the environment with which to exec the script
2483
    @rtype: tuple (success, message)
2484
    @return: a tuple of success and message, where success
2485
        indicates the succes of the operation, and message
2486
        which will contain the error details in case we
2487
        failed
2488

2489
    """
2490
    # exec the process using subprocess and log the output
2491
    fdstdin = None
2492
    try:
2493
      fdstdin = open("/dev/null", "r")
2494
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2495
                               stderr=subprocess.STDOUT, close_fds=True,
2496
                               shell=False, cwd="/", env=env)
2497
      output = ""
2498
      try:
2499
        output = child.stdout.read(4096)
2500
        child.stdout.close()
2501
      except EnvironmentError, err:
2502
        output += "Hook script error: %s" % str(err)
2503

    
2504
      while True:
2505
        try:
2506
          result = child.wait()
2507
          break
2508
        except EnvironmentError, err:
2509
          if err.errno == errno.EINTR:
2510
            continue
2511
          raise
2512
    finally:
2513
      # try not to leak fds
2514
      for fd in (fdstdin, ):
2515
        if fd is not None:
2516
          try:
2517
            fd.close()
2518
          except EnvironmentError, err:
2519
            # just log the error
2520
            #logging.exception("Error while closing fd %s", fd)
2521
            pass
2522

    
2523
    return result == 0, utils.SafeEncode(output.strip())
2524

    
2525
  def RunHooks(self, hpath, phase, env):
2526
    """Run the scripts in the hooks directory.
2527

2528
    @type hpath: str
2529
    @param hpath: the path to the hooks directory which
2530
        holds the scripts
2531
    @type phase: str
2532
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2533
        L{constants.HOOKS_PHASE_POST}
2534
    @type env: dict
2535
    @param env: dictionary with the environment for the hook
2536
    @rtype: list
2537
    @return: list of 3-element tuples:
2538
      - script path
2539
      - script result, either L{constants.HKR_SUCCESS} or
2540
        L{constants.HKR_FAIL}
2541
      - output of the script
2542

2543
    @raise errors.ProgrammerError: for invalid input
2544
        parameters
2545

2546
    """
2547
    if phase == constants.HOOKS_PHASE_PRE:
2548
      suffix = "pre"
2549
    elif phase == constants.HOOKS_PHASE_POST:
2550
      suffix = "post"
2551
    else:
2552
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2553
    rr = []
2554

    
2555
    subdir = "%s-%s.d" % (hpath, suffix)
2556
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2557
    try:
2558
      dir_contents = utils.ListVisibleFiles(dir_name)
2559
    except OSError, err:
2560
      # FIXME: must log output in case of failures
2561
      return rr
2562

    
2563
    # we use the standard python sort order,
2564
    # so 00name is the recommended naming scheme
2565
    dir_contents.sort()
2566
    for relname in dir_contents:
2567
      fname = os.path.join(dir_name, relname)
2568
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2569
          self.RE_MASK.match(relname) is not None):
2570
        rrval = constants.HKR_SKIP
2571
        output = ""
2572
      else:
2573
        result, output = self.ExecHook(fname, env)
2574
        if not result:
2575
          rrval = constants.HKR_FAIL
2576
        else:
2577
          rrval = constants.HKR_SUCCESS
2578
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2579

    
2580
    return rr
2581

    
2582

    
2583
class IAllocatorRunner(object):
2584
  """IAllocator runner.
2585

2586
  This class is instantiated on the node side (ganeti-noded) and not on
2587
  the master side.
2588

2589
  """
2590
  def Run(self, name, idata):
2591
    """Run an iallocator script.
2592

2593
    @type name: str
2594
    @param name: the iallocator script name
2595
    @type idata: str
2596
    @param idata: the allocator input data
2597

2598
    @rtype: tuple
2599
    @return: four element tuple of:
2600
       - run status (one of the IARUN_ constants)
2601
       - stdout
2602
       - stderr
2603
       - fail reason (as from L{utils.RunResult})
2604

2605
    """
2606
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2607
                                  os.path.isfile)
2608
    if alloc_script is None:
2609
      return (constants.IARUN_NOTFOUND, None, None, None)
2610

    
2611
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2612
    try:
2613
      os.write(fd, idata)
2614
      os.close(fd)
2615
      result = utils.RunCmd([alloc_script, fin_name])
2616
      if result.failed:
2617
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2618
                result.fail_reason)
2619
    finally:
2620
      os.unlink(fin_name)
2621

    
2622
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2623

    
2624

    
2625
class DevCacheManager(object):
2626
  """Simple class for managing a cache of block device information.
2627

2628
  """
2629
  _DEV_PREFIX = "/dev/"
2630
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2631

    
2632
  @classmethod
2633
  def _ConvertPath(cls, dev_path):
2634
    """Converts a /dev/name path to the cache file name.
2635

2636
    This replaces slashes with underscores and strips the /dev
2637
    prefix. It then returns the full path to the cache file.
2638

2639
    @type dev_path: str
2640
    @param dev_path: the C{/dev/} path name
2641
    @rtype: str
2642
    @return: the converted path name
2643

2644
    """
2645
    if dev_path.startswith(cls._DEV_PREFIX):
2646
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2647
    dev_path = dev_path.replace("/", "_")
2648
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2649
    return fpath
2650

    
2651
  @classmethod
2652
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2653
    """Updates the cache information for a given device.
2654

2655
    @type dev_path: str
2656
    @param dev_path: the pathname of the device
2657
    @type owner: str
2658
    @param owner: the owner (instance name) of the device
2659
    @type on_primary: bool
2660
    @param on_primary: whether this is the primary
2661
        node nor not
2662
    @type iv_name: str
2663
    @param iv_name: the instance-visible name of the
2664
        device, as in objects.Disk.iv_name
2665

2666
    @rtype: None
2667

2668
    """
2669
    if dev_path is None:
2670
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2671
      return
2672
    fpath = cls._ConvertPath(dev_path)
2673
    if on_primary:
2674
      state = "primary"
2675
    else:
2676
      state = "secondary"
2677
    if iv_name is None:
2678
      iv_name = "not_visible"
2679
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2680
    try:
2681
      utils.WriteFile(fpath, data=fdata)
2682
    except EnvironmentError, err:
2683
      logging.exception("Can't update bdev cache for %s", dev_path)
2684

    
2685
  @classmethod
2686
  def RemoveCache(cls, dev_path):
2687
    """Remove data for a dev_path.
2688

2689
    This is just a wrapper over L{utils.RemoveFile} with a converted
2690
    path name and logging.
2691

2692
    @type dev_path: str
2693
    @param dev_path: the pathname of the device
2694

2695
    @rtype: None
2696

2697
    """
2698
    if dev_path is None:
2699
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2700
      return
2701
    fpath = cls._ConvertPath(dev_path)
2702
    try:
2703
      utils.RemoveFile(fpath)
2704
    except EnvironmentError, err:
2705
      logging.exception("Can't update bdev cache for %s", dev_path)