Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 4fe80ef2

History | View | Annotate | Download (82 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
def _GetConfig():
55
  """Simple wrapper to return a SimpleStore.
56

57
  @rtype: L{ssconf.SimpleStore}
58
  @return: a SimpleStore instance
59

60
  """
61
  return ssconf.SimpleStore()
62

    
63

    
64
def _GetSshRunner(cluster_name):
65
  """Simple wrapper to return an SshRunner.
66

67
  @type cluster_name: str
68
  @param cluster_name: the cluster name, which is needed
69
      by the SshRunner constructor
70
  @rtype: L{ssh.SshRunner}
71
  @return: an SshRunner instance
72

73
  """
74
  return ssh.SshRunner(cluster_name)
75

    
76

    
77
def _Decompress(data):
78
  """Unpacks data compressed by the RPC client.
79

80
  @type data: list or tuple
81
  @param data: Data sent by RPC client
82
  @rtype: str
83
  @return: Decompressed data
84

85
  """
86
  assert isinstance(data, (list, tuple))
87
  assert len(data) == 2
88
  (encoding, content) = data
89
  if encoding == constants.RPC_ENCODING_NONE:
90
    return content
91
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
92
    return zlib.decompress(base64.b64decode(content))
93
  else:
94
    raise AssertionError("Unknown data encoding")
95

    
96

    
97
def _CleanDirectory(path, exclude=None):
98
  """Removes all regular files in a directory.
99

100
  @type path: str
101
  @param path: the directory to clean
102
  @type exclude: list
103
  @param exclude: list of files to be excluded, defaults
104
      to the empty list
105

106
  """
107
  if not os.path.isdir(path):
108
    return
109
  if exclude is None:
110
    exclude = []
111
  else:
112
    # Normalize excluded paths
113
    exclude = [os.path.normpath(i) for i in exclude]
114

    
115
  for rel_name in utils.ListVisibleFiles(path):
116
    full_name = os.path.normpath(os.path.join(path, rel_name))
117
    if full_name in exclude:
118
      continue
119
    if os.path.isfile(full_name) and not os.path.islink(full_name):
120
      utils.RemoveFile(full_name)
121

    
122

    
123
def _BuildUploadFileList():
124
  """Build the list of allowed upload files.
125

126
  This is abstracted so that it's built only once at module import time.
127

128
  """
129
  return frozenset([
130
      constants.CLUSTER_CONF_FILE,
131
      constants.ETC_HOSTS,
132
      constants.SSH_KNOWN_HOSTS_FILE,
133
      constants.VNC_PASSWORD_FILE,
134
      ])
135

    
136

    
137
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
138

    
139

    
140
def JobQueuePurge():
141
  """Removes job queue files and archived jobs.
142

143
  @rtype: None
144

145
  """
146
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
147
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
148

    
149

    
150
def GetMasterInfo():
151
  """Returns master information.
152

153
  This is an utility function to compute master information, either
154
  for consumption here or from the node daemon.
155

156
  @rtype: tuple
157
  @return: (master_netdev, master_ip, master_name) if we have a good
158
      configuration, otherwise (None, None, None)
159

160
  """
161
  try:
162
    cfg = _GetConfig()
163
    master_netdev = cfg.GetMasterNetdev()
164
    master_ip = cfg.GetMasterIP()
165
    master_node = cfg.GetMasterNode()
166
  except errors.ConfigurationError:
167
    logging.exception("Cluster configuration incomplete")
168
    return (None, None, None)
169
  return (master_netdev, master_ip, master_node)
170

    
171

    
172
def StartMaster(start_daemons, no_voting):
173
  """Activate local node as master node.
174

175
  The function will always try activate the IP address of the master
176
  (unless someone else has it). It will also start the master daemons,
177
  based on the start_daemons parameter.
178

179
  @type start_daemons: boolean
180
  @param start_daemons: whther to also start the master
181
      daemons (ganeti-masterd and ganeti-rapi)
182
  @type no_voting: boolean
183
  @param no_voting: whether to start ganeti-masterd without a node vote
184
      (if start_daemons is True), but still non-interactively
185
  @rtype: None
186

187
  """
188
  ok = True
189
  master_netdev, master_ip, _ = GetMasterInfo()
190
  if not master_netdev:
191
    return False
192

    
193
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
194
    if utils.OwnIpAddress(master_ip):
195
      # we already have the ip:
196
      logging.debug("Already started")
197
    else:
198
      logging.error("Someone else has the master ip, not activating")
199
      ok = False
200
  else:
201
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
202
                           "dev", master_netdev, "label",
203
                           "%s:0" % master_netdev])
204
    if result.failed:
205
      logging.error("Can't activate master IP: %s", result.output)
206
      ok = False
207

    
208
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
209
                           "-s", master_ip, master_ip])
210
    # we'll ignore the exit code of arping
211

    
212
  # and now start the master and rapi daemons
213
  if start_daemons:
214
    daemons_params = {
215
        'ganeti-masterd': [],
216
        'ganeti-rapi': [],
217
        }
218
    if no_voting:
219
      daemons_params['ganeti-masterd'].append('--no-voting')
220
      daemons_params['ganeti-masterd'].append('--yes-do-it')
221
    for daemon in daemons_params:
222
      cmd = [daemon]
223
      cmd.extend(daemons_params[daemon])
224
      result = utils.RunCmd(cmd)
225
      if result.failed:
226
        logging.error("Can't start daemon %s: %s", daemon, result.output)
227
        ok = False
228
  return ok
229

    
230

    
231
def StopMaster(stop_daemons):
232
  """Deactivate this node as master.
233

234
  The function will always try to deactivate the IP address of the
235
  master. It will also stop the master daemons depending on the
236
  stop_daemons parameter.
237

238
  @type stop_daemons: boolean
239
  @param stop_daemons: whether to also stop the master daemons
240
      (ganeti-masterd and ganeti-rapi)
241
  @rtype: None
242

243
  """
244
  master_netdev, master_ip, _ = GetMasterInfo()
245
  if not master_netdev:
246
    return False
247

    
248
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
249
                         "dev", master_netdev])
250
  if result.failed:
251
    logging.error("Can't remove the master IP, error: %s", result.output)
252
    # but otherwise ignore the failure
253

    
254
  if stop_daemons:
255
    # stop/kill the rapi and the master daemon
256
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
257
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
258

    
259
  return True
260

    
261

    
262
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
263
  """Joins this node to the cluster.
264

265
  This does the following:
266
      - updates the hostkeys of the machine (rsa and dsa)
267
      - adds the ssh private key to the user
268
      - adds the ssh public key to the users' authorized_keys file
269

270
  @type dsa: str
271
  @param dsa: the DSA private key to write
272
  @type dsapub: str
273
  @param dsapub: the DSA public key to write
274
  @type rsa: str
275
  @param rsa: the RSA private key to write
276
  @type rsapub: str
277
  @param rsapub: the RSA public key to write
278
  @type sshkey: str
279
  @param sshkey: the SSH private key to write
280
  @type sshpub: str
281
  @param sshpub: the SSH public key to write
282
  @rtype: boolean
283
  @return: the success of the operation
284

285
  """
286
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
287
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
288
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
289
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
290
  for name, content, mode in sshd_keys:
291
    utils.WriteFile(name, data=content, mode=mode)
292

    
293
  try:
294
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
295
                                                    mkdir=True)
296
  except errors.OpExecError, err:
297
    msg = "Error while processing user ssh files"
298
    logging.exception(msg)
299
    return (False, "%s: %s" % (msg, err))
300

    
301
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
302
    utils.WriteFile(name, data=content, mode=0600)
303

    
304
  utils.AddAuthorizedKey(auth_keys, sshpub)
305

    
306
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
307

    
308
  return (True, "Node added successfully")
309

    
310

    
311
def LeaveCluster():
312
  """Cleans up and remove the current node.
313

314
  This function cleans up and prepares the current node to be removed
315
  from the cluster.
316

317
  If processing is successful, then it raises an
318
  L{errors.QuitGanetiException} which is used as a special case to
319
  shutdown the node daemon.
320

321
  """
322
  _CleanDirectory(constants.DATA_DIR)
323
  JobQueuePurge()
324

    
325
  try:
326
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
327
  except errors.OpExecError:
328
    logging.exception("Error while processing ssh files")
329
    return
330

    
331
  f = open(pub_key, 'r')
332
  try:
333
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
334
  finally:
335
    f.close()
336

    
337
  utils.RemoveFile(priv_key)
338
  utils.RemoveFile(pub_key)
339

    
340
  # Return a reassuring string to the caller, and quit
341
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
342

    
343

    
344
def GetNodeInfo(vgname, hypervisor_type):
345
  """Gives back a hash with different information about the node.
346

347
  @type vgname: C{string}
348
  @param vgname: the name of the volume group to ask for disk space information
349
  @type hypervisor_type: C{str}
350
  @param hypervisor_type: the name of the hypervisor to ask for
351
      memory information
352
  @rtype: C{dict}
353
  @return: dictionary with the following keys:
354
      - vg_size is the size of the configured volume group in MiB
355
      - vg_free is the free size of the volume group in MiB
356
      - memory_dom0 is the memory allocated for domain0 in MiB
357
      - memory_free is the currently available (free) ram in MiB
358
      - memory_total is the total number of ram in MiB
359

360
  """
361
  outputarray = {}
362
  vginfo = _GetVGInfo(vgname)
363
  outputarray['vg_size'] = vginfo['vg_size']
364
  outputarray['vg_free'] = vginfo['vg_free']
365

    
366
  hyper = hypervisor.GetHypervisor(hypervisor_type)
367
  hyp_info = hyper.GetNodeInfo()
368
  if hyp_info is not None:
369
    outputarray.update(hyp_info)
370

    
371
  f = open("/proc/sys/kernel/random/boot_id", 'r')
372
  try:
373
    outputarray["bootid"] = f.read(128).rstrip("\n")
374
  finally:
375
    f.close()
376

    
377
  return outputarray
378

    
379

    
380
def VerifyNode(what, cluster_name):
381
  """Verify the status of the local node.
382

383
  Based on the input L{what} parameter, various checks are done on the
384
  local node.
385

386
  If the I{filelist} key is present, this list of
387
  files is checksummed and the file/checksum pairs are returned.
388

389
  If the I{nodelist} key is present, we check that we have
390
  connectivity via ssh with the target nodes (and check the hostname
391
  report).
392

393
  If the I{node-net-test} key is present, we check that we have
394
  connectivity to the given nodes via both primary IP and, if
395
  applicable, secondary IPs.
396

397
  @type what: C{dict}
398
  @param what: a dictionary of things to check:
399
      - filelist: list of files for which to compute checksums
400
      - nodelist: list of nodes we should check ssh communication with
401
      - node-net-test: list of nodes we should check node daemon port
402
        connectivity with
403
      - hypervisor: list with hypervisors to run the verify for
404
  @rtype: dict
405
  @return: a dictionary with the same keys as the input dict, and
406
      values representing the result of the checks
407

408
  """
409
  result = {}
410

    
411
  if constants.NV_HYPERVISOR in what:
412
    result[constants.NV_HYPERVISOR] = tmp = {}
413
    for hv_name in what[constants.NV_HYPERVISOR]:
414
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
415

    
416
  if constants.NV_FILELIST in what:
417
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
418
      what[constants.NV_FILELIST])
419

    
420
  if constants.NV_NODELIST in what:
421
    result[constants.NV_NODELIST] = tmp = {}
422
    random.shuffle(what[constants.NV_NODELIST])
423
    for node in what[constants.NV_NODELIST]:
424
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
425
      if not success:
426
        tmp[node] = message
427

    
428
  if constants.NV_NODENETTEST in what:
429
    result[constants.NV_NODENETTEST] = tmp = {}
430
    my_name = utils.HostInfo().name
431
    my_pip = my_sip = None
432
    for name, pip, sip in what[constants.NV_NODENETTEST]:
433
      if name == my_name:
434
        my_pip = pip
435
        my_sip = sip
436
        break
437
    if not my_pip:
438
      tmp[my_name] = ("Can't find my own primary/secondary IP"
439
                      " in the node list")
440
    else:
441
      port = utils.GetNodeDaemonPort()
442
      for name, pip, sip in what[constants.NV_NODENETTEST]:
443
        fail = []
444
        if not utils.TcpPing(pip, port, source=my_pip):
445
          fail.append("primary")
446
        if sip != pip:
447
          if not utils.TcpPing(sip, port, source=my_sip):
448
            fail.append("secondary")
449
        if fail:
450
          tmp[name] = ("failure using the %s interface(s)" %
451
                       " and ".join(fail))
452

    
453
  if constants.NV_LVLIST in what:
454
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
455

    
456
  if constants.NV_INSTANCELIST in what:
457
    result[constants.NV_INSTANCELIST] = GetInstanceList(
458
      what[constants.NV_INSTANCELIST])
459

    
460
  if constants.NV_VGLIST in what:
461
    result[constants.NV_VGLIST] = ListVolumeGroups()
462

    
463
  if constants.NV_VERSION in what:
464
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
465
                                    constants.RELEASE_VERSION)
466

    
467
  if constants.NV_HVINFO in what:
468
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
469
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
470

    
471
  if constants.NV_DRBDLIST in what:
472
    try:
473
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
474
    except errors.BlockDeviceError, err:
475
      logging.warning("Can't get used minors list", exc_info=True)
476
      used_minors = str(err)
477
    result[constants.NV_DRBDLIST] = used_minors
478

    
479
  return result
480

    
481

    
482
def GetVolumeList(vg_name):
483
  """Compute list of logical volumes and their size.
484

485
  @type vg_name: str
486
  @param vg_name: the volume group whose LVs we should list
487
  @rtype: dict
488
  @return:
489
      dictionary of all partions (key) with value being a tuple of
490
      their size (in MiB), inactive and online status::
491

492
        {'test1': ('20.06', True, True)}
493

494
      in case of errors, a string is returned with the error
495
      details.
496

497
  """
498
  lvs = {}
499
  sep = '|'
500
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
501
                         "--separator=%s" % sep,
502
                         "-olv_name,lv_size,lv_attr", vg_name])
503
  if result.failed:
504
    logging.error("Failed to list logical volumes, lvs output: %s",
505
                  result.output)
506
    return result.output
507

    
508
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
509
  for line in result.stdout.splitlines():
510
    line = line.strip()
511
    match = valid_line_re.match(line)
512
    if not match:
513
      logging.error("Invalid line returned from lvs output: '%s'", line)
514
      continue
515
    name, size, attr = match.groups()
516
    inactive = attr[4] == '-'
517
    online = attr[5] == 'o'
518
    lvs[name] = (size, inactive, online)
519

    
520
  return lvs
521

    
522

    
523
def ListVolumeGroups():
524
  """List the volume groups and their size.
525

526
  @rtype: dict
527
  @return: dictionary with keys volume name and values the
528
      size of the volume
529

530
  """
531
  return utils.ListVolumeGroups()
532

    
533

    
534
def NodeVolumes():
535
  """List all volumes on this node.
536

537
  @rtype: list
538
  @return:
539
    A list of dictionaries, each having four keys:
540
      - name: the logical volume name,
541
      - size: the size of the logical volume
542
      - dev: the physical device on which the LV lives
543
      - vg: the volume group to which it belongs
544

545
    In case of errors, we return an empty list and log the
546
    error.
547

548
    Note that since a logical volume can live on multiple physical
549
    volumes, the resulting list might include a logical volume
550
    multiple times.
551

552
  """
553
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
554
                         "--separator=|",
555
                         "--options=lv_name,lv_size,devices,vg_name"])
556
  if result.failed:
557
    logging.error("Failed to list logical volumes, lvs output: %s",
558
                  result.output)
559
    return []
560

    
561
  def parse_dev(dev):
562
    if '(' in dev:
563
      return dev.split('(')[0]
564
    else:
565
      return dev
566

    
567
  def map_line(line):
568
    return {
569
      'name': line[0].strip(),
570
      'size': line[1].strip(),
571
      'dev': parse_dev(line[2].strip()),
572
      'vg': line[3].strip(),
573
    }
574

    
575
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
576
          if line.count('|') >= 3]
577

    
578

    
579
def BridgesExist(bridges_list):
580
  """Check if a list of bridges exist on the current node.
581

582
  @rtype: boolean
583
  @return: C{True} if all of them exist, C{False} otherwise
584

585
  """
586
  for bridge in bridges_list:
587
    if not utils.BridgeExists(bridge):
588
      return False
589

    
590
  return True
591

    
592

    
593
def GetInstanceList(hypervisor_list):
594
  """Provides a list of instances.
595

596
  @type hypervisor_list: list
597
  @param hypervisor_list: the list of hypervisors to query information
598

599
  @rtype: list
600
  @return: a list of all running instances on the current node
601
    - instance1.example.com
602
    - instance2.example.com
603

604
  """
605
  results = []
606
  for hname in hypervisor_list:
607
    try:
608
      names = hypervisor.GetHypervisor(hname).ListInstances()
609
      results.extend(names)
610
    except errors.HypervisorError:
611
      logging.exception("Error enumerating instances for hypevisor %s", hname)
612
      raise
613

    
614
  return results
615

    
616

    
617
def GetInstanceInfo(instance, hname):
618
  """Gives back the information about an instance as a dictionary.
619

620
  @type instance: string
621
  @param instance: the instance name
622
  @type hname: string
623
  @param hname: the hypervisor type of the instance
624

625
  @rtype: dict
626
  @return: dictionary with the following keys:
627
      - memory: memory size of instance (int)
628
      - state: xen state of instance (string)
629
      - time: cpu time of instance (float)
630

631
  """
632
  output = {}
633

    
634
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
635
  if iinfo is not None:
636
    output['memory'] = iinfo[2]
637
    output['state'] = iinfo[4]
638
    output['time'] = iinfo[5]
639

    
640
  return output
641

    
642

    
643
def GetInstanceMigratable(instance):
644
  """Gives whether an instance can be migrated.
645

646
  @type instance: L{objects.Instance}
647
  @param instance: object representing the instance to be checked.
648

649
  @rtype: tuple
650
  @return: tuple of (result, description) where:
651
      - result: whether the instance can be migrated or not
652
      - description: a description of the issue, if relevant
653

654
  """
655
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
656
  if instance.name not in hyper.ListInstances():
657
    return (False, 'not running')
658

    
659
  for idx in range(len(instance.disks)):
660
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
661
    if not os.path.islink(link_name):
662
      return (False, 'not restarted since ganeti 1.2.5')
663

    
664
  return (True, '')
665

    
666

    
667
def GetAllInstancesInfo(hypervisor_list):
668
  """Gather data about all instances.
669

670
  This is the equivalent of L{GetInstanceInfo}, except that it
671
  computes data for all instances at once, thus being faster if one
672
  needs data about more than one instance.
673

674
  @type hypervisor_list: list
675
  @param hypervisor_list: list of hypervisors to query for instance data
676

677
  @rtype: dict
678
  @return: dictionary of instance: data, with data having the following keys:
679
      - memory: memory size of instance (int)
680
      - state: xen state of instance (string)
681
      - time: cpu time of instance (float)
682
      - vcpus: the number of vcpus
683

684
  """
685
  output = {}
686

    
687
  for hname in hypervisor_list:
688
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
689
    if iinfo:
690
      for name, inst_id, memory, vcpus, state, times in iinfo:
691
        value = {
692
          'memory': memory,
693
          'vcpus': vcpus,
694
          'state': state,
695
          'time': times,
696
          }
697
        if name in output:
698
          # we only check static parameters, like memory and vcpus,
699
          # and not state and time which can change between the
700
          # invocations of the different hypervisors
701
          for key in 'memory', 'vcpus':
702
            if value[key] != output[name][key]:
703
              raise errors.HypervisorError("Instance %s is running twice"
704
                                           " with different parameters" % name)
705
        output[name] = value
706

    
707
  return output
708

    
709

    
710
def InstanceOsAdd(instance):
711
  """Add an OS to an instance.
712

713
  @type instance: L{objects.Instance}
714
  @param instance: Instance whose OS is to be installed
715
  @rtype: boolean
716
  @return: the success of the operation
717

718
  """
719
  try:
720
    inst_os = OSFromDisk(instance.os)
721
  except errors.InvalidOS, err:
722
    os_name, os_dir, os_err = err.args
723
    if os_dir is None:
724
      return (False, "Can't find OS '%s': %s" % (os_name, os_err))
725
    else:
726
      return (False, "Error parsing OS '%s' in directory %s: %s" %
727
              (os_name, os_dir, os_err))
728

    
729
  create_env = OSEnvironment(instance)
730

    
731
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
732
                                     instance.name, int(time.time()))
733

    
734
  result = utils.RunCmd([inst_os.create_script], env=create_env,
735
                        cwd=inst_os.path, output=logfile,)
736
  if result.failed:
737
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
738
                  " output: %s", result.cmd, result.fail_reason, logfile,
739
                  result.output)
740
    lines = [utils.SafeEncode(val)
741
             for val in utils.TailFile(logfile, lines=20)]
742
    return (False, "OS create script failed (%s), last lines in the"
743
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
744

    
745
  return (True, "Successfully installed")
746

    
747

    
748
def RunRenameInstance(instance, old_name):
749
  """Run the OS rename script for an instance.
750

751
  @type instance: L{objects.Instance}
752
  @param instance: Instance whose OS is to be installed
753
  @type old_name: string
754
  @param old_name: previous instance name
755
  @rtype: boolean
756
  @return: the success of the operation
757

758
  """
759
  inst_os = OSFromDisk(instance.os)
760

    
761
  rename_env = OSEnvironment(instance)
762
  rename_env['OLD_INSTANCE_NAME'] = old_name
763

    
764
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
765
                                           old_name,
766
                                           instance.name, int(time.time()))
767

    
768
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
769
                        cwd=inst_os.path, output=logfile)
770

    
771
  if result.failed:
772
    logging.error("os create command '%s' returned error: %s output: %s",
773
                  result.cmd, result.fail_reason, result.output)
774
    lines = [utils.SafeEncode(val)
775
             for val in utils.TailFile(logfile, lines=20)]
776
    return (False, "OS rename script failed (%s), last lines in the"
777
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
778

    
779
  return (True, "Rename successful")
780

    
781

    
782
def _GetVGInfo(vg_name):
783
  """Get information about the volume group.
784

785
  @type vg_name: str
786
  @param vg_name: the volume group which we query
787
  @rtype: dict
788
  @return:
789
    A dictionary with the following keys:
790
      - C{vg_size} is the total size of the volume group in MiB
791
      - C{vg_free} is the free size of the volume group in MiB
792
      - C{pv_count} are the number of physical disks in that VG
793

794
    If an error occurs during gathering of data, we return the same dict
795
    with keys all set to None.
796

797
  """
798
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
799

    
800
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
801
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
802

    
803
  if retval.failed:
804
    logging.error("volume group %s not present", vg_name)
805
    return retdic
806
  valarr = retval.stdout.strip().rstrip(':').split(':')
807
  if len(valarr) == 3:
808
    try:
809
      retdic = {
810
        "vg_size": int(round(float(valarr[0]), 0)),
811
        "vg_free": int(round(float(valarr[1]), 0)),
812
        "pv_count": int(valarr[2]),
813
        }
814
    except ValueError, err:
815
      logging.exception("Fail to parse vgs output")
816
  else:
817
    logging.error("vgs output has the wrong number of fields (expected"
818
                  " three): %s", str(valarr))
819
  return retdic
820

    
821

    
822
def _GetBlockDevSymlinkPath(instance_name, idx):
823
  return os.path.join(constants.DISK_LINKS_DIR,
824
                      "%s:%d" % (instance_name, idx))
825

    
826

    
827
def _SymlinkBlockDev(instance_name, device_path, idx):
828
  """Set up symlinks to a instance's block device.
829

830
  This is an auxiliary function run when an instance is start (on the primary
831
  node) or when an instance is migrated (on the target node).
832

833

834
  @param instance_name: the name of the target instance
835
  @param device_path: path of the physical block device, on the node
836
  @param idx: the disk index
837
  @return: absolute path to the disk's symlink
838

839
  """
840
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
841
  try:
842
    os.symlink(device_path, link_name)
843
  except OSError, err:
844
    if err.errno == errno.EEXIST:
845
      if (not os.path.islink(link_name) or
846
          os.readlink(link_name) != device_path):
847
        os.remove(link_name)
848
        os.symlink(device_path, link_name)
849
    else:
850
      raise
851

    
852
  return link_name
853

    
854

    
855
def _RemoveBlockDevLinks(instance_name, disks):
856
  """Remove the block device symlinks belonging to the given instance.
857

858
  """
859
  for idx, disk in enumerate(disks):
860
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
861
    if os.path.islink(link_name):
862
      try:
863
        os.remove(link_name)
864
      except OSError:
865
        logging.exception("Can't remove symlink '%s'", link_name)
866

    
867

    
868
def _GatherAndLinkBlockDevs(instance):
869
  """Set up an instance's block device(s).
870

871
  This is run on the primary node at instance startup. The block
872
  devices must be already assembled.
873

874
  @type instance: L{objects.Instance}
875
  @param instance: the instance whose disks we shoul assemble
876
  @rtype: list
877
  @return: list of (disk_object, device_path)
878

879
  """
880
  block_devices = []
881
  for idx, disk in enumerate(instance.disks):
882
    device = _RecursiveFindBD(disk)
883
    if device is None:
884
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
885
                                    str(disk))
886
    device.Open()
887
    try:
888
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
889
    except OSError, e:
890
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
891
                                    e.strerror)
892

    
893
    block_devices.append((disk, link_name))
894

    
895
  return block_devices
896

    
897

    
898
def StartInstance(instance):
899
  """Start an instance.
900

901
  @type instance: L{objects.Instance}
902
  @param instance: the instance object
903
  @rtype: boolean
904
  @return: whether the startup was successful or not
905

906
  """
907
  running_instances = GetInstanceList([instance.hypervisor])
908

    
909
  if instance.name in running_instances:
910
    return (True, "Already running")
911

    
912
  try:
913
    block_devices = _GatherAndLinkBlockDevs(instance)
914
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
915
    hyper.StartInstance(instance, block_devices)
916
  except errors.BlockDeviceError, err:
917
    logging.exception("Failed to start instance")
918
    return (False, "Block device error: %s" % str(err))
919
  except errors.HypervisorError, err:
920
    logging.exception("Failed to start instance")
921
    _RemoveBlockDevLinks(instance.name, instance.disks)
922
    return (False, "Hypervisor error: %s" % str(err))
923

    
924
  return (True, "Instance started successfully")
925

    
926

    
927
def InstanceShutdown(instance):
928
  """Shut an instance down.
929

930
  @note: this functions uses polling with a hardcoded timeout.
931

932
  @type instance: L{objects.Instance}
933
  @param instance: the instance object
934
  @rtype: boolean
935
  @return: whether the startup was successful or not
936

937
  """
938
  hv_name = instance.hypervisor
939
  running_instances = GetInstanceList([hv_name])
940

    
941
  if instance.name not in running_instances:
942
    return (True, "Instance already stopped")
943

    
944
  hyper = hypervisor.GetHypervisor(hv_name)
945
  try:
946
    hyper.StopInstance(instance)
947
  except errors.HypervisorError, err:
948
    msg = "Failed to stop instance %s: %s" % (instance.name, err)
949
    logging.error(msg)
950
    return (False, msg)
951

    
952
  # test every 10secs for 2min
953

    
954
  time.sleep(1)
955
  for _ in range(11):
956
    if instance.name not in GetInstanceList([hv_name]):
957
      break
958
    time.sleep(10)
959
  else:
960
    # the shutdown did not succeed
961
    logging.error("Shutdown of '%s' unsuccessful, using destroy",
962
                  instance.name)
963

    
964
    try:
965
      hyper.StopInstance(instance, force=True)
966
    except errors.HypervisorError, err:
967
      msg = "Failed to force stop instance %s: %s" % (instance.name, err)
968
      logging.error(msg)
969
      return (False, msg)
970

    
971
    time.sleep(1)
972
    if instance.name in GetInstanceList([hv_name]):
973
      msg = ("Could not shutdown instance %s even by destroy" %
974
             instance.name)
975
      logging.error(msg)
976
      return (False, msg)
977

    
978
  _RemoveBlockDevLinks(instance.name, instance.disks)
979

    
980
  return (True, "Instance has been shutdown successfully")
981

    
982

    
983
def InstanceReboot(instance, reboot_type):
984
  """Reboot an instance.
985

986
  @type instance: L{objects.Instance}
987
  @param instance: the instance object to reboot
988
  @type reboot_type: str
989
  @param reboot_type: the type of reboot, one the following
990
    constants:
991
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
992
        instance OS, do not recreate the VM
993
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
994
        restart the VM (at the hypervisor level)
995
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
996
        not accepted here, since that mode is handled differently, in
997
        cmdlib, and translates into full stop and start of the
998
        instance (instead of a call_instance_reboot RPC)
999
  @rtype: boolean
1000
  @return: the success of the operation
1001

1002
  """
1003
  running_instances = GetInstanceList([instance.hypervisor])
1004

    
1005
  if instance.name not in running_instances:
1006
    msg = "Cannot reboot instance %s that is not running" % instance.name
1007
    logging.error(msg)
1008
    return (False, msg)
1009

    
1010
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1011
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1012
    try:
1013
      hyper.RebootInstance(instance)
1014
    except errors.HypervisorError, err:
1015
      msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
1016
      logging.error(msg)
1017
      return (False, msg)
1018
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1019
    try:
1020
      stop_result = InstanceShutdown(instance)
1021
      if not stop_result[0]:
1022
        return stop_result
1023
      return StartInstance(instance)
1024
    except errors.HypervisorError, err:
1025
      msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
1026
      logging.error(msg)
1027
      return (False, msg)
1028
  else:
1029
    return (False, "Invalid reboot_type received: %s" % (reboot_type,))
1030

    
1031
  return (True, "Reboot successful")
1032

    
1033

    
1034
def MigrationInfo(instance):
1035
  """Gather information about an instance to be migrated.
1036

1037
  @type instance: L{objects.Instance}
1038
  @param instance: the instance definition
1039

1040
  """
1041
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1042
  try:
1043
    info = hyper.MigrationInfo(instance)
1044
  except errors.HypervisorError, err:
1045
    msg = "Failed to fetch migration information"
1046
    logging.exception(msg)
1047
    return (False, '%s: %s' % (msg, err))
1048
  return (True, info)
1049

    
1050

    
1051
def AcceptInstance(instance, info, target):
1052
  """Prepare the node to accept an instance.
1053

1054
  @type instance: L{objects.Instance}
1055
  @param instance: the instance definition
1056
  @type info: string/data (opaque)
1057
  @param info: migration information, from the source node
1058
  @type target: string
1059
  @param target: target host (usually ip), on this node
1060

1061
  """
1062
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1063
  try:
1064
    hyper.AcceptInstance(instance, info, target)
1065
  except errors.HypervisorError, err:
1066
    msg = "Failed to accept instance"
1067
    logging.exception(msg)
1068
    return (False, '%s: %s' % (msg, err))
1069
  return (True, "Accept successful")
1070

    
1071

    
1072
def FinalizeMigration(instance, info, success):
1073
  """Finalize any preparation to accept an instance.
1074

1075
  @type instance: L{objects.Instance}
1076
  @param instance: the instance definition
1077
  @type info: string/data (opaque)
1078
  @param info: migration information, from the source node
1079
  @type success: boolean
1080
  @param success: whether the migration was a success or a failure
1081

1082
  """
1083
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1084
  try:
1085
    hyper.FinalizeMigration(instance, info, success)
1086
  except errors.HypervisorError, err:
1087
    msg = "Failed to finalize migration"
1088
    logging.exception(msg)
1089
    return (False, '%s: %s' % (msg, err))
1090
  return (True, "Migration Finalized")
1091

    
1092

    
1093
def MigrateInstance(instance, target, live):
1094
  """Migrates an instance to another node.
1095

1096
  @type instance: L{objects.Instance}
1097
  @param instance: the instance definition
1098
  @type target: string
1099
  @param target: the target node name
1100
  @type live: boolean
1101
  @param live: whether the migration should be done live or not (the
1102
      interpretation of this parameter is left to the hypervisor)
1103
  @rtype: tuple
1104
  @return: a tuple of (success, msg) where:
1105
      - succes is a boolean denoting the success/failure of the operation
1106
      - msg is a string with details in case of failure
1107

1108
  """
1109
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1110

    
1111
  try:
1112
    hyper.MigrateInstance(instance.name, target, live)
1113
  except errors.HypervisorError, err:
1114
    msg = "Failed to migrate instance"
1115
    logging.exception(msg)
1116
    return (False, "%s: %s" % (msg, err))
1117
  return (True, "Migration successful")
1118

    
1119

    
1120
def BlockdevCreate(disk, size, owner, on_primary, info):
1121
  """Creates a block device for an instance.
1122

1123
  @type disk: L{objects.Disk}
1124
  @param disk: the object describing the disk we should create
1125
  @type size: int
1126
  @param size: the size of the physical underlying device, in MiB
1127
  @type owner: str
1128
  @param owner: the name of the instance for which disk is created,
1129
      used for device cache data
1130
  @type on_primary: boolean
1131
  @param on_primary:  indicates if it is the primary node or not
1132
  @type info: string
1133
  @param info: string that will be sent to the physical device
1134
      creation, used for example to set (LVM) tags on LVs
1135

1136
  @return: the new unique_id of the device (this can sometime be
1137
      computed only after creation), or None. On secondary nodes,
1138
      it's not required to return anything.
1139

1140
  """
1141
  clist = []
1142
  if disk.children:
1143
    for child in disk.children:
1144
      try:
1145
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1146
      except errors.BlockDeviceError, err:
1147
        errmsg = "Can't assemble device %s: %s" % (child, err)
1148
        logging.error(errmsg)
1149
        return False, errmsg
1150
      if on_primary or disk.AssembleOnSecondary():
1151
        # we need the children open in case the device itself has to
1152
        # be assembled
1153
        try:
1154
          crdev.Open()
1155
        except errors.BlockDeviceError, err:
1156
          errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1157
          logging.error(errmsg)
1158
          return False, errmsg
1159
      clist.append(crdev)
1160

    
1161
  try:
1162
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1163
  except errors.BlockDeviceError, err:
1164
    return False, "Can't create block device: %s" % str(err)
1165

    
1166
  if on_primary or disk.AssembleOnSecondary():
1167
    try:
1168
      device.Assemble()
1169
    except errors.BlockDeviceError, err:
1170
      errmsg = ("Can't assemble device after creation, very"
1171
                " unusual event: %s" % str(err))
1172
      logging.error(errmsg)
1173
      return False, errmsg
1174
    device.SetSyncSpeed(constants.SYNC_SPEED)
1175
    if on_primary or disk.OpenOnSecondary():
1176
      try:
1177
        device.Open(force=True)
1178
      except errors.BlockDeviceError, err:
1179
        errmsg = ("Can't make device r/w after creation, very"
1180
                  " unusual event: %s" % str(err))
1181
        logging.error(errmsg)
1182
        return False, errmsg
1183
    DevCacheManager.UpdateCache(device.dev_path, owner,
1184
                                on_primary, disk.iv_name)
1185

    
1186
  device.SetInfo(info)
1187

    
1188
  physical_id = device.unique_id
1189
  return True, physical_id
1190

    
1191

    
1192
def BlockdevRemove(disk):
1193
  """Remove a block device.
1194

1195
  @note: This is intended to be called recursively.
1196

1197
  @type disk: L{objects.Disk}
1198
  @param disk: the disk object we should remove
1199
  @rtype: boolean
1200
  @return: the success of the operation
1201

1202
  """
1203
  msgs = []
1204
  result = True
1205
  try:
1206
    rdev = _RecursiveFindBD(disk)
1207
  except errors.BlockDeviceError, err:
1208
    # probably can't attach
1209
    logging.info("Can't attach to device %s in remove", disk)
1210
    rdev = None
1211
  if rdev is not None:
1212
    r_path = rdev.dev_path
1213
    try:
1214
      rdev.Remove()
1215
    except errors.BlockDeviceError, err:
1216
      msgs.append(str(err))
1217
      result = False
1218
    if result:
1219
      DevCacheManager.RemoveCache(r_path)
1220

    
1221
  if disk.children:
1222
    for child in disk.children:
1223
      c_status, c_msg = BlockdevRemove(child)
1224
      result = result and c_status
1225
      if c_msg: # not an empty message
1226
        msgs.append(c_msg)
1227

    
1228
  return (result, "; ".join(msgs))
1229

    
1230

    
1231
def _RecursiveAssembleBD(disk, owner, as_primary):
1232
  """Activate a block device for an instance.
1233

1234
  This is run on the primary and secondary nodes for an instance.
1235

1236
  @note: this function is called recursively.
1237

1238
  @type disk: L{objects.Disk}
1239
  @param disk: the disk we try to assemble
1240
  @type owner: str
1241
  @param owner: the name of the instance which owns the disk
1242
  @type as_primary: boolean
1243
  @param as_primary: if we should make the block device
1244
      read/write
1245

1246
  @return: the assembled device or None (in case no device
1247
      was assembled)
1248
  @raise errors.BlockDeviceError: in case there is an error
1249
      during the activation of the children or the device
1250
      itself
1251

1252
  """
1253
  children = []
1254
  if disk.children:
1255
    mcn = disk.ChildrenNeeded()
1256
    if mcn == -1:
1257
      mcn = 0 # max number of Nones allowed
1258
    else:
1259
      mcn = len(disk.children) - mcn # max number of Nones
1260
    for chld_disk in disk.children:
1261
      try:
1262
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1263
      except errors.BlockDeviceError, err:
1264
        if children.count(None) >= mcn:
1265
          raise
1266
        cdev = None
1267
        logging.error("Error in child activation (but continuing): %s",
1268
                      str(err))
1269
      children.append(cdev)
1270

    
1271
  if as_primary or disk.AssembleOnSecondary():
1272
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1273
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1274
    result = r_dev
1275
    if as_primary or disk.OpenOnSecondary():
1276
      r_dev.Open()
1277
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1278
                                as_primary, disk.iv_name)
1279

    
1280
  else:
1281
    result = True
1282
  return result
1283

    
1284

    
1285
def BlockdevAssemble(disk, owner, as_primary):
1286
  """Activate a block device for an instance.
1287

1288
  This is a wrapper over _RecursiveAssembleBD.
1289

1290
  @rtype: str or boolean
1291
  @return: a C{/dev/...} path for primary nodes, and
1292
      C{True} for secondary nodes
1293

1294
  """
1295
  status = True
1296
  result = "no error information"
1297
  try:
1298
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1299
    if isinstance(result, bdev.BlockDev):
1300
      result = result.dev_path
1301
  except errors.BlockDeviceError, err:
1302
    result = "Error while assembling disk: %s" % str(err)
1303
    status = False
1304
  return (status, result)
1305

    
1306

    
1307
def BlockdevShutdown(disk):
1308
  """Shut down a block device.
1309

1310
  First, if the device is assembled (Attach() is successful), then
1311
  the device is shutdown. Then the children of the device are
1312
  shutdown.
1313

1314
  This function is called recursively. Note that we don't cache the
1315
  children or such, as oppossed to assemble, shutdown of different
1316
  devices doesn't require that the upper device was active.
1317

1318
  @type disk: L{objects.Disk}
1319
  @param disk: the description of the disk we should
1320
      shutdown
1321
  @rtype: boolean
1322
  @return: the success of the operation
1323

1324
  """
1325
  msgs = []
1326
  result = True
1327
  r_dev = _RecursiveFindBD(disk)
1328
  if r_dev is not None:
1329
    r_path = r_dev.dev_path
1330
    try:
1331
      r_dev.Shutdown()
1332
      DevCacheManager.RemoveCache(r_path)
1333
    except errors.BlockDeviceError, err:
1334
      msgs.append(str(err))
1335
      result = False
1336

    
1337
  if disk.children:
1338
    for child in disk.children:
1339
      c_status, c_msg = BlockdevShutdown(child)
1340
      result = result and c_status
1341
      if c_msg: # not an empty message
1342
        msgs.append(c_msg)
1343

    
1344
  return (result, "; ".join(msgs))
1345

    
1346

    
1347
def BlockdevAddchildren(parent_cdev, new_cdevs):
1348
  """Extend a mirrored block device.
1349

1350
  @type parent_cdev: L{objects.Disk}
1351
  @param parent_cdev: the disk to which we should add children
1352
  @type new_cdevs: list of L{objects.Disk}
1353
  @param new_cdevs: the list of children which we should add
1354
  @rtype: boolean
1355
  @return: the success of the operation
1356

1357
  """
1358
  parent_bdev = _RecursiveFindBD(parent_cdev)
1359
  if parent_bdev is None:
1360
    logging.error("Can't find parent device")
1361
    return False
1362
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1363
  if new_bdevs.count(None) > 0:
1364
    logging.error("Can't find new device(s) to add: %s:%s",
1365
                  new_bdevs, new_cdevs)
1366
    return False
1367
  parent_bdev.AddChildren(new_bdevs)
1368
  return True
1369

    
1370

    
1371
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1372
  """Shrink a mirrored block device.
1373

1374
  @type parent_cdev: L{objects.Disk}
1375
  @param parent_cdev: the disk from which we should remove children
1376
  @type new_cdevs: list of L{objects.Disk}
1377
  @param new_cdevs: the list of children which we should remove
1378
  @rtype: boolean
1379
  @return: the success of the operation
1380

1381
  """
1382
  parent_bdev = _RecursiveFindBD(parent_cdev)
1383
  if parent_bdev is None:
1384
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1385
    return False
1386
  devs = []
1387
  for disk in new_cdevs:
1388
    rpath = disk.StaticDevPath()
1389
    if rpath is None:
1390
      bd = _RecursiveFindBD(disk)
1391
      if bd is None:
1392
        logging.error("Can't find dynamic device %s while removing children",
1393
                      disk)
1394
        return False
1395
      else:
1396
        devs.append(bd.dev_path)
1397
    else:
1398
      devs.append(rpath)
1399
  parent_bdev.RemoveChildren(devs)
1400
  return True
1401

    
1402

    
1403
def BlockdevGetmirrorstatus(disks):
1404
  """Get the mirroring status of a list of devices.
1405

1406
  @type disks: list of L{objects.Disk}
1407
  @param disks: the list of disks which we should query
1408
  @rtype: disk
1409
  @return:
1410
      a list of (mirror_done, estimated_time) tuples, which
1411
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1412
  @raise errors.BlockDeviceError: if any of the disks cannot be
1413
      found
1414

1415
  """
1416
  stats = []
1417
  for dsk in disks:
1418
    rbd = _RecursiveFindBD(dsk)
1419
    if rbd is None:
1420
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1421
    stats.append(rbd.CombinedSyncStatus())
1422
  return stats
1423

    
1424

    
1425
def _RecursiveFindBD(disk):
1426
  """Check if a device is activated.
1427

1428
  If so, return information about the real device.
1429

1430
  @type disk: L{objects.Disk}
1431
  @param disk: the disk object we need to find
1432

1433
  @return: None if the device can't be found,
1434
      otherwise the device instance
1435

1436
  """
1437
  children = []
1438
  if disk.children:
1439
    for chdisk in disk.children:
1440
      children.append(_RecursiveFindBD(chdisk))
1441

    
1442
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1443

    
1444

    
1445
def BlockdevFind(disk):
1446
  """Check if a device is activated.
1447

1448
  If it is, return information about the real device.
1449

1450
  @type disk: L{objects.Disk}
1451
  @param disk: the disk to find
1452
  @rtype: None or tuple
1453
  @return: None if the disk cannot be found, otherwise a
1454
      tuple (device_path, major, minor, sync_percent,
1455
      estimated_time, is_degraded)
1456

1457
  """
1458
  try:
1459
    rbd = _RecursiveFindBD(disk)
1460
  except errors.BlockDeviceError, err:
1461
    return (False, str(err))
1462
  if rbd is None:
1463
    return (True, None)
1464
  return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1465

    
1466

    
1467
def BlockdevGetsize(disks):
1468
  """Computes the size of the given disks.
1469

1470
  If a disk is not found, returns None instead.
1471

1472
  @type disks: list of L{objects.Disk}
1473
  @param disks: the list of disk to compute the size for
1474
  @rtype: list
1475
  @return: list with elements None if the disk cannot be found,
1476
      otherwise the size
1477

1478
  """
1479
  result = []
1480
  for cf in disks:
1481
    try:
1482
      rbd = _RecursiveFindBD(cf)
1483
    except errors.BlockDeviceError, err:
1484
      result.append(None)
1485
      continue
1486
    if rbd is None:
1487
      result.append(None)
1488
    else:
1489
      result.append(rbd.GetActualSize())
1490
  return result
1491

    
1492

    
1493
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1494
  """Write a file to the filesystem.
1495

1496
  This allows the master to overwrite(!) a file. It will only perform
1497
  the operation if the file belongs to a list of configuration files.
1498

1499
  @type file_name: str
1500
  @param file_name: the target file name
1501
  @type data: str
1502
  @param data: the new contents of the file
1503
  @type mode: int
1504
  @param mode: the mode to give the file (can be None)
1505
  @type uid: int
1506
  @param uid: the owner of the file (can be -1 for default)
1507
  @type gid: int
1508
  @param gid: the group of the file (can be -1 for default)
1509
  @type atime: float
1510
  @param atime: the atime to set on the file (can be None)
1511
  @type mtime: float
1512
  @param mtime: the mtime to set on the file (can be None)
1513
  @rtype: boolean
1514
  @return: the success of the operation; errors are logged
1515
      in the node daemon log
1516

1517
  """
1518
  if not os.path.isabs(file_name):
1519
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1520
                  file_name)
1521
    return False
1522

    
1523
  if file_name not in _ALLOWED_UPLOAD_FILES:
1524
    logging.error("Filename passed to UploadFile not in allowed"
1525
                 " upload targets: '%s'", file_name)
1526
    return False
1527

    
1528
  raw_data = _Decompress(data)
1529

    
1530
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1531
                  atime=atime, mtime=mtime)
1532
  return True
1533

    
1534

    
1535
def WriteSsconfFiles(values):
1536
  """Update all ssconf files.
1537

1538
  Wrapper around the SimpleStore.WriteFiles.
1539

1540
  """
1541
  ssconf.SimpleStore().WriteFiles(values)
1542

    
1543

    
1544
def _ErrnoOrStr(err):
1545
  """Format an EnvironmentError exception.
1546

1547
  If the L{err} argument has an errno attribute, it will be looked up
1548
  and converted into a textual C{E...} description. Otherwise the
1549
  string representation of the error will be returned.
1550

1551
  @type err: L{EnvironmentError}
1552
  @param err: the exception to format
1553

1554
  """
1555
  if hasattr(err, 'errno'):
1556
    detail = errno.errorcode[err.errno]
1557
  else:
1558
    detail = str(err)
1559
  return detail
1560

    
1561

    
1562
def _OSOndiskVersion(name, os_dir):
1563
  """Compute and return the API version of a given OS.
1564

1565
  This function will try to read the API version of the OS given by
1566
  the 'name' parameter and residing in the 'os_dir' directory.
1567

1568
  @type name: str
1569
  @param name: the OS name we should look for
1570
  @type os_dir: str
1571
  @param os_dir: the directory inwhich we should look for the OS
1572
  @rtype: int or None
1573
  @return:
1574
      Either an integer denoting the version or None in the
1575
      case when this is not a valid OS name.
1576
  @raise errors.InvalidOS: if the OS cannot be found
1577

1578
  """
1579
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1580

    
1581
  try:
1582
    st = os.stat(api_file)
1583
  except EnvironmentError, err:
1584
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1585
                           " found (%s)" % _ErrnoOrStr(err))
1586

    
1587
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1588
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1589
                           " a regular file")
1590

    
1591
  try:
1592
    f = open(api_file)
1593
    try:
1594
      api_versions = f.readlines()
1595
    finally:
1596
      f.close()
1597
  except EnvironmentError, err:
1598
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1599
                           " API version (%s)" % _ErrnoOrStr(err))
1600

    
1601
  api_versions = [version.strip() for version in api_versions]
1602
  try:
1603
    api_versions = [int(version) for version in api_versions]
1604
  except (TypeError, ValueError), err:
1605
    raise errors.InvalidOS(name, os_dir,
1606
                           "API version is not integer (%s)" % str(err))
1607

    
1608
  return api_versions
1609

    
1610

    
1611
def DiagnoseOS(top_dirs=None):
1612
  """Compute the validity for all OSes.
1613

1614
  @type top_dirs: list
1615
  @param top_dirs: the list of directories in which to
1616
      search (if not given defaults to
1617
      L{constants.OS_SEARCH_PATH})
1618
  @rtype: list of L{objects.OS}
1619
  @return: an OS object for each name in all the given
1620
      directories
1621

1622
  """
1623
  if top_dirs is None:
1624
    top_dirs = constants.OS_SEARCH_PATH
1625

    
1626
  result = []
1627
  for dir_name in top_dirs:
1628
    if os.path.isdir(dir_name):
1629
      try:
1630
        f_names = utils.ListVisibleFiles(dir_name)
1631
      except EnvironmentError, err:
1632
        logging.exception("Can't list the OS directory %s", dir_name)
1633
        break
1634
      for name in f_names:
1635
        try:
1636
          os_inst = OSFromDisk(name, base_dir=dir_name)
1637
          result.append(os_inst)
1638
        except errors.InvalidOS, err:
1639
          result.append(objects.OS.FromInvalidOS(err))
1640

    
1641
  return result
1642

    
1643

    
1644
def OSFromDisk(name, base_dir=None):
1645
  """Create an OS instance from disk.
1646

1647
  This function will return an OS instance if the given name is a
1648
  valid OS name. Otherwise, it will raise an appropriate
1649
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1650

1651
  @type base_dir: string
1652
  @keyword base_dir: Base directory containing OS installations.
1653
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1654
  @rtype: L{objects.OS}
1655
  @return: the OS instance if we find a valid one
1656
  @raise errors.InvalidOS: if we don't find a valid OS
1657

1658
  """
1659
  if base_dir is None:
1660
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1661
    if os_dir is None:
1662
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1663
  else:
1664
    os_dir = os.path.sep.join([base_dir, name])
1665

    
1666
  api_versions = _OSOndiskVersion(name, os_dir)
1667

    
1668
  if constants.OS_API_VERSION not in api_versions:
1669
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1670
                           " (found %s want %s)"
1671
                           % (api_versions, constants.OS_API_VERSION))
1672

    
1673
  # OS Scripts dictionary, we will populate it with the actual script names
1674
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1675

    
1676
  for script in os_scripts:
1677
    os_scripts[script] = os.path.sep.join([os_dir, script])
1678

    
1679
    try:
1680
      st = os.stat(os_scripts[script])
1681
    except EnvironmentError, err:
1682
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1683
                             (script, _ErrnoOrStr(err)))
1684

    
1685
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1686
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1687
                             script)
1688

    
1689
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1690
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1691
                             script)
1692

    
1693

    
1694
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1695
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1696
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1697
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1698
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1699
                    api_versions=api_versions)
1700

    
1701
def OSEnvironment(instance, debug=0):
1702
  """Calculate the environment for an os script.
1703

1704
  @type instance: L{objects.Instance}
1705
  @param instance: target instance for the os script run
1706
  @type debug: integer
1707
  @param debug: debug level (0 or 1, for OS Api 10)
1708
  @rtype: dict
1709
  @return: dict of environment variables
1710
  @raise errors.BlockDeviceError: if the block device
1711
      cannot be found
1712

1713
  """
1714
  result = {}
1715
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1716
  result['INSTANCE_NAME'] = instance.name
1717
  result['INSTANCE_OS'] = instance.os
1718
  result['HYPERVISOR'] = instance.hypervisor
1719
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1720
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1721
  result['DEBUG_LEVEL'] = '%d' % debug
1722
  for idx, disk in enumerate(instance.disks):
1723
    real_disk = _RecursiveFindBD(disk)
1724
    if real_disk is None:
1725
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1726
                                    str(disk))
1727
    real_disk.Open()
1728
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1729
    result['DISK_%d_ACCESS' % idx] = disk.mode
1730
    if constants.HV_DISK_TYPE in instance.hvparams:
1731
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1732
        instance.hvparams[constants.HV_DISK_TYPE]
1733
    if disk.dev_type in constants.LDS_BLOCK:
1734
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1735
    elif disk.dev_type == constants.LD_FILE:
1736
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1737
        'file:%s' % disk.physical_id[0]
1738
  for idx, nic in enumerate(instance.nics):
1739
    result['NIC_%d_MAC' % idx] = nic.mac
1740
    if nic.ip:
1741
      result['NIC_%d_IP' % idx] = nic.ip
1742
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1743
    if constants.HV_NIC_TYPE in instance.hvparams:
1744
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1745
        instance.hvparams[constants.HV_NIC_TYPE]
1746

    
1747
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1748
    for key, value in source.items():
1749
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1750

    
1751
  return result
1752

    
1753
def BlockdevGrow(disk, amount):
1754
  """Grow a stack of block devices.
1755

1756
  This function is called recursively, with the childrens being the
1757
  first ones to resize.
1758

1759
  @type disk: L{objects.Disk}
1760
  @param disk: the disk to be grown
1761
  @rtype: (status, result)
1762
  @return: a tuple with the status of the operation
1763
      (True/False), and the errors message if status
1764
      is False
1765

1766
  """
1767
  r_dev = _RecursiveFindBD(disk)
1768
  if r_dev is None:
1769
    return False, "Cannot find block device %s" % (disk,)
1770

    
1771
  try:
1772
    r_dev.Grow(amount)
1773
  except errors.BlockDeviceError, err:
1774
    return False, str(err)
1775

    
1776
  return True, None
1777

    
1778

    
1779
def BlockdevSnapshot(disk):
1780
  """Create a snapshot copy of a block device.
1781

1782
  This function is called recursively, and the snapshot is actually created
1783
  just for the leaf lvm backend device.
1784

1785
  @type disk: L{objects.Disk}
1786
  @param disk: the disk to be snapshotted
1787
  @rtype: string
1788
  @return: snapshot disk path
1789

1790
  """
1791
  if disk.children:
1792
    if len(disk.children) == 1:
1793
      # only one child, let's recurse on it
1794
      return BlockdevSnapshot(disk.children[0])
1795
    else:
1796
      # more than one child, choose one that matches
1797
      for child in disk.children:
1798
        if child.size == disk.size:
1799
          # return implies breaking the loop
1800
          return BlockdevSnapshot(child)
1801
  elif disk.dev_type == constants.LD_LV:
1802
    r_dev = _RecursiveFindBD(disk)
1803
    if r_dev is not None:
1804
      # let's stay on the safe side and ask for the full size, for now
1805
      return r_dev.Snapshot(disk.size)
1806
    else:
1807
      return None
1808
  else:
1809
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1810
                                 " '%s' of type '%s'" %
1811
                                 (disk.unique_id, disk.dev_type))
1812

    
1813

    
1814
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1815
  """Export a block device snapshot to a remote node.
1816

1817
  @type disk: L{objects.Disk}
1818
  @param disk: the description of the disk to export
1819
  @type dest_node: str
1820
  @param dest_node: the destination node to export to
1821
  @type instance: L{objects.Instance}
1822
  @param instance: the instance object to whom the disk belongs
1823
  @type cluster_name: str
1824
  @param cluster_name: the cluster name, needed for SSH hostalias
1825
  @type idx: int
1826
  @param idx: the index of the disk in the instance's disk list,
1827
      used to export to the OS scripts environment
1828
  @rtype: boolean
1829
  @return: the success of the operation
1830

1831
  """
1832
  export_env = OSEnvironment(instance)
1833

    
1834
  inst_os = OSFromDisk(instance.os)
1835
  export_script = inst_os.export_script
1836

    
1837
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1838
                                     instance.name, int(time.time()))
1839
  if not os.path.exists(constants.LOG_OS_DIR):
1840
    os.mkdir(constants.LOG_OS_DIR, 0750)
1841
  real_disk = _RecursiveFindBD(disk)
1842
  if real_disk is None:
1843
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1844
                                  str(disk))
1845
  real_disk.Open()
1846

    
1847
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1848
  export_env['EXPORT_INDEX'] = str(idx)
1849

    
1850
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1851
  destfile = disk.physical_id[1]
1852

    
1853
  # the target command is built out of three individual commands,
1854
  # which are joined by pipes; we check each individual command for
1855
  # valid parameters
1856
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1857
                               inst_os.path, export_script, logfile)
1858

    
1859
  comprcmd = "gzip"
1860

    
1861
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1862
                                destdir, destdir, destfile)
1863
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1864
                                                   constants.GANETI_RUNAS,
1865
                                                   destcmd)
1866

    
1867
  # all commands have been checked, so we're safe to combine them
1868
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1869

    
1870
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1871

    
1872
  if result.failed:
1873
    logging.error("os snapshot export command '%s' returned error: %s"
1874
                  " output: %s", command, result.fail_reason, result.output)
1875
    return False
1876

    
1877
  return True
1878

    
1879

    
1880
def FinalizeExport(instance, snap_disks):
1881
  """Write out the export configuration information.
1882

1883
  @type instance: L{objects.Instance}
1884
  @param instance: the instance which we export, used for
1885
      saving configuration
1886
  @type snap_disks: list of L{objects.Disk}
1887
  @param snap_disks: list of snapshot block devices, which
1888
      will be used to get the actual name of the dump file
1889

1890
  @rtype: boolean
1891
  @return: the success of the operation
1892

1893
  """
1894
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1895
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1896

    
1897
  config = objects.SerializableConfigParser()
1898

    
1899
  config.add_section(constants.INISECT_EXP)
1900
  config.set(constants.INISECT_EXP, 'version', '0')
1901
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1902
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1903
  config.set(constants.INISECT_EXP, 'os', instance.os)
1904
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1905

    
1906
  config.add_section(constants.INISECT_INS)
1907
  config.set(constants.INISECT_INS, 'name', instance.name)
1908
  config.set(constants.INISECT_INS, 'memory', '%d' %
1909
             instance.beparams[constants.BE_MEMORY])
1910
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1911
             instance.beparams[constants.BE_VCPUS])
1912
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1913

    
1914
  nic_total = 0
1915
  for nic_count, nic in enumerate(instance.nics):
1916
    nic_total += 1
1917
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1918
               nic_count, '%s' % nic.mac)
1919
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1920
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1921
               '%s' % nic.bridge)
1922
  # TODO: redundant: on load can read nics until it doesn't exist
1923
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1924

    
1925
  disk_total = 0
1926
  for disk_count, disk in enumerate(snap_disks):
1927
    if disk:
1928
      disk_total += 1
1929
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1930
                 ('%s' % disk.iv_name))
1931
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1932
                 ('%s' % disk.physical_id[1]))
1933
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1934
                 ('%d' % disk.size))
1935

    
1936
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1937

    
1938
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1939
                  data=config.Dumps())
1940
  shutil.rmtree(finaldestdir, True)
1941
  shutil.move(destdir, finaldestdir)
1942

    
1943
  return True
1944

    
1945

    
1946
def ExportInfo(dest):
1947
  """Get export configuration information.
1948

1949
  @type dest: str
1950
  @param dest: directory containing the export
1951

1952
  @rtype: L{objects.SerializableConfigParser}
1953
  @return: a serializable config file containing the
1954
      export info
1955

1956
  """
1957
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1958

    
1959
  config = objects.SerializableConfigParser()
1960
  config.read(cff)
1961

    
1962
  if (not config.has_section(constants.INISECT_EXP) or
1963
      not config.has_section(constants.INISECT_INS)):
1964
    return None
1965

    
1966
  return config
1967

    
1968

    
1969
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1970
  """Import an os image into an instance.
1971

1972
  @type instance: L{objects.Instance}
1973
  @param instance: instance to import the disks into
1974
  @type src_node: string
1975
  @param src_node: source node for the disk images
1976
  @type src_images: list of string
1977
  @param src_images: absolute paths of the disk images
1978
  @rtype: list of boolean
1979
  @return: each boolean represent the success of importing the n-th disk
1980

1981
  """
1982
  import_env = OSEnvironment(instance)
1983
  inst_os = OSFromDisk(instance.os)
1984
  import_script = inst_os.import_script
1985

    
1986
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1987
                                        instance.name, int(time.time()))
1988
  if not os.path.exists(constants.LOG_OS_DIR):
1989
    os.mkdir(constants.LOG_OS_DIR, 0750)
1990

    
1991
  comprcmd = "gunzip"
1992
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1993
                               import_script, logfile)
1994

    
1995
  final_result = []
1996
  for idx, image in enumerate(src_images):
1997
    if image:
1998
      destcmd = utils.BuildShellCmd('cat %s', image)
1999
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2000
                                                       constants.GANETI_RUNAS,
2001
                                                       destcmd)
2002
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2003
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2004
      import_env['IMPORT_INDEX'] = str(idx)
2005
      result = utils.RunCmd(command, env=import_env)
2006
      if result.failed:
2007
        logging.error("Disk import command '%s' returned error: %s"
2008
                      " output: %s", command, result.fail_reason,
2009
                      result.output)
2010
        final_result.append(False)
2011
      else:
2012
        final_result.append(True)
2013
    else:
2014
      final_result.append(True)
2015

    
2016
  return final_result
2017

    
2018

    
2019
def ListExports():
2020
  """Return a list of exports currently available on this machine.
2021

2022
  @rtype: list
2023
  @return: list of the exports
2024

2025
  """
2026
  if os.path.isdir(constants.EXPORT_DIR):
2027
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2028
  else:
2029
    return []
2030

    
2031

    
2032
def RemoveExport(export):
2033
  """Remove an existing export from the node.
2034

2035
  @type export: str
2036
  @param export: the name of the export to remove
2037
  @rtype: boolean
2038
  @return: the success of the operation
2039

2040
  """
2041
  target = os.path.join(constants.EXPORT_DIR, export)
2042

    
2043
  shutil.rmtree(target)
2044
  # TODO: catch some of the relevant exceptions and provide a pretty
2045
  # error message if rmtree fails.
2046

    
2047
  return True
2048

    
2049

    
2050
def BlockdevRename(devlist):
2051
  """Rename a list of block devices.
2052

2053
  @type devlist: list of tuples
2054
  @param devlist: list of tuples of the form  (disk,
2055
      new_logical_id, new_physical_id); disk is an
2056
      L{objects.Disk} object describing the current disk,
2057
      and new logical_id/physical_id is the name we
2058
      rename it to
2059
  @rtype: boolean
2060
  @return: True if all renames succeeded, False otherwise
2061

2062
  """
2063
  result = True
2064
  for disk, unique_id in devlist:
2065
    dev = _RecursiveFindBD(disk)
2066
    if dev is None:
2067
      result = False
2068
      continue
2069
    try:
2070
      old_rpath = dev.dev_path
2071
      dev.Rename(unique_id)
2072
      new_rpath = dev.dev_path
2073
      if old_rpath != new_rpath:
2074
        DevCacheManager.RemoveCache(old_rpath)
2075
        # FIXME: we should add the new cache information here, like:
2076
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2077
        # but we don't have the owner here - maybe parse from existing
2078
        # cache? for now, we only lose lvm data when we rename, which
2079
        # is less critical than DRBD or MD
2080
    except errors.BlockDeviceError:
2081
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2082
      result = False
2083
  return result
2084

    
2085

    
2086
def _TransformFileStorageDir(file_storage_dir):
2087
  """Checks whether given file_storage_dir is valid.
2088

2089
  Checks wheter the given file_storage_dir is within the cluster-wide
2090
  default file_storage_dir stored in SimpleStore. Only paths under that
2091
  directory are allowed.
2092

2093
  @type file_storage_dir: str
2094
  @param file_storage_dir: the path to check
2095

2096
  @return: the normalized path if valid, None otherwise
2097

2098
  """
2099
  cfg = _GetConfig()
2100
  file_storage_dir = os.path.normpath(file_storage_dir)
2101
  base_file_storage_dir = cfg.GetFileStorageDir()
2102
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2103
      base_file_storage_dir):
2104
    logging.error("file storage directory '%s' is not under base file"
2105
                  " storage directory '%s'",
2106
                  file_storage_dir, base_file_storage_dir)
2107
    return None
2108
  return file_storage_dir
2109

    
2110

    
2111
def CreateFileStorageDir(file_storage_dir):
2112
  """Create file storage directory.
2113

2114
  @type file_storage_dir: str
2115
  @param file_storage_dir: directory to create
2116

2117
  @rtype: tuple
2118
  @return: tuple with first element a boolean indicating wheter dir
2119
      creation was successful or not
2120

2121
  """
2122
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2123
  result = True,
2124
  if not file_storage_dir:
2125
    result = False,
2126
  else:
2127
    if os.path.exists(file_storage_dir):
2128
      if not os.path.isdir(file_storage_dir):
2129
        logging.error("'%s' is not a directory", file_storage_dir)
2130
        result = False,
2131
    else:
2132
      try:
2133
        os.makedirs(file_storage_dir, 0750)
2134
      except OSError, err:
2135
        logging.error("Cannot create file storage directory '%s': %s",
2136
                      file_storage_dir, err)
2137
        result = False,
2138
  return result
2139

    
2140

    
2141
def RemoveFileStorageDir(file_storage_dir):
2142
  """Remove file storage directory.
2143

2144
  Remove it only if it's empty. If not log an error and return.
2145

2146
  @type file_storage_dir: str
2147
  @param file_storage_dir: the directory we should cleanup
2148
  @rtype: tuple (success,)
2149
  @return: tuple of one element, C{success}, denoting
2150
      whether the operation was successful
2151

2152
  """
2153
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2154
  result = True,
2155
  if not file_storage_dir:
2156
    result = False,
2157
  else:
2158
    if os.path.exists(file_storage_dir):
2159
      if not os.path.isdir(file_storage_dir):
2160
        logging.error("'%s' is not a directory", file_storage_dir)
2161
        result = False,
2162
      # deletes dir only if empty, otherwise we want to return False
2163
      try:
2164
        os.rmdir(file_storage_dir)
2165
      except OSError:
2166
        logging.exception("Cannot remove file storage directory '%s'",
2167
                          file_storage_dir)
2168
        result = False,
2169
  return result
2170

    
2171

    
2172
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2173
  """Rename the file storage directory.
2174

2175
  @type old_file_storage_dir: str
2176
  @param old_file_storage_dir: the current path
2177
  @type new_file_storage_dir: str
2178
  @param new_file_storage_dir: the name we should rename to
2179
  @rtype: tuple (success,)
2180
  @return: tuple of one element, C{success}, denoting
2181
      whether the operation was successful
2182

2183
  """
2184
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2185
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2186
  result = True,
2187
  if not old_file_storage_dir or not new_file_storage_dir:
2188
    result = False,
2189
  else:
2190
    if not os.path.exists(new_file_storage_dir):
2191
      if os.path.isdir(old_file_storage_dir):
2192
        try:
2193
          os.rename(old_file_storage_dir, new_file_storage_dir)
2194
        except OSError:
2195
          logging.exception("Cannot rename '%s' to '%s'",
2196
                            old_file_storage_dir, new_file_storage_dir)
2197
          result =  False,
2198
      else:
2199
        logging.error("'%s' is not a directory", old_file_storage_dir)
2200
        result = False,
2201
    else:
2202
      if os.path.exists(old_file_storage_dir):
2203
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2204
                      old_file_storage_dir, new_file_storage_dir)
2205
        result = False,
2206
  return result
2207

    
2208

    
2209
def _IsJobQueueFile(file_name):
2210
  """Checks whether the given filename is in the queue directory.
2211

2212
  @type file_name: str
2213
  @param file_name: the file name we should check
2214
  @rtype: boolean
2215
  @return: whether the file is under the queue directory
2216

2217
  """
2218
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2219
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2220

    
2221
  if not result:
2222
    logging.error("'%s' is not a file in the queue directory",
2223
                  file_name)
2224

    
2225
  return result
2226

    
2227

    
2228
def JobQueueUpdate(file_name, content):
2229
  """Updates a file in the queue directory.
2230

2231
  This is just a wrapper over L{utils.WriteFile}, with proper
2232
  checking.
2233

2234
  @type file_name: str
2235
  @param file_name: the job file name
2236
  @type content: str
2237
  @param content: the new job contents
2238
  @rtype: boolean
2239
  @return: the success of the operation
2240

2241
  """
2242
  if not _IsJobQueueFile(file_name):
2243
    return False
2244

    
2245
  # Write and replace the file atomically
2246
  utils.WriteFile(file_name, data=_Decompress(content))
2247

    
2248
  return True
2249

    
2250

    
2251
def JobQueueRename(old, new):
2252
  """Renames a job queue file.
2253

2254
  This is just a wrapper over os.rename with proper checking.
2255

2256
  @type old: str
2257
  @param old: the old (actual) file name
2258
  @type new: str
2259
  @param new: the desired file name
2260
  @rtype: boolean
2261
  @return: the success of the operation
2262

2263
  """
2264
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2265
    return False
2266

    
2267
  utils.RenameFile(old, new, mkdir=True)
2268

    
2269
  return True
2270

    
2271

    
2272
def JobQueueSetDrainFlag(drain_flag):
2273
  """Set the drain flag for the queue.
2274

2275
  This will set or unset the queue drain flag.
2276

2277
  @type drain_flag: boolean
2278
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2279
  @rtype: boolean
2280
  @return: always True
2281
  @warning: the function always returns True
2282

2283
  """
2284
  if drain_flag:
2285
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2286
  else:
2287
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2288

    
2289
  return True
2290

    
2291

    
2292
def BlockdevClose(instance_name, disks):
2293
  """Closes the given block devices.
2294

2295
  This means they will be switched to secondary mode (in case of
2296
  DRBD).
2297

2298
  @param instance_name: if the argument is not empty, the symlinks
2299
      of this instance will be removed
2300
  @type disks: list of L{objects.Disk}
2301
  @param disks: the list of disks to be closed
2302
  @rtype: tuple (success, message)
2303
  @return: a tuple of success and message, where success
2304
      indicates the succes of the operation, and message
2305
      which will contain the error details in case we
2306
      failed
2307

2308
  """
2309
  bdevs = []
2310
  for cf in disks:
2311
    rd = _RecursiveFindBD(cf)
2312
    if rd is None:
2313
      return (False, "Can't find device %s" % cf)
2314
    bdevs.append(rd)
2315

    
2316
  msg = []
2317
  for rd in bdevs:
2318
    try:
2319
      rd.Close()
2320
    except errors.BlockDeviceError, err:
2321
      msg.append(str(err))
2322
  if msg:
2323
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2324
  else:
2325
    if instance_name:
2326
      _RemoveBlockDevLinks(instance_name, disks)
2327
    return (True, "All devices secondary")
2328

    
2329

    
2330
def ValidateHVParams(hvname, hvparams):
2331
  """Validates the given hypervisor parameters.
2332

2333
  @type hvname: string
2334
  @param hvname: the hypervisor name
2335
  @type hvparams: dict
2336
  @param hvparams: the hypervisor parameters to be validated
2337
  @rtype: tuple (success, message)
2338
  @return: a tuple of success and message, where success
2339
      indicates the succes of the operation, and message
2340
      which will contain the error details in case we
2341
      failed
2342

2343
  """
2344
  try:
2345
    hv_type = hypervisor.GetHypervisor(hvname)
2346
    hv_type.ValidateParameters(hvparams)
2347
    return (True, "Validation passed")
2348
  except errors.HypervisorError, err:
2349
    return (False, str(err))
2350

    
2351

    
2352
def DemoteFromMC():
2353
  """Demotes the current node from master candidate role.
2354

2355
  """
2356
  # try to ensure we're not the master by mistake
2357
  master, myself = ssconf.GetMasterAndMyself()
2358
  if master == myself:
2359
    return (False, "ssconf status shows I'm the master node, will not demote")
2360
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2361
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2362
    return (False, "The master daemon is running, will not demote")
2363
  try:
2364
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2365
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2366
  except EnvironmentError, err:
2367
    if err.errno != errno.ENOENT:
2368
      return (False, "Error while backing up cluster file: %s" % str(err))
2369
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2370
  return (True, "Done")
2371

    
2372

    
2373
def _FindDisks(nodes_ip, disks):
2374
  """Sets the physical ID on disks and returns the block devices.
2375

2376
  """
2377
  # set the correct physical ID
2378
  my_name = utils.HostInfo().name
2379
  for cf in disks:
2380
    cf.SetPhysicalID(my_name, nodes_ip)
2381

    
2382
  bdevs = []
2383

    
2384
  for cf in disks:
2385
    rd = _RecursiveFindBD(cf)
2386
    if rd is None:
2387
      return (False, "Can't find device %s" % cf)
2388
    bdevs.append(rd)
2389
  return (True, bdevs)
2390

    
2391

    
2392
def DrbdDisconnectNet(nodes_ip, disks):
2393
  """Disconnects the network on a list of drbd devices.
2394

2395
  """
2396
  status, bdevs = _FindDisks(nodes_ip, disks)
2397
  if not status:
2398
    return status, bdevs
2399

    
2400
  # disconnect disks
2401
  for rd in bdevs:
2402
    try:
2403
      rd.DisconnectNet()
2404
    except errors.BlockDeviceError, err:
2405
      logging.exception("Failed to go into standalone mode")
2406
      return (False, "Can't change network configuration: %s" % str(err))
2407
  return (True, "All disks are now disconnected")
2408

    
2409

    
2410
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2411
  """Attaches the network on a list of drbd devices.
2412

2413
  """
2414
  status, bdevs = _FindDisks(nodes_ip, disks)
2415
  if not status:
2416
    return status, bdevs
2417

    
2418
  if multimaster:
2419
    for idx, rd in enumerate(bdevs):
2420
      try:
2421
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2422
      except EnvironmentError, err:
2423
        return (False, "Can't create symlink: %s" % str(err))
2424
  # reconnect disks, switch to new master configuration and if
2425
  # needed primary mode
2426
  for rd in bdevs:
2427
    try:
2428
      rd.AttachNet(multimaster)
2429
    except errors.BlockDeviceError, err:
2430
      return (False, "Can't change network configuration: %s" % str(err))
2431
  # wait until the disks are connected; we need to retry the re-attach
2432
  # if the device becomes standalone, as this might happen if the one
2433
  # node disconnects and reconnects in a different mode before the
2434
  # other node reconnects; in this case, one or both of the nodes will
2435
  # decide it has wrong configuration and switch to standalone
2436
  RECONNECT_TIMEOUT = 2 * 60
2437
  sleep_time = 0.100 # start with 100 miliseconds
2438
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2439
  while time.time() < timeout_limit:
2440
    all_connected = True
2441
    for rd in bdevs:
2442
      stats = rd.GetProcStatus()
2443
      if not (stats.is_connected or stats.is_in_resync):
2444
        all_connected = False
2445
      if stats.is_standalone:
2446
        # peer had different config info and this node became
2447
        # standalone, even though this should not happen with the
2448
        # new staged way of changing disk configs
2449
        try:
2450
          rd.AttachNet(multimaster)
2451
        except errors.BlockDeviceError, err:
2452
          return (False, "Can't change network configuration: %s" % str(err))
2453
    if all_connected:
2454
      break
2455
    time.sleep(sleep_time)
2456
    sleep_time = min(5, sleep_time * 1.5)
2457
  if not all_connected:
2458
    return (False, "Timeout in disk reconnecting")
2459
  if multimaster:
2460
    # change to primary mode
2461
    for rd in bdevs:
2462
      try:
2463
        rd.Open()
2464
      except errors.BlockDeviceError, err:
2465
        return (False, "Can't change to primary mode: %s" % str(err))
2466
  if multimaster:
2467
    msg = "multi-master and primary"
2468
  else:
2469
    msg = "single-master"
2470
  return (True, "Disks are now configured as %s" % msg)
2471

    
2472

    
2473
def DrbdWaitSync(nodes_ip, disks):
2474
  """Wait until DRBDs have synchronized.
2475

2476
  """
2477
  status, bdevs = _FindDisks(nodes_ip, disks)
2478
  if not status:
2479
    return status, bdevs
2480

    
2481
  min_resync = 100
2482
  alldone = True
2483
  failure = False
2484
  for rd in bdevs:
2485
    stats = rd.GetProcStatus()
2486
    if not (stats.is_connected or stats.is_in_resync):
2487
      failure = True
2488
      break
2489
    alldone = alldone and (not stats.is_in_resync)
2490
    if stats.sync_percent is not None:
2491
      min_resync = min(min_resync, stats.sync_percent)
2492
  return (not failure, (alldone, min_resync))
2493

    
2494

    
2495
class HooksRunner(object):
2496
  """Hook runner.
2497

2498
  This class is instantiated on the node side (ganeti-noded) and not
2499
  on the master side.
2500

2501
  """
2502
  def __init__(self, hooks_base_dir=None):
2503
    """Constructor for hooks runner.
2504

2505
    @type hooks_base_dir: str or None
2506
    @param hooks_base_dir: if not None, this overrides the
2507
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2508

2509
    """
2510
    if hooks_base_dir is None:
2511
      hooks_base_dir = constants.HOOKS_BASE_DIR
2512
    self._BASE_DIR = hooks_base_dir
2513

    
2514
  @staticmethod
2515
  def ExecHook(script, env):
2516
    """Exec one hook script.
2517

2518
    @type script: str
2519
    @param script: the full path to the script
2520
    @type env: dict
2521
    @param env: the environment with which to exec the script
2522
    @rtype: tuple (success, message)
2523
    @return: a tuple of success and message, where success
2524
        indicates the succes of the operation, and message
2525
        which will contain the error details in case we
2526
        failed
2527

2528
    """
2529
    # exec the process using subprocess and log the output
2530
    fdstdin = None
2531
    try:
2532
      fdstdin = open("/dev/null", "r")
2533
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2534
                               stderr=subprocess.STDOUT, close_fds=True,
2535
                               shell=False, cwd="/", env=env)
2536
      output = ""
2537
      try:
2538
        output = child.stdout.read(4096)
2539
        child.stdout.close()
2540
      except EnvironmentError, err:
2541
        output += "Hook script error: %s" % str(err)
2542

    
2543
      while True:
2544
        try:
2545
          result = child.wait()
2546
          break
2547
        except EnvironmentError, err:
2548
          if err.errno == errno.EINTR:
2549
            continue
2550
          raise
2551
    finally:
2552
      # try not to leak fds
2553
      for fd in (fdstdin, ):
2554
        if fd is not None:
2555
          try:
2556
            fd.close()
2557
          except EnvironmentError, err:
2558
            # just log the error
2559
            #logging.exception("Error while closing fd %s", fd)
2560
            pass
2561

    
2562
    return result == 0, utils.SafeEncode(output.strip())
2563

    
2564
  def RunHooks(self, hpath, phase, env):
2565
    """Run the scripts in the hooks directory.
2566

2567
    @type hpath: str
2568
    @param hpath: the path to the hooks directory which
2569
        holds the scripts
2570
    @type phase: str
2571
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2572
        L{constants.HOOKS_PHASE_POST}
2573
    @type env: dict
2574
    @param env: dictionary with the environment for the hook
2575
    @rtype: list
2576
    @return: list of 3-element tuples:
2577
      - script path
2578
      - script result, either L{constants.HKR_SUCCESS} or
2579
        L{constants.HKR_FAIL}
2580
      - output of the script
2581

2582
    @raise errors.ProgrammerError: for invalid input
2583
        parameters
2584

2585
    """
2586
    if phase == constants.HOOKS_PHASE_PRE:
2587
      suffix = "pre"
2588
    elif phase == constants.HOOKS_PHASE_POST:
2589
      suffix = "post"
2590
    else:
2591
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2592
    rr = []
2593

    
2594
    subdir = "%s-%s.d" % (hpath, suffix)
2595
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2596
    try:
2597
      dir_contents = utils.ListVisibleFiles(dir_name)
2598
    except OSError:
2599
      # FIXME: must log output in case of failures
2600
      return rr
2601

    
2602
    # we use the standard python sort order,
2603
    # so 00name is the recommended naming scheme
2604
    dir_contents.sort()
2605
    for relname in dir_contents:
2606
      fname = os.path.join(dir_name, relname)
2607
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2608
              constants.EXT_PLUGIN_MASK.match(relname) is not None):
2609
        rrval = constants.HKR_SKIP
2610
        output = ""
2611
      else:
2612
        result, output = self.ExecHook(fname, env)
2613
        if not result:
2614
          rrval = constants.HKR_FAIL
2615
        else:
2616
          rrval = constants.HKR_SUCCESS
2617
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2618

    
2619
    return rr
2620

    
2621

    
2622
class IAllocatorRunner(object):
2623
  """IAllocator runner.
2624

2625
  This class is instantiated on the node side (ganeti-noded) and not on
2626
  the master side.
2627

2628
  """
2629
  def Run(self, name, idata):
2630
    """Run an iallocator script.
2631

2632
    @type name: str
2633
    @param name: the iallocator script name
2634
    @type idata: str
2635
    @param idata: the allocator input data
2636

2637
    @rtype: tuple
2638
    @return: four element tuple of:
2639
       - run status (one of the IARUN_ constants)
2640
       - stdout
2641
       - stderr
2642
       - fail reason (as from L{utils.RunResult})
2643

2644
    """
2645
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2646
                                  os.path.isfile)
2647
    if alloc_script is None:
2648
      return (constants.IARUN_NOTFOUND, None, None, None)
2649

    
2650
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2651
    try:
2652
      os.write(fd, idata)
2653
      os.close(fd)
2654
      result = utils.RunCmd([alloc_script, fin_name])
2655
      if result.failed:
2656
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2657
                result.fail_reason)
2658
    finally:
2659
      os.unlink(fin_name)
2660

    
2661
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2662

    
2663

    
2664
class DevCacheManager(object):
2665
  """Simple class for managing a cache of block device information.
2666

2667
  """
2668
  _DEV_PREFIX = "/dev/"
2669
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2670

    
2671
  @classmethod
2672
  def _ConvertPath(cls, dev_path):
2673
    """Converts a /dev/name path to the cache file name.
2674

2675
    This replaces slashes with underscores and strips the /dev
2676
    prefix. It then returns the full path to the cache file.
2677

2678
    @type dev_path: str
2679
    @param dev_path: the C{/dev/} path name
2680
    @rtype: str
2681
    @return: the converted path name
2682

2683
    """
2684
    if dev_path.startswith(cls._DEV_PREFIX):
2685
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2686
    dev_path = dev_path.replace("/", "_")
2687
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2688
    return fpath
2689

    
2690
  @classmethod
2691
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2692
    """Updates the cache information for a given device.
2693

2694
    @type dev_path: str
2695
    @param dev_path: the pathname of the device
2696
    @type owner: str
2697
    @param owner: the owner (instance name) of the device
2698
    @type on_primary: bool
2699
    @param on_primary: whether this is the primary
2700
        node nor not
2701
    @type iv_name: str
2702
    @param iv_name: the instance-visible name of the
2703
        device, as in objects.Disk.iv_name
2704

2705
    @rtype: None
2706

2707
    """
2708
    if dev_path is None:
2709
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2710
      return
2711
    fpath = cls._ConvertPath(dev_path)
2712
    if on_primary:
2713
      state = "primary"
2714
    else:
2715
      state = "secondary"
2716
    if iv_name is None:
2717
      iv_name = "not_visible"
2718
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2719
    try:
2720
      utils.WriteFile(fpath, data=fdata)
2721
    except EnvironmentError:
2722
      logging.exception("Can't update bdev cache for %s", dev_path)
2723

    
2724
  @classmethod
2725
  def RemoveCache(cls, dev_path):
2726
    """Remove data for a dev_path.
2727

2728
    This is just a wrapper over L{utils.RemoveFile} with a converted
2729
    path name and logging.
2730

2731
    @type dev_path: str
2732
    @param dev_path: the pathname of the device
2733

2734
    @rtype: None
2735

2736
    """
2737
    if dev_path is None:
2738
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2739
      return
2740
    fpath = cls._ConvertPath(dev_path)
2741
    try:
2742
      utils.RemoveFile(fpath)
2743
    except EnvironmentError:
2744
      logging.exception("Can't update bdev cache for %s", dev_path)