Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 13998ef2

History | View | Annotate | Download (83 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55

    
56

    
57
class RPCFail(Exception):
58
  """Class denoting RPC failure.
59

60
  Its argument is the error message.
61

62
  """
63

    
64

    
65
def _Fail(msg, *args, **kwargs):
66
  """Log an error and the raise an RPCFail exception.
67

68
  This exception is then handled specially in the ganeti daemon and
69
  turned into a 'failed' return type. As such, this function is a
70
  useful shortcut for logging the error and returning it to the master
71
  daemon.
72

73
  @type msg: string
74
  @param msg: the text of the exception
75
  @raise RPCFail
76

77
  """
78
  if args:
79
    msg = msg % args
80
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
81
    if "exc" in kwargs and kwargs["exc"]:
82
      logging.exception(msg)
83
    else:
84
      logging.error(msg)
85
  raise RPCFail(msg)
86

    
87

    
88
def _GetConfig():
89
  """Simple wrapper to return a SimpleStore.
90

91
  @rtype: L{ssconf.SimpleStore}
92
  @return: a SimpleStore instance
93

94
  """
95
  return ssconf.SimpleStore()
96

    
97

    
98
def _GetSshRunner(cluster_name):
99
  """Simple wrapper to return an SshRunner.
100

101
  @type cluster_name: str
102
  @param cluster_name: the cluster name, which is needed
103
      by the SshRunner constructor
104
  @rtype: L{ssh.SshRunner}
105
  @return: an SshRunner instance
106

107
  """
108
  return ssh.SshRunner(cluster_name)
109

    
110

    
111
def _Decompress(data):
112
  """Unpacks data compressed by the RPC client.
113

114
  @type data: list or tuple
115
  @param data: Data sent by RPC client
116
  @rtype: str
117
  @return: Decompressed data
118

119
  """
120
  assert isinstance(data, (list, tuple))
121
  assert len(data) == 2
122
  (encoding, content) = data
123
  if encoding == constants.RPC_ENCODING_NONE:
124
    return content
125
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126
    return zlib.decompress(base64.b64decode(content))
127
  else:
128
    raise AssertionError("Unknown data encoding")
129

    
130

    
131
def _CleanDirectory(path, exclude=None):
132
  """Removes all regular files in a directory.
133

134
  @type path: str
135
  @param path: the directory to clean
136
  @type exclude: list
137
  @param exclude: list of files to be excluded, defaults
138
      to the empty list
139

140
  """
141
  if not os.path.isdir(path):
142
    return
143
  if exclude is None:
144
    exclude = []
145
  else:
146
    # Normalize excluded paths
147
    exclude = [os.path.normpath(i) for i in exclude]
148

    
149
  for rel_name in utils.ListVisibleFiles(path):
150
    full_name = os.path.normpath(os.path.join(path, rel_name))
151
    if full_name in exclude:
152
      continue
153
    if os.path.isfile(full_name) and not os.path.islink(full_name):
154
      utils.RemoveFile(full_name)
155

    
156

    
157
def _BuildUploadFileList():
158
  """Build the list of allowed upload files.
159

160
  This is abstracted so that it's built only once at module import time.
161

162
  """
163
  allowed_files = set([
164
    constants.CLUSTER_CONF_FILE,
165
    constants.ETC_HOSTS,
166
    constants.SSH_KNOWN_HOSTS_FILE,
167
    constants.VNC_PASSWORD_FILE,
168
    constants.RAPI_CERT_FILE,
169
    constants.RAPI_USERS_FILE,
170
    constants.HMAC_CLUSTER_KEY,
171
    ])
172

    
173
  for hv_name in constants.HYPER_TYPES:
174
    hv_class = hypervisor.GetHypervisorClass(hv_name)
175
    allowed_files.update(hv_class.GetAncillaryFiles())
176

    
177
  return frozenset(allowed_files)
178

    
179

    
180
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181

    
182

    
183
def JobQueuePurge():
184
  """Removes job queue files and archived jobs.
185

186
  @rtype: tuple
187
  @return: True, None
188

189
  """
190
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192

    
193

    
194
def GetMasterInfo():
195
  """Returns master information.
196

197
  This is an utility function to compute master information, either
198
  for consumption here or from the node daemon.
199

200
  @rtype: tuple
201
  @return: master_netdev, master_ip, master_name
202
  @raise RPCFail: in case of errors
203

204
  """
205
  try:
206
    cfg = _GetConfig()
207
    master_netdev = cfg.GetMasterNetdev()
208
    master_ip = cfg.GetMasterIP()
209
    master_node = cfg.GetMasterNode()
210
  except errors.ConfigurationError, err:
211
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
212
  return (master_netdev, master_ip, master_node)
213

    
214

    
215
def StartMaster(start_daemons, no_voting):
216
  """Activate local node as master node.
217

218
  The function will always try activate the IP address of the master
219
  (unless someone else has it). It will also start the master daemons,
220
  based on the start_daemons parameter.
221

222
  @type start_daemons: boolean
223
  @param start_daemons: whether to also start the master
224
      daemons (ganeti-masterd and ganeti-rapi)
225
  @type no_voting: boolean
226
  @param no_voting: whether to start ganeti-masterd without a node vote
227
      (if start_daemons is True), but still non-interactively
228
  @rtype: None
229

230
  """
231
  # GetMasterInfo will raise an exception if not able to return data
232
  master_netdev, master_ip, _ = GetMasterInfo()
233

    
234
  err_msgs = []
235
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236
    if utils.OwnIpAddress(master_ip):
237
      # we already have the ip:
238
      logging.debug("Master IP already configured, doing nothing")
239
    else:
240
      msg = "Someone else has the master ip, not activating"
241
      logging.error(msg)
242
      err_msgs.append(msg)
243
  else:
244
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245
                           "dev", master_netdev, "label",
246
                           "%s:0" % master_netdev])
247
    if result.failed:
248
      msg = "Can't activate master IP: %s" % result.output
249
      logging.error(msg)
250
      err_msgs.append(msg)
251

    
252
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253
                           "-s", master_ip, master_ip])
254
    # we'll ignore the exit code of arping
255

    
256
  # and now start the master and rapi daemons
257
  if start_daemons:
258
    daemons_params = {
259
        'ganeti-masterd': [],
260
        'ganeti-rapi': [],
261
        }
262
    if no_voting:
263
      daemons_params['ganeti-masterd'].append('--no-voting')
264
      daemons_params['ganeti-masterd'].append('--yes-do-it')
265
    for daemon in daemons_params:
266
      cmd = [daemon]
267
      cmd.extend(daemons_params[daemon])
268
      result = utils.RunCmd(cmd)
269
      if result.failed:
270
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
271
        logging.error(msg)
272
        err_msgs.append(msg)
273

    
274
  if err_msgs:
275
    _Fail("; ".join(err_msgs))
276

    
277

    
278
def StopMaster(stop_daemons):
279
  """Deactivate this node as master.
280

281
  The function will always try to deactivate the IP address of the
282
  master. It will also stop the master daemons depending on the
283
  stop_daemons parameter.
284

285
  @type stop_daemons: boolean
286
  @param stop_daemons: whether to also stop the master daemons
287
      (ganeti-masterd and ganeti-rapi)
288
  @rtype: None
289

290
  """
291
  # TODO: log and report back to the caller the error failures; we
292
  # need to decide in which case we fail the RPC for this
293

    
294
  # GetMasterInfo will raise an exception if not able to return data
295
  master_netdev, master_ip, _ = GetMasterInfo()
296

    
297
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298
                         "dev", master_netdev])
299
  if result.failed:
300
    logging.error("Can't remove the master IP, error: %s", result.output)
301
    # but otherwise ignore the failure
302

    
303
  if stop_daemons:
304
    # stop/kill the rapi and the master daemon
305
    for daemon in constants.RAPI, constants.MASTERD:
306
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307

    
308

    
309
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310
  """Joins this node to the cluster.
311

312
  This does the following:
313
      - updates the hostkeys of the machine (rsa and dsa)
314
      - adds the ssh private key to the user
315
      - adds the ssh public key to the users' authorized_keys file
316

317
  @type dsa: str
318
  @param dsa: the DSA private key to write
319
  @type dsapub: str
320
  @param dsapub: the DSA public key to write
321
  @type rsa: str
322
  @param rsa: the RSA private key to write
323
  @type rsapub: str
324
  @param rsapub: the RSA public key to write
325
  @type sshkey: str
326
  @param sshkey: the SSH private key to write
327
  @type sshpub: str
328
  @param sshpub: the SSH public key to write
329
  @rtype: boolean
330
  @return: the success of the operation
331

332
  """
333
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337
  for name, content, mode in sshd_keys:
338
    utils.WriteFile(name, data=content, mode=mode)
339

    
340
  try:
341
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342
                                                    mkdir=True)
343
  except errors.OpExecError, err:
344
    _Fail("Error while processing user ssh files: %s", err, exc=True)
345

    
346
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347
    utils.WriteFile(name, data=content, mode=0600)
348

    
349
  utils.AddAuthorizedKey(auth_keys, sshpub)
350

    
351
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352

    
353

    
354
def LeaveCluster():
355
  """Cleans up and remove the current node.
356

357
  This function cleans up and prepares the current node to be removed
358
  from the cluster.
359

360
  If processing is successful, then it raises an
361
  L{errors.QuitGanetiException} which is used as a special case to
362
  shutdown the node daemon.
363

364
  """
365
  _CleanDirectory(constants.DATA_DIR)
366
  JobQueuePurge()
367

    
368
  try:
369
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370

    
371
    utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372

    
373
    utils.RemoveFile(priv_key)
374
    utils.RemoveFile(pub_key)
375
  except errors.OpExecError:
376
    logging.exception("Error while processing ssh files")
377

    
378
  # Raise a custom exception (handled in ganeti-noded)
379
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
380

    
381

    
382
def GetNodeInfo(vgname, hypervisor_type):
383
  """Gives back a hash with different information about the node.
384

385
  @type vgname: C{string}
386
  @param vgname: the name of the volume group to ask for disk space information
387
  @type hypervisor_type: C{str}
388
  @param hypervisor_type: the name of the hypervisor to ask for
389
      memory information
390
  @rtype: C{dict}
391
  @return: dictionary with the following keys:
392
      - vg_size is the size of the configured volume group in MiB
393
      - vg_free is the free size of the volume group in MiB
394
      - memory_dom0 is the memory allocated for domain0 in MiB
395
      - memory_free is the currently available (free) ram in MiB
396
      - memory_total is the total number of ram in MiB
397

398
  """
399
  outputarray = {}
400
  vginfo = _GetVGInfo(vgname)
401
  outputarray['vg_size'] = vginfo['vg_size']
402
  outputarray['vg_free'] = vginfo['vg_free']
403

    
404
  hyper = hypervisor.GetHypervisor(hypervisor_type)
405
  hyp_info = hyper.GetNodeInfo()
406
  if hyp_info is not None:
407
    outputarray.update(hyp_info)
408

    
409
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
410

    
411
  return outputarray
412

    
413

    
414
def VerifyNode(what, cluster_name):
415
  """Verify the status of the local node.
416

417
  Based on the input L{what} parameter, various checks are done on the
418
  local node.
419

420
  If the I{filelist} key is present, this list of
421
  files is checksummed and the file/checksum pairs are returned.
422

423
  If the I{nodelist} key is present, we check that we have
424
  connectivity via ssh with the target nodes (and check the hostname
425
  report).
426

427
  If the I{node-net-test} key is present, we check that we have
428
  connectivity to the given nodes via both primary IP and, if
429
  applicable, secondary IPs.
430

431
  @type what: C{dict}
432
  @param what: a dictionary of things to check:
433
      - filelist: list of files for which to compute checksums
434
      - nodelist: list of nodes we should check ssh communication with
435
      - node-net-test: list of nodes we should check node daemon port
436
        connectivity with
437
      - hypervisor: list with hypervisors to run the verify for
438
  @rtype: dict
439
  @return: a dictionary with the same keys as the input dict, and
440
      values representing the result of the checks
441

442
  """
443
  result = {}
444

    
445
  if constants.NV_HYPERVISOR in what:
446
    result[constants.NV_HYPERVISOR] = tmp = {}
447
    for hv_name in what[constants.NV_HYPERVISOR]:
448
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
449

    
450
  if constants.NV_FILELIST in what:
451
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
452
      what[constants.NV_FILELIST])
453

    
454
  if constants.NV_NODELIST in what:
455
    result[constants.NV_NODELIST] = tmp = {}
456
    random.shuffle(what[constants.NV_NODELIST])
457
    for node in what[constants.NV_NODELIST]:
458
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
459
      if not success:
460
        tmp[node] = message
461

    
462
  if constants.NV_NODENETTEST in what:
463
    result[constants.NV_NODENETTEST] = tmp = {}
464
    my_name = utils.HostInfo().name
465
    my_pip = my_sip = None
466
    for name, pip, sip in what[constants.NV_NODENETTEST]:
467
      if name == my_name:
468
        my_pip = pip
469
        my_sip = sip
470
        break
471
    if not my_pip:
472
      tmp[my_name] = ("Can't find my own primary/secondary IP"
473
                      " in the node list")
474
    else:
475
      port = utils.GetDaemonPort(constants.NODED)
476
      for name, pip, sip in what[constants.NV_NODENETTEST]:
477
        fail = []
478
        if not utils.TcpPing(pip, port, source=my_pip):
479
          fail.append("primary")
480
        if sip != pip:
481
          if not utils.TcpPing(sip, port, source=my_sip):
482
            fail.append("secondary")
483
        if fail:
484
          tmp[name] = ("failure using the %s interface(s)" %
485
                       " and ".join(fail))
486

    
487
  if constants.NV_LVLIST in what:
488
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
489

    
490
  if constants.NV_INSTANCELIST in what:
491
    result[constants.NV_INSTANCELIST] = GetInstanceList(
492
      what[constants.NV_INSTANCELIST])
493

    
494
  if constants.NV_VGLIST in what:
495
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
496

    
497
  if constants.NV_VERSION in what:
498
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
499
                                    constants.RELEASE_VERSION)
500

    
501
  if constants.NV_HVINFO in what:
502
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
503
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
504

    
505
  if constants.NV_DRBDLIST in what:
506
    try:
507
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
508
    except errors.BlockDeviceError, err:
509
      logging.warning("Can't get used minors list", exc_info=True)
510
      used_minors = str(err)
511
    result[constants.NV_DRBDLIST] = used_minors
512

    
513
  return result
514

    
515

    
516
def GetVolumeList(vg_name):
517
  """Compute list of logical volumes and their size.
518

519
  @type vg_name: str
520
  @param vg_name: the volume group whose LVs we should list
521
  @rtype: dict
522
  @return:
523
      dictionary of all partions (key) with value being a tuple of
524
      their size (in MiB), inactive and online status::
525

526
        {'test1': ('20.06', True, True)}
527

528
      in case of errors, a string is returned with the error
529
      details.
530

531
  """
532
  lvs = {}
533
  sep = '|'
534
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
535
                         "--separator=%s" % sep,
536
                         "-olv_name,lv_size,lv_attr", vg_name])
537
  if result.failed:
538
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
539

    
540
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
541
  for line in result.stdout.splitlines():
542
    line = line.strip()
543
    match = valid_line_re.match(line)
544
    if not match:
545
      logging.error("Invalid line returned from lvs output: '%s'", line)
546
      continue
547
    name, size, attr = match.groups()
548
    inactive = attr[4] == '-'
549
    online = attr[5] == 'o'
550
    lvs[name] = (size, inactive, online)
551

    
552
  return lvs
553

    
554

    
555
def ListVolumeGroups():
556
  """List the volume groups and their size.
557

558
  @rtype: dict
559
  @return: dictionary with keys volume name and values the
560
      size of the volume
561

562
  """
563
  return utils.ListVolumeGroups()
564

    
565

    
566
def NodeVolumes():
567
  """List all volumes on this node.
568

569
  @rtype: list
570
  @return:
571
    A list of dictionaries, each having four keys:
572
      - name: the logical volume name,
573
      - size: the size of the logical volume
574
      - dev: the physical device on which the LV lives
575
      - vg: the volume group to which it belongs
576

577
    In case of errors, we return an empty list and log the
578
    error.
579

580
    Note that since a logical volume can live on multiple physical
581
    volumes, the resulting list might include a logical volume
582
    multiple times.
583

584
  """
585
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
586
                         "--separator=|",
587
                         "--options=lv_name,lv_size,devices,vg_name"])
588
  if result.failed:
589
    _Fail("Failed to list logical volumes, lvs output: %s",
590
          result.output)
591

    
592
  def parse_dev(dev):
593
    if '(' in dev:
594
      return dev.split('(')[0]
595
    else:
596
      return dev
597

    
598
  def map_line(line):
599
    return {
600
      'name': line[0].strip(),
601
      'size': line[1].strip(),
602
      'dev': parse_dev(line[2].strip()),
603
      'vg': line[3].strip(),
604
    }
605

    
606
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
607
          if line.count('|') >= 3]
608

    
609

    
610
def BridgesExist(bridges_list):
611
  """Check if a list of bridges exist on the current node.
612

613
  @rtype: boolean
614
  @return: C{True} if all of them exist, C{False} otherwise
615

616
  """
617
  missing = []
618
  for bridge in bridges_list:
619
    if not utils.BridgeExists(bridge):
620
      missing.append(bridge)
621

    
622
  if missing:
623
    _Fail("Missing bridges %s", ", ".join(missing))
624

    
625

    
626
def GetInstanceList(hypervisor_list):
627
  """Provides a list of instances.
628

629
  @type hypervisor_list: list
630
  @param hypervisor_list: the list of hypervisors to query information
631

632
  @rtype: list
633
  @return: a list of all running instances on the current node
634
    - instance1.example.com
635
    - instance2.example.com
636

637
  """
638
  results = []
639
  for hname in hypervisor_list:
640
    try:
641
      names = hypervisor.GetHypervisor(hname).ListInstances()
642
      results.extend(names)
643
    except errors.HypervisorError, err:
644
      _Fail("Error enumerating instances (hypervisor %s): %s",
645
            hname, err, exc=True)
646

    
647
  return results
648

    
649

    
650
def GetInstanceInfo(instance, hname):
651
  """Gives back the information about an instance as a dictionary.
652

653
  @type instance: string
654
  @param instance: the instance name
655
  @type hname: string
656
  @param hname: the hypervisor type of the instance
657

658
  @rtype: dict
659
  @return: dictionary with the following keys:
660
      - memory: memory size of instance (int)
661
      - state: xen state of instance (string)
662
      - time: cpu time of instance (float)
663

664
  """
665
  output = {}
666

    
667
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
668
  if iinfo is not None:
669
    output['memory'] = iinfo[2]
670
    output['state'] = iinfo[4]
671
    output['time'] = iinfo[5]
672

    
673
  return output
674

    
675

    
676
def GetInstanceMigratable(instance):
677
  """Gives whether an instance can be migrated.
678

679
  @type instance: L{objects.Instance}
680
  @param instance: object representing the instance to be checked.
681

682
  @rtype: tuple
683
  @return: tuple of (result, description) where:
684
      - result: whether the instance can be migrated or not
685
      - description: a description of the issue, if relevant
686

687
  """
688
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
689
  iname = instance.name
690
  if iname not in hyper.ListInstances():
691
    _Fail("Instance %s is not running", iname)
692

    
693
  for idx in range(len(instance.disks)):
694
    link_name = _GetBlockDevSymlinkPath(iname, idx)
695
    if not os.path.islink(link_name):
696
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
697

    
698

    
699
def GetAllInstancesInfo(hypervisor_list):
700
  """Gather data about all instances.
701

702
  This is the equivalent of L{GetInstanceInfo}, except that it
703
  computes data for all instances at once, thus being faster if one
704
  needs data about more than one instance.
705

706
  @type hypervisor_list: list
707
  @param hypervisor_list: list of hypervisors to query for instance data
708

709
  @rtype: dict
710
  @return: dictionary of instance: data, with data having the following keys:
711
      - memory: memory size of instance (int)
712
      - state: xen state of instance (string)
713
      - time: cpu time of instance (float)
714
      - vcpus: the number of vcpus
715

716
  """
717
  output = {}
718

    
719
  for hname in hypervisor_list:
720
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
721
    if iinfo:
722
      for name, _, memory, vcpus, state, times in iinfo:
723
        value = {
724
          'memory': memory,
725
          'vcpus': vcpus,
726
          'state': state,
727
          'time': times,
728
          }
729
        if name in output:
730
          # we only check static parameters, like memory and vcpus,
731
          # and not state and time which can change between the
732
          # invocations of the different hypervisors
733
          for key in 'memory', 'vcpus':
734
            if value[key] != output[name][key]:
735
              _Fail("Instance %s is running twice"
736
                    " with different parameters", name)
737
        output[name] = value
738

    
739
  return output
740

    
741

    
742
def InstanceOsAdd(instance, reinstall):
743
  """Add an OS to an instance.
744

745
  @type instance: L{objects.Instance}
746
  @param instance: Instance whose OS is to be installed
747
  @type reinstall: boolean
748
  @param reinstall: whether this is an instance reinstall
749
  @rtype: None
750

751
  """
752
  inst_os = OSFromDisk(instance.os)
753

    
754
  create_env = OSEnvironment(instance, inst_os)
755
  if reinstall:
756
    create_env['INSTANCE_REINSTALL'] = "1"
757

    
758
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
759
                                     instance.name, int(time.time()))
760

    
761
  result = utils.RunCmd([inst_os.create_script], env=create_env,
762
                        cwd=inst_os.path, output=logfile,)
763
  if result.failed:
764
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
765
                  " output: %s", result.cmd, result.fail_reason, logfile,
766
                  result.output)
767
    lines = [utils.SafeEncode(val)
768
             for val in utils.TailFile(logfile, lines=20)]
769
    _Fail("OS create script failed (%s), last lines in the"
770
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
771

    
772

    
773
def RunRenameInstance(instance, old_name):
774
  """Run the OS rename script for an instance.
775

776
  @type instance: L{objects.Instance}
777
  @param instance: Instance whose OS is to be installed
778
  @type old_name: string
779
  @param old_name: previous instance name
780
  @rtype: boolean
781
  @return: the success of the operation
782

783
  """
784
  inst_os = OSFromDisk(instance.os)
785

    
786
  rename_env = OSEnvironment(instance, inst_os)
787
  rename_env['OLD_INSTANCE_NAME'] = old_name
788

    
789
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
790
                                           old_name,
791
                                           instance.name, int(time.time()))
792

    
793
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
794
                        cwd=inst_os.path, output=logfile)
795

    
796
  if result.failed:
797
    logging.error("os create command '%s' returned error: %s output: %s",
798
                  result.cmd, result.fail_reason, result.output)
799
    lines = [utils.SafeEncode(val)
800
             for val in utils.TailFile(logfile, lines=20)]
801
    _Fail("OS rename script failed (%s), last lines in the"
802
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
803

    
804

    
805
def _GetVGInfo(vg_name):
806
  """Get information about the volume group.
807

808
  @type vg_name: str
809
  @param vg_name: the volume group which we query
810
  @rtype: dict
811
  @return:
812
    A dictionary with the following keys:
813
      - C{vg_size} is the total size of the volume group in MiB
814
      - C{vg_free} is the free size of the volume group in MiB
815
      - C{pv_count} are the number of physical disks in that VG
816

817
    If an error occurs during gathering of data, we return the same dict
818
    with keys all set to None.
819

820
  """
821
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
822

    
823
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
824
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
825

    
826
  if retval.failed:
827
    logging.error("volume group %s not present", vg_name)
828
    return retdic
829
  valarr = retval.stdout.strip().rstrip(':').split(':')
830
  if len(valarr) == 3:
831
    try:
832
      retdic = {
833
        "vg_size": int(round(float(valarr[0]), 0)),
834
        "vg_free": int(round(float(valarr[1]), 0)),
835
        "pv_count": int(valarr[2]),
836
        }
837
    except ValueError, err:
838
      logging.exception("Fail to parse vgs output: %s", err)
839
  else:
840
    logging.error("vgs output has the wrong number of fields (expected"
841
                  " three): %s", str(valarr))
842
  return retdic
843

    
844

    
845
def _GetBlockDevSymlinkPath(instance_name, idx):
846
  return os.path.join(constants.DISK_LINKS_DIR,
847
                      "%s:%d" % (instance_name, idx))
848

    
849

    
850
def _SymlinkBlockDev(instance_name, device_path, idx):
851
  """Set up symlinks to a instance's block device.
852

853
  This is an auxiliary function run when an instance is start (on the primary
854
  node) or when an instance is migrated (on the target node).
855

856

857
  @param instance_name: the name of the target instance
858
  @param device_path: path of the physical block device, on the node
859
  @param idx: the disk index
860
  @return: absolute path to the disk's symlink
861

862
  """
863
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
864
  try:
865
    os.symlink(device_path, link_name)
866
  except OSError, err:
867
    if err.errno == errno.EEXIST:
868
      if (not os.path.islink(link_name) or
869
          os.readlink(link_name) != device_path):
870
        os.remove(link_name)
871
        os.symlink(device_path, link_name)
872
    else:
873
      raise
874

    
875
  return link_name
876

    
877

    
878
def _RemoveBlockDevLinks(instance_name, disks):
879
  """Remove the block device symlinks belonging to the given instance.
880

881
  """
882
  for idx, _ in enumerate(disks):
883
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
884
    if os.path.islink(link_name):
885
      try:
886
        os.remove(link_name)
887
      except OSError:
888
        logging.exception("Can't remove symlink '%s'", link_name)
889

    
890

    
891
def _GatherAndLinkBlockDevs(instance):
892
  """Set up an instance's block device(s).
893

894
  This is run on the primary node at instance startup. The block
895
  devices must be already assembled.
896

897
  @type instance: L{objects.Instance}
898
  @param instance: the instance whose disks we shoul assemble
899
  @rtype: list
900
  @return: list of (disk_object, device_path)
901

902
  """
903
  block_devices = []
904
  for idx, disk in enumerate(instance.disks):
905
    device = _RecursiveFindBD(disk)
906
    if device is None:
907
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
908
                                    str(disk))
909
    device.Open()
910
    try:
911
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
912
    except OSError, e:
913
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
914
                                    e.strerror)
915

    
916
    block_devices.append((disk, link_name))
917

    
918
  return block_devices
919

    
920

    
921
def StartInstance(instance):
922
  """Start an instance.
923

924
  @type instance: L{objects.Instance}
925
  @param instance: the instance object
926
  @rtype: None
927

928
  """
929
  running_instances = GetInstanceList([instance.hypervisor])
930

    
931
  if instance.name in running_instances:
932
    logging.info("Instance %s already running, not starting", instance.name)
933
    return
934

    
935
  try:
936
    block_devices = _GatherAndLinkBlockDevs(instance)
937
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
938
    hyper.StartInstance(instance, block_devices)
939
  except errors.BlockDeviceError, err:
940
    _Fail("Block device error: %s", err, exc=True)
941
  except errors.HypervisorError, err:
942
    _RemoveBlockDevLinks(instance.name, instance.disks)
943
    _Fail("Hypervisor error: %s", err, exc=True)
944

    
945

    
946
def InstanceShutdown(instance):
947
  """Shut an instance down.
948

949
  @note: this functions uses polling with a hardcoded timeout.
950

951
  @type instance: L{objects.Instance}
952
  @param instance: the instance object
953
  @rtype: None
954

955
  """
956
  hv_name = instance.hypervisor
957
  running_instances = GetInstanceList([hv_name])
958
  iname = instance.name
959

    
960
  if iname not in running_instances:
961
    logging.info("Instance %s not running, doing nothing", iname)
962
    return
963

    
964
  hyper = hypervisor.GetHypervisor(hv_name)
965
  try:
966
    hyper.StopInstance(instance)
967
  except errors.HypervisorError, err:
968
    _Fail("Failed to stop instance %s: %s", iname, err)
969

    
970
  # test every 10secs for 2min
971

    
972
  time.sleep(1)
973
  for _ in range(11):
974
    if instance.name not in GetInstanceList([hv_name]):
975
      break
976
    time.sleep(10)
977
  else:
978
    # the shutdown did not succeed
979
    logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
980

    
981
    try:
982
      hyper.StopInstance(instance, force=True)
983
    except errors.HypervisorError, err:
984
      _Fail("Failed to force stop instance %s: %s", iname, err)
985

    
986
    time.sleep(1)
987
    if instance.name in GetInstanceList([hv_name]):
988
      _Fail("Could not shutdown instance %s even by destroy", iname)
989

    
990
  _RemoveBlockDevLinks(iname, instance.disks)
991

    
992

    
993
def InstanceReboot(instance, reboot_type):
994
  """Reboot an instance.
995

996
  @type instance: L{objects.Instance}
997
  @param instance: the instance object to reboot
998
  @type reboot_type: str
999
  @param reboot_type: the type of reboot, one the following
1000
    constants:
1001
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1002
        instance OS, do not recreate the VM
1003
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1004
        restart the VM (at the hypervisor level)
1005
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1006
        not accepted here, since that mode is handled differently, in
1007
        cmdlib, and translates into full stop and start of the
1008
        instance (instead of a call_instance_reboot RPC)
1009
  @rtype: None
1010

1011
  """
1012
  running_instances = GetInstanceList([instance.hypervisor])
1013

    
1014
  if instance.name not in running_instances:
1015
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1016

    
1017
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1018
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1019
    try:
1020
      hyper.RebootInstance(instance)
1021
    except errors.HypervisorError, err:
1022
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1023
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1024
    try:
1025
      InstanceShutdown(instance)
1026
      return StartInstance(instance)
1027
    except errors.HypervisorError, err:
1028
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1029
  else:
1030
    _Fail("Invalid reboot_type received: %s", reboot_type)
1031

    
1032

    
1033
def MigrationInfo(instance):
1034
  """Gather information about an instance to be migrated.
1035

1036
  @type instance: L{objects.Instance}
1037
  @param instance: the instance definition
1038

1039
  """
1040
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1041
  try:
1042
    info = hyper.MigrationInfo(instance)
1043
  except errors.HypervisorError, err:
1044
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1045
  return info
1046

    
1047

    
1048
def AcceptInstance(instance, info, target):
1049
  """Prepare the node to accept an instance.
1050

1051
  @type instance: L{objects.Instance}
1052
  @param instance: the instance definition
1053
  @type info: string/data (opaque)
1054
  @param info: migration information, from the source node
1055
  @type target: string
1056
  @param target: target host (usually ip), on this node
1057

1058
  """
1059
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1060
  try:
1061
    hyper.AcceptInstance(instance, info, target)
1062
  except errors.HypervisorError, err:
1063
    _Fail("Failed to accept instance: %s", err, exc=True)
1064

    
1065

    
1066
def FinalizeMigration(instance, info, success):
1067
  """Finalize any preparation to accept an instance.
1068

1069
  @type instance: L{objects.Instance}
1070
  @param instance: the instance definition
1071
  @type info: string/data (opaque)
1072
  @param info: migration information, from the source node
1073
  @type success: boolean
1074
  @param success: whether the migration was a success or a failure
1075

1076
  """
1077
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1078
  try:
1079
    hyper.FinalizeMigration(instance, info, success)
1080
  except errors.HypervisorError, err:
1081
    _Fail("Failed to finalize migration: %s", err, exc=True)
1082

    
1083

    
1084
def MigrateInstance(instance, target, live):
1085
  """Migrates an instance to another node.
1086

1087
  @type instance: L{objects.Instance}
1088
  @param instance: the instance definition
1089
  @type target: string
1090
  @param target: the target node name
1091
  @type live: boolean
1092
  @param live: whether the migration should be done live or not (the
1093
      interpretation of this parameter is left to the hypervisor)
1094
  @rtype: tuple
1095
  @return: a tuple of (success, msg) where:
1096
      - succes is a boolean denoting the success/failure of the operation
1097
      - msg is a string with details in case of failure
1098

1099
  """
1100
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1101

    
1102
  try:
1103
    hyper.MigrateInstance(instance.name, target, live)
1104
  except errors.HypervisorError, err:
1105
    _Fail("Failed to migrate instance: %s", err, exc=True)
1106

    
1107

    
1108
def BlockdevCreate(disk, size, owner, on_primary, info):
1109
  """Creates a block device for an instance.
1110

1111
  @type disk: L{objects.Disk}
1112
  @param disk: the object describing the disk we should create
1113
  @type size: int
1114
  @param size: the size of the physical underlying device, in MiB
1115
  @type owner: str
1116
  @param owner: the name of the instance for which disk is created,
1117
      used for device cache data
1118
  @type on_primary: boolean
1119
  @param on_primary:  indicates if it is the primary node or not
1120
  @type info: string
1121
  @param info: string that will be sent to the physical device
1122
      creation, used for example to set (LVM) tags on LVs
1123

1124
  @return: the new unique_id of the device (this can sometime be
1125
      computed only after creation), or None. On secondary nodes,
1126
      it's not required to return anything.
1127

1128
  """
1129
  clist = []
1130
  if disk.children:
1131
    for child in disk.children:
1132
      try:
1133
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1134
      except errors.BlockDeviceError, err:
1135
        _Fail("Can't assemble device %s: %s", child, err)
1136
      if on_primary or disk.AssembleOnSecondary():
1137
        # we need the children open in case the device itself has to
1138
        # be assembled
1139
        try:
1140
          crdev.Open()
1141
        except errors.BlockDeviceError, err:
1142
          _Fail("Can't make child '%s' read-write: %s", child, err)
1143
      clist.append(crdev)
1144

    
1145
  try:
1146
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1147
  except errors.BlockDeviceError, err:
1148
    _Fail("Can't create block device: %s", err)
1149

    
1150
  if on_primary or disk.AssembleOnSecondary():
1151
    try:
1152
      device.Assemble()
1153
    except errors.BlockDeviceError, err:
1154
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1155
    device.SetSyncSpeed(constants.SYNC_SPEED)
1156
    if on_primary or disk.OpenOnSecondary():
1157
      try:
1158
        device.Open(force=True)
1159
      except errors.BlockDeviceError, err:
1160
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1161
    DevCacheManager.UpdateCache(device.dev_path, owner,
1162
                                on_primary, disk.iv_name)
1163

    
1164
  device.SetInfo(info)
1165

    
1166
  return device.unique_id
1167

    
1168

    
1169
def BlockdevRemove(disk):
1170
  """Remove a block device.
1171

1172
  @note: This is intended to be called recursively.
1173

1174
  @type disk: L{objects.Disk}
1175
  @param disk: the disk object we should remove
1176
  @rtype: boolean
1177
  @return: the success of the operation
1178

1179
  """
1180
  msgs = []
1181
  try:
1182
    rdev = _RecursiveFindBD(disk)
1183
  except errors.BlockDeviceError, err:
1184
    # probably can't attach
1185
    logging.info("Can't attach to device %s in remove", disk)
1186
    rdev = None
1187
  if rdev is not None:
1188
    r_path = rdev.dev_path
1189
    try:
1190
      rdev.Remove()
1191
    except errors.BlockDeviceError, err:
1192
      msgs.append(str(err))
1193
    if not msgs:
1194
      DevCacheManager.RemoveCache(r_path)
1195

    
1196
  if disk.children:
1197
    for child in disk.children:
1198
      try:
1199
        BlockdevRemove(child)
1200
      except RPCFail, err:
1201
        msgs.append(str(err))
1202

    
1203
  if msgs:
1204
    _Fail("; ".join(msgs))
1205

    
1206

    
1207
def _RecursiveAssembleBD(disk, owner, as_primary):
1208
  """Activate a block device for an instance.
1209

1210
  This is run on the primary and secondary nodes for an instance.
1211

1212
  @note: this function is called recursively.
1213

1214
  @type disk: L{objects.Disk}
1215
  @param disk: the disk we try to assemble
1216
  @type owner: str
1217
  @param owner: the name of the instance which owns the disk
1218
  @type as_primary: boolean
1219
  @param as_primary: if we should make the block device
1220
      read/write
1221

1222
  @return: the assembled device or None (in case no device
1223
      was assembled)
1224
  @raise errors.BlockDeviceError: in case there is an error
1225
      during the activation of the children or the device
1226
      itself
1227

1228
  """
1229
  children = []
1230
  if disk.children:
1231
    mcn = disk.ChildrenNeeded()
1232
    if mcn == -1:
1233
      mcn = 0 # max number of Nones allowed
1234
    else:
1235
      mcn = len(disk.children) - mcn # max number of Nones
1236
    for chld_disk in disk.children:
1237
      try:
1238
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1239
      except errors.BlockDeviceError, err:
1240
        if children.count(None) >= mcn:
1241
          raise
1242
        cdev = None
1243
        logging.error("Error in child activation (but continuing): %s",
1244
                      str(err))
1245
      children.append(cdev)
1246

    
1247
  if as_primary or disk.AssembleOnSecondary():
1248
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1249
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1250
    result = r_dev
1251
    if as_primary or disk.OpenOnSecondary():
1252
      r_dev.Open()
1253
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1254
                                as_primary, disk.iv_name)
1255

    
1256
  else:
1257
    result = True
1258
  return result
1259

    
1260

    
1261
def BlockdevAssemble(disk, owner, as_primary):
1262
  """Activate a block device for an instance.
1263

1264
  This is a wrapper over _RecursiveAssembleBD.
1265

1266
  @rtype: str or boolean
1267
  @return: a C{/dev/...} path for primary nodes, and
1268
      C{True} for secondary nodes
1269

1270
  """
1271
  try:
1272
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1273
    if isinstance(result, bdev.BlockDev):
1274
      result = result.dev_path
1275
  except errors.BlockDeviceError, err:
1276
    _Fail("Error while assembling disk: %s", err, exc=True)
1277

    
1278
  return result
1279

    
1280

    
1281
def BlockdevShutdown(disk):
1282
  """Shut down a block device.
1283

1284
  First, if the device is assembled (Attach() is successful), then
1285
  the device is shutdown. Then the children of the device are
1286
  shutdown.
1287

1288
  This function is called recursively. Note that we don't cache the
1289
  children or such, as oppossed to assemble, shutdown of different
1290
  devices doesn't require that the upper device was active.
1291

1292
  @type disk: L{objects.Disk}
1293
  @param disk: the description of the disk we should
1294
      shutdown
1295
  @rtype: None
1296

1297
  """
1298
  msgs = []
1299
  r_dev = _RecursiveFindBD(disk)
1300
  if r_dev is not None:
1301
    r_path = r_dev.dev_path
1302
    try:
1303
      r_dev.Shutdown()
1304
      DevCacheManager.RemoveCache(r_path)
1305
    except errors.BlockDeviceError, err:
1306
      msgs.append(str(err))
1307

    
1308
  if disk.children:
1309
    for child in disk.children:
1310
      try:
1311
        BlockdevShutdown(child)
1312
      except RPCFail, err:
1313
        msgs.append(str(err))
1314

    
1315
  if msgs:
1316
    _Fail("; ".join(msgs))
1317

    
1318

    
1319
def BlockdevAddchildren(parent_cdev, new_cdevs):
1320
  """Extend a mirrored block device.
1321

1322
  @type parent_cdev: L{objects.Disk}
1323
  @param parent_cdev: the disk to which we should add children
1324
  @type new_cdevs: list of L{objects.Disk}
1325
  @param new_cdevs: the list of children which we should add
1326
  @rtype: None
1327

1328
  """
1329
  parent_bdev = _RecursiveFindBD(parent_cdev)
1330
  if parent_bdev is None:
1331
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1332
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1333
  if new_bdevs.count(None) > 0:
1334
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1335
  parent_bdev.AddChildren(new_bdevs)
1336

    
1337

    
1338
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1339
  """Shrink a mirrored block device.
1340

1341
  @type parent_cdev: L{objects.Disk}
1342
  @param parent_cdev: the disk from which we should remove children
1343
  @type new_cdevs: list of L{objects.Disk}
1344
  @param new_cdevs: the list of children which we should remove
1345
  @rtype: None
1346

1347
  """
1348
  parent_bdev = _RecursiveFindBD(parent_cdev)
1349
  if parent_bdev is None:
1350
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1351
  devs = []
1352
  for disk in new_cdevs:
1353
    rpath = disk.StaticDevPath()
1354
    if rpath is None:
1355
      bd = _RecursiveFindBD(disk)
1356
      if bd is None:
1357
        _Fail("Can't find device %s while removing children", disk)
1358
      else:
1359
        devs.append(bd.dev_path)
1360
    else:
1361
      devs.append(rpath)
1362
  parent_bdev.RemoveChildren(devs)
1363

    
1364

    
1365
def BlockdevGetmirrorstatus(disks):
1366
  """Get the mirroring status of a list of devices.
1367

1368
  @type disks: list of L{objects.Disk}
1369
  @param disks: the list of disks which we should query
1370
  @rtype: disk
1371
  @return:
1372
      a list of (mirror_done, estimated_time) tuples, which
1373
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1374
  @raise errors.BlockDeviceError: if any of the disks cannot be
1375
      found
1376

1377
  """
1378
  stats = []
1379
  for dsk in disks:
1380
    rbd = _RecursiveFindBD(dsk)
1381
    if rbd is None:
1382
      _Fail("Can't find device %s", dsk)
1383

    
1384
    stats.append(rbd.CombinedSyncStatus())
1385

    
1386
  return stats
1387

    
1388

    
1389
def _RecursiveFindBD(disk):
1390
  """Check if a device is activated.
1391

1392
  If so, return information about the real device.
1393

1394
  @type disk: L{objects.Disk}
1395
  @param disk: the disk object we need to find
1396

1397
  @return: None if the device can't be found,
1398
      otherwise the device instance
1399

1400
  """
1401
  children = []
1402
  if disk.children:
1403
    for chdisk in disk.children:
1404
      children.append(_RecursiveFindBD(chdisk))
1405

    
1406
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1407

    
1408

    
1409
def BlockdevFind(disk):
1410
  """Check if a device is activated.
1411

1412
  If it is, return information about the real device.
1413

1414
  @type disk: L{objects.Disk}
1415
  @param disk: the disk to find
1416
  @rtype: None or objects.BlockDevStatus
1417
  @return: None if the disk cannot be found, otherwise a the current
1418
           information
1419

1420
  """
1421
  try:
1422
    rbd = _RecursiveFindBD(disk)
1423
  except errors.BlockDeviceError, err:
1424
    _Fail("Failed to find device: %s", err, exc=True)
1425

    
1426
  if rbd is None:
1427
    return None
1428

    
1429
  return rbd.GetSyncStatus()
1430

    
1431

    
1432
def BlockdevGetsize(disks):
1433
  """Computes the size of the given disks.
1434

1435
  If a disk is not found, returns None instead.
1436

1437
  @type disks: list of L{objects.Disk}
1438
  @param disks: the list of disk to compute the size for
1439
  @rtype: list
1440
  @return: list with elements None if the disk cannot be found,
1441
      otherwise the size
1442

1443
  """
1444
  result = []
1445
  for cf in disks:
1446
    try:
1447
      rbd = _RecursiveFindBD(cf)
1448
    except errors.BlockDeviceError, err:
1449
      result.append(None)
1450
      continue
1451
    if rbd is None:
1452
      result.append(None)
1453
    else:
1454
      result.append(rbd.GetActualSize())
1455
  return result
1456

    
1457

    
1458
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1459
  """Export a block device to a remote node.
1460

1461
  @type disk: L{objects.Disk}
1462
  @param disk: the description of the disk to export
1463
  @type dest_node: str
1464
  @param dest_node: the destination node to export to
1465
  @type dest_path: str
1466
  @param dest_path: the destination path on the target node
1467
  @type cluster_name: str
1468
  @param cluster_name: the cluster name, needed for SSH hostalias
1469
  @rtype: None
1470

1471
  """
1472
  real_disk = _RecursiveFindBD(disk)
1473
  if real_disk is None:
1474
    _Fail("Block device '%s' is not set up", disk)
1475

    
1476
  real_disk.Open()
1477

    
1478
  # the block size on the read dd is 1MiB to match our units
1479
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1480
                               "dd if=%s bs=1048576 count=%s",
1481
                               real_disk.dev_path, str(disk.size))
1482

    
1483
  # we set here a smaller block size as, due to ssh buffering, more
1484
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1485
  # device is not already there or we pass a wrong path; we use
1486
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1487
  # to not buffer too much memory; this means that at best, we flush
1488
  # every 64k, which will not be very fast
1489
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1490
                                " oflag=dsync", dest_path)
1491

    
1492
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1493
                                                   constants.GANETI_RUNAS,
1494
                                                   destcmd)
1495

    
1496
  # all commands have been checked, so we're safe to combine them
1497
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1498

    
1499
  result = utils.RunCmd(["bash", "-c", command])
1500

    
1501
  if result.failed:
1502
    _Fail("Disk copy command '%s' returned error: %s"
1503
          " output: %s", command, result.fail_reason, result.output)
1504

    
1505

    
1506
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1507
  """Write a file to the filesystem.
1508

1509
  This allows the master to overwrite(!) a file. It will only perform
1510
  the operation if the file belongs to a list of configuration files.
1511

1512
  @type file_name: str
1513
  @param file_name: the target file name
1514
  @type data: str
1515
  @param data: the new contents of the file
1516
  @type mode: int
1517
  @param mode: the mode to give the file (can be None)
1518
  @type uid: int
1519
  @param uid: the owner of the file (can be -1 for default)
1520
  @type gid: int
1521
  @param gid: the group of the file (can be -1 for default)
1522
  @type atime: float
1523
  @param atime: the atime to set on the file (can be None)
1524
  @type mtime: float
1525
  @param mtime: the mtime to set on the file (can be None)
1526
  @rtype: None
1527

1528
  """
1529
  if not os.path.isabs(file_name):
1530
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1531

    
1532
  if file_name not in _ALLOWED_UPLOAD_FILES:
1533
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1534
          file_name)
1535

    
1536
  raw_data = _Decompress(data)
1537

    
1538
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1539
                  atime=atime, mtime=mtime)
1540

    
1541

    
1542
def WriteSsconfFiles(values):
1543
  """Update all ssconf files.
1544

1545
  Wrapper around the SimpleStore.WriteFiles.
1546

1547
  """
1548
  ssconf.SimpleStore().WriteFiles(values)
1549

    
1550

    
1551
def _ErrnoOrStr(err):
1552
  """Format an EnvironmentError exception.
1553

1554
  If the L{err} argument has an errno attribute, it will be looked up
1555
  and converted into a textual C{E...} description. Otherwise the
1556
  string representation of the error will be returned.
1557

1558
  @type err: L{EnvironmentError}
1559
  @param err: the exception to format
1560

1561
  """
1562
  if hasattr(err, 'errno'):
1563
    detail = errno.errorcode[err.errno]
1564
  else:
1565
    detail = str(err)
1566
  return detail
1567

    
1568

    
1569
def _OSOndiskAPIVersion(name, os_dir):
1570
  """Compute and return the API version of a given OS.
1571

1572
  This function will try to read the API version of the OS given by
1573
  the 'name' parameter and residing in the 'os_dir' directory.
1574

1575
  @type name: str
1576
  @param name: the OS name we should look for
1577
  @type os_dir: str
1578
  @param os_dir: the directory inwhich we should look for the OS
1579
  @rtype: tuple
1580
  @return: tuple (status, data) with status denoting the validity and
1581
      data holding either the vaid versions or an error message
1582

1583
  """
1584
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1585

    
1586
  try:
1587
    st = os.stat(api_file)
1588
  except EnvironmentError, err:
1589
    return False, ("Required file 'ganeti_api_version' file not"
1590
                   " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1591

    
1592
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1593
    return False, ("File 'ganeti_api_version' file at %s is not"
1594
                   " a regular file" % os_dir)
1595

    
1596
  try:
1597
    api_versions = utils.ReadFile(api_file).splitlines()
1598
  except EnvironmentError, err:
1599
    return False, ("Error while reading the API version file at %s: %s" %
1600
                   (api_file, _ErrnoOrStr(err)))
1601

    
1602
  try:
1603
    api_versions = [int(version.strip()) for version in api_versions]
1604
  except (TypeError, ValueError), err:
1605
    return False, ("API version(s) can't be converted to integer: %s" %
1606
                   str(err))
1607

    
1608
  return True, api_versions
1609

    
1610

    
1611
def DiagnoseOS(top_dirs=None):
1612
  """Compute the validity for all OSes.
1613

1614
  @type top_dirs: list
1615
  @param top_dirs: the list of directories in which to
1616
      search (if not given defaults to
1617
      L{constants.OS_SEARCH_PATH})
1618
  @rtype: list of L{objects.OS}
1619
  @return: a list of tuples (name, path, status, diagnose)
1620
      for all (potential) OSes under all search paths, where:
1621
          - name is the (potential) OS name
1622
          - path is the full path to the OS
1623
          - status True/False is the validity of the OS
1624
          - diagnose is the error message for an invalid OS, otherwise empty
1625

1626
  """
1627
  if top_dirs is None:
1628
    top_dirs = constants.OS_SEARCH_PATH
1629

    
1630
  result = []
1631
  for dir_name in top_dirs:
1632
    if os.path.isdir(dir_name):
1633
      try:
1634
        f_names = utils.ListVisibleFiles(dir_name)
1635
      except EnvironmentError, err:
1636
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1637
        break
1638
      for name in f_names:
1639
        os_path = os.path.sep.join([dir_name, name])
1640
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1641
        if status:
1642
          diagnose = ""
1643
        else:
1644
          diagnose = os_inst
1645
        result.append((name, os_path, status, diagnose))
1646

    
1647
  return result
1648

    
1649

    
1650
def _TryOSFromDisk(name, base_dir=None):
1651
  """Create an OS instance from disk.
1652

1653
  This function will return an OS instance if the given name is a
1654
  valid OS name.
1655

1656
  @type base_dir: string
1657
  @keyword base_dir: Base directory containing OS installations.
1658
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1659
  @rtype: tuple
1660
  @return: success and either the OS instance if we find a valid one,
1661
      or error message
1662

1663
  """
1664
  if base_dir is None:
1665
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1666
    if os_dir is None:
1667
      return False, "Directory for OS %s not found in search path" % name
1668
  else:
1669
    os_dir = os.path.sep.join([base_dir, name])
1670

    
1671
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1672
  if not status:
1673
    # push the error up
1674
    return status, api_versions
1675

    
1676
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1677
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1678
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1679

    
1680
  # OS Scripts dictionary, we will populate it with the actual script names
1681
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1682

    
1683
  for script in os_scripts:
1684
    os_scripts[script] = os.path.sep.join([os_dir, script])
1685

    
1686
    try:
1687
      st = os.stat(os_scripts[script])
1688
    except EnvironmentError, err:
1689
      return False, ("Script '%s' under path '%s' is missing (%s)" %
1690
                     (script, os_dir, _ErrnoOrStr(err)))
1691

    
1692
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1693
      return False, ("Script '%s' under path '%s' is not executable" %
1694
                     (script, os_dir))
1695

    
1696
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1697
      return False, ("Script '%s' under path '%s' is not a regular file" %
1698
                     (script, os_dir))
1699

    
1700
  os_obj = objects.OS(name=name, path=os_dir,
1701
                      create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1702
                      export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1703
                      import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1704
                      rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1705
                      api_versions=api_versions)
1706
  return True, os_obj
1707

    
1708

    
1709
def OSFromDisk(name, base_dir=None):
1710
  """Create an OS instance from disk.
1711

1712
  This function will return an OS instance if the given name is a
1713
  valid OS name. Otherwise, it will raise an appropriate
1714
  L{RPCFail} exception, detailing why this is not a valid OS.
1715

1716
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1717
  an exception but returns true/false status data.
1718

1719
  @type base_dir: string
1720
  @keyword base_dir: Base directory containing OS installations.
1721
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1722
  @rtype: L{objects.OS}
1723
  @return: the OS instance if we find a valid one
1724
  @raise RPCFail: if we don't find a valid OS
1725

1726
  """
1727
  status, payload = _TryOSFromDisk(name, base_dir)
1728

    
1729
  if not status:
1730
    _Fail(payload)
1731

    
1732
  return payload
1733

    
1734

    
1735
def OSEnvironment(instance, os, debug=0):
1736
  """Calculate the environment for an os script.
1737

1738
  @type instance: L{objects.Instance}
1739
  @param instance: target instance for the os script run
1740
  @type os: L{objects.OS}
1741
  @param os: operating system for which the environment is being built
1742
  @type debug: integer
1743
  @param debug: debug level (0 or 1, for OS Api 10)
1744
  @rtype: dict
1745
  @return: dict of environment variables
1746
  @raise errors.BlockDeviceError: if the block device
1747
      cannot be found
1748

1749
  """
1750
  result = {}
1751
  api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1752
  result['OS_API_VERSION'] = '%d' % api_version
1753
  result['INSTANCE_NAME'] = instance.name
1754
  result['INSTANCE_OS'] = instance.os
1755
  result['HYPERVISOR'] = instance.hypervisor
1756
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1757
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1758
  result['DEBUG_LEVEL'] = '%d' % debug
1759
  for idx, disk in enumerate(instance.disks):
1760
    real_disk = _RecursiveFindBD(disk)
1761
    if real_disk is None:
1762
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1763
                                    str(disk))
1764
    real_disk.Open()
1765
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1766
    result['DISK_%d_ACCESS' % idx] = disk.mode
1767
    if constants.HV_DISK_TYPE in instance.hvparams:
1768
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1769
        instance.hvparams[constants.HV_DISK_TYPE]
1770
    if disk.dev_type in constants.LDS_BLOCK:
1771
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1772
    elif disk.dev_type == constants.LD_FILE:
1773
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1774
        'file:%s' % disk.physical_id[0]
1775
  for idx, nic in enumerate(instance.nics):
1776
    result['NIC_%d_MAC' % idx] = nic.mac
1777
    if nic.ip:
1778
      result['NIC_%d_IP' % idx] = nic.ip
1779
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1780
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1781
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1782
    if nic.nicparams[constants.NIC_LINK]:
1783
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1784
    if constants.HV_NIC_TYPE in instance.hvparams:
1785
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1786
        instance.hvparams[constants.HV_NIC_TYPE]
1787

    
1788
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1789
    for key, value in source.items():
1790
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1791

    
1792
  return result
1793

    
1794
def BlockdevGrow(disk, amount):
1795
  """Grow a stack of block devices.
1796

1797
  This function is called recursively, with the childrens being the
1798
  first ones to resize.
1799

1800
  @type disk: L{objects.Disk}
1801
  @param disk: the disk to be grown
1802
  @rtype: (status, result)
1803
  @return: a tuple with the status of the operation
1804
      (True/False), and the errors message if status
1805
      is False
1806

1807
  """
1808
  r_dev = _RecursiveFindBD(disk)
1809
  if r_dev is None:
1810
    _Fail("Cannot find block device %s", disk)
1811

    
1812
  try:
1813
    r_dev.Grow(amount)
1814
  except errors.BlockDeviceError, err:
1815
    _Fail("Failed to grow block device: %s", err, exc=True)
1816

    
1817

    
1818
def BlockdevSnapshot(disk):
1819
  """Create a snapshot copy of a block device.
1820

1821
  This function is called recursively, and the snapshot is actually created
1822
  just for the leaf lvm backend device.
1823

1824
  @type disk: L{objects.Disk}
1825
  @param disk: the disk to be snapshotted
1826
  @rtype: string
1827
  @return: snapshot disk path
1828

1829
  """
1830
  if disk.children:
1831
    if len(disk.children) == 1:
1832
      # only one child, let's recurse on it
1833
      return BlockdevSnapshot(disk.children[0])
1834
    else:
1835
      # more than one child, choose one that matches
1836
      for child in disk.children:
1837
        if child.size == disk.size:
1838
          # return implies breaking the loop
1839
          return BlockdevSnapshot(child)
1840
  elif disk.dev_type == constants.LD_LV:
1841
    r_dev = _RecursiveFindBD(disk)
1842
    if r_dev is not None:
1843
      # let's stay on the safe side and ask for the full size, for now
1844
      return r_dev.Snapshot(disk.size)
1845
    else:
1846
      _Fail("Cannot find block device %s", disk)
1847
  else:
1848
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1849
          disk.unique_id, disk.dev_type)
1850

    
1851

    
1852
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1853
  """Export a block device snapshot to a remote node.
1854

1855
  @type disk: L{objects.Disk}
1856
  @param disk: the description of the disk to export
1857
  @type dest_node: str
1858
  @param dest_node: the destination node to export to
1859
  @type instance: L{objects.Instance}
1860
  @param instance: the instance object to whom the disk belongs
1861
  @type cluster_name: str
1862
  @param cluster_name: the cluster name, needed for SSH hostalias
1863
  @type idx: int
1864
  @param idx: the index of the disk in the instance's disk list,
1865
      used to export to the OS scripts environment
1866
  @rtype: None
1867

1868
  """
1869
  inst_os = OSFromDisk(instance.os)
1870
  export_env = OSEnvironment(instance, inst_os)
1871

    
1872
  export_script = inst_os.export_script
1873

    
1874
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1875
                                     instance.name, int(time.time()))
1876
  if not os.path.exists(constants.LOG_OS_DIR):
1877
    os.mkdir(constants.LOG_OS_DIR, 0750)
1878
  real_disk = _RecursiveFindBD(disk)
1879
  if real_disk is None:
1880
    _Fail("Block device '%s' is not set up", disk)
1881

    
1882
  real_disk.Open()
1883

    
1884
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1885
  export_env['EXPORT_INDEX'] = str(idx)
1886

    
1887
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1888
  destfile = disk.physical_id[1]
1889

    
1890
  # the target command is built out of three individual commands,
1891
  # which are joined by pipes; we check each individual command for
1892
  # valid parameters
1893
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1894
                               inst_os.path, export_script, logfile)
1895

    
1896
  comprcmd = "gzip"
1897

    
1898
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1899
                                destdir, destdir, destfile)
1900
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1901
                                                   constants.GANETI_RUNAS,
1902
                                                   destcmd)
1903

    
1904
  # all commands have been checked, so we're safe to combine them
1905
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1906

    
1907
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1908

    
1909
  if result.failed:
1910
    _Fail("OS snapshot export command '%s' returned error: %s"
1911
          " output: %s", command, result.fail_reason, result.output)
1912

    
1913

    
1914
def FinalizeExport(instance, snap_disks):
1915
  """Write out the export configuration information.
1916

1917
  @type instance: L{objects.Instance}
1918
  @param instance: the instance which we export, used for
1919
      saving configuration
1920
  @type snap_disks: list of L{objects.Disk}
1921
  @param snap_disks: list of snapshot block devices, which
1922
      will be used to get the actual name of the dump file
1923

1924
  @rtype: None
1925

1926
  """
1927
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1928
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1929

    
1930
  config = objects.SerializableConfigParser()
1931

    
1932
  config.add_section(constants.INISECT_EXP)
1933
  config.set(constants.INISECT_EXP, 'version', '0')
1934
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1935
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1936
  config.set(constants.INISECT_EXP, 'os', instance.os)
1937
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1938

    
1939
  config.add_section(constants.INISECT_INS)
1940
  config.set(constants.INISECT_INS, 'name', instance.name)
1941
  config.set(constants.INISECT_INS, 'memory', '%d' %
1942
             instance.beparams[constants.BE_MEMORY])
1943
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1944
             instance.beparams[constants.BE_VCPUS])
1945
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1946

    
1947
  nic_total = 0
1948
  for nic_count, nic in enumerate(instance.nics):
1949
    nic_total += 1
1950
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1951
               nic_count, '%s' % nic.mac)
1952
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1953
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1954
               '%s' % nic.bridge)
1955
  # TODO: redundant: on load can read nics until it doesn't exist
1956
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1957

    
1958
  disk_total = 0
1959
  for disk_count, disk in enumerate(snap_disks):
1960
    if disk:
1961
      disk_total += 1
1962
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1963
                 ('%s' % disk.iv_name))
1964
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1965
                 ('%s' % disk.physical_id[1]))
1966
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1967
                 ('%d' % disk.size))
1968

    
1969
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1970

    
1971
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1972
                  data=config.Dumps())
1973
  shutil.rmtree(finaldestdir, True)
1974
  shutil.move(destdir, finaldestdir)
1975

    
1976

    
1977
def ExportInfo(dest):
1978
  """Get export configuration information.
1979

1980
  @type dest: str
1981
  @param dest: directory containing the export
1982

1983
  @rtype: L{objects.SerializableConfigParser}
1984
  @return: a serializable config file containing the
1985
      export info
1986

1987
  """
1988
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1989

    
1990
  config = objects.SerializableConfigParser()
1991
  config.read(cff)
1992

    
1993
  if (not config.has_section(constants.INISECT_EXP) or
1994
      not config.has_section(constants.INISECT_INS)):
1995
    _Fail("Export info file doesn't have the required fields")
1996

    
1997
  return config.Dumps()
1998

    
1999

    
2000
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2001
  """Import an os image into an instance.
2002

2003
  @type instance: L{objects.Instance}
2004
  @param instance: instance to import the disks into
2005
  @type src_node: string
2006
  @param src_node: source node for the disk images
2007
  @type src_images: list of string
2008
  @param src_images: absolute paths of the disk images
2009
  @rtype: list of boolean
2010
  @return: each boolean represent the success of importing the n-th disk
2011

2012
  """
2013
  inst_os = OSFromDisk(instance.os)
2014
  import_env = OSEnvironment(instance, inst_os)
2015
  import_script = inst_os.import_script
2016

    
2017
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2018
                                        instance.name, int(time.time()))
2019
  if not os.path.exists(constants.LOG_OS_DIR):
2020
    os.mkdir(constants.LOG_OS_DIR, 0750)
2021

    
2022
  comprcmd = "gunzip"
2023
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2024
                               import_script, logfile)
2025

    
2026
  final_result = []
2027
  for idx, image in enumerate(src_images):
2028
    if image:
2029
      destcmd = utils.BuildShellCmd('cat %s', image)
2030
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2031
                                                       constants.GANETI_RUNAS,
2032
                                                       destcmd)
2033
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2034
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2035
      import_env['IMPORT_INDEX'] = str(idx)
2036
      result = utils.RunCmd(command, env=import_env)
2037
      if result.failed:
2038
        logging.error("Disk import command '%s' returned error: %s"
2039
                      " output: %s", command, result.fail_reason,
2040
                      result.output)
2041
        final_result.append("error importing disk %d: %s, %s" %
2042
                            (idx, result.fail_reason, result.output[-100]))
2043

    
2044
  if final_result:
2045
    _Fail("; ".join(final_result), log=False)
2046

    
2047

    
2048
def ListExports():
2049
  """Return a list of exports currently available on this machine.
2050

2051
  @rtype: list
2052
  @return: list of the exports
2053

2054
  """
2055
  if os.path.isdir(constants.EXPORT_DIR):
2056
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2057
  else:
2058
    _Fail("No exports directory")
2059

    
2060

    
2061
def RemoveExport(export):
2062
  """Remove an existing export from the node.
2063

2064
  @type export: str
2065
  @param export: the name of the export to remove
2066
  @rtype: None
2067

2068
  """
2069
  target = os.path.join(constants.EXPORT_DIR, export)
2070

    
2071
  try:
2072
    shutil.rmtree(target)
2073
  except EnvironmentError, err:
2074
    _Fail("Error while removing the export: %s", err, exc=True)
2075

    
2076

    
2077
def BlockdevRename(devlist):
2078
  """Rename a list of block devices.
2079

2080
  @type devlist: list of tuples
2081
  @param devlist: list of tuples of the form  (disk,
2082
      new_logical_id, new_physical_id); disk is an
2083
      L{objects.Disk} object describing the current disk,
2084
      and new logical_id/physical_id is the name we
2085
      rename it to
2086
  @rtype: boolean
2087
  @return: True if all renames succeeded, False otherwise
2088

2089
  """
2090
  msgs = []
2091
  result = True
2092
  for disk, unique_id in devlist:
2093
    dev = _RecursiveFindBD(disk)
2094
    if dev is None:
2095
      msgs.append("Can't find device %s in rename" % str(disk))
2096
      result = False
2097
      continue
2098
    try:
2099
      old_rpath = dev.dev_path
2100
      dev.Rename(unique_id)
2101
      new_rpath = dev.dev_path
2102
      if old_rpath != new_rpath:
2103
        DevCacheManager.RemoveCache(old_rpath)
2104
        # FIXME: we should add the new cache information here, like:
2105
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2106
        # but we don't have the owner here - maybe parse from existing
2107
        # cache? for now, we only lose lvm data when we rename, which
2108
        # is less critical than DRBD or MD
2109
    except errors.BlockDeviceError, err:
2110
      msgs.append("Can't rename device '%s' to '%s': %s" %
2111
                  (dev, unique_id, err))
2112
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2113
      result = False
2114
  if not result:
2115
    _Fail("; ".join(msgs))
2116

    
2117

    
2118
def _TransformFileStorageDir(file_storage_dir):
2119
  """Checks whether given file_storage_dir is valid.
2120

2121
  Checks wheter the given file_storage_dir is within the cluster-wide
2122
  default file_storage_dir stored in SimpleStore. Only paths under that
2123
  directory are allowed.
2124

2125
  @type file_storage_dir: str
2126
  @param file_storage_dir: the path to check
2127

2128
  @return: the normalized path if valid, None otherwise
2129

2130
  """
2131
  cfg = _GetConfig()
2132
  file_storage_dir = os.path.normpath(file_storage_dir)
2133
  base_file_storage_dir = cfg.GetFileStorageDir()
2134
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2135
      base_file_storage_dir):
2136
    _Fail("File storage directory '%s' is not under base file"
2137
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2138
  return file_storage_dir
2139

    
2140

    
2141
def CreateFileStorageDir(file_storage_dir):
2142
  """Create file storage directory.
2143

2144
  @type file_storage_dir: str
2145
  @param file_storage_dir: directory to create
2146

2147
  @rtype: tuple
2148
  @return: tuple with first element a boolean indicating wheter dir
2149
      creation was successful or not
2150

2151
  """
2152
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2153
  if os.path.exists(file_storage_dir):
2154
    if not os.path.isdir(file_storage_dir):
2155
      _Fail("Specified storage dir '%s' is not a directory",
2156
            file_storage_dir)
2157
  else:
2158
    try:
2159
      os.makedirs(file_storage_dir, 0750)
2160
    except OSError, err:
2161
      _Fail("Cannot create file storage directory '%s': %s",
2162
            file_storage_dir, err, exc=True)
2163

    
2164

    
2165
def RemoveFileStorageDir(file_storage_dir):
2166
  """Remove file storage directory.
2167

2168
  Remove it only if it's empty. If not log an error and return.
2169

2170
  @type file_storage_dir: str
2171
  @param file_storage_dir: the directory we should cleanup
2172
  @rtype: tuple (success,)
2173
  @return: tuple of one element, C{success}, denoting
2174
      whether the operation was successful
2175

2176
  """
2177
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2178
  if os.path.exists(file_storage_dir):
2179
    if not os.path.isdir(file_storage_dir):
2180
      _Fail("Specified Storage directory '%s' is not a directory",
2181
            file_storage_dir)
2182
    # deletes dir only if empty, otherwise we want to fail the rpc call
2183
    try:
2184
      os.rmdir(file_storage_dir)
2185
    except OSError, err:
2186
      _Fail("Cannot remove file storage directory '%s': %s",
2187
            file_storage_dir, err)
2188

    
2189

    
2190
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2191
  """Rename the file storage directory.
2192

2193
  @type old_file_storage_dir: str
2194
  @param old_file_storage_dir: the current path
2195
  @type new_file_storage_dir: str
2196
  @param new_file_storage_dir: the name we should rename to
2197
  @rtype: tuple (success,)
2198
  @return: tuple of one element, C{success}, denoting
2199
      whether the operation was successful
2200

2201
  """
2202
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2203
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2204
  if not os.path.exists(new_file_storage_dir):
2205
    if os.path.isdir(old_file_storage_dir):
2206
      try:
2207
        os.rename(old_file_storage_dir, new_file_storage_dir)
2208
      except OSError, err:
2209
        _Fail("Cannot rename '%s' to '%s': %s",
2210
              old_file_storage_dir, new_file_storage_dir, err)
2211
    else:
2212
      _Fail("Specified storage dir '%s' is not a directory",
2213
            old_file_storage_dir)
2214
  else:
2215
    if os.path.exists(old_file_storage_dir):
2216
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2217
            old_file_storage_dir, new_file_storage_dir)
2218

    
2219

    
2220
def _EnsureJobQueueFile(file_name):
2221
  """Checks whether the given filename is in the queue directory.
2222

2223
  @type file_name: str
2224
  @param file_name: the file name we should check
2225
  @rtype: None
2226
  @raises RPCFail: if the file is not valid
2227

2228
  """
2229
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2230
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2231

    
2232
  if not result:
2233
    _Fail("Passed job queue file '%s' does not belong to"
2234
          " the queue directory '%s'", file_name, queue_dir)
2235

    
2236

    
2237
def JobQueueUpdate(file_name, content):
2238
  """Updates a file in the queue directory.
2239

2240
  This is just a wrapper over L{utils.WriteFile}, with proper
2241
  checking.
2242

2243
  @type file_name: str
2244
  @param file_name: the job file name
2245
  @type content: str
2246
  @param content: the new job contents
2247
  @rtype: boolean
2248
  @return: the success of the operation
2249

2250
  """
2251
  _EnsureJobQueueFile(file_name)
2252

    
2253
  # Write and replace the file atomically
2254
  utils.WriteFile(file_name, data=_Decompress(content))
2255

    
2256

    
2257
def JobQueueRename(old, new):
2258
  """Renames a job queue file.
2259

2260
  This is just a wrapper over os.rename with proper checking.
2261

2262
  @type old: str
2263
  @param old: the old (actual) file name
2264
  @type new: str
2265
  @param new: the desired file name
2266
  @rtype: tuple
2267
  @return: the success of the operation and payload
2268

2269
  """
2270
  _EnsureJobQueueFile(old)
2271
  _EnsureJobQueueFile(new)
2272

    
2273
  utils.RenameFile(old, new, mkdir=True)
2274

    
2275

    
2276
def JobQueueSetDrainFlag(drain_flag):
2277
  """Set the drain flag for the queue.
2278

2279
  This will set or unset the queue drain flag.
2280

2281
  @type drain_flag: boolean
2282
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2283
  @rtype: truple
2284
  @return: always True, None
2285
  @warning: the function always returns True
2286

2287
  """
2288
  if drain_flag:
2289
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2290
  else:
2291
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2292

    
2293

    
2294
def BlockdevClose(instance_name, disks):
2295
  """Closes the given block devices.
2296

2297
  This means they will be switched to secondary mode (in case of
2298
  DRBD).
2299

2300
  @param instance_name: if the argument is not empty, the symlinks
2301
      of this instance will be removed
2302
  @type disks: list of L{objects.Disk}
2303
  @param disks: the list of disks to be closed
2304
  @rtype: tuple (success, message)
2305
  @return: a tuple of success and message, where success
2306
      indicates the succes of the operation, and message
2307
      which will contain the error details in case we
2308
      failed
2309

2310
  """
2311
  bdevs = []
2312
  for cf in disks:
2313
    rd = _RecursiveFindBD(cf)
2314
    if rd is None:
2315
      _Fail("Can't find device %s", cf)
2316
    bdevs.append(rd)
2317

    
2318
  msg = []
2319
  for rd in bdevs:
2320
    try:
2321
      rd.Close()
2322
    except errors.BlockDeviceError, err:
2323
      msg.append(str(err))
2324
  if msg:
2325
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2326
  else:
2327
    if instance_name:
2328
      _RemoveBlockDevLinks(instance_name, disks)
2329

    
2330

    
2331
def ValidateHVParams(hvname, hvparams):
2332
  """Validates the given hypervisor parameters.
2333

2334
  @type hvname: string
2335
  @param hvname: the hypervisor name
2336
  @type hvparams: dict
2337
  @param hvparams: the hypervisor parameters to be validated
2338
  @rtype: None
2339

2340
  """
2341
  try:
2342
    hv_type = hypervisor.GetHypervisor(hvname)
2343
    hv_type.ValidateParameters(hvparams)
2344
  except errors.HypervisorError, err:
2345
    _Fail(str(err), log=False)
2346

    
2347

    
2348
def DemoteFromMC():
2349
  """Demotes the current node from master candidate role.
2350

2351
  """
2352
  # try to ensure we're not the master by mistake
2353
  master, myself = ssconf.GetMasterAndMyself()
2354
  if master == myself:
2355
    _Fail("ssconf status shows I'm the master node, will not demote")
2356
  pid_file = utils.DaemonPidFileName(constants.MASTERD)
2357
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2358
    _Fail("The master daemon is running, will not demote")
2359
  try:
2360
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2361
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2362
  except EnvironmentError, err:
2363
    if err.errno != errno.ENOENT:
2364
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2365
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2366

    
2367

    
2368
def _FindDisks(nodes_ip, disks):
2369
  """Sets the physical ID on disks and returns the block devices.
2370

2371
  """
2372
  # set the correct physical ID
2373
  my_name = utils.HostInfo().name
2374
  for cf in disks:
2375
    cf.SetPhysicalID(my_name, nodes_ip)
2376

    
2377
  bdevs = []
2378

    
2379
  for cf in disks:
2380
    rd = _RecursiveFindBD(cf)
2381
    if rd is None:
2382
      _Fail("Can't find device %s", cf)
2383
    bdevs.append(rd)
2384
  return bdevs
2385

    
2386

    
2387
def DrbdDisconnectNet(nodes_ip, disks):
2388
  """Disconnects the network on a list of drbd devices.
2389

2390
  """
2391
  bdevs = _FindDisks(nodes_ip, disks)
2392

    
2393
  # disconnect disks
2394
  for rd in bdevs:
2395
    try:
2396
      rd.DisconnectNet()
2397
    except errors.BlockDeviceError, err:
2398
      _Fail("Can't change network configuration to standalone mode: %s",
2399
            err, exc=True)
2400

    
2401

    
2402
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2403
  """Attaches the network on a list of drbd devices.
2404

2405
  """
2406
  bdevs = _FindDisks(nodes_ip, disks)
2407

    
2408
  if multimaster:
2409
    for idx, rd in enumerate(bdevs):
2410
      try:
2411
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2412
      except EnvironmentError, err:
2413
        _Fail("Can't create symlink: %s", err)
2414
  # reconnect disks, switch to new master configuration and if
2415
  # needed primary mode
2416
  for rd in bdevs:
2417
    try:
2418
      rd.AttachNet(multimaster)
2419
    except errors.BlockDeviceError, err:
2420
      _Fail("Can't change network configuration: %s", err)
2421
  # wait until the disks are connected; we need to retry the re-attach
2422
  # if the device becomes standalone, as this might happen if the one
2423
  # node disconnects and reconnects in a different mode before the
2424
  # other node reconnects; in this case, one or both of the nodes will
2425
  # decide it has wrong configuration and switch to standalone
2426
  RECONNECT_TIMEOUT = 2 * 60
2427
  sleep_time = 0.100 # start with 100 miliseconds
2428
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2429
  while time.time() < timeout_limit:
2430
    all_connected = True
2431
    for rd in bdevs:
2432
      stats = rd.GetProcStatus()
2433
      if not (stats.is_connected or stats.is_in_resync):
2434
        all_connected = False
2435
      if stats.is_standalone:
2436
        # peer had different config info and this node became
2437
        # standalone, even though this should not happen with the
2438
        # new staged way of changing disk configs
2439
        try:
2440
          rd.AttachNet(multimaster)
2441
        except errors.BlockDeviceError, err:
2442
          _Fail("Can't change network configuration: %s", err)
2443
    if all_connected:
2444
      break
2445
    time.sleep(sleep_time)
2446
    sleep_time = min(5, sleep_time * 1.5)
2447
  if not all_connected:
2448
    _Fail("Timeout in disk reconnecting")
2449
  if multimaster:
2450
    # change to primary mode
2451
    for rd in bdevs:
2452
      try:
2453
        rd.Open()
2454
      except errors.BlockDeviceError, err:
2455
        _Fail("Can't change to primary mode: %s", err)
2456

    
2457

    
2458
def DrbdWaitSync(nodes_ip, disks):
2459
  """Wait until DRBDs have synchronized.
2460

2461
  """
2462
  bdevs = _FindDisks(nodes_ip, disks)
2463

    
2464
  min_resync = 100
2465
  alldone = True
2466
  for rd in bdevs:
2467
    stats = rd.GetProcStatus()
2468
    if not (stats.is_connected or stats.is_in_resync):
2469
      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2470
    alldone = alldone and (not stats.is_in_resync)
2471
    if stats.sync_percent is not None:
2472
      min_resync = min(min_resync, stats.sync_percent)
2473

    
2474
  return (alldone, min_resync)
2475

    
2476

    
2477
def PowercycleNode(hypervisor_type):
2478
  """Hard-powercycle the node.
2479

2480
  Because we need to return first, and schedule the powercycle in the
2481
  background, we won't be able to report failures nicely.
2482

2483
  """
2484
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2485
  try:
2486
    pid = os.fork()
2487
  except OSError:
2488
    # if we can't fork, we'll pretend that we're in the child process
2489
    pid = 0
2490
  if pid > 0:
2491
    return "Reboot scheduled in 5 seconds"
2492
  time.sleep(5)
2493
  hyper.PowercycleNode()
2494

    
2495

    
2496
class HooksRunner(object):
2497
  """Hook runner.
2498

2499
  This class is instantiated on the node side (ganeti-noded) and not
2500
  on the master side.
2501

2502
  """
2503
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2504

    
2505
  def __init__(self, hooks_base_dir=None):
2506
    """Constructor for hooks runner.
2507

2508
    @type hooks_base_dir: str or None
2509
    @param hooks_base_dir: if not None, this overrides the
2510
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2511

2512
    """
2513
    if hooks_base_dir is None:
2514
      hooks_base_dir = constants.HOOKS_BASE_DIR
2515
    self._BASE_DIR = hooks_base_dir
2516

    
2517
  @staticmethod
2518
  def ExecHook(script, env):
2519
    """Exec one hook script.
2520

2521
    @type script: str
2522
    @param script: the full path to the script
2523
    @type env: dict
2524
    @param env: the environment with which to exec the script
2525
    @rtype: tuple (success, message)
2526
    @return: a tuple of success and message, where success
2527
        indicates the succes of the operation, and message
2528
        which will contain the error details in case we
2529
        failed
2530

2531
    """
2532
    # exec the process using subprocess and log the output
2533
    fdstdin = None
2534
    try:
2535
      fdstdin = open("/dev/null", "r")
2536
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2537
                               stderr=subprocess.STDOUT, close_fds=True,
2538
                               shell=False, cwd="/", env=env)
2539
      output = ""
2540
      try:
2541
        output = child.stdout.read(4096)
2542
        child.stdout.close()
2543
      except EnvironmentError, err:
2544
        output += "Hook script error: %s" % str(err)
2545

    
2546
      while True:
2547
        try:
2548
          result = child.wait()
2549
          break
2550
        except EnvironmentError, err:
2551
          if err.errno == errno.EINTR:
2552
            continue
2553
          raise
2554
    finally:
2555
      # try not to leak fds
2556
      for fd in (fdstdin, ):
2557
        if fd is not None:
2558
          try:
2559
            fd.close()
2560
          except EnvironmentError, err:
2561
            # just log the error
2562
            #logging.exception("Error while closing fd %s", fd)
2563
            pass
2564

    
2565
    return result == 0, utils.SafeEncode(output.strip())
2566

    
2567
  def RunHooks(self, hpath, phase, env):
2568
    """Run the scripts in the hooks directory.
2569

2570
    @type hpath: str
2571
    @param hpath: the path to the hooks directory which
2572
        holds the scripts
2573
    @type phase: str
2574
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2575
        L{constants.HOOKS_PHASE_POST}
2576
    @type env: dict
2577
    @param env: dictionary with the environment for the hook
2578
    @rtype: list
2579
    @return: list of 3-element tuples:
2580
      - script path
2581
      - script result, either L{constants.HKR_SUCCESS} or
2582
        L{constants.HKR_FAIL}
2583
      - output of the script
2584

2585
    @raise errors.ProgrammerError: for invalid input
2586
        parameters
2587

2588
    """
2589
    if phase == constants.HOOKS_PHASE_PRE:
2590
      suffix = "pre"
2591
    elif phase == constants.HOOKS_PHASE_POST:
2592
      suffix = "post"
2593
    else:
2594
      _Fail("Unknown hooks phase '%s'", phase)
2595

    
2596
    rr = []
2597

    
2598
    subdir = "%s-%s.d" % (hpath, suffix)
2599
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2600
    try:
2601
      dir_contents = utils.ListVisibleFiles(dir_name)
2602
    except OSError:
2603
      # FIXME: must log output in case of failures
2604
      return rr
2605

    
2606
    # we use the standard python sort order,
2607
    # so 00name is the recommended naming scheme
2608
    dir_contents.sort()
2609
    for relname in dir_contents:
2610
      fname = os.path.join(dir_name, relname)
2611
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2612
          self.RE_MASK.match(relname) is not None):
2613
        rrval = constants.HKR_SKIP
2614
        output = ""
2615
      else:
2616
        result, output = self.ExecHook(fname, env)
2617
        if not result:
2618
          rrval = constants.HKR_FAIL
2619
        else:
2620
          rrval = constants.HKR_SUCCESS
2621
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2622

    
2623
    return rr
2624

    
2625

    
2626
class IAllocatorRunner(object):
2627
  """IAllocator runner.
2628

2629
  This class is instantiated on the node side (ganeti-noded) and not on
2630
  the master side.
2631

2632
  """
2633
  def Run(self, name, idata):
2634
    """Run an iallocator script.
2635

2636
    @type name: str
2637
    @param name: the iallocator script name
2638
    @type idata: str
2639
    @param idata: the allocator input data
2640

2641
    @rtype: tuple
2642
    @return: two element tuple of:
2643
       - status
2644
       - either error message or stdout of allocator (for success)
2645

2646
    """
2647
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2648
                                  os.path.isfile)
2649
    if alloc_script is None:
2650
      _Fail("iallocator module '%s' not found in the search path", name)
2651

    
2652
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2653
    try:
2654
      os.write(fd, idata)
2655
      os.close(fd)
2656
      result = utils.RunCmd([alloc_script, fin_name])
2657
      if result.failed:
2658
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2659
              name, result.fail_reason, result.output)
2660
    finally:
2661
      os.unlink(fin_name)
2662

    
2663
    return result.stdout
2664

    
2665

    
2666
class DevCacheManager(object):
2667
  """Simple class for managing a cache of block device information.
2668

2669
  """
2670
  _DEV_PREFIX = "/dev/"
2671
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2672

    
2673
  @classmethod
2674
  def _ConvertPath(cls, dev_path):
2675
    """Converts a /dev/name path to the cache file name.
2676

2677
    This replaces slashes with underscores and strips the /dev
2678
    prefix. It then returns the full path to the cache file.
2679

2680
    @type dev_path: str
2681
    @param dev_path: the C{/dev/} path name
2682
    @rtype: str
2683
    @return: the converted path name
2684

2685
    """
2686
    if dev_path.startswith(cls._DEV_PREFIX):
2687
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2688
    dev_path = dev_path.replace("/", "_")
2689
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2690
    return fpath
2691

    
2692
  @classmethod
2693
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2694
    """Updates the cache information for a given device.
2695

2696
    @type dev_path: str
2697
    @param dev_path: the pathname of the device
2698
    @type owner: str
2699
    @param owner: the owner (instance name) of the device
2700
    @type on_primary: bool
2701
    @param on_primary: whether this is the primary
2702
        node nor not
2703
    @type iv_name: str
2704
    @param iv_name: the instance-visible name of the
2705
        device, as in objects.Disk.iv_name
2706

2707
    @rtype: None
2708

2709
    """
2710
    if dev_path is None:
2711
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2712
      return
2713
    fpath = cls._ConvertPath(dev_path)
2714
    if on_primary:
2715
      state = "primary"
2716
    else:
2717
      state = "secondary"
2718
    if iv_name is None:
2719
      iv_name = "not_visible"
2720
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2721
    try:
2722
      utils.WriteFile(fpath, data=fdata)
2723
    except EnvironmentError, err:
2724
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2725

    
2726
  @classmethod
2727
  def RemoveCache(cls, dev_path):
2728
    """Remove data for a dev_path.
2729

2730
    This is just a wrapper over L{utils.RemoveFile} with a converted
2731
    path name and logging.
2732

2733
    @type dev_path: str
2734
    @param dev_path: the pathname of the device
2735

2736
    @rtype: None
2737

2738
    """
2739
    if dev_path is None:
2740
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2741
      return
2742
    fpath = cls._ConvertPath(dev_path)
2743
    try:
2744
      utils.RemoveFile(fpath)
2745
    except EnvironmentError, err:
2746
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)