Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 0623d351

History | View | Annotate | Download (81.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
class RPCFail(Exception):
50
  """Class denoting RPC failure.
51

52
  Its argument is the error message.
53

54
  """
55

    
56
def _Fail(msg, *args, **kwargs):
57
  """Log an error and the raise an RPCFail exception.
58

59
  This exception is then handled specially in the ganeti daemon and
60
  turned into a 'failed' return type. As such, this function is a
61
  useful shortcut for logging the error and returning it to the master
62
  daemon.
63

64
  @type msg: string
65
  @param msg: the text of the exception
66
  @raise RPCFail
67

68
  """
69
  if args:
70
    msg = msg % args
71
  if "exc" in kwargs and kwargs["exc"]:
72
    logging.exception(msg)
73
  else:
74
    logging.error(msg)
75
  raise RPCFail(msg)
76

    
77

    
78
def _GetConfig():
79
  """Simple wrapper to return a SimpleStore.
80

81
  @rtype: L{ssconf.SimpleStore}
82
  @return: a SimpleStore instance
83

84
  """
85
  return ssconf.SimpleStore()
86

    
87

    
88
def _GetSshRunner(cluster_name):
89
  """Simple wrapper to return an SshRunner.
90

91
  @type cluster_name: str
92
  @param cluster_name: the cluster name, which is needed
93
      by the SshRunner constructor
94
  @rtype: L{ssh.SshRunner}
95
  @return: an SshRunner instance
96

97
  """
98
  return ssh.SshRunner(cluster_name)
99

    
100

    
101
def _Decompress(data):
102
  """Unpacks data compressed by the RPC client.
103

104
  @type data: list or tuple
105
  @param data: Data sent by RPC client
106
  @rtype: str
107
  @return: Decompressed data
108

109
  """
110
  assert isinstance(data, (list, tuple))
111
  assert len(data) == 2
112
  (encoding, content) = data
113
  if encoding == constants.RPC_ENCODING_NONE:
114
    return content
115
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116
    return zlib.decompress(base64.b64decode(content))
117
  else:
118
    raise AssertionError("Unknown data encoding")
119

    
120

    
121
def _CleanDirectory(path, exclude=None):
122
  """Removes all regular files in a directory.
123

124
  @type path: str
125
  @param path: the directory to clean
126
  @type exclude: list
127
  @param exclude: list of files to be excluded, defaults
128
      to the empty list
129

130
  """
131
  if not os.path.isdir(path):
132
    return
133
  if exclude is None:
134
    exclude = []
135
  else:
136
    # Normalize excluded paths
137
    exclude = [os.path.normpath(i) for i in exclude]
138

    
139
  for rel_name in utils.ListVisibleFiles(path):
140
    full_name = os.path.normpath(os.path.join(path, rel_name))
141
    if full_name in exclude:
142
      continue
143
    if os.path.isfile(full_name) and not os.path.islink(full_name):
144
      utils.RemoveFile(full_name)
145

    
146

    
147
def JobQueuePurge():
148
  """Removes job queue files and archived jobs.
149

150
  @rtype: None
151

152
  """
153
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
155

    
156

    
157
def GetMasterInfo():
158
  """Returns master information.
159

160
  This is an utility function to compute master information, either
161
  for consumption here or from the node daemon.
162

163
  @rtype: tuple
164
  @return: (master_netdev, master_ip, master_name) if we have a good
165
      configuration, otherwise (None, None, None)
166

167
  """
168
  try:
169
    cfg = _GetConfig()
170
    master_netdev = cfg.GetMasterNetdev()
171
    master_ip = cfg.GetMasterIP()
172
    master_node = cfg.GetMasterNode()
173
  except errors.ConfigurationError, err:
174
    logging.exception("Cluster configuration incomplete")
175
    return (None, None, None)
176
  return (master_netdev, master_ip, master_node)
177

    
178

    
179
def StartMaster(start_daemons):
180
  """Activate local node as master node.
181

182
  The function will always try activate the IP address of the master
183
  (unless someone else has it). It will also start the master daemons,
184
  based on the start_daemons parameter.
185

186
  @type start_daemons: boolean
187
  @param start_daemons: whther to also start the master
188
      daemons (ganeti-masterd and ganeti-rapi)
189
  @rtype: None
190

191
  """
192
  master_netdev, master_ip, _ = GetMasterInfo()
193
  if not master_netdev:
194
    return False, "Cluster configuration incomplete, cannot read ssconf files"
195

    
196
  payload = []
197
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198
    if utils.OwnIpAddress(master_ip):
199
      # we already have the ip:
200
      logging.debug("Master IP already configured, doing nothing")
201
    else:
202
      msg = "Someone else has the master ip, not activating"
203
      logging.error(msg)
204
      payload.append(msg)
205
  else:
206
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
207
                           "dev", master_netdev, "label",
208
                           "%s:0" % master_netdev])
209
    if result.failed:
210
      msg = "Can't activate master IP: %s" % result.output
211
      logging.error(msg)
212
      payload.append(msg)
213

    
214
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
215
                           "-s", master_ip, master_ip])
216
    # we'll ignore the exit code of arping
217

    
218
  # and now start the master and rapi daemons
219
  if start_daemons:
220
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
221
      result = utils.RunCmd([daemon])
222
      if result.failed:
223
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
224
        logging.error(msg)
225
        payload.append(msg)
226

    
227
  return not payload, "; ".join(payload)
228

    
229

    
230
def StopMaster(stop_daemons):
231
  """Deactivate this node as master.
232

233
  The function will always try to deactivate the IP address of the
234
  master. It will also stop the master daemons depending on the
235
  stop_daemons parameter.
236

237
  @type stop_daemons: boolean
238
  @param stop_daemons: whether to also stop the master daemons
239
      (ganeti-masterd and ganeti-rapi)
240
  @rtype: None
241

242
  """
243
  # TODO: log and report back to the caller the error failures; we
244
  # need to decide in which case we fail the RPC for this
245
  master_netdev, master_ip, _ = GetMasterInfo()
246
  if not master_netdev:
247
    return False, "Cluster configuration incomplete, cannot read ssconf files"
248

    
249
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
250
                         "dev", master_netdev])
251
  if result.failed:
252
    logging.error("Can't remove the master IP, error: %s", result.output)
253
    # but otherwise ignore the failure
254

    
255
  if stop_daemons:
256
    # stop/kill the rapi and the master daemon
257
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
258
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
259

    
260
  return True, None
261

    
262

    
263
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
264
  """Joins this node to the cluster.
265

266
  This does the following:
267
      - updates the hostkeys of the machine (rsa and dsa)
268
      - adds the ssh private key to the user
269
      - adds the ssh public key to the users' authorized_keys file
270

271
  @type dsa: str
272
  @param dsa: the DSA private key to write
273
  @type dsapub: str
274
  @param dsapub: the DSA public key to write
275
  @type rsa: str
276
  @param rsa: the RSA private key to write
277
  @type rsapub: str
278
  @param rsapub: the RSA public key to write
279
  @type sshkey: str
280
  @param sshkey: the SSH private key to write
281
  @type sshpub: str
282
  @param sshpub: the SSH public key to write
283
  @rtype: boolean
284
  @return: the success of the operation
285

286
  """
287
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
288
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
289
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
290
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
291
  for name, content, mode in sshd_keys:
292
    utils.WriteFile(name, data=content, mode=mode)
293

    
294
  try:
295
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
296
                                                    mkdir=True)
297
  except errors.OpExecError, err:
298
    _Fail("Error while processing user ssh files: %s", err, exc=True)
299

    
300
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
301
    utils.WriteFile(name, data=content, mode=0600)
302

    
303
  utils.AddAuthorizedKey(auth_keys, sshpub)
304

    
305
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
306

    
307
  return (True, "Node added successfully")
308

    
309

    
310
def LeaveCluster():
311
  """Cleans up and remove the current node.
312

313
  This function cleans up and prepares the current node to be removed
314
  from the cluster.
315

316
  If processing is successful, then it raises an
317
  L{errors.QuitGanetiException} which is used as a special case to
318
  shutdown the node daemon.
319

320
  """
321
  _CleanDirectory(constants.DATA_DIR)
322
  JobQueuePurge()
323

    
324
  try:
325
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
326

    
327
    f = open(pub_key, 'r')
328
    try:
329
      utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
330
    finally:
331
      f.close()
332

    
333
    utils.RemoveFile(priv_key)
334
    utils.RemoveFile(pub_key)
335
  except errors.OpExecError:
336
    logging.exception("Error while processing ssh files")
337

    
338
  # Raise a custom exception (handled in ganeti-noded)
339
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
340

    
341

    
342
def GetNodeInfo(vgname, hypervisor_type):
343
  """Gives back a hash with different informations about the node.
344

345
  @type vgname: C{string}
346
  @param vgname: the name of the volume group to ask for disk space information
347
  @type hypervisor_type: C{str}
348
  @param hypervisor_type: the name of the hypervisor to ask for
349
      memory information
350
  @rtype: C{dict}
351
  @return: dictionary with the following keys:
352
      - vg_size is the size of the configured volume group in MiB
353
      - vg_free is the free size of the volume group in MiB
354
      - memory_dom0 is the memory allocated for domain0 in MiB
355
      - memory_free is the currently available (free) ram in MiB
356
      - memory_total is the total number of ram in MiB
357

358
  """
359
  outputarray = {}
360
  vginfo = _GetVGInfo(vgname)
361
  outputarray['vg_size'] = vginfo['vg_size']
362
  outputarray['vg_free'] = vginfo['vg_free']
363

    
364
  hyper = hypervisor.GetHypervisor(hypervisor_type)
365
  hyp_info = hyper.GetNodeInfo()
366
  if hyp_info is not None:
367
    outputarray.update(hyp_info)
368

    
369
  f = open("/proc/sys/kernel/random/boot_id", 'r')
370
  try:
371
    outputarray["bootid"] = f.read(128).rstrip("\n")
372
  finally:
373
    f.close()
374

    
375
  return True, outputarray
376

    
377

    
378
def VerifyNode(what, cluster_name):
379
  """Verify the status of the local node.
380

381
  Based on the input L{what} parameter, various checks are done on the
382
  local node.
383

384
  If the I{filelist} key is present, this list of
385
  files is checksummed and the file/checksum pairs are returned.
386

387
  If the I{nodelist} key is present, we check that we have
388
  connectivity via ssh with the target nodes (and check the hostname
389
  report).
390

391
  If the I{node-net-test} key is present, we check that we have
392
  connectivity to the given nodes via both primary IP and, if
393
  applicable, secondary IPs.
394

395
  @type what: C{dict}
396
  @param what: a dictionary of things to check:
397
      - filelist: list of files for which to compute checksums
398
      - nodelist: list of nodes we should check ssh communication with
399
      - node-net-test: list of nodes we should check node daemon port
400
        connectivity with
401
      - hypervisor: list with hypervisors to run the verify for
402
  @rtype: dict
403
  @return: a dictionary with the same keys as the input dict, and
404
      values representing the result of the checks
405

406
  """
407
  result = {}
408

    
409
  if constants.NV_HYPERVISOR in what:
410
    result[constants.NV_HYPERVISOR] = tmp = {}
411
    for hv_name in what[constants.NV_HYPERVISOR]:
412
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
413

    
414
  if constants.NV_FILELIST in what:
415
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
416
      what[constants.NV_FILELIST])
417

    
418
  if constants.NV_NODELIST in what:
419
    result[constants.NV_NODELIST] = tmp = {}
420
    random.shuffle(what[constants.NV_NODELIST])
421
    for node in what[constants.NV_NODELIST]:
422
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
423
      if not success:
424
        tmp[node] = message
425

    
426
  if constants.NV_NODENETTEST in what:
427
    result[constants.NV_NODENETTEST] = tmp = {}
428
    my_name = utils.HostInfo().name
429
    my_pip = my_sip = None
430
    for name, pip, sip in what[constants.NV_NODENETTEST]:
431
      if name == my_name:
432
        my_pip = pip
433
        my_sip = sip
434
        break
435
    if not my_pip:
436
      tmp[my_name] = ("Can't find my own primary/secondary IP"
437
                      " in the node list")
438
    else:
439
      port = utils.GetNodeDaemonPort()
440
      for name, pip, sip in what[constants.NV_NODENETTEST]:
441
        fail = []
442
        if not utils.TcpPing(pip, port, source=my_pip):
443
          fail.append("primary")
444
        if sip != pip:
445
          if not utils.TcpPing(sip, port, source=my_sip):
446
            fail.append("secondary")
447
        if fail:
448
          tmp[name] = ("failure using the %s interface(s)" %
449
                       " and ".join(fail))
450

    
451
  if constants.NV_LVLIST in what:
452
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
453

    
454
  if constants.NV_INSTANCELIST in what:
455
    result[constants.NV_INSTANCELIST] = GetInstanceList(
456
      what[constants.NV_INSTANCELIST])
457

    
458
  if constants.NV_VGLIST in what:
459
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
460

    
461
  if constants.NV_VERSION in what:
462
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
463
                                    constants.RELEASE_VERSION)
464

    
465
  if constants.NV_HVINFO in what:
466
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
467
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
468

    
469
  if constants.NV_DRBDLIST in what:
470
    try:
471
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
472
    except errors.BlockDeviceError, err:
473
      logging.warning("Can't get used minors list", exc_info=True)
474
      used_minors = str(err)
475
    result[constants.NV_DRBDLIST] = used_minors
476

    
477
  return True, result
478

    
479

    
480
def GetVolumeList(vg_name):
481
  """Compute list of logical volumes and their size.
482

483
  @type vg_name: str
484
  @param vg_name: the volume group whose LVs we should list
485
  @rtype: dict
486
  @return:
487
      dictionary of all partions (key) with value being a tuple of
488
      their size (in MiB), inactive and online status::
489

490
        {'test1': ('20.06', True, True)}
491

492
      in case of errors, a string is returned with the error
493
      details.
494

495
  """
496
  lvs = {}
497
  sep = '|'
498
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
499
                         "--separator=%s" % sep,
500
                         "-olv_name,lv_size,lv_attr", vg_name])
501
  if result.failed:
502
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
503

    
504
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
505
  for line in result.stdout.splitlines():
506
    line = line.strip()
507
    match = valid_line_re.match(line)
508
    if not match:
509
      logging.error("Invalid line returned from lvs output: '%s'", line)
510
      continue
511
    name, size, attr = match.groups()
512
    inactive = attr[4] == '-'
513
    online = attr[5] == 'o'
514
    lvs[name] = (size, inactive, online)
515

    
516
  return lvs
517

    
518

    
519
def ListVolumeGroups():
520
  """List the volume groups and their size.
521

522
  @rtype: dict
523
  @return: dictionary with keys volume name and values the
524
      size of the volume
525

526
  """
527
  return True, utils.ListVolumeGroups()
528

    
529

    
530
def NodeVolumes():
531
  """List all volumes on this node.
532

533
  @rtype: list
534
  @return:
535
    A list of dictionaries, each having four keys:
536
      - name: the logical volume name,
537
      - size: the size of the logical volume
538
      - dev: the physical device on which the LV lives
539
      - vg: the volume group to which it belongs
540

541
    In case of errors, we return an empty list and log the
542
    error.
543

544
    Note that since a logical volume can live on multiple physical
545
    volumes, the resulting list might include a logical volume
546
    multiple times.
547

548
  """
549
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
550
                         "--separator=|",
551
                         "--options=lv_name,lv_size,devices,vg_name"])
552
  if result.failed:
553
    logging.error("Failed to list logical volumes, lvs output: %s",
554
                  result.output)
555
    return []
556

    
557
  def parse_dev(dev):
558
    if '(' in dev:
559
      return dev.split('(')[0]
560
    else:
561
      return dev
562

    
563
  def map_line(line):
564
    return {
565
      'name': line[0].strip(),
566
      'size': line[1].strip(),
567
      'dev': parse_dev(line[2].strip()),
568
      'vg': line[3].strip(),
569
    }
570

    
571
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
572
          if line.count('|') >= 3]
573

    
574

    
575
def BridgesExist(bridges_list):
576
  """Check if a list of bridges exist on the current node.
577

578
  @rtype: boolean
579
  @return: C{True} if all of them exist, C{False} otherwise
580

581
  """
582
  missing = []
583
  for bridge in bridges_list:
584
    if not utils.BridgeExists(bridge):
585
      missing.append(bridge)
586

    
587
  if missing:
588
    return False, "Missing bridges %s" % (", ".join(missing),)
589

    
590
  return True, None
591

    
592

    
593
def GetInstanceList(hypervisor_list):
594
  """Provides a list of instances.
595

596
  @type hypervisor_list: list
597
  @param hypervisor_list: the list of hypervisors to query information
598

599
  @rtype: list
600
  @return: a list of all running instances on the current node
601
    - instance1.example.com
602
    - instance2.example.com
603

604
  """
605
  results = []
606
  for hname in hypervisor_list:
607
    try:
608
      names = hypervisor.GetHypervisor(hname).ListInstances()
609
      results.extend(names)
610
    except errors.HypervisorError, err:
611
      _Fail("Error enumerating instances (hypervisor %s): %s",
612
            hname, err, exc=True)
613

    
614
  return results
615

    
616

    
617
def GetInstanceInfo(instance, hname):
618
  """Gives back the informations about an instance as a dictionary.
619

620
  @type instance: string
621
  @param instance: the instance name
622
  @type hname: string
623
  @param hname: the hypervisor type of the instance
624

625
  @rtype: dict
626
  @return: dictionary with the following keys:
627
      - memory: memory size of instance (int)
628
      - state: xen state of instance (string)
629
      - time: cpu time of instance (float)
630

631
  """
632
  output = {}
633

    
634
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
635
  if iinfo is not None:
636
    output['memory'] = iinfo[2]
637
    output['state'] = iinfo[4]
638
    output['time'] = iinfo[5]
639

    
640
  return True, output
641

    
642

    
643
def GetInstanceMigratable(instance):
644
  """Gives whether an instance can be migrated.
645

646
  @type instance: L{objects.Instance}
647
  @param instance: object representing the instance to be checked.
648

649
  @rtype: tuple
650
  @return: tuple of (result, description) where:
651
      - result: whether the instance can be migrated or not
652
      - description: a description of the issue, if relevant
653

654
  """
655
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
656
  if instance.name not in hyper.ListInstances():
657
    return (False, 'not running')
658

    
659
  for idx in range(len(instance.disks)):
660
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
661
    if not os.path.islink(link_name):
662
      return (False, 'not restarted since ganeti 1.2.5')
663

    
664
  return (True, '')
665

    
666

    
667
def GetAllInstancesInfo(hypervisor_list):
668
  """Gather data about all instances.
669

670
  This is the equivalent of L{GetInstanceInfo}, except that it
671
  computes data for all instances at once, thus being faster if one
672
  needs data about more than one instance.
673

674
  @type hypervisor_list: list
675
  @param hypervisor_list: list of hypervisors to query for instance data
676

677
  @rtype: dict
678
  @return: dictionary of instance: data, with data having the following keys:
679
      - memory: memory size of instance (int)
680
      - state: xen state of instance (string)
681
      - time: cpu time of instance (float)
682
      - vcpus: the number of vcpus
683

684
  """
685
  output = {}
686

    
687
  for hname in hypervisor_list:
688
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
689
    if iinfo:
690
      for name, inst_id, memory, vcpus, state, times in iinfo:
691
        value = {
692
          'memory': memory,
693
          'vcpus': vcpus,
694
          'state': state,
695
          'time': times,
696
          }
697
        if name in output:
698
          # we only check static parameters, like memory and vcpus,
699
          # and not state and time which can change between the
700
          # invocations of the different hypervisors
701
          for key in 'memory', 'vcpus':
702
            if value[key] != output[name][key]:
703
              _Fail("Instance %s is running twice"
704
                    " with different parameters", name)
705
        output[name] = value
706

    
707
  return True, output
708

    
709

    
710
def InstanceOsAdd(instance, reinstall):
711
  """Add an OS to an instance.
712

713
  @type instance: L{objects.Instance}
714
  @param instance: Instance whose OS is to be installed
715
  @type reinstall: boolean
716
  @param reinstall: whether this is an instance reinstall
717
  @rtype: boolean
718
  @return: the success of the operation
719

720
  """
721
  try:
722
    inst_os = OSFromDisk(instance.os)
723
  except errors.InvalidOS, err:
724
    os_name, os_dir, os_err = err.args
725
    if os_dir is None:
726
      return (False, "Can't find OS '%s': %s" % (os_name, os_err))
727
    else:
728
      return (False, "Error parsing OS '%s' in directory %s: %s" %
729
              (os_name, os_dir, os_err))
730

    
731
  create_env = OSEnvironment(instance)
732
  if reinstall:
733
    create_env['INSTANCE_REINSTALL'] = "1"
734

    
735
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
736
                                     instance.name, int(time.time()))
737

    
738
  result = utils.RunCmd([inst_os.create_script], env=create_env,
739
                        cwd=inst_os.path, output=logfile,)
740
  if result.failed:
741
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
742
                  " output: %s", result.cmd, result.fail_reason, logfile,
743
                  result.output)
744
    lines = [utils.SafeEncode(val)
745
             for val in utils.TailFile(logfile, lines=20)]
746
    return (False, "OS create script failed (%s), last lines in the"
747
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
748

    
749
  return (True, "Successfully installed")
750

    
751

    
752
def RunRenameInstance(instance, old_name):
753
  """Run the OS rename script for an instance.
754

755
  @type instance: L{objects.Instance}
756
  @param instance: Instance whose OS is to be installed
757
  @type old_name: string
758
  @param old_name: previous instance name
759
  @rtype: boolean
760
  @return: the success of the operation
761

762
  """
763
  inst_os = OSFromDisk(instance.os)
764

    
765
  rename_env = OSEnvironment(instance)
766
  rename_env['OLD_INSTANCE_NAME'] = old_name
767

    
768
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
769
                                           old_name,
770
                                           instance.name, int(time.time()))
771

    
772
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
773
                        cwd=inst_os.path, output=logfile)
774

    
775
  if result.failed:
776
    logging.error("os create command '%s' returned error: %s output: %s",
777
                  result.cmd, result.fail_reason, result.output)
778
    lines = [utils.SafeEncode(val)
779
             for val in utils.TailFile(logfile, lines=20)]
780
    return (False, "OS rename script failed (%s), last lines in the"
781
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
782

    
783
  return (True, "Rename successful")
784

    
785

    
786
def _GetVGInfo(vg_name):
787
  """Get informations about the volume group.
788

789
  @type vg_name: str
790
  @param vg_name: the volume group which we query
791
  @rtype: dict
792
  @return:
793
    A dictionary with the following keys:
794
      - C{vg_size} is the total size of the volume group in MiB
795
      - C{vg_free} is the free size of the volume group in MiB
796
      - C{pv_count} are the number of physical disks in that VG
797

798
    If an error occurs during gathering of data, we return the same dict
799
    with keys all set to None.
800

801
  """
802
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
803

    
804
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
805
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
806

    
807
  if retval.failed:
808
    logging.error("volume group %s not present", vg_name)
809
    return retdic
810
  valarr = retval.stdout.strip().rstrip(':').split(':')
811
  if len(valarr) == 3:
812
    try:
813
      retdic = {
814
        "vg_size": int(round(float(valarr[0]), 0)),
815
        "vg_free": int(round(float(valarr[1]), 0)),
816
        "pv_count": int(valarr[2]),
817
        }
818
    except ValueError, err:
819
      logging.exception("Fail to parse vgs output")
820
  else:
821
    logging.error("vgs output has the wrong number of fields (expected"
822
                  " three): %s", str(valarr))
823
  return retdic
824

    
825

    
826
def _GetBlockDevSymlinkPath(instance_name, idx):
827
  return os.path.join(constants.DISK_LINKS_DIR,
828
                      "%s:%d" % (instance_name, idx))
829

    
830

    
831
def _SymlinkBlockDev(instance_name, device_path, idx):
832
  """Set up symlinks to a instance's block device.
833

834
  This is an auxiliary function run when an instance is start (on the primary
835
  node) or when an instance is migrated (on the target node).
836

837

838
  @param instance_name: the name of the target instance
839
  @param device_path: path of the physical block device, on the node
840
  @param idx: the disk index
841
  @return: absolute path to the disk's symlink
842

843
  """
844
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
845
  try:
846
    os.symlink(device_path, link_name)
847
  except OSError, err:
848
    if err.errno == errno.EEXIST:
849
      if (not os.path.islink(link_name) or
850
          os.readlink(link_name) != device_path):
851
        os.remove(link_name)
852
        os.symlink(device_path, link_name)
853
    else:
854
      raise
855

    
856
  return link_name
857

    
858

    
859
def _RemoveBlockDevLinks(instance_name, disks):
860
  """Remove the block device symlinks belonging to the given instance.
861

862
  """
863
  for idx, disk in enumerate(disks):
864
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
865
    if os.path.islink(link_name):
866
      try:
867
        os.remove(link_name)
868
      except OSError:
869
        logging.exception("Can't remove symlink '%s'", link_name)
870

    
871

    
872
def _GatherAndLinkBlockDevs(instance):
873
  """Set up an instance's block device(s).
874

875
  This is run on the primary node at instance startup. The block
876
  devices must be already assembled.
877

878
  @type instance: L{objects.Instance}
879
  @param instance: the instance whose disks we shoul assemble
880
  @rtype: list
881
  @return: list of (disk_object, device_path)
882

883
  """
884
  block_devices = []
885
  for idx, disk in enumerate(instance.disks):
886
    device = _RecursiveFindBD(disk)
887
    if device is None:
888
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
889
                                    str(disk))
890
    device.Open()
891
    try:
892
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
893
    except OSError, e:
894
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
895
                                    e.strerror)
896

    
897
    block_devices.append((disk, link_name))
898

    
899
  return block_devices
900

    
901

    
902
def StartInstance(instance):
903
  """Start an instance.
904

905
  @type instance: L{objects.Instance}
906
  @param instance: the instance object
907
  @rtype: boolean
908
  @return: whether the startup was successful or not
909

910
  """
911
  running_instances = GetInstanceList([instance.hypervisor])
912

    
913
  if instance.name in running_instances:
914
    return (True, "Already running")
915

    
916
  try:
917
    block_devices = _GatherAndLinkBlockDevs(instance)
918
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
919
    hyper.StartInstance(instance, block_devices)
920
  except errors.BlockDeviceError, err:
921
    _Fail("Block device error: %s", err, exc=True)
922
  except errors.HypervisorError, err:
923
    _RemoveBlockDevLinks(instance.name, instance.disks)
924
    _Fail("Hypervisor error: %s", err, exc=True)
925

    
926
  return (True, "Instance started successfully")
927

    
928

    
929
def InstanceShutdown(instance):
930
  """Shut an instance down.
931

932
  @note: this functions uses polling with a hardcoded timeout.
933

934
  @type instance: L{objects.Instance}
935
  @param instance: the instance object
936
  @rtype: boolean
937
  @return: whether the startup was successful or not
938

939
  """
940
  hv_name = instance.hypervisor
941
  running_instances = GetInstanceList([hv_name])
942

    
943
  if instance.name not in running_instances:
944
    return (True, "Instance already stopped")
945

    
946
  hyper = hypervisor.GetHypervisor(hv_name)
947
  try:
948
    hyper.StopInstance(instance)
949
  except errors.HypervisorError, err:
950
    _Fail("Failed to stop instance %s: %s", instance.name, err)
951

    
952
  # test every 10secs for 2min
953

    
954
  time.sleep(1)
955
  for dummy in range(11):
956
    if instance.name not in GetInstanceList([hv_name]):
957
      break
958
    time.sleep(10)
959
  else:
960
    # the shutdown did not succeed
961
    logging.error("Shutdown of '%s' unsuccessful, using destroy",
962
                  instance.name)
963

    
964
    try:
965
      hyper.StopInstance(instance, force=True)
966
    except errors.HypervisorError, err:
967
      _Fail("Failed to force stop instance %s: %s", instance.name, err)
968

    
969
    time.sleep(1)
970
    if instance.name in GetInstanceList([hv_name]):
971
      _Fail("Could not shutdown instance %s even by destroy", instance.name)
972

    
973
  _RemoveBlockDevLinks(instance.name, instance.disks)
974

    
975
  return (True, "Instance has been shutdown successfully")
976

    
977

    
978
def InstanceReboot(instance, reboot_type):
979
  """Reboot an instance.
980

981
  @type instance: L{objects.Instance}
982
  @param instance: the instance object to reboot
983
  @type reboot_type: str
984
  @param reboot_type: the type of reboot, one the following
985
    constants:
986
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
987
        instance OS, do not recreate the VM
988
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
989
        restart the VM (at the hypervisor level)
990
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
991
        is not accepted here, since that mode is handled
992
        differently
993
  @rtype: boolean
994
  @return: the success of the operation
995

996
  """
997
  running_instances = GetInstanceList([instance.hypervisor])
998

    
999
  if instance.name not in running_instances:
1000
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1001

    
1002
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1003
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1004
    try:
1005
      hyper.RebootInstance(instance)
1006
    except errors.HypervisorError, err:
1007
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1008
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1009
    try:
1010
      stop_result = InstanceShutdown(instance)
1011
      if not stop_result[0]:
1012
        return stop_result
1013
      return StartInstance(instance)
1014
    except errors.HypervisorError, err:
1015
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1016
  else:
1017
    _Fail("Invalid reboot_type received: %s", reboot_type)
1018

    
1019
  return (True, "Reboot successful")
1020

    
1021

    
1022
def MigrationInfo(instance):
1023
  """Gather information about an instance to be migrated.
1024

1025
  @type instance: L{objects.Instance}
1026
  @param instance: the instance definition
1027

1028
  """
1029
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1030
  try:
1031
    info = hyper.MigrationInfo(instance)
1032
  except errors.HypervisorError, err:
1033
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1034
  return (True, info)
1035

    
1036

    
1037
def AcceptInstance(instance, info, target):
1038
  """Prepare the node to accept an instance.
1039

1040
  @type instance: L{objects.Instance}
1041
  @param instance: the instance definition
1042
  @type info: string/data (opaque)
1043
  @param info: migration information, from the source node
1044
  @type target: string
1045
  @param target: target host (usually ip), on this node
1046

1047
  """
1048
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1049
  try:
1050
    hyper.AcceptInstance(instance, info, target)
1051
  except errors.HypervisorError, err:
1052
    _Fail("Failed to accept instance: %s", err, exc=True)
1053
  return (True, "Accept successfull")
1054

    
1055

    
1056
def FinalizeMigration(instance, info, success):
1057
  """Finalize any preparation to accept an instance.
1058

1059
  @type instance: L{objects.Instance}
1060
  @param instance: the instance definition
1061
  @type info: string/data (opaque)
1062
  @param info: migration information, from the source node
1063
  @type success: boolean
1064
  @param success: whether the migration was a success or a failure
1065

1066
  """
1067
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1068
  try:
1069
    hyper.FinalizeMigration(instance, info, success)
1070
  except errors.HypervisorError, err:
1071
    _Fail("Failed to finalize migration: %s", err, exc=True)
1072
  return (True, "Migration Finalized")
1073

    
1074

    
1075
def MigrateInstance(instance, target, live):
1076
  """Migrates an instance to another node.
1077

1078
  @type instance: L{objects.Instance}
1079
  @param instance: the instance definition
1080
  @type target: string
1081
  @param target: the target node name
1082
  @type live: boolean
1083
  @param live: whether the migration should be done live or not (the
1084
      interpretation of this parameter is left to the hypervisor)
1085
  @rtype: tuple
1086
  @return: a tuple of (success, msg) where:
1087
      - succes is a boolean denoting the success/failure of the operation
1088
      - msg is a string with details in case of failure
1089

1090
  """
1091
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1092

    
1093
  try:
1094
    hyper.MigrateInstance(instance.name, target, live)
1095
  except errors.HypervisorError, err:
1096
    _Fail("Failed to migrate instance: %s", err, exc=True)
1097
  return (True, "Migration successfull")
1098

    
1099

    
1100
def BlockdevCreate(disk, size, owner, on_primary, info):
1101
  """Creates a block device for an instance.
1102

1103
  @type disk: L{objects.Disk}
1104
  @param disk: the object describing the disk we should create
1105
  @type size: int
1106
  @param size: the size of the physical underlying device, in MiB
1107
  @type owner: str
1108
  @param owner: the name of the instance for which disk is created,
1109
      used for device cache data
1110
  @type on_primary: boolean
1111
  @param on_primary:  indicates if it is the primary node or not
1112
  @type info: string
1113
  @param info: string that will be sent to the physical device
1114
      creation, used for example to set (LVM) tags on LVs
1115

1116
  @return: the new unique_id of the device (this can sometime be
1117
      computed only after creation), or None. On secondary nodes,
1118
      it's not required to return anything.
1119

1120
  """
1121
  clist = []
1122
  if disk.children:
1123
    for child in disk.children:
1124
      try:
1125
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1126
      except errors.BlockDeviceError, err:
1127
        _Fail("Can't assemble device %s: %s", child, err)
1128
      if on_primary or disk.AssembleOnSecondary():
1129
        # we need the children open in case the device itself has to
1130
        # be assembled
1131
        try:
1132
          crdev.Open()
1133
        except errors.BlockDeviceError, err:
1134
          _Fail("Can't make child '%s' read-write: %s", child, err)
1135
      clist.append(crdev)
1136

    
1137
  try:
1138
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1139
  except errors.BlockDeviceError, err:
1140
    _Fail("Can't create block device: %s", err)
1141

    
1142
  if on_primary or disk.AssembleOnSecondary():
1143
    try:
1144
      device.Assemble()
1145
    except errors.BlockDeviceError, err:
1146
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1147
    device.SetSyncSpeed(constants.SYNC_SPEED)
1148
    if on_primary or disk.OpenOnSecondary():
1149
      try:
1150
        device.Open(force=True)
1151
      except errors.BlockDeviceError, err:
1152
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1153
    DevCacheManager.UpdateCache(device.dev_path, owner,
1154
                                on_primary, disk.iv_name)
1155

    
1156
  device.SetInfo(info)
1157

    
1158
  physical_id = device.unique_id
1159
  return True, physical_id
1160

    
1161

    
1162
def BlockdevRemove(disk):
1163
  """Remove a block device.
1164

1165
  @note: This is intended to be called recursively.
1166

1167
  @type disk: L{objects.Disk}
1168
  @param disk: the disk object we should remove
1169
  @rtype: boolean
1170
  @return: the success of the operation
1171

1172
  """
1173
  msgs = []
1174
  result = True
1175
  try:
1176
    rdev = _RecursiveFindBD(disk)
1177
  except errors.BlockDeviceError, err:
1178
    # probably can't attach
1179
    logging.info("Can't attach to device %s in remove", disk)
1180
    rdev = None
1181
  if rdev is not None:
1182
    r_path = rdev.dev_path
1183
    try:
1184
      rdev.Remove()
1185
    except errors.BlockDeviceError, err:
1186
      msgs.append(str(err))
1187
      result = False
1188
    if result:
1189
      DevCacheManager.RemoveCache(r_path)
1190

    
1191
  if disk.children:
1192
    for child in disk.children:
1193
      c_status, c_msg = BlockdevRemove(child)
1194
      result = result and c_status
1195
      if c_msg: # not an empty message
1196
        msgs.append(c_msg)
1197

    
1198
  return (result, "; ".join(msgs))
1199

    
1200

    
1201
def _RecursiveAssembleBD(disk, owner, as_primary):
1202
  """Activate a block device for an instance.
1203

1204
  This is run on the primary and secondary nodes for an instance.
1205

1206
  @note: this function is called recursively.
1207

1208
  @type disk: L{objects.Disk}
1209
  @param disk: the disk we try to assemble
1210
  @type owner: str
1211
  @param owner: the name of the instance which owns the disk
1212
  @type as_primary: boolean
1213
  @param as_primary: if we should make the block device
1214
      read/write
1215

1216
  @return: the assembled device or None (in case no device
1217
      was assembled)
1218
  @raise errors.BlockDeviceError: in case there is an error
1219
      during the activation of the children or the device
1220
      itself
1221

1222
  """
1223
  children = []
1224
  if disk.children:
1225
    mcn = disk.ChildrenNeeded()
1226
    if mcn == -1:
1227
      mcn = 0 # max number of Nones allowed
1228
    else:
1229
      mcn = len(disk.children) - mcn # max number of Nones
1230
    for chld_disk in disk.children:
1231
      try:
1232
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1233
      except errors.BlockDeviceError, err:
1234
        if children.count(None) >= mcn:
1235
          raise
1236
        cdev = None
1237
        logging.error("Error in child activation (but continuing): %s",
1238
                      str(err))
1239
      children.append(cdev)
1240

    
1241
  if as_primary or disk.AssembleOnSecondary():
1242
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1243
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1244
    result = r_dev
1245
    if as_primary or disk.OpenOnSecondary():
1246
      r_dev.Open()
1247
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1248
                                as_primary, disk.iv_name)
1249

    
1250
  else:
1251
    result = True
1252
  return result
1253

    
1254

    
1255
def BlockdevAssemble(disk, owner, as_primary):
1256
  """Activate a block device for an instance.
1257

1258
  This is a wrapper over _RecursiveAssembleBD.
1259

1260
  @rtype: str or boolean
1261
  @return: a C{/dev/...} path for primary nodes, and
1262
      C{True} for secondary nodes
1263

1264
  """
1265
  status = True
1266
  result = "no error information"
1267
  try:
1268
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1269
    if isinstance(result, bdev.BlockDev):
1270
      result = result.dev_path
1271
  except errors.BlockDeviceError, err:
1272
    result = "Error while assembling disk: %s" % str(err)
1273
    status = False
1274
  return (status, result)
1275

    
1276

    
1277
def BlockdevShutdown(disk):
1278
  """Shut down a block device.
1279

1280
  First, if the device is assembled (Attach() is successfull), then
1281
  the device is shutdown. Then the children of the device are
1282
  shutdown.
1283

1284
  This function is called recursively. Note that we don't cache the
1285
  children or such, as oppossed to assemble, shutdown of different
1286
  devices doesn't require that the upper device was active.
1287

1288
  @type disk: L{objects.Disk}
1289
  @param disk: the description of the disk we should
1290
      shutdown
1291
  @rtype: boolean
1292
  @return: the success of the operation
1293

1294
  """
1295
  msgs = []
1296
  result = True
1297
  r_dev = _RecursiveFindBD(disk)
1298
  if r_dev is not None:
1299
    r_path = r_dev.dev_path
1300
    try:
1301
      r_dev.Shutdown()
1302
      DevCacheManager.RemoveCache(r_path)
1303
    except errors.BlockDeviceError, err:
1304
      msgs.append(str(err))
1305
      result = False
1306

    
1307
  if disk.children:
1308
    for child in disk.children:
1309
      c_status, c_msg = BlockdevShutdown(child)
1310
      result = result and c_status
1311
      if c_msg: # not an empty message
1312
        msgs.append(c_msg)
1313

    
1314
  return (result, "; ".join(msgs))
1315

    
1316

    
1317
def BlockdevAddchildren(parent_cdev, new_cdevs):
1318
  """Extend a mirrored block device.
1319

1320
  @type parent_cdev: L{objects.Disk}
1321
  @param parent_cdev: the disk to which we should add children
1322
  @type new_cdevs: list of L{objects.Disk}
1323
  @param new_cdevs: the list of children which we should add
1324
  @rtype: boolean
1325
  @return: the success of the operation
1326

1327
  """
1328
  parent_bdev = _RecursiveFindBD(parent_cdev)
1329
  if parent_bdev is None:
1330
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1331
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1332
  if new_bdevs.count(None) > 0:
1333
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1334
  parent_bdev.AddChildren(new_bdevs)
1335
  return (True, None)
1336

    
1337

    
1338
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1339
  """Shrink a mirrored block device.
1340

1341
  @type parent_cdev: L{objects.Disk}
1342
  @param parent_cdev: the disk from which we should remove children
1343
  @type new_cdevs: list of L{objects.Disk}
1344
  @param new_cdevs: the list of children which we should remove
1345
  @rtype: boolean
1346
  @return: the success of the operation
1347

1348
  """
1349
  parent_bdev = _RecursiveFindBD(parent_cdev)
1350
  if parent_bdev is None:
1351
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1352
  devs = []
1353
  for disk in new_cdevs:
1354
    rpath = disk.StaticDevPath()
1355
    if rpath is None:
1356
      bd = _RecursiveFindBD(disk)
1357
      if bd is None:
1358
        _Fail("Can't find device %s while removing children", disk)
1359
      else:
1360
        devs.append(bd.dev_path)
1361
    else:
1362
      devs.append(rpath)
1363
  parent_bdev.RemoveChildren(devs)
1364
  return (True, None)
1365

    
1366

    
1367
def BlockdevGetmirrorstatus(disks):
1368
  """Get the mirroring status of a list of devices.
1369

1370
  @type disks: list of L{objects.Disk}
1371
  @param disks: the list of disks which we should query
1372
  @rtype: disk
1373
  @return:
1374
      a list of (mirror_done, estimated_time) tuples, which
1375
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1376
  @raise errors.BlockDeviceError: if any of the disks cannot be
1377
      found
1378

1379
  """
1380
  stats = []
1381
  for dsk in disks:
1382
    rbd = _RecursiveFindBD(dsk)
1383
    if rbd is None:
1384
      _Fail("Can't find device %s", dsk)
1385
    stats.append(rbd.CombinedSyncStatus())
1386
  return True, stats
1387

    
1388

    
1389
def _RecursiveFindBD(disk):
1390
  """Check if a device is activated.
1391

1392
  If so, return informations about the real device.
1393

1394
  @type disk: L{objects.Disk}
1395
  @param disk: the disk object we need to find
1396

1397
  @return: None if the device can't be found,
1398
      otherwise the device instance
1399

1400
  """
1401
  children = []
1402
  if disk.children:
1403
    for chdisk in disk.children:
1404
      children.append(_RecursiveFindBD(chdisk))
1405

    
1406
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1407

    
1408

    
1409
def BlockdevFind(disk):
1410
  """Check if a device is activated.
1411

1412
  If it is, return informations about the real device.
1413

1414
  @type disk: L{objects.Disk}
1415
  @param disk: the disk to find
1416
  @rtype: None or tuple
1417
  @return: None if the disk cannot be found, otherwise a
1418
      tuple (device_path, major, minor, sync_percent,
1419
      estimated_time, is_degraded)
1420

1421
  """
1422
  try:
1423
    rbd = _RecursiveFindBD(disk)
1424
  except errors.BlockDeviceError, err:
1425
    _Fail("Failed to find device: %s", err, exc=True)
1426
  if rbd is None:
1427
    return (True, None)
1428
  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1429

    
1430

    
1431
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1432
  """Write a file to the filesystem.
1433

1434
  This allows the master to overwrite(!) a file. It will only perform
1435
  the operation if the file belongs to a list of configuration files.
1436

1437
  @type file_name: str
1438
  @param file_name: the target file name
1439
  @type data: str
1440
  @param data: the new contents of the file
1441
  @type mode: int
1442
  @param mode: the mode to give the file (can be None)
1443
  @type uid: int
1444
  @param uid: the owner of the file (can be -1 for default)
1445
  @type gid: int
1446
  @param gid: the group of the file (can be -1 for default)
1447
  @type atime: float
1448
  @param atime: the atime to set on the file (can be None)
1449
  @type mtime: float
1450
  @param mtime: the mtime to set on the file (can be None)
1451
  @rtype: boolean
1452
  @return: the success of the operation; errors are logged
1453
      in the node daemon log
1454

1455
  """
1456
  if not os.path.isabs(file_name):
1457
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1458

    
1459
  allowed_files = set([
1460
    constants.CLUSTER_CONF_FILE,
1461
    constants.ETC_HOSTS,
1462
    constants.SSH_KNOWN_HOSTS_FILE,
1463
    constants.VNC_PASSWORD_FILE,
1464
    constants.RAPI_CERT_FILE,
1465
    constants.RAPI_USERS_FILE,
1466
    ])
1467

    
1468
  for hv_name in constants.HYPER_TYPES:
1469
    hv_class = hypervisor.GetHypervisor(hv_name)
1470
    allowed_files.update(hv_class.GetAncillaryFiles())
1471

    
1472
  if file_name not in allowed_files:
1473
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1474
          file_name)
1475

    
1476
  raw_data = _Decompress(data)
1477

    
1478
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1479
                  atime=atime, mtime=mtime)
1480
  return (True, "success")
1481

    
1482

    
1483
def WriteSsconfFiles(values):
1484
  """Update all ssconf files.
1485

1486
  Wrapper around the SimpleStore.WriteFiles.
1487

1488
  """
1489
  ssconf.SimpleStore().WriteFiles(values)
1490

    
1491

    
1492
def _ErrnoOrStr(err):
1493
  """Format an EnvironmentError exception.
1494

1495
  If the L{err} argument has an errno attribute, it will be looked up
1496
  and converted into a textual C{E...} description. Otherwise the
1497
  string representation of the error will be returned.
1498

1499
  @type err: L{EnvironmentError}
1500
  @param err: the exception to format
1501

1502
  """
1503
  if hasattr(err, 'errno'):
1504
    detail = errno.errorcode[err.errno]
1505
  else:
1506
    detail = str(err)
1507
  return detail
1508

    
1509

    
1510
def _OSOndiskVersion(name, os_dir):
1511
  """Compute and return the API version of a given OS.
1512

1513
  This function will try to read the API version of the OS given by
1514
  the 'name' parameter and residing in the 'os_dir' directory.
1515

1516
  @type name: str
1517
  @param name: the OS name we should look for
1518
  @type os_dir: str
1519
  @param os_dir: the directory inwhich we should look for the OS
1520
  @rtype: int or None
1521
  @return:
1522
      Either an integer denoting the version or None in the
1523
      case when this is not a valid OS name.
1524
  @raise errors.InvalidOS: if the OS cannot be found
1525

1526
  """
1527
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1528

    
1529
  try:
1530
    st = os.stat(api_file)
1531
  except EnvironmentError, err:
1532
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1533
                           " found (%s)" % _ErrnoOrStr(err))
1534

    
1535
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1536
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1537
                           " a regular file")
1538

    
1539
  try:
1540
    f = open(api_file)
1541
    try:
1542
      api_versions = f.readlines()
1543
    finally:
1544
      f.close()
1545
  except EnvironmentError, err:
1546
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1547
                           " API version (%s)" % _ErrnoOrStr(err))
1548

    
1549
  api_versions = [version.strip() for version in api_versions]
1550
  try:
1551
    api_versions = [int(version) for version in api_versions]
1552
  except (TypeError, ValueError), err:
1553
    raise errors.InvalidOS(name, os_dir,
1554
                           "API version is not integer (%s)" % str(err))
1555

    
1556
  return api_versions
1557

    
1558

    
1559
def DiagnoseOS(top_dirs=None):
1560
  """Compute the validity for all OSes.
1561

1562
  @type top_dirs: list
1563
  @param top_dirs: the list of directories in which to
1564
      search (if not given defaults to
1565
      L{constants.OS_SEARCH_PATH})
1566
  @rtype: list of L{objects.OS}
1567
  @return: an OS object for each name in all the given
1568
      directories
1569

1570
  """
1571
  if top_dirs is None:
1572
    top_dirs = constants.OS_SEARCH_PATH
1573

    
1574
  result = []
1575
  for dir_name in top_dirs:
1576
    if os.path.isdir(dir_name):
1577
      try:
1578
        f_names = utils.ListVisibleFiles(dir_name)
1579
      except EnvironmentError, err:
1580
        logging.exception("Can't list the OS directory %s", dir_name)
1581
        break
1582
      for name in f_names:
1583
        try:
1584
          os_inst = OSFromDisk(name, base_dir=dir_name)
1585
          result.append(os_inst)
1586
        except errors.InvalidOS, err:
1587
          result.append(objects.OS.FromInvalidOS(err))
1588

    
1589
  return result
1590

    
1591

    
1592
def OSFromDisk(name, base_dir=None):
1593
  """Create an OS instance from disk.
1594

1595
  This function will return an OS instance if the given name is a
1596
  valid OS name. Otherwise, it will raise an appropriate
1597
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1598

1599
  @type base_dir: string
1600
  @keyword base_dir: Base directory containing OS installations.
1601
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1602
  @rtype: L{objects.OS}
1603
  @return: the OS instance if we find a valid one
1604
  @raise errors.InvalidOS: if we don't find a valid OS
1605

1606
  """
1607
  if base_dir is None:
1608
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1609
    if os_dir is None:
1610
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1611
  else:
1612
    os_dir = os.path.sep.join([base_dir, name])
1613

    
1614
  api_versions = _OSOndiskVersion(name, os_dir)
1615

    
1616
  if constants.OS_API_VERSION not in api_versions:
1617
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1618
                           " (found %s want %s)"
1619
                           % (api_versions, constants.OS_API_VERSION))
1620

    
1621
  # OS Scripts dictionary, we will populate it with the actual script names
1622
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1623

    
1624
  for script in os_scripts:
1625
    os_scripts[script] = os.path.sep.join([os_dir, script])
1626

    
1627
    try:
1628
      st = os.stat(os_scripts[script])
1629
    except EnvironmentError, err:
1630
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1631
                             (script, _ErrnoOrStr(err)))
1632

    
1633
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1634
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1635
                             script)
1636

    
1637
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1638
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1639
                             script)
1640

    
1641

    
1642
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1643
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1644
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1645
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1646
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1647
                    api_versions=api_versions)
1648

    
1649
def OSEnvironment(instance, debug=0):
1650
  """Calculate the environment for an os script.
1651

1652
  @type instance: L{objects.Instance}
1653
  @param instance: target instance for the os script run
1654
  @type debug: integer
1655
  @param debug: debug level (0 or 1, for OS Api 10)
1656
  @rtype: dict
1657
  @return: dict of environment variables
1658
  @raise errors.BlockDeviceError: if the block device
1659
      cannot be found
1660

1661
  """
1662
  result = {}
1663
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1664
  result['INSTANCE_NAME'] = instance.name
1665
  result['INSTANCE_OS'] = instance.os
1666
  result['HYPERVISOR'] = instance.hypervisor
1667
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1668
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1669
  result['DEBUG_LEVEL'] = '%d' % debug
1670
  for idx, disk in enumerate(instance.disks):
1671
    real_disk = _RecursiveFindBD(disk)
1672
    if real_disk is None:
1673
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1674
                                    str(disk))
1675
    real_disk.Open()
1676
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1677
    result['DISK_%d_ACCESS' % idx] = disk.mode
1678
    if constants.HV_DISK_TYPE in instance.hvparams:
1679
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1680
        instance.hvparams[constants.HV_DISK_TYPE]
1681
    if disk.dev_type in constants.LDS_BLOCK:
1682
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1683
    elif disk.dev_type == constants.LD_FILE:
1684
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1685
        'file:%s' % disk.physical_id[0]
1686
  for idx, nic in enumerate(instance.nics):
1687
    result['NIC_%d_MAC' % idx] = nic.mac
1688
    if nic.ip:
1689
      result['NIC_%d_IP' % idx] = nic.ip
1690
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1691
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1692
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1693
    if nic.nicparams[constants.NIC_LINK]:
1694
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1695
    if constants.HV_NIC_TYPE in instance.hvparams:
1696
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1697
        instance.hvparams[constants.HV_NIC_TYPE]
1698

    
1699
  return result
1700

    
1701
def BlockdevGrow(disk, amount):
1702
  """Grow a stack of block devices.
1703

1704
  This function is called recursively, with the childrens being the
1705
  first ones to resize.
1706

1707
  @type disk: L{objects.Disk}
1708
  @param disk: the disk to be grown
1709
  @rtype: (status, result)
1710
  @return: a tuple with the status of the operation
1711
      (True/False), and the errors message if status
1712
      is False
1713

1714
  """
1715
  r_dev = _RecursiveFindBD(disk)
1716
  if r_dev is None:
1717
    return False, "Cannot find block device %s" % (disk,)
1718

    
1719
  try:
1720
    r_dev.Grow(amount)
1721
  except errors.BlockDeviceError, err:
1722
    _Fail("Failed to grow block device: %s", err, exc=True)
1723

    
1724
  return True, None
1725

    
1726

    
1727
def BlockdevSnapshot(disk):
1728
  """Create a snapshot copy of a block device.
1729

1730
  This function is called recursively, and the snapshot is actually created
1731
  just for the leaf lvm backend device.
1732

1733
  @type disk: L{objects.Disk}
1734
  @param disk: the disk to be snapshotted
1735
  @rtype: string
1736
  @return: snapshot disk path
1737

1738
  """
1739
  if disk.children:
1740
    if len(disk.children) == 1:
1741
      # only one child, let's recurse on it
1742
      return BlockdevSnapshot(disk.children[0])
1743
    else:
1744
      # more than one child, choose one that matches
1745
      for child in disk.children:
1746
        if child.size == disk.size:
1747
          # return implies breaking the loop
1748
          return BlockdevSnapshot(child)
1749
  elif disk.dev_type == constants.LD_LV:
1750
    r_dev = _RecursiveFindBD(disk)
1751
    if r_dev is not None:
1752
      # let's stay on the safe side and ask for the full size, for now
1753
      return True, r_dev.Snapshot(disk.size)
1754
    else:
1755
      _Fail("Cannot find block device %s", disk)
1756
  else:
1757
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1758
          disk.unique_id, disk.dev_type)
1759

    
1760

    
1761
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1762
  """Export a block device snapshot to a remote node.
1763

1764
  @type disk: L{objects.Disk}
1765
  @param disk: the description of the disk to export
1766
  @type dest_node: str
1767
  @param dest_node: the destination node to export to
1768
  @type instance: L{objects.Instance}
1769
  @param instance: the instance object to whom the disk belongs
1770
  @type cluster_name: str
1771
  @param cluster_name: the cluster name, needed for SSH hostalias
1772
  @type idx: int
1773
  @param idx: the index of the disk in the instance's disk list,
1774
      used to export to the OS scripts environment
1775
  @rtype: boolean
1776
  @return: the success of the operation
1777

1778
  """
1779
  export_env = OSEnvironment(instance)
1780

    
1781
  inst_os = OSFromDisk(instance.os)
1782
  export_script = inst_os.export_script
1783

    
1784
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1785
                                     instance.name, int(time.time()))
1786
  if not os.path.exists(constants.LOG_OS_DIR):
1787
    os.mkdir(constants.LOG_OS_DIR, 0750)
1788
  real_disk = _RecursiveFindBD(disk)
1789
  if real_disk is None:
1790
    _Fail("Block device '%s' is not set up", disk)
1791

    
1792
  real_disk.Open()
1793

    
1794
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1795
  export_env['EXPORT_INDEX'] = str(idx)
1796

    
1797
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1798
  destfile = disk.physical_id[1]
1799

    
1800
  # the target command is built out of three individual commands,
1801
  # which are joined by pipes; we check each individual command for
1802
  # valid parameters
1803
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1804
                               export_script, logfile)
1805

    
1806
  comprcmd = "gzip"
1807

    
1808
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1809
                                destdir, destdir, destfile)
1810
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1811
                                                   constants.GANETI_RUNAS,
1812
                                                   destcmd)
1813

    
1814
  # all commands have been checked, so we're safe to combine them
1815
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1816

    
1817
  result = utils.RunCmd(command, env=export_env)
1818

    
1819
  if result.failed:
1820
    _Fail("OS snapshot export command '%s' returned error: %s"
1821
          " output: %s", command, result.fail_reason, result.output)
1822

    
1823
  return (True, None)
1824

    
1825

    
1826
def FinalizeExport(instance, snap_disks):
1827
  """Write out the export configuration information.
1828

1829
  @type instance: L{objects.Instance}
1830
  @param instance: the instance which we export, used for
1831
      saving configuration
1832
  @type snap_disks: list of L{objects.Disk}
1833
  @param snap_disks: list of snapshot block devices, which
1834
      will be used to get the actual name of the dump file
1835

1836
  @rtype: boolean
1837
  @return: the success of the operation
1838

1839
  """
1840
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1841
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1842

    
1843
  config = objects.SerializableConfigParser()
1844

    
1845
  config.add_section(constants.INISECT_EXP)
1846
  config.set(constants.INISECT_EXP, 'version', '0')
1847
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1848
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1849
  config.set(constants.INISECT_EXP, 'os', instance.os)
1850
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1851

    
1852
  config.add_section(constants.INISECT_INS)
1853
  config.set(constants.INISECT_INS, 'name', instance.name)
1854
  config.set(constants.INISECT_INS, 'memory', '%d' %
1855
             instance.beparams[constants.BE_MEMORY])
1856
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1857
             instance.beparams[constants.BE_VCPUS])
1858
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1859

    
1860
  nic_total = 0
1861
  for nic_count, nic in enumerate(instance.nics):
1862
    nic_total += 1
1863
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1864
               nic_count, '%s' % nic.mac)
1865
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1866
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1867
               '%s' % nic.bridge)
1868
  # TODO: redundant: on load can read nics until it doesn't exist
1869
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1870

    
1871
  disk_total = 0
1872
  for disk_count, disk in enumerate(snap_disks):
1873
    if disk:
1874
      disk_total += 1
1875
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1876
                 ('%s' % disk.iv_name))
1877
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1878
                 ('%s' % disk.physical_id[1]))
1879
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1880
                 ('%d' % disk.size))
1881

    
1882
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1883

    
1884
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1885
                  data=config.Dumps())
1886
  shutil.rmtree(finaldestdir, True)
1887
  shutil.move(destdir, finaldestdir)
1888

    
1889
  return True, None
1890

    
1891

    
1892
def ExportInfo(dest):
1893
  """Get export configuration information.
1894

1895
  @type dest: str
1896
  @param dest: directory containing the export
1897

1898
  @rtype: L{objects.SerializableConfigParser}
1899
  @return: a serializable config file containing the
1900
      export info
1901

1902
  """
1903
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1904

    
1905
  config = objects.SerializableConfigParser()
1906
  config.read(cff)
1907

    
1908
  if (not config.has_section(constants.INISECT_EXP) or
1909
      not config.has_section(constants.INISECT_INS)):
1910
    _Fail("Export info file doesn't have the required fields")
1911

    
1912
  return True, config.Dumps()
1913

    
1914

    
1915
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1916
  """Import an os image into an instance.
1917

1918
  @type instance: L{objects.Instance}
1919
  @param instance: instance to import the disks into
1920
  @type src_node: string
1921
  @param src_node: source node for the disk images
1922
  @type src_images: list of string
1923
  @param src_images: absolute paths of the disk images
1924
  @rtype: list of boolean
1925
  @return: each boolean represent the success of importing the n-th disk
1926

1927
  """
1928
  import_env = OSEnvironment(instance)
1929
  inst_os = OSFromDisk(instance.os)
1930
  import_script = inst_os.import_script
1931

    
1932
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1933
                                        instance.name, int(time.time()))
1934
  if not os.path.exists(constants.LOG_OS_DIR):
1935
    os.mkdir(constants.LOG_OS_DIR, 0750)
1936

    
1937
  comprcmd = "gunzip"
1938
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1939
                               import_script, logfile)
1940

    
1941
  final_result = []
1942
  for idx, image in enumerate(src_images):
1943
    if image:
1944
      destcmd = utils.BuildShellCmd('cat %s', image)
1945
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1946
                                                       constants.GANETI_RUNAS,
1947
                                                       destcmd)
1948
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1949
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1950
      import_env['IMPORT_INDEX'] = str(idx)
1951
      result = utils.RunCmd(command, env=import_env)
1952
      if result.failed:
1953
        logging.error("Disk import command '%s' returned error: %s"
1954
                      " output: %s", command, result.fail_reason,
1955
                      result.output)
1956
        final_result.append("error importing disk %d: %s, %s" %
1957
                            (idx, result.fail_reason, result.output[-100]))
1958

    
1959
  if final_result:
1960
    return False, "; ".join(final_result)
1961
  return True, None
1962

    
1963

    
1964
def ListExports():
1965
  """Return a list of exports currently available on this machine.
1966

1967
  @rtype: list
1968
  @return: list of the exports
1969

1970
  """
1971
  if os.path.isdir(constants.EXPORT_DIR):
1972
    return True, utils.ListVisibleFiles(constants.EXPORT_DIR)
1973
  else:
1974
    return False, "No exports directory"
1975

    
1976

    
1977
def RemoveExport(export):
1978
  """Remove an existing export from the node.
1979

1980
  @type export: str
1981
  @param export: the name of the export to remove
1982
  @rtype: boolean
1983
  @return: the success of the operation
1984

1985
  """
1986
  target = os.path.join(constants.EXPORT_DIR, export)
1987

    
1988
  try:
1989
    shutil.rmtree(target)
1990
  except EnvironmentError, err:
1991
    _Fail("Error while removing the export: %s", err, exc=True)
1992

    
1993
  return True, None
1994

    
1995

    
1996
def BlockdevRename(devlist):
1997
  """Rename a list of block devices.
1998

1999
  @type devlist: list of tuples
2000
  @param devlist: list of tuples of the form  (disk,
2001
      new_logical_id, new_physical_id); disk is an
2002
      L{objects.Disk} object describing the current disk,
2003
      and new logical_id/physical_id is the name we
2004
      rename it to
2005
  @rtype: boolean
2006
  @return: True if all renames succeeded, False otherwise
2007

2008
  """
2009
  msgs = []
2010
  result = True
2011
  for disk, unique_id in devlist:
2012
    dev = _RecursiveFindBD(disk)
2013
    if dev is None:
2014
      msgs.append("Can't find device %s in rename" % str(disk))
2015
      result = False
2016
      continue
2017
    try:
2018
      old_rpath = dev.dev_path
2019
      dev.Rename(unique_id)
2020
      new_rpath = dev.dev_path
2021
      if old_rpath != new_rpath:
2022
        DevCacheManager.RemoveCache(old_rpath)
2023
        # FIXME: we should add the new cache information here, like:
2024
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2025
        # but we don't have the owner here - maybe parse from existing
2026
        # cache? for now, we only lose lvm data when we rename, which
2027
        # is less critical than DRBD or MD
2028
    except errors.BlockDeviceError, err:
2029
      msgs.append("Can't rename device '%s' to '%s': %s" %
2030
                  (dev, unique_id, err))
2031
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2032
      result = False
2033
  return (result, "; ".join(msgs))
2034

    
2035

    
2036
def _TransformFileStorageDir(file_storage_dir):
2037
  """Checks whether given file_storage_dir is valid.
2038

2039
  Checks wheter the given file_storage_dir is within the cluster-wide
2040
  default file_storage_dir stored in SimpleStore. Only paths under that
2041
  directory are allowed.
2042

2043
  @type file_storage_dir: str
2044
  @param file_storage_dir: the path to check
2045

2046
  @return: the normalized path if valid, None otherwise
2047

2048
  """
2049
  cfg = _GetConfig()
2050
  file_storage_dir = os.path.normpath(file_storage_dir)
2051
  base_file_storage_dir = cfg.GetFileStorageDir()
2052
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2053
      base_file_storage_dir):
2054
    logging.error("file storage directory '%s' is not under base file"
2055
                  " storage directory '%s'",
2056
                  file_storage_dir, base_file_storage_dir)
2057
    return None
2058
  return file_storage_dir
2059

    
2060

    
2061
def CreateFileStorageDir(file_storage_dir):
2062
  """Create file storage directory.
2063

2064
  @type file_storage_dir: str
2065
  @param file_storage_dir: directory to create
2066

2067
  @rtype: tuple
2068
  @return: tuple with first element a boolean indicating wheter dir
2069
      creation was successful or not
2070

2071
  """
2072
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2073
  result = True,
2074
  if not file_storage_dir:
2075
    result = False,
2076
  else:
2077
    if os.path.exists(file_storage_dir):
2078
      if not os.path.isdir(file_storage_dir):
2079
        logging.error("'%s' is not a directory", file_storage_dir)
2080
        result = False,
2081
    else:
2082
      try:
2083
        os.makedirs(file_storage_dir, 0750)
2084
      except OSError, err:
2085
        logging.error("Cannot create file storage directory '%s': %s",
2086
                      file_storage_dir, err)
2087
        result = False,
2088
  return result
2089

    
2090

    
2091
def RemoveFileStorageDir(file_storage_dir):
2092
  """Remove file storage directory.
2093

2094
  Remove it only if it's empty. If not log an error and return.
2095

2096
  @type file_storage_dir: str
2097
  @param file_storage_dir: the directory we should cleanup
2098
  @rtype: tuple (success,)
2099
  @return: tuple of one element, C{success}, denoting
2100
      whether the operation was successfull
2101

2102
  """
2103
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2104
  result = True,
2105
  if not file_storage_dir:
2106
    result = False,
2107
  else:
2108
    if os.path.exists(file_storage_dir):
2109
      if not os.path.isdir(file_storage_dir):
2110
        logging.error("'%s' is not a directory", file_storage_dir)
2111
        result = False,
2112
      # deletes dir only if empty, otherwise we want to return False
2113
      try:
2114
        os.rmdir(file_storage_dir)
2115
      except OSError, err:
2116
        logging.exception("Cannot remove file storage directory '%s'",
2117
                          file_storage_dir)
2118
        result = False,
2119
  return result
2120

    
2121

    
2122
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2123
  """Rename the file storage directory.
2124

2125
  @type old_file_storage_dir: str
2126
  @param old_file_storage_dir: the current path
2127
  @type new_file_storage_dir: str
2128
  @param new_file_storage_dir: the name we should rename to
2129
  @rtype: tuple (success,)
2130
  @return: tuple of one element, C{success}, denoting
2131
      whether the operation was successful
2132

2133
  """
2134
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2135
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2136
  result = True,
2137
  if not old_file_storage_dir or not new_file_storage_dir:
2138
    result = False,
2139
  else:
2140
    if not os.path.exists(new_file_storage_dir):
2141
      if os.path.isdir(old_file_storage_dir):
2142
        try:
2143
          os.rename(old_file_storage_dir, new_file_storage_dir)
2144
        except OSError, err:
2145
          logging.exception("Cannot rename '%s' to '%s'",
2146
                            old_file_storage_dir, new_file_storage_dir)
2147
          result =  False,
2148
      else:
2149
        logging.error("'%s' is not a directory", old_file_storage_dir)
2150
        result = False,
2151
    else:
2152
      if os.path.exists(old_file_storage_dir):
2153
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2154
                      old_file_storage_dir, new_file_storage_dir)
2155
        result = False,
2156
  return result
2157

    
2158

    
2159
def _IsJobQueueFile(file_name):
2160
  """Checks whether the given filename is in the queue directory.
2161

2162
  @type file_name: str
2163
  @param file_name: the file name we should check
2164
  @rtype: boolean
2165
  @return: whether the file is under the queue directory
2166

2167
  """
2168
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2169
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2170

    
2171
  if not result:
2172
    logging.error("'%s' is not a file in the queue directory",
2173
                  file_name)
2174

    
2175
  return result
2176

    
2177

    
2178
def JobQueueUpdate(file_name, content):
2179
  """Updates a file in the queue directory.
2180

2181
  This is just a wrapper over L{utils.WriteFile}, with proper
2182
  checking.
2183

2184
  @type file_name: str
2185
  @param file_name: the job file name
2186
  @type content: str
2187
  @param content: the new job contents
2188
  @rtype: boolean
2189
  @return: the success of the operation
2190

2191
  """
2192
  if not _IsJobQueueFile(file_name):
2193
    return False
2194

    
2195
  # Write and replace the file atomically
2196
  utils.WriteFile(file_name, data=_Decompress(content))
2197

    
2198
  return True
2199

    
2200

    
2201
def JobQueueRename(old, new):
2202
  """Renames a job queue file.
2203

2204
  This is just a wrapper over os.rename with proper checking.
2205

2206
  @type old: str
2207
  @param old: the old (actual) file name
2208
  @type new: str
2209
  @param new: the desired file name
2210
  @rtype: boolean
2211
  @return: the success of the operation
2212

2213
  """
2214
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2215
    return False
2216

    
2217
  utils.RenameFile(old, new, mkdir=True)
2218

    
2219
  return True
2220

    
2221

    
2222
def JobQueueSetDrainFlag(drain_flag):
2223
  """Set the drain flag for the queue.
2224

2225
  This will set or unset the queue drain flag.
2226

2227
  @type drain_flag: boolean
2228
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2229
  @rtype: boolean
2230
  @return: always True
2231
  @warning: the function always returns True
2232

2233
  """
2234
  if drain_flag:
2235
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2236
  else:
2237
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2238

    
2239
  return True
2240

    
2241

    
2242
def BlockdevClose(instance_name, disks):
2243
  """Closes the given block devices.
2244

2245
  This means they will be switched to secondary mode (in case of
2246
  DRBD).
2247

2248
  @param instance_name: if the argument is not empty, the symlinks
2249
      of this instance will be removed
2250
  @type disks: list of L{objects.Disk}
2251
  @param disks: the list of disks to be closed
2252
  @rtype: tuple (success, message)
2253
  @return: a tuple of success and message, where success
2254
      indicates the succes of the operation, and message
2255
      which will contain the error details in case we
2256
      failed
2257

2258
  """
2259
  bdevs = []
2260
  for cf in disks:
2261
    rd = _RecursiveFindBD(cf)
2262
    if rd is None:
2263
      _Fail("Can't find device %s", cf)
2264
    bdevs.append(rd)
2265

    
2266
  msg = []
2267
  for rd in bdevs:
2268
    try:
2269
      rd.Close()
2270
    except errors.BlockDeviceError, err:
2271
      msg.append(str(err))
2272
  if msg:
2273
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2274
  else:
2275
    if instance_name:
2276
      _RemoveBlockDevLinks(instance_name, disks)
2277
    return (True, "All devices secondary")
2278

    
2279

    
2280
def ValidateHVParams(hvname, hvparams):
2281
  """Validates the given hypervisor parameters.
2282

2283
  @type hvname: string
2284
  @param hvname: the hypervisor name
2285
  @type hvparams: dict
2286
  @param hvparams: the hypervisor parameters to be validated
2287
  @rtype: tuple (success, message)
2288
  @return: a tuple of success and message, where success
2289
      indicates the succes of the operation, and message
2290
      which will contain the error details in case we
2291
      failed
2292

2293
  """
2294
  try:
2295
    hv_type = hypervisor.GetHypervisor(hvname)
2296
    hv_type.ValidateParameters(hvparams)
2297
    return (True, "Validation passed")
2298
  except errors.HypervisorError, err:
2299
    return (False, str(err))
2300

    
2301

    
2302
def DemoteFromMC():
2303
  """Demotes the current node from master candidate role.
2304

2305
  """
2306
  # try to ensure we're not the master by mistake
2307
  master, myself = ssconf.GetMasterAndMyself()
2308
  if master == myself:
2309
    return (False, "ssconf status shows I'm the master node, will not demote")
2310
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2311
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2312
    return (False, "The master daemon is running, will not demote")
2313
  try:
2314
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2315
  except EnvironmentError, err:
2316
    if err.errno != errno.ENOENT:
2317
      return (False, "Error while backing up cluster file: %s" % str(err))
2318
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2319
  return (True, "Done")
2320

    
2321

    
2322
def _FindDisks(nodes_ip, disks):
2323
  """Sets the physical ID on disks and returns the block devices.
2324

2325
  """
2326
  # set the correct physical ID
2327
  my_name = utils.HostInfo().name
2328
  for cf in disks:
2329
    cf.SetPhysicalID(my_name, nodes_ip)
2330

    
2331
  bdevs = []
2332

    
2333
  for cf in disks:
2334
    rd = _RecursiveFindBD(cf)
2335
    if rd is None:
2336
      return (False, "Can't find device %s" % cf)
2337
    bdevs.append(rd)
2338
  return (True, bdevs)
2339

    
2340

    
2341
def DrbdDisconnectNet(nodes_ip, disks):
2342
  """Disconnects the network on a list of drbd devices.
2343

2344
  """
2345
  status, bdevs = _FindDisks(nodes_ip, disks)
2346
  if not status:
2347
    return status, bdevs
2348

    
2349
  # disconnect disks
2350
  for rd in bdevs:
2351
    try:
2352
      rd.DisconnectNet()
2353
    except errors.BlockDeviceError, err:
2354
      _Fail("Can't change network configuration to standalone mode: %s",
2355
            err, exc=True)
2356
  return (True, "All disks are now disconnected")
2357

    
2358

    
2359
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2360
  """Attaches the network on a list of drbd devices.
2361

2362
  """
2363
  status, bdevs = _FindDisks(nodes_ip, disks)
2364
  if not status:
2365
    return status, bdevs
2366

    
2367
  if multimaster:
2368
    for idx, rd in enumerate(bdevs):
2369
      try:
2370
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2371
      except EnvironmentError, err:
2372
        _Fail("Can't create symlink: %s", err)
2373
  # reconnect disks, switch to new master configuration and if
2374
  # needed primary mode
2375
  for rd in bdevs:
2376
    try:
2377
      rd.AttachNet(multimaster)
2378
    except errors.BlockDeviceError, err:
2379
      _Fail("Can't change network configuration: %s", err)
2380
  # wait until the disks are connected; we need to retry the re-attach
2381
  # if the device becomes standalone, as this might happen if the one
2382
  # node disconnects and reconnects in a different mode before the
2383
  # other node reconnects; in this case, one or both of the nodes will
2384
  # decide it has wrong configuration and switch to standalone
2385
  RECONNECT_TIMEOUT = 2 * 60
2386
  sleep_time = 0.100 # start with 100 miliseconds
2387
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2388
  while time.time() < timeout_limit:
2389
    all_connected = True
2390
    for rd in bdevs:
2391
      stats = rd.GetProcStatus()
2392
      if not (stats.is_connected or stats.is_in_resync):
2393
        all_connected = False
2394
      if stats.is_standalone:
2395
        # peer had different config info and this node became
2396
        # standalone, even though this should not happen with the
2397
        # new staged way of changing disk configs
2398
        try:
2399
          rd.ReAttachNet(multimaster)
2400
        except errors.BlockDeviceError, err:
2401
          _Fail("Can't change network configuration: %s", err)
2402
    if all_connected:
2403
      break
2404
    time.sleep(sleep_time)
2405
    sleep_time = min(5, sleep_time * 1.5)
2406
  if not all_connected:
2407
    return (False, "Timeout in disk reconnecting")
2408
  if multimaster:
2409
    # change to primary mode
2410
    for rd in bdevs:
2411
      try:
2412
        rd.Open()
2413
      except errors.BlockDeviceError, err:
2414
        _Fail("Can't change to primary mode: %s", err)
2415
  if multimaster:
2416
    msg = "multi-master and primary"
2417
  else:
2418
    msg = "single-master"
2419
  return (True, "Disks are now configured as %s" % msg)
2420

    
2421

    
2422
def DrbdWaitSync(nodes_ip, disks):
2423
  """Wait until DRBDs have synchronized.
2424

2425
  """
2426
  status, bdevs = _FindDisks(nodes_ip, disks)
2427
  if not status:
2428
    return status, bdevs
2429

    
2430
  min_resync = 100
2431
  alldone = True
2432
  failure = False
2433
  for rd in bdevs:
2434
    stats = rd.GetProcStatus()
2435
    if not (stats.is_connected or stats.is_in_resync):
2436
      failure = True
2437
      break
2438
    alldone = alldone and (not stats.is_in_resync)
2439
    if stats.sync_percent is not None:
2440
      min_resync = min(min_resync, stats.sync_percent)
2441
  return (not failure, (alldone, min_resync))
2442

    
2443

    
2444
def PowercycleNode(hypervisor_type):
2445
  """Hard-powercycle the node.
2446

2447
  Because we need to return first, and schedule the powercycle in the
2448
  background, we won't be able to report failures nicely.
2449

2450
  """
2451
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2452
  try:
2453
    pid = os.fork()
2454
  except OSError, err:
2455
    # if we can't fork, we'll pretend that we're in the child process
2456
    pid = 0
2457
  if pid > 0:
2458
    return (True, "Reboot scheduled in 5 seconds")
2459
  time.sleep(5)
2460
  hyper.PowercycleNode()
2461

    
2462

    
2463
class HooksRunner(object):
2464
  """Hook runner.
2465

2466
  This class is instantiated on the node side (ganeti-noded) and not
2467
  on the master side.
2468

2469
  """
2470
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2471

    
2472
  def __init__(self, hooks_base_dir=None):
2473
    """Constructor for hooks runner.
2474

2475
    @type hooks_base_dir: str or None
2476
    @param hooks_base_dir: if not None, this overrides the
2477
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2478

2479
    """
2480
    if hooks_base_dir is None:
2481
      hooks_base_dir = constants.HOOKS_BASE_DIR
2482
    self._BASE_DIR = hooks_base_dir
2483

    
2484
  @staticmethod
2485
  def ExecHook(script, env):
2486
    """Exec one hook script.
2487

2488
    @type script: str
2489
    @param script: the full path to the script
2490
    @type env: dict
2491
    @param env: the environment with which to exec the script
2492
    @rtype: tuple (success, message)
2493
    @return: a tuple of success and message, where success
2494
        indicates the succes of the operation, and message
2495
        which will contain the error details in case we
2496
        failed
2497

2498
    """
2499
    # exec the process using subprocess and log the output
2500
    fdstdin = None
2501
    try:
2502
      fdstdin = open("/dev/null", "r")
2503
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2504
                               stderr=subprocess.STDOUT, close_fds=True,
2505
                               shell=False, cwd="/", env=env)
2506
      output = ""
2507
      try:
2508
        output = child.stdout.read(4096)
2509
        child.stdout.close()
2510
      except EnvironmentError, err:
2511
        output += "Hook script error: %s" % str(err)
2512

    
2513
      while True:
2514
        try:
2515
          result = child.wait()
2516
          break
2517
        except EnvironmentError, err:
2518
          if err.errno == errno.EINTR:
2519
            continue
2520
          raise
2521
    finally:
2522
      # try not to leak fds
2523
      for fd in (fdstdin, ):
2524
        if fd is not None:
2525
          try:
2526
            fd.close()
2527
          except EnvironmentError, err:
2528
            # just log the error
2529
            #logging.exception("Error while closing fd %s", fd)
2530
            pass
2531

    
2532
    return result == 0, utils.SafeEncode(output.strip())
2533

    
2534
  def RunHooks(self, hpath, phase, env):
2535
    """Run the scripts in the hooks directory.
2536

2537
    @type hpath: str
2538
    @param hpath: the path to the hooks directory which
2539
        holds the scripts
2540
    @type phase: str
2541
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2542
        L{constants.HOOKS_PHASE_POST}
2543
    @type env: dict
2544
    @param env: dictionary with the environment for the hook
2545
    @rtype: list
2546
    @return: list of 3-element tuples:
2547
      - script path
2548
      - script result, either L{constants.HKR_SUCCESS} or
2549
        L{constants.HKR_FAIL}
2550
      - output of the script
2551

2552
    @raise errors.ProgrammerError: for invalid input
2553
        parameters
2554

2555
    """
2556
    if phase == constants.HOOKS_PHASE_PRE:
2557
      suffix = "pre"
2558
    elif phase == constants.HOOKS_PHASE_POST:
2559
      suffix = "post"
2560
    else:
2561
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2562
    rr = []
2563

    
2564
    subdir = "%s-%s.d" % (hpath, suffix)
2565
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2566
    try:
2567
      dir_contents = utils.ListVisibleFiles(dir_name)
2568
    except OSError, err:
2569
      # FIXME: must log output in case of failures
2570
      return rr
2571

    
2572
    # we use the standard python sort order,
2573
    # so 00name is the recommended naming scheme
2574
    dir_contents.sort()
2575
    for relname in dir_contents:
2576
      fname = os.path.join(dir_name, relname)
2577
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2578
          self.RE_MASK.match(relname) is not None):
2579
        rrval = constants.HKR_SKIP
2580
        output = ""
2581
      else:
2582
        result, output = self.ExecHook(fname, env)
2583
        if not result:
2584
          rrval = constants.HKR_FAIL
2585
        else:
2586
          rrval = constants.HKR_SUCCESS
2587
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2588

    
2589
    return rr
2590

    
2591

    
2592
class IAllocatorRunner(object):
2593
  """IAllocator runner.
2594

2595
  This class is instantiated on the node side (ganeti-noded) and not on
2596
  the master side.
2597

2598
  """
2599
  def Run(self, name, idata):
2600
    """Run an iallocator script.
2601

2602
    @type name: str
2603
    @param name: the iallocator script name
2604
    @type idata: str
2605
    @param idata: the allocator input data
2606

2607
    @rtype: tuple
2608
    @return: four element tuple of:
2609
       - run status (one of the IARUN_ constants)
2610
       - stdout
2611
       - stderr
2612
       - fail reason (as from L{utils.RunResult})
2613

2614
    """
2615
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2616
                                  os.path.isfile)
2617
    if alloc_script is None:
2618
      return (constants.IARUN_NOTFOUND, None, None, None)
2619

    
2620
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2621
    try:
2622
      os.write(fd, idata)
2623
      os.close(fd)
2624
      result = utils.RunCmd([alloc_script, fin_name])
2625
      if result.failed:
2626
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2627
                result.fail_reason)
2628
    finally:
2629
      os.unlink(fin_name)
2630

    
2631
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2632

    
2633

    
2634
class DevCacheManager(object):
2635
  """Simple class for managing a cache of block device information.
2636

2637
  """
2638
  _DEV_PREFIX = "/dev/"
2639
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2640

    
2641
  @classmethod
2642
  def _ConvertPath(cls, dev_path):
2643
    """Converts a /dev/name path to the cache file name.
2644

2645
    This replaces slashes with underscores and strips the /dev
2646
    prefix. It then returns the full path to the cache file.
2647

2648
    @type dev_path: str
2649
    @param dev_path: the C{/dev/} path name
2650
    @rtype: str
2651
    @return: the converted path name
2652

2653
    """
2654
    if dev_path.startswith(cls._DEV_PREFIX):
2655
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2656
    dev_path = dev_path.replace("/", "_")
2657
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2658
    return fpath
2659

    
2660
  @classmethod
2661
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2662
    """Updates the cache information for a given device.
2663

2664
    @type dev_path: str
2665
    @param dev_path: the pathname of the device
2666
    @type owner: str
2667
    @param owner: the owner (instance name) of the device
2668
    @type on_primary: bool
2669
    @param on_primary: whether this is the primary
2670
        node nor not
2671
    @type iv_name: str
2672
    @param iv_name: the instance-visible name of the
2673
        device, as in objects.Disk.iv_name
2674

2675
    @rtype: None
2676

2677
    """
2678
    if dev_path is None:
2679
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2680
      return
2681
    fpath = cls._ConvertPath(dev_path)
2682
    if on_primary:
2683
      state = "primary"
2684
    else:
2685
      state = "secondary"
2686
    if iv_name is None:
2687
      iv_name = "not_visible"
2688
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2689
    try:
2690
      utils.WriteFile(fpath, data=fdata)
2691
    except EnvironmentError, err:
2692
      logging.exception("Can't update bdev cache for %s", dev_path)
2693

    
2694
  @classmethod
2695
  def RemoveCache(cls, dev_path):
2696
    """Remove data for a dev_path.
2697

2698
    This is just a wrapper over L{utils.RemoveFile} with a converted
2699
    path name and logging.
2700

2701
    @type dev_path: str
2702
    @param dev_path: the pathname of the device
2703

2704
    @rtype: None
2705

2706
    """
2707
    if dev_path is None:
2708
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2709
      return
2710
    fpath = cls._ConvertPath(dev_path)
2711
    try:
2712
      utils.RemoveFile(fpath)
2713
    except EnvironmentError, err:
2714
      logging.exception("Can't update bdev cache for %s", dev_path)