Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 95075fba

History | View | Annotate | Download (84 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55

    
56

    
57
class RPCFail(Exception):
58
  """Class denoting RPC failure.
59

60
  Its argument is the error message.
61

62
  """
63

    
64

    
65
def _Fail(msg, *args, **kwargs):
66
  """Log an error and the raise an RPCFail exception.
67

68
  This exception is then handled specially in the ganeti daemon and
69
  turned into a 'failed' return type. As such, this function is a
70
  useful shortcut for logging the error and returning it to the master
71
  daemon.
72

73
  @type msg: string
74
  @param msg: the text of the exception
75
  @raise RPCFail
76

77
  """
78
  if args:
79
    msg = msg % args
80
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
81
    if "exc" in kwargs and kwargs["exc"]:
82
      logging.exception(msg)
83
    else:
84
      logging.error(msg)
85
  raise RPCFail(msg)
86

    
87

    
88
def _GetConfig():
89
  """Simple wrapper to return a SimpleStore.
90

91
  @rtype: L{ssconf.SimpleStore}
92
  @return: a SimpleStore instance
93

94
  """
95
  return ssconf.SimpleStore()
96

    
97

    
98
def _GetSshRunner(cluster_name):
99
  """Simple wrapper to return an SshRunner.
100

101
  @type cluster_name: str
102
  @param cluster_name: the cluster name, which is needed
103
      by the SshRunner constructor
104
  @rtype: L{ssh.SshRunner}
105
  @return: an SshRunner instance
106

107
  """
108
  return ssh.SshRunner(cluster_name)
109

    
110

    
111
def _Decompress(data):
112
  """Unpacks data compressed by the RPC client.
113

114
  @type data: list or tuple
115
  @param data: Data sent by RPC client
116
  @rtype: str
117
  @return: Decompressed data
118

119
  """
120
  assert isinstance(data, (list, tuple))
121
  assert len(data) == 2
122
  (encoding, content) = data
123
  if encoding == constants.RPC_ENCODING_NONE:
124
    return content
125
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126
    return zlib.decompress(base64.b64decode(content))
127
  else:
128
    raise AssertionError("Unknown data encoding")
129

    
130

    
131
def _CleanDirectory(path, exclude=None):
132
  """Removes all regular files in a directory.
133

134
  @type path: str
135
  @param path: the directory to clean
136
  @type exclude: list
137
  @param exclude: list of files to be excluded, defaults
138
      to the empty list
139

140
  """
141
  if not os.path.isdir(path):
142
    return
143
  if exclude is None:
144
    exclude = []
145
  else:
146
    # Normalize excluded paths
147
    exclude = [os.path.normpath(i) for i in exclude]
148

    
149
  for rel_name in utils.ListVisibleFiles(path):
150
    full_name = os.path.normpath(os.path.join(path, rel_name))
151
    if full_name in exclude:
152
      continue
153
    if os.path.isfile(full_name) and not os.path.islink(full_name):
154
      utils.RemoveFile(full_name)
155

    
156

    
157
def _BuildUploadFileList():
158
  """Build the list of allowed upload files.
159

160
  This is abstracted so that it's built only once at module import time.
161

162
  """
163
  allowed_files = set([
164
    constants.CLUSTER_CONF_FILE,
165
    constants.ETC_HOSTS,
166
    constants.SSH_KNOWN_HOSTS_FILE,
167
    constants.VNC_PASSWORD_FILE,
168
    constants.RAPI_CERT_FILE,
169
    constants.RAPI_USERS_FILE,
170
    constants.HMAC_CLUSTER_KEY,
171
    ])
172

    
173
  for hv_name in constants.HYPER_TYPES:
174
    hv_class = hypervisor.GetHypervisorClass(hv_name)
175
    allowed_files.update(hv_class.GetAncillaryFiles())
176

    
177
  return frozenset(allowed_files)
178

    
179

    
180
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181

    
182

    
183
def JobQueuePurge():
184
  """Removes job queue files and archived jobs.
185

186
  @rtype: tuple
187
  @return: True, None
188

189
  """
190
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192

    
193

    
194
def GetMasterInfo():
195
  """Returns master information.
196

197
  This is an utility function to compute master information, either
198
  for consumption here or from the node daemon.
199

200
  @rtype: tuple
201
  @return: master_netdev, master_ip, master_name
202
  @raise RPCFail: in case of errors
203

204
  """
205
  try:
206
    cfg = _GetConfig()
207
    master_netdev = cfg.GetMasterNetdev()
208
    master_ip = cfg.GetMasterIP()
209
    master_node = cfg.GetMasterNode()
210
  except errors.ConfigurationError, err:
211
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
212
  return (master_netdev, master_ip, master_node)
213

    
214

    
215
def StartMaster(start_daemons, no_voting):
216
  """Activate local node as master node.
217

218
  The function will always try activate the IP address of the master
219
  (unless someone else has it). It will also start the master daemons,
220
  based on the start_daemons parameter.
221

222
  @type start_daemons: boolean
223
  @param start_daemons: whether to also start the master
224
      daemons (ganeti-masterd and ganeti-rapi)
225
  @type no_voting: boolean
226
  @param no_voting: whether to start ganeti-masterd without a node vote
227
      (if start_daemons is True), but still non-interactively
228
  @rtype: None
229

230
  """
231
  # GetMasterInfo will raise an exception if not able to return data
232
  master_netdev, master_ip, _ = GetMasterInfo()
233

    
234
  err_msgs = []
235
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236
    if utils.OwnIpAddress(master_ip):
237
      # we already have the ip:
238
      logging.debug("Master IP already configured, doing nothing")
239
    else:
240
      msg = "Someone else has the master ip, not activating"
241
      logging.error(msg)
242
      err_msgs.append(msg)
243
  else:
244
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245
                           "dev", master_netdev, "label",
246
                           "%s:0" % master_netdev])
247
    if result.failed:
248
      msg = "Can't activate master IP: %s" % result.output
249
      logging.error(msg)
250
      err_msgs.append(msg)
251

    
252
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253
                           "-s", master_ip, master_ip])
254
    # we'll ignore the exit code of arping
255

    
256
  # and now start the master and rapi daemons
257
  if start_daemons:
258
    daemons_params = {
259
        'ganeti-masterd': [],
260
        'ganeti-rapi': [],
261
        }
262
    if no_voting:
263
      daemons_params['ganeti-masterd'].append('--no-voting')
264
      daemons_params['ganeti-masterd'].append('--yes-do-it')
265
    for daemon in daemons_params:
266
      cmd = [daemon]
267
      cmd.extend(daemons_params[daemon])
268
      result = utils.RunCmd(cmd)
269
      if result.failed:
270
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
271
        logging.error(msg)
272
        err_msgs.append(msg)
273

    
274
  if err_msgs:
275
    _Fail("; ".join(err_msgs))
276

    
277

    
278
def StopMaster(stop_daemons):
279
  """Deactivate this node as master.
280

281
  The function will always try to deactivate the IP address of the
282
  master. It will also stop the master daemons depending on the
283
  stop_daemons parameter.
284

285
  @type stop_daemons: boolean
286
  @param stop_daemons: whether to also stop the master daemons
287
      (ganeti-masterd and ganeti-rapi)
288
  @rtype: None
289

290
  """
291
  # TODO: log and report back to the caller the error failures; we
292
  # need to decide in which case we fail the RPC for this
293

    
294
  # GetMasterInfo will raise an exception if not able to return data
295
  master_netdev, master_ip, _ = GetMasterInfo()
296

    
297
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298
                         "dev", master_netdev])
299
  if result.failed:
300
    logging.error("Can't remove the master IP, error: %s", result.output)
301
    # but otherwise ignore the failure
302

    
303
  if stop_daemons:
304
    # stop/kill the rapi and the master daemon
305
    for daemon in constants.RAPI, constants.MASTERD:
306
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307

    
308

    
309
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310
  """Joins this node to the cluster.
311

312
  This does the following:
313
      - updates the hostkeys of the machine (rsa and dsa)
314
      - adds the ssh private key to the user
315
      - adds the ssh public key to the users' authorized_keys file
316

317
  @type dsa: str
318
  @param dsa: the DSA private key to write
319
  @type dsapub: str
320
  @param dsapub: the DSA public key to write
321
  @type rsa: str
322
  @param rsa: the RSA private key to write
323
  @type rsapub: str
324
  @param rsapub: the RSA public key to write
325
  @type sshkey: str
326
  @param sshkey: the SSH private key to write
327
  @type sshpub: str
328
  @param sshpub: the SSH public key to write
329
  @rtype: boolean
330
  @return: the success of the operation
331

332
  """
333
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337
  for name, content, mode in sshd_keys:
338
    utils.WriteFile(name, data=content, mode=mode)
339

    
340
  try:
341
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342
                                                    mkdir=True)
343
  except errors.OpExecError, err:
344
    _Fail("Error while processing user ssh files: %s", err, exc=True)
345

    
346
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347
    utils.WriteFile(name, data=content, mode=0600)
348

    
349
  utils.AddAuthorizedKey(auth_keys, sshpub)
350

    
351
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352

    
353

    
354
def LeaveCluster():
355
  """Cleans up and remove the current node.
356

357
  This function cleans up and prepares the current node to be removed
358
  from the cluster.
359

360
  If processing is successful, then it raises an
361
  L{errors.QuitGanetiException} which is used as a special case to
362
  shutdown the node daemon.
363

364
  """
365
  _CleanDirectory(constants.DATA_DIR)
366
  JobQueuePurge()
367

    
368
  try:
369
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370

    
371
    utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372

    
373
    utils.RemoveFile(priv_key)
374
    utils.RemoveFile(pub_key)
375
  except errors.OpExecError:
376
    logging.exception("Error while processing ssh files")
377

    
378
  try:
379
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380
    utils.RemoveFile(constants.RAPI_CERT_FILE)
381
    utils.RemoveFile(constants.SSL_CERT_FILE)
382
  except:
383
    logging.exception("Error while removing cluster secrets")
384

    
385
  confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
386

    
387
  if confd_pid:
388
    utils.KillProcess(confd_pid, timeout=2)
389

    
390
  # Raise a custom exception (handled in ganeti-noded)
391
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
392

    
393

    
394
def GetNodeInfo(vgname, hypervisor_type):
395
  """Gives back a hash with different information about the node.
396

397
  @type vgname: C{string}
398
  @param vgname: the name of the volume group to ask for disk space information
399
  @type hypervisor_type: C{str}
400
  @param hypervisor_type: the name of the hypervisor to ask for
401
      memory information
402
  @rtype: C{dict}
403
  @return: dictionary with the following keys:
404
      - vg_size is the size of the configured volume group in MiB
405
      - vg_free is the free size of the volume group in MiB
406
      - memory_dom0 is the memory allocated for domain0 in MiB
407
      - memory_free is the currently available (free) ram in MiB
408
      - memory_total is the total number of ram in MiB
409

410
  """
411
  outputarray = {}
412
  vginfo = _GetVGInfo(vgname)
413
  outputarray['vg_size'] = vginfo['vg_size']
414
  outputarray['vg_free'] = vginfo['vg_free']
415

    
416
  hyper = hypervisor.GetHypervisor(hypervisor_type)
417
  hyp_info = hyper.GetNodeInfo()
418
  if hyp_info is not None:
419
    outputarray.update(hyp_info)
420

    
421
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
422

    
423
  return outputarray
424

    
425

    
426
def VerifyNode(what, cluster_name):
427
  """Verify the status of the local node.
428

429
  Based on the input L{what} parameter, various checks are done on the
430
  local node.
431

432
  If the I{filelist} key is present, this list of
433
  files is checksummed and the file/checksum pairs are returned.
434

435
  If the I{nodelist} key is present, we check that we have
436
  connectivity via ssh with the target nodes (and check the hostname
437
  report).
438

439
  If the I{node-net-test} key is present, we check that we have
440
  connectivity to the given nodes via both primary IP and, if
441
  applicable, secondary IPs.
442

443
  @type what: C{dict}
444
  @param what: a dictionary of things to check:
445
      - filelist: list of files for which to compute checksums
446
      - nodelist: list of nodes we should check ssh communication with
447
      - node-net-test: list of nodes we should check node daemon port
448
        connectivity with
449
      - hypervisor: list with hypervisors to run the verify for
450
  @rtype: dict
451
  @return: a dictionary with the same keys as the input dict, and
452
      values representing the result of the checks
453

454
  """
455
  result = {}
456

    
457
  if constants.NV_HYPERVISOR in what:
458
    result[constants.NV_HYPERVISOR] = tmp = {}
459
    for hv_name in what[constants.NV_HYPERVISOR]:
460
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
461

    
462
  if constants.NV_FILELIST in what:
463
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
464
      what[constants.NV_FILELIST])
465

    
466
  if constants.NV_NODELIST in what:
467
    result[constants.NV_NODELIST] = tmp = {}
468
    random.shuffle(what[constants.NV_NODELIST])
469
    for node in what[constants.NV_NODELIST]:
470
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
471
      if not success:
472
        tmp[node] = message
473

    
474
  if constants.NV_NODENETTEST in what:
475
    result[constants.NV_NODENETTEST] = tmp = {}
476
    my_name = utils.HostInfo().name
477
    my_pip = my_sip = None
478
    for name, pip, sip in what[constants.NV_NODENETTEST]:
479
      if name == my_name:
480
        my_pip = pip
481
        my_sip = sip
482
        break
483
    if not my_pip:
484
      tmp[my_name] = ("Can't find my own primary/secondary IP"
485
                      " in the node list")
486
    else:
487
      port = utils.GetDaemonPort(constants.NODED)
488
      for name, pip, sip in what[constants.NV_NODENETTEST]:
489
        fail = []
490
        if not utils.TcpPing(pip, port, source=my_pip):
491
          fail.append("primary")
492
        if sip != pip:
493
          if not utils.TcpPing(sip, port, source=my_sip):
494
            fail.append("secondary")
495
        if fail:
496
          tmp[name] = ("failure using the %s interface(s)" %
497
                       " and ".join(fail))
498

    
499
  if constants.NV_LVLIST in what:
500
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
501

    
502
  if constants.NV_INSTANCELIST in what:
503
    result[constants.NV_INSTANCELIST] = GetInstanceList(
504
      what[constants.NV_INSTANCELIST])
505

    
506
  if constants.NV_VGLIST in what:
507
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
508

    
509
  if constants.NV_VERSION in what:
510
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511
                                    constants.RELEASE_VERSION)
512

    
513
  if constants.NV_HVINFO in what:
514
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
516

    
517
  if constants.NV_DRBDLIST in what:
518
    try:
519
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
520
    except errors.BlockDeviceError, err:
521
      logging.warning("Can't get used minors list", exc_info=True)
522
      used_minors = str(err)
523
    result[constants.NV_DRBDLIST] = used_minors
524

    
525
  return result
526

    
527

    
528
def GetVolumeList(vg_name):
529
  """Compute list of logical volumes and their size.
530

531
  @type vg_name: str
532
  @param vg_name: the volume group whose LVs we should list
533
  @rtype: dict
534
  @return:
535
      dictionary of all partions (key) with value being a tuple of
536
      their size (in MiB), inactive and online status::
537

538
        {'test1': ('20.06', True, True)}
539

540
      in case of errors, a string is returned with the error
541
      details.
542

543
  """
544
  lvs = {}
545
  sep = '|'
546
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547
                         "--separator=%s" % sep,
548
                         "-olv_name,lv_size,lv_attr", vg_name])
549
  if result.failed:
550
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
551

    
552
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
553
  for line in result.stdout.splitlines():
554
    line = line.strip()
555
    match = valid_line_re.match(line)
556
    if not match:
557
      logging.error("Invalid line returned from lvs output: '%s'", line)
558
      continue
559
    name, size, attr = match.groups()
560
    inactive = attr[4] == '-'
561
    online = attr[5] == 'o'
562
    virtual = attr[0] == 'v'
563
    if virtual:
564
      # we don't want to report such volumes as existing, since they
565
      # don't really hold data
566
      continue
567
    lvs[name] = (size, inactive, online)
568

    
569
  return lvs
570

    
571

    
572
def ListVolumeGroups():
573
  """List the volume groups and their size.
574

575
  @rtype: dict
576
  @return: dictionary with keys volume name and values the
577
      size of the volume
578

579
  """
580
  return utils.ListVolumeGroups()
581

    
582

    
583
def NodeVolumes():
584
  """List all volumes on this node.
585

586
  @rtype: list
587
  @return:
588
    A list of dictionaries, each having four keys:
589
      - name: the logical volume name,
590
      - size: the size of the logical volume
591
      - dev: the physical device on which the LV lives
592
      - vg: the volume group to which it belongs
593

594
    In case of errors, we return an empty list and log the
595
    error.
596

597
    Note that since a logical volume can live on multiple physical
598
    volumes, the resulting list might include a logical volume
599
    multiple times.
600

601
  """
602
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
603
                         "--separator=|",
604
                         "--options=lv_name,lv_size,devices,vg_name"])
605
  if result.failed:
606
    _Fail("Failed to list logical volumes, lvs output: %s",
607
          result.output)
608

    
609
  def parse_dev(dev):
610
    if '(' in dev:
611
      return dev.split('(')[0]
612
    else:
613
      return dev
614

    
615
  def map_line(line):
616
    return {
617
      'name': line[0].strip(),
618
      'size': line[1].strip(),
619
      'dev': parse_dev(line[2].strip()),
620
      'vg': line[3].strip(),
621
    }
622

    
623
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
624
          if line.count('|') >= 3]
625

    
626

    
627
def BridgesExist(bridges_list):
628
  """Check if a list of bridges exist on the current node.
629

630
  @rtype: boolean
631
  @return: C{True} if all of them exist, C{False} otherwise
632

633
  """
634
  missing = []
635
  for bridge in bridges_list:
636
    if not utils.BridgeExists(bridge):
637
      missing.append(bridge)
638

    
639
  if missing:
640
    _Fail("Missing bridges %s", ", ".join(missing))
641

    
642

    
643
def GetInstanceList(hypervisor_list):
644
  """Provides a list of instances.
645

646
  @type hypervisor_list: list
647
  @param hypervisor_list: the list of hypervisors to query information
648

649
  @rtype: list
650
  @return: a list of all running instances on the current node
651
    - instance1.example.com
652
    - instance2.example.com
653

654
  """
655
  results = []
656
  for hname in hypervisor_list:
657
    try:
658
      names = hypervisor.GetHypervisor(hname).ListInstances()
659
      results.extend(names)
660
    except errors.HypervisorError, err:
661
      _Fail("Error enumerating instances (hypervisor %s): %s",
662
            hname, err, exc=True)
663

    
664
  return results
665

    
666

    
667
def GetInstanceInfo(instance, hname):
668
  """Gives back the information about an instance as a dictionary.
669

670
  @type instance: string
671
  @param instance: the instance name
672
  @type hname: string
673
  @param hname: the hypervisor type of the instance
674

675
  @rtype: dict
676
  @return: dictionary with the following keys:
677
      - memory: memory size of instance (int)
678
      - state: xen state of instance (string)
679
      - time: cpu time of instance (float)
680

681
  """
682
  output = {}
683

    
684
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
685
  if iinfo is not None:
686
    output['memory'] = iinfo[2]
687
    output['state'] = iinfo[4]
688
    output['time'] = iinfo[5]
689

    
690
  return output
691

    
692

    
693
def GetInstanceMigratable(instance):
694
  """Gives whether an instance can be migrated.
695

696
  @type instance: L{objects.Instance}
697
  @param instance: object representing the instance to be checked.
698

699
  @rtype: tuple
700
  @return: tuple of (result, description) where:
701
      - result: whether the instance can be migrated or not
702
      - description: a description of the issue, if relevant
703

704
  """
705
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
706
  iname = instance.name
707
  if iname not in hyper.ListInstances():
708
    _Fail("Instance %s is not running", iname)
709

    
710
  for idx in range(len(instance.disks)):
711
    link_name = _GetBlockDevSymlinkPath(iname, idx)
712
    if not os.path.islink(link_name):
713
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
714

    
715

    
716
def GetAllInstancesInfo(hypervisor_list):
717
  """Gather data about all instances.
718

719
  This is the equivalent of L{GetInstanceInfo}, except that it
720
  computes data for all instances at once, thus being faster if one
721
  needs data about more than one instance.
722

723
  @type hypervisor_list: list
724
  @param hypervisor_list: list of hypervisors to query for instance data
725

726
  @rtype: dict
727
  @return: dictionary of instance: data, with data having the following keys:
728
      - memory: memory size of instance (int)
729
      - state: xen state of instance (string)
730
      - time: cpu time of instance (float)
731
      - vcpus: the number of vcpus
732

733
  """
734
  output = {}
735

    
736
  for hname in hypervisor_list:
737
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
738
    if iinfo:
739
      for name, _, memory, vcpus, state, times in iinfo:
740
        value = {
741
          'memory': memory,
742
          'vcpus': vcpus,
743
          'state': state,
744
          'time': times,
745
          }
746
        if name in output:
747
          # we only check static parameters, like memory and vcpus,
748
          # and not state and time which can change between the
749
          # invocations of the different hypervisors
750
          for key in 'memory', 'vcpus':
751
            if value[key] != output[name][key]:
752
              _Fail("Instance %s is running twice"
753
                    " with different parameters", name)
754
        output[name] = value
755

    
756
  return output
757

    
758

    
759
def InstanceOsAdd(instance, reinstall):
760
  """Add an OS to an instance.
761

762
  @type instance: L{objects.Instance}
763
  @param instance: Instance whose OS is to be installed
764
  @type reinstall: boolean
765
  @param reinstall: whether this is an instance reinstall
766
  @rtype: None
767

768
  """
769
  inst_os = OSFromDisk(instance.os)
770

    
771
  create_env = OSEnvironment(instance, inst_os)
772
  if reinstall:
773
    create_env['INSTANCE_REINSTALL'] = "1"
774

    
775
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
776
                                     instance.name, int(time.time()))
777

    
778
  result = utils.RunCmd([inst_os.create_script], env=create_env,
779
                        cwd=inst_os.path, output=logfile,)
780
  if result.failed:
781
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
782
                  " output: %s", result.cmd, result.fail_reason, logfile,
783
                  result.output)
784
    lines = [utils.SafeEncode(val)
785
             for val in utils.TailFile(logfile, lines=20)]
786
    _Fail("OS create script failed (%s), last lines in the"
787
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
788

    
789

    
790
def RunRenameInstance(instance, old_name):
791
  """Run the OS rename script for an instance.
792

793
  @type instance: L{objects.Instance}
794
  @param instance: Instance whose OS is to be installed
795
  @type old_name: string
796
  @param old_name: previous instance name
797
  @rtype: boolean
798
  @return: the success of the operation
799

800
  """
801
  inst_os = OSFromDisk(instance.os)
802

    
803
  rename_env = OSEnvironment(instance, inst_os)
804
  rename_env['OLD_INSTANCE_NAME'] = old_name
805

    
806
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
807
                                           old_name,
808
                                           instance.name, int(time.time()))
809

    
810
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
811
                        cwd=inst_os.path, output=logfile)
812

    
813
  if result.failed:
814
    logging.error("os create command '%s' returned error: %s output: %s",
815
                  result.cmd, result.fail_reason, result.output)
816
    lines = [utils.SafeEncode(val)
817
             for val in utils.TailFile(logfile, lines=20)]
818
    _Fail("OS rename script failed (%s), last lines in the"
819
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
820

    
821

    
822
def _GetVGInfo(vg_name):
823
  """Get information about the volume group.
824

825
  @type vg_name: str
826
  @param vg_name: the volume group which we query
827
  @rtype: dict
828
  @return:
829
    A dictionary with the following keys:
830
      - C{vg_size} is the total size of the volume group in MiB
831
      - C{vg_free} is the free size of the volume group in MiB
832
      - C{pv_count} are the number of physical disks in that VG
833

834
    If an error occurs during gathering of data, we return the same dict
835
    with keys all set to None.
836

837
  """
838
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
839

    
840
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
841
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
842

    
843
  if retval.failed:
844
    logging.error("volume group %s not present", vg_name)
845
    return retdic
846
  valarr = retval.stdout.strip().rstrip(':').split(':')
847
  if len(valarr) == 3:
848
    try:
849
      retdic = {
850
        "vg_size": int(round(float(valarr[0]), 0)),
851
        "vg_free": int(round(float(valarr[1]), 0)),
852
        "pv_count": int(valarr[2]),
853
        }
854
    except ValueError, err:
855
      logging.exception("Fail to parse vgs output: %s", err)
856
  else:
857
    logging.error("vgs output has the wrong number of fields (expected"
858
                  " three): %s", str(valarr))
859
  return retdic
860

    
861

    
862
def _GetBlockDevSymlinkPath(instance_name, idx):
863
  return os.path.join(constants.DISK_LINKS_DIR,
864
                      "%s:%d" % (instance_name, idx))
865

    
866

    
867
def _SymlinkBlockDev(instance_name, device_path, idx):
868
  """Set up symlinks to a instance's block device.
869

870
  This is an auxiliary function run when an instance is start (on the primary
871
  node) or when an instance is migrated (on the target node).
872

873

874
  @param instance_name: the name of the target instance
875
  @param device_path: path of the physical block device, on the node
876
  @param idx: the disk index
877
  @return: absolute path to the disk's symlink
878

879
  """
880
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
881
  try:
882
    os.symlink(device_path, link_name)
883
  except OSError, err:
884
    if err.errno == errno.EEXIST:
885
      if (not os.path.islink(link_name) or
886
          os.readlink(link_name) != device_path):
887
        os.remove(link_name)
888
        os.symlink(device_path, link_name)
889
    else:
890
      raise
891

    
892
  return link_name
893

    
894

    
895
def _RemoveBlockDevLinks(instance_name, disks):
896
  """Remove the block device symlinks belonging to the given instance.
897

898
  """
899
  for idx, _ in enumerate(disks):
900
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
901
    if os.path.islink(link_name):
902
      try:
903
        os.remove(link_name)
904
      except OSError:
905
        logging.exception("Can't remove symlink '%s'", link_name)
906

    
907

    
908
def _GatherAndLinkBlockDevs(instance):
909
  """Set up an instance's block device(s).
910

911
  This is run on the primary node at instance startup. The block
912
  devices must be already assembled.
913

914
  @type instance: L{objects.Instance}
915
  @param instance: the instance whose disks we shoul assemble
916
  @rtype: list
917
  @return: list of (disk_object, device_path)
918

919
  """
920
  block_devices = []
921
  for idx, disk in enumerate(instance.disks):
922
    device = _RecursiveFindBD(disk)
923
    if device is None:
924
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
925
                                    str(disk))
926
    device.Open()
927
    try:
928
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
929
    except OSError, e:
930
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
931
                                    e.strerror)
932

    
933
    block_devices.append((disk, link_name))
934

    
935
  return block_devices
936

    
937

    
938
def StartInstance(instance):
939
  """Start an instance.
940

941
  @type instance: L{objects.Instance}
942
  @param instance: the instance object
943
  @rtype: None
944

945
  """
946
  running_instances = GetInstanceList([instance.hypervisor])
947

    
948
  if instance.name in running_instances:
949
    logging.info("Instance %s already running, not starting", instance.name)
950
    return
951

    
952
  try:
953
    block_devices = _GatherAndLinkBlockDevs(instance)
954
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
955
    hyper.StartInstance(instance, block_devices)
956
  except errors.BlockDeviceError, err:
957
    _Fail("Block device error: %s", err, exc=True)
958
  except errors.HypervisorError, err:
959
    _RemoveBlockDevLinks(instance.name, instance.disks)
960
    _Fail("Hypervisor error: %s", err, exc=True)
961

    
962

    
963
def InstanceShutdown(instance):
964
  """Shut an instance down.
965

966
  @note: this functions uses polling with a hardcoded timeout.
967

968
  @type instance: L{objects.Instance}
969
  @param instance: the instance object
970
  @rtype: None
971

972
  """
973
  hv_name = instance.hypervisor
974
  running_instances = GetInstanceList([hv_name])
975
  iname = instance.name
976

    
977
  if iname not in running_instances:
978
    logging.info("Instance %s not running, doing nothing", iname)
979
    return
980

    
981
  hyper = hypervisor.GetHypervisor(hv_name)
982
  try:
983
    hyper.StopInstance(instance)
984
  except errors.HypervisorError, err:
985
    _Fail("Failed to stop instance %s: %s", iname, err)
986

    
987
  # test every 10secs for 2min
988

    
989
  time.sleep(1)
990
  for _ in range(11):
991
    if instance.name not in GetInstanceList([hv_name]):
992
      break
993
    time.sleep(10)
994
  else:
995
    # the shutdown did not succeed
996
    logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
997

    
998
    try:
999
      hyper.StopInstance(instance, force=True)
1000
    except errors.HypervisorError, err:
1001
      _Fail("Failed to force stop instance %s: %s", iname, err)
1002

    
1003
    time.sleep(1)
1004
    if instance.name in GetInstanceList([hv_name]):
1005
      _Fail("Could not shutdown instance %s even by destroy", iname)
1006

    
1007
  _RemoveBlockDevLinks(iname, instance.disks)
1008

    
1009

    
1010
def InstanceReboot(instance, reboot_type):
1011
  """Reboot an instance.
1012

1013
  @type instance: L{objects.Instance}
1014
  @param instance: the instance object to reboot
1015
  @type reboot_type: str
1016
  @param reboot_type: the type of reboot, one the following
1017
    constants:
1018
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1019
        instance OS, do not recreate the VM
1020
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1021
        restart the VM (at the hypervisor level)
1022
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1023
        not accepted here, since that mode is handled differently, in
1024
        cmdlib, and translates into full stop and start of the
1025
        instance (instead of a call_instance_reboot RPC)
1026
  @rtype: None
1027

1028
  """
1029
  running_instances = GetInstanceList([instance.hypervisor])
1030

    
1031
  if instance.name not in running_instances:
1032
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1033

    
1034
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1035
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1036
    try:
1037
      hyper.RebootInstance(instance)
1038
    except errors.HypervisorError, err:
1039
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1040
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1041
    try:
1042
      InstanceShutdown(instance)
1043
      return StartInstance(instance)
1044
    except errors.HypervisorError, err:
1045
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1046
  else:
1047
    _Fail("Invalid reboot_type received: %s", reboot_type)
1048

    
1049

    
1050
def MigrationInfo(instance):
1051
  """Gather information about an instance to be migrated.
1052

1053
  @type instance: L{objects.Instance}
1054
  @param instance: the instance definition
1055

1056
  """
1057
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1058
  try:
1059
    info = hyper.MigrationInfo(instance)
1060
  except errors.HypervisorError, err:
1061
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1062
  return info
1063

    
1064

    
1065
def AcceptInstance(instance, info, target):
1066
  """Prepare the node to accept an instance.
1067

1068
  @type instance: L{objects.Instance}
1069
  @param instance: the instance definition
1070
  @type info: string/data (opaque)
1071
  @param info: migration information, from the source node
1072
  @type target: string
1073
  @param target: target host (usually ip), on this node
1074

1075
  """
1076
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1077
  try:
1078
    hyper.AcceptInstance(instance, info, target)
1079
  except errors.HypervisorError, err:
1080
    _Fail("Failed to accept instance: %s", err, exc=True)
1081

    
1082

    
1083
def FinalizeMigration(instance, info, success):
1084
  """Finalize any preparation to accept an instance.
1085

1086
  @type instance: L{objects.Instance}
1087
  @param instance: the instance definition
1088
  @type info: string/data (opaque)
1089
  @param info: migration information, from the source node
1090
  @type success: boolean
1091
  @param success: whether the migration was a success or a failure
1092

1093
  """
1094
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1095
  try:
1096
    hyper.FinalizeMigration(instance, info, success)
1097
  except errors.HypervisorError, err:
1098
    _Fail("Failed to finalize migration: %s", err, exc=True)
1099

    
1100

    
1101
def MigrateInstance(instance, target, live):
1102
  """Migrates an instance to another node.
1103

1104
  @type instance: L{objects.Instance}
1105
  @param instance: the instance definition
1106
  @type target: string
1107
  @param target: the target node name
1108
  @type live: boolean
1109
  @param live: whether the migration should be done live or not (the
1110
      interpretation of this parameter is left to the hypervisor)
1111
  @rtype: tuple
1112
  @return: a tuple of (success, msg) where:
1113
      - succes is a boolean denoting the success/failure of the operation
1114
      - msg is a string with details in case of failure
1115

1116
  """
1117
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1118

    
1119
  try:
1120
    hyper.MigrateInstance(instance.name, target, live)
1121
  except errors.HypervisorError, err:
1122
    _Fail("Failed to migrate instance: %s", err, exc=True)
1123

    
1124

    
1125
def BlockdevCreate(disk, size, owner, on_primary, info):
1126
  """Creates a block device for an instance.
1127

1128
  @type disk: L{objects.Disk}
1129
  @param disk: the object describing the disk we should create
1130
  @type size: int
1131
  @param size: the size of the physical underlying device, in MiB
1132
  @type owner: str
1133
  @param owner: the name of the instance for which disk is created,
1134
      used for device cache data
1135
  @type on_primary: boolean
1136
  @param on_primary:  indicates if it is the primary node or not
1137
  @type info: string
1138
  @param info: string that will be sent to the physical device
1139
      creation, used for example to set (LVM) tags on LVs
1140

1141
  @return: the new unique_id of the device (this can sometime be
1142
      computed only after creation), or None. On secondary nodes,
1143
      it's not required to return anything.
1144

1145
  """
1146
  clist = []
1147
  if disk.children:
1148
    for child in disk.children:
1149
      try:
1150
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1151
      except errors.BlockDeviceError, err:
1152
        _Fail("Can't assemble device %s: %s", child, err)
1153
      if on_primary or disk.AssembleOnSecondary():
1154
        # we need the children open in case the device itself has to
1155
        # be assembled
1156
        try:
1157
          crdev.Open()
1158
        except errors.BlockDeviceError, err:
1159
          _Fail("Can't make child '%s' read-write: %s", child, err)
1160
      clist.append(crdev)
1161

    
1162
  try:
1163
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1164
  except errors.BlockDeviceError, err:
1165
    _Fail("Can't create block device: %s", err)
1166

    
1167
  if on_primary or disk.AssembleOnSecondary():
1168
    try:
1169
      device.Assemble()
1170
    except errors.BlockDeviceError, err:
1171
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1172
    device.SetSyncSpeed(constants.SYNC_SPEED)
1173
    if on_primary or disk.OpenOnSecondary():
1174
      try:
1175
        device.Open(force=True)
1176
      except errors.BlockDeviceError, err:
1177
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1178
    DevCacheManager.UpdateCache(device.dev_path, owner,
1179
                                on_primary, disk.iv_name)
1180

    
1181
  device.SetInfo(info)
1182

    
1183
  return device.unique_id
1184

    
1185

    
1186
def BlockdevRemove(disk):
1187
  """Remove a block device.
1188

1189
  @note: This is intended to be called recursively.
1190

1191
  @type disk: L{objects.Disk}
1192
  @param disk: the disk object we should remove
1193
  @rtype: boolean
1194
  @return: the success of the operation
1195

1196
  """
1197
  msgs = []
1198
  try:
1199
    rdev = _RecursiveFindBD(disk)
1200
  except errors.BlockDeviceError, err:
1201
    # probably can't attach
1202
    logging.info("Can't attach to device %s in remove", disk)
1203
    rdev = None
1204
  if rdev is not None:
1205
    r_path = rdev.dev_path
1206
    try:
1207
      rdev.Remove()
1208
    except errors.BlockDeviceError, err:
1209
      msgs.append(str(err))
1210
    if not msgs:
1211
      DevCacheManager.RemoveCache(r_path)
1212

    
1213
  if disk.children:
1214
    for child in disk.children:
1215
      try:
1216
        BlockdevRemove(child)
1217
      except RPCFail, err:
1218
        msgs.append(str(err))
1219

    
1220
  if msgs:
1221
    _Fail("; ".join(msgs))
1222

    
1223

    
1224
def _RecursiveAssembleBD(disk, owner, as_primary):
1225
  """Activate a block device for an instance.
1226

1227
  This is run on the primary and secondary nodes for an instance.
1228

1229
  @note: this function is called recursively.
1230

1231
  @type disk: L{objects.Disk}
1232
  @param disk: the disk we try to assemble
1233
  @type owner: str
1234
  @param owner: the name of the instance which owns the disk
1235
  @type as_primary: boolean
1236
  @param as_primary: if we should make the block device
1237
      read/write
1238

1239
  @return: the assembled device or None (in case no device
1240
      was assembled)
1241
  @raise errors.BlockDeviceError: in case there is an error
1242
      during the activation of the children or the device
1243
      itself
1244

1245
  """
1246
  children = []
1247
  if disk.children:
1248
    mcn = disk.ChildrenNeeded()
1249
    if mcn == -1:
1250
      mcn = 0 # max number of Nones allowed
1251
    else:
1252
      mcn = len(disk.children) - mcn # max number of Nones
1253
    for chld_disk in disk.children:
1254
      try:
1255
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1256
      except errors.BlockDeviceError, err:
1257
        if children.count(None) >= mcn:
1258
          raise
1259
        cdev = None
1260
        logging.error("Error in child activation (but continuing): %s",
1261
                      str(err))
1262
      children.append(cdev)
1263

    
1264
  if as_primary or disk.AssembleOnSecondary():
1265
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1266
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1267
    result = r_dev
1268
    if as_primary or disk.OpenOnSecondary():
1269
      r_dev.Open()
1270
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1271
                                as_primary, disk.iv_name)
1272

    
1273
  else:
1274
    result = True
1275
  return result
1276

    
1277

    
1278
def BlockdevAssemble(disk, owner, as_primary):
1279
  """Activate a block device for an instance.
1280

1281
  This is a wrapper over _RecursiveAssembleBD.
1282

1283
  @rtype: str or boolean
1284
  @return: a C{/dev/...} path for primary nodes, and
1285
      C{True} for secondary nodes
1286

1287
  """
1288
  try:
1289
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1290
    if isinstance(result, bdev.BlockDev):
1291
      result = result.dev_path
1292
  except errors.BlockDeviceError, err:
1293
    _Fail("Error while assembling disk: %s", err, exc=True)
1294

    
1295
  return result
1296

    
1297

    
1298
def BlockdevShutdown(disk):
1299
  """Shut down a block device.
1300

1301
  First, if the device is assembled (Attach() is successful), then
1302
  the device is shutdown. Then the children of the device are
1303
  shutdown.
1304

1305
  This function is called recursively. Note that we don't cache the
1306
  children or such, as oppossed to assemble, shutdown of different
1307
  devices doesn't require that the upper device was active.
1308

1309
  @type disk: L{objects.Disk}
1310
  @param disk: the description of the disk we should
1311
      shutdown
1312
  @rtype: None
1313

1314
  """
1315
  msgs = []
1316
  r_dev = _RecursiveFindBD(disk)
1317
  if r_dev is not None:
1318
    r_path = r_dev.dev_path
1319
    try:
1320
      r_dev.Shutdown()
1321
      DevCacheManager.RemoveCache(r_path)
1322
    except errors.BlockDeviceError, err:
1323
      msgs.append(str(err))
1324

    
1325
  if disk.children:
1326
    for child in disk.children:
1327
      try:
1328
        BlockdevShutdown(child)
1329
      except RPCFail, err:
1330
        msgs.append(str(err))
1331

    
1332
  if msgs:
1333
    _Fail("; ".join(msgs))
1334

    
1335

    
1336
def BlockdevAddchildren(parent_cdev, new_cdevs):
1337
  """Extend a mirrored block device.
1338

1339
  @type parent_cdev: L{objects.Disk}
1340
  @param parent_cdev: the disk to which we should add children
1341
  @type new_cdevs: list of L{objects.Disk}
1342
  @param new_cdevs: the list of children which we should add
1343
  @rtype: None
1344

1345
  """
1346
  parent_bdev = _RecursiveFindBD(parent_cdev)
1347
  if parent_bdev is None:
1348
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1349
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1350
  if new_bdevs.count(None) > 0:
1351
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1352
  parent_bdev.AddChildren(new_bdevs)
1353

    
1354

    
1355
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1356
  """Shrink a mirrored block device.
1357

1358
  @type parent_cdev: L{objects.Disk}
1359
  @param parent_cdev: the disk from which we should remove children
1360
  @type new_cdevs: list of L{objects.Disk}
1361
  @param new_cdevs: the list of children which we should remove
1362
  @rtype: None
1363

1364
  """
1365
  parent_bdev = _RecursiveFindBD(parent_cdev)
1366
  if parent_bdev is None:
1367
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1368
  devs = []
1369
  for disk in new_cdevs:
1370
    rpath = disk.StaticDevPath()
1371
    if rpath is None:
1372
      bd = _RecursiveFindBD(disk)
1373
      if bd is None:
1374
        _Fail("Can't find device %s while removing children", disk)
1375
      else:
1376
        devs.append(bd.dev_path)
1377
    else:
1378
      devs.append(rpath)
1379
  parent_bdev.RemoveChildren(devs)
1380

    
1381

    
1382
def BlockdevGetmirrorstatus(disks):
1383
  """Get the mirroring status of a list of devices.
1384

1385
  @type disks: list of L{objects.Disk}
1386
  @param disks: the list of disks which we should query
1387
  @rtype: disk
1388
  @return:
1389
      a list of (mirror_done, estimated_time) tuples, which
1390
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1391
  @raise errors.BlockDeviceError: if any of the disks cannot be
1392
      found
1393

1394
  """
1395
  stats = []
1396
  for dsk in disks:
1397
    rbd = _RecursiveFindBD(dsk)
1398
    if rbd is None:
1399
      _Fail("Can't find device %s", dsk)
1400

    
1401
    stats.append(rbd.CombinedSyncStatus())
1402

    
1403
  return stats
1404

    
1405

    
1406
def _RecursiveFindBD(disk):
1407
  """Check if a device is activated.
1408

1409
  If so, return information about the real device.
1410

1411
  @type disk: L{objects.Disk}
1412
  @param disk: the disk object we need to find
1413

1414
  @return: None if the device can't be found,
1415
      otherwise the device instance
1416

1417
  """
1418
  children = []
1419
  if disk.children:
1420
    for chdisk in disk.children:
1421
      children.append(_RecursiveFindBD(chdisk))
1422

    
1423
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1424

    
1425

    
1426
def BlockdevFind(disk):
1427
  """Check if a device is activated.
1428

1429
  If it is, return information about the real device.
1430

1431
  @type disk: L{objects.Disk}
1432
  @param disk: the disk to find
1433
  @rtype: None or objects.BlockDevStatus
1434
  @return: None if the disk cannot be found, otherwise a the current
1435
           information
1436

1437
  """
1438
  try:
1439
    rbd = _RecursiveFindBD(disk)
1440
  except errors.BlockDeviceError, err:
1441
    _Fail("Failed to find device: %s", err, exc=True)
1442

    
1443
  if rbd is None:
1444
    return None
1445

    
1446
  return rbd.GetSyncStatus()
1447

    
1448

    
1449
def BlockdevGetsize(disks):
1450
  """Computes the size of the given disks.
1451

1452
  If a disk is not found, returns None instead.
1453

1454
  @type disks: list of L{objects.Disk}
1455
  @param disks: the list of disk to compute the size for
1456
  @rtype: list
1457
  @return: list with elements None if the disk cannot be found,
1458
      otherwise the size
1459

1460
  """
1461
  result = []
1462
  for cf in disks:
1463
    try:
1464
      rbd = _RecursiveFindBD(cf)
1465
    except errors.BlockDeviceError, err:
1466
      result.append(None)
1467
      continue
1468
    if rbd is None:
1469
      result.append(None)
1470
    else:
1471
      result.append(rbd.GetActualSize())
1472
  return result
1473

    
1474

    
1475
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1476
  """Export a block device to a remote node.
1477

1478
  @type disk: L{objects.Disk}
1479
  @param disk: the description of the disk to export
1480
  @type dest_node: str
1481
  @param dest_node: the destination node to export to
1482
  @type dest_path: str
1483
  @param dest_path: the destination path on the target node
1484
  @type cluster_name: str
1485
  @param cluster_name: the cluster name, needed for SSH hostalias
1486
  @rtype: None
1487

1488
  """
1489
  real_disk = _RecursiveFindBD(disk)
1490
  if real_disk is None:
1491
    _Fail("Block device '%s' is not set up", disk)
1492

    
1493
  real_disk.Open()
1494

    
1495
  # the block size on the read dd is 1MiB to match our units
1496
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1497
                               "dd if=%s bs=1048576 count=%s",
1498
                               real_disk.dev_path, str(disk.size))
1499

    
1500
  # we set here a smaller block size as, due to ssh buffering, more
1501
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1502
  # device is not already there or we pass a wrong path; we use
1503
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1504
  # to not buffer too much memory; this means that at best, we flush
1505
  # every 64k, which will not be very fast
1506
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1507
                                " oflag=dsync", dest_path)
1508

    
1509
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1510
                                                   constants.GANETI_RUNAS,
1511
                                                   destcmd)
1512

    
1513
  # all commands have been checked, so we're safe to combine them
1514
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1515

    
1516
  result = utils.RunCmd(["bash", "-c", command])
1517

    
1518
  if result.failed:
1519
    _Fail("Disk copy command '%s' returned error: %s"
1520
          " output: %s", command, result.fail_reason, result.output)
1521

    
1522

    
1523
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1524
  """Write a file to the filesystem.
1525

1526
  This allows the master to overwrite(!) a file. It will only perform
1527
  the operation if the file belongs to a list of configuration files.
1528

1529
  @type file_name: str
1530
  @param file_name: the target file name
1531
  @type data: str
1532
  @param data: the new contents of the file
1533
  @type mode: int
1534
  @param mode: the mode to give the file (can be None)
1535
  @type uid: int
1536
  @param uid: the owner of the file (can be -1 for default)
1537
  @type gid: int
1538
  @param gid: the group of the file (can be -1 for default)
1539
  @type atime: float
1540
  @param atime: the atime to set on the file (can be None)
1541
  @type mtime: float
1542
  @param mtime: the mtime to set on the file (can be None)
1543
  @rtype: None
1544

1545
  """
1546
  if not os.path.isabs(file_name):
1547
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1548

    
1549
  if file_name not in _ALLOWED_UPLOAD_FILES:
1550
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1551
          file_name)
1552

    
1553
  raw_data = _Decompress(data)
1554

    
1555
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1556
                  atime=atime, mtime=mtime)
1557

    
1558

    
1559
def WriteSsconfFiles(values):
1560
  """Update all ssconf files.
1561

1562
  Wrapper around the SimpleStore.WriteFiles.
1563

1564
  """
1565
  ssconf.SimpleStore().WriteFiles(values)
1566

    
1567

    
1568
def _ErrnoOrStr(err):
1569
  """Format an EnvironmentError exception.
1570

1571
  If the L{err} argument has an errno attribute, it will be looked up
1572
  and converted into a textual C{E...} description. Otherwise the
1573
  string representation of the error will be returned.
1574

1575
  @type err: L{EnvironmentError}
1576
  @param err: the exception to format
1577

1578
  """
1579
  if hasattr(err, 'errno'):
1580
    detail = errno.errorcode[err.errno]
1581
  else:
1582
    detail = str(err)
1583
  return detail
1584

    
1585

    
1586
def _OSOndiskAPIVersion(name, os_dir):
1587
  """Compute and return the API version of a given OS.
1588

1589
  This function will try to read the API version of the OS given by
1590
  the 'name' parameter and residing in the 'os_dir' directory.
1591

1592
  @type name: str
1593
  @param name: the OS name we should look for
1594
  @type os_dir: str
1595
  @param os_dir: the directory inwhich we should look for the OS
1596
  @rtype: tuple
1597
  @return: tuple (status, data) with status denoting the validity and
1598
      data holding either the vaid versions or an error message
1599

1600
  """
1601
  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1602

    
1603
  try:
1604
    st = os.stat(api_file)
1605
  except EnvironmentError, err:
1606
    return False, ("Required file '%s' not found under path %s: %s" %
1607
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1608

    
1609
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1610
    return False, ("File '%s' in %s is not a regular file" %
1611
                   (constants.OS_API_FILE, os_dir))
1612

    
1613
  try:
1614
    api_versions = utils.ReadFile(api_file).splitlines()
1615
  except EnvironmentError, err:
1616
    return False, ("Error while reading the API version file at %s: %s" %
1617
                   (api_file, _ErrnoOrStr(err)))
1618

    
1619
  try:
1620
    api_versions = [int(version.strip()) for version in api_versions]
1621
  except (TypeError, ValueError), err:
1622
    return False, ("API version(s) can't be converted to integer: %s" %
1623
                   str(err))
1624

    
1625
  return True, api_versions
1626

    
1627

    
1628
def DiagnoseOS(top_dirs=None):
1629
  """Compute the validity for all OSes.
1630

1631
  @type top_dirs: list
1632
  @param top_dirs: the list of directories in which to
1633
      search (if not given defaults to
1634
      L{constants.OS_SEARCH_PATH})
1635
  @rtype: list of L{objects.OS}
1636
  @return: a list of tuples (name, path, status, diagnose)
1637
      for all (potential) OSes under all search paths, where:
1638
          - name is the (potential) OS name
1639
          - path is the full path to the OS
1640
          - status True/False is the validity of the OS
1641
          - diagnose is the error message for an invalid OS, otherwise empty
1642

1643
  """
1644
  if top_dirs is None:
1645
    top_dirs = constants.OS_SEARCH_PATH
1646

    
1647
  result = []
1648
  for dir_name in top_dirs:
1649
    if os.path.isdir(dir_name):
1650
      try:
1651
        f_names = utils.ListVisibleFiles(dir_name)
1652
      except EnvironmentError, err:
1653
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1654
        break
1655
      for name in f_names:
1656
        os_path = os.path.sep.join([dir_name, name])
1657
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1658
        if status:
1659
          diagnose = ""
1660
        else:
1661
          diagnose = os_inst
1662
        result.append((name, os_path, status, diagnose))
1663

    
1664
  return result
1665

    
1666

    
1667
def _TryOSFromDisk(name, base_dir=None):
1668
  """Create an OS instance from disk.
1669

1670
  This function will return an OS instance if the given name is a
1671
  valid OS name.
1672

1673
  @type base_dir: string
1674
  @keyword base_dir: Base directory containing OS installations.
1675
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1676
  @rtype: tuple
1677
  @return: success and either the OS instance if we find a valid one,
1678
      or error message
1679

1680
  """
1681
  if base_dir is None:
1682
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1683
    if os_dir is None:
1684
      return False, "Directory for OS %s not found in search path" % name
1685
  else:
1686
    os_dir = os.path.sep.join([base_dir, name])
1687

    
1688
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1689
  if not status:
1690
    # push the error up
1691
    return status, api_versions
1692

    
1693
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1694
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1695
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1696

    
1697
  # OS Files dictionary, we will populate it with the absolute path names
1698
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1699

    
1700
  if max(api_versions) >= constants.OS_API_V15:
1701
    os_files[constants.OS_VARIANTS_FILE] = ''
1702

    
1703
  for name in os_files:
1704
    os_files[name] = os.path.sep.join([os_dir, name])
1705

    
1706
    try:
1707
      st = os.stat(os_files[name])
1708
    except EnvironmentError, err:
1709
      return False, ("File '%s' under path '%s' is missing (%s)" %
1710
                     (name, os_dir, _ErrnoOrStr(err)))
1711

    
1712
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1713
      return False, ("File '%s' under path '%s' is not a regular file" %
1714
                     (name, os_dir))
1715

    
1716
    if name in constants.OS_SCRIPTS:
1717
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1718
        return False, ("File '%s' under path '%s' is not executable" %
1719
                       (name, os_dir))
1720

    
1721
  variants = None
1722
  if constants.OS_VARIANTS_FILE in os_files:
1723
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1724
    try:
1725
      variants = utils.ReadFile(variants_file).splitlines()
1726
    except EnvironmentError, err:
1727
      return False, ("Error while reading the OS variants file at %s: %s" %
1728
                     (variants_file, _ErrnoOrStr(err)))
1729
    if not variants:
1730
      return False, ("No supported os variant found")
1731

    
1732
  os_obj = objects.OS(name=name, path=os_dir,
1733
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1734
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1735
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1736
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1737
                      supported_variants=variants,
1738
                      api_versions=api_versions)
1739
  return True, os_obj
1740

    
1741

    
1742
def OSFromDisk(name, base_dir=None):
1743
  """Create an OS instance from disk.
1744

1745
  This function will return an OS instance if the given name is a
1746
  valid OS name. Otherwise, it will raise an appropriate
1747
  L{RPCFail} exception, detailing why this is not a valid OS.
1748

1749
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1750
  an exception but returns true/false status data.
1751

1752
  @type base_dir: string
1753
  @keyword base_dir: Base directory containing OS installations.
1754
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1755
  @rtype: L{objects.OS}
1756
  @return: the OS instance if we find a valid one
1757
  @raise RPCFail: if we don't find a valid OS
1758

1759
  """
1760
  status, payload = _TryOSFromDisk(name, base_dir)
1761

    
1762
  if not status:
1763
    _Fail(payload)
1764

    
1765
  return payload
1766

    
1767

    
1768
def OSEnvironment(instance, os, debug=0):
1769
  """Calculate the environment for an os script.
1770

1771
  @type instance: L{objects.Instance}
1772
  @param instance: target instance for the os script run
1773
  @type os: L{objects.OS}
1774
  @param os: operating system for which the environment is being built
1775
  @type debug: integer
1776
  @param debug: debug level (0 or 1, for OS Api 10)
1777
  @rtype: dict
1778
  @return: dict of environment variables
1779
  @raise errors.BlockDeviceError: if the block device
1780
      cannot be found
1781

1782
  """
1783
  result = {}
1784
  api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1785
  result['OS_API_VERSION'] = '%d' % api_version
1786
  result['INSTANCE_NAME'] = instance.name
1787
  result['INSTANCE_OS'] = instance.os
1788
  result['HYPERVISOR'] = instance.hypervisor
1789
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1790
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1791
  result['DEBUG_LEVEL'] = '%d' % debug
1792
  for idx, disk in enumerate(instance.disks):
1793
    real_disk = _RecursiveFindBD(disk)
1794
    if real_disk is None:
1795
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1796
                                    str(disk))
1797
    real_disk.Open()
1798
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1799
    result['DISK_%d_ACCESS' % idx] = disk.mode
1800
    if constants.HV_DISK_TYPE in instance.hvparams:
1801
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1802
        instance.hvparams[constants.HV_DISK_TYPE]
1803
    if disk.dev_type in constants.LDS_BLOCK:
1804
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1805
    elif disk.dev_type == constants.LD_FILE:
1806
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1807
        'file:%s' % disk.physical_id[0]
1808
  for idx, nic in enumerate(instance.nics):
1809
    result['NIC_%d_MAC' % idx] = nic.mac
1810
    if nic.ip:
1811
      result['NIC_%d_IP' % idx] = nic.ip
1812
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1813
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1814
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1815
    if nic.nicparams[constants.NIC_LINK]:
1816
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1817
    if constants.HV_NIC_TYPE in instance.hvparams:
1818
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1819
        instance.hvparams[constants.HV_NIC_TYPE]
1820

    
1821
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1822
    for key, value in source.items():
1823
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1824

    
1825
  return result
1826

    
1827
def BlockdevGrow(disk, amount):
1828
  """Grow a stack of block devices.
1829

1830
  This function is called recursively, with the childrens being the
1831
  first ones to resize.
1832

1833
  @type disk: L{objects.Disk}
1834
  @param disk: the disk to be grown
1835
  @rtype: (status, result)
1836
  @return: a tuple with the status of the operation
1837
      (True/False), and the errors message if status
1838
      is False
1839

1840
  """
1841
  r_dev = _RecursiveFindBD(disk)
1842
  if r_dev is None:
1843
    _Fail("Cannot find block device %s", disk)
1844

    
1845
  try:
1846
    r_dev.Grow(amount)
1847
  except errors.BlockDeviceError, err:
1848
    _Fail("Failed to grow block device: %s", err, exc=True)
1849

    
1850

    
1851
def BlockdevSnapshot(disk):
1852
  """Create a snapshot copy of a block device.
1853

1854
  This function is called recursively, and the snapshot is actually created
1855
  just for the leaf lvm backend device.
1856

1857
  @type disk: L{objects.Disk}
1858
  @param disk: the disk to be snapshotted
1859
  @rtype: string
1860
  @return: snapshot disk path
1861

1862
  """
1863
  if disk.children:
1864
    if len(disk.children) == 1:
1865
      # only one child, let's recurse on it
1866
      return BlockdevSnapshot(disk.children[0])
1867
    else:
1868
      # more than one child, choose one that matches
1869
      for child in disk.children:
1870
        if child.size == disk.size:
1871
          # return implies breaking the loop
1872
          return BlockdevSnapshot(child)
1873
  elif disk.dev_type == constants.LD_LV:
1874
    r_dev = _RecursiveFindBD(disk)
1875
    if r_dev is not None:
1876
      # let's stay on the safe side and ask for the full size, for now
1877
      return r_dev.Snapshot(disk.size)
1878
    else:
1879
      _Fail("Cannot find block device %s", disk)
1880
  else:
1881
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1882
          disk.unique_id, disk.dev_type)
1883

    
1884

    
1885
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1886
  """Export a block device snapshot to a remote node.
1887

1888
  @type disk: L{objects.Disk}
1889
  @param disk: the description of the disk to export
1890
  @type dest_node: str
1891
  @param dest_node: the destination node to export to
1892
  @type instance: L{objects.Instance}
1893
  @param instance: the instance object to whom the disk belongs
1894
  @type cluster_name: str
1895
  @param cluster_name: the cluster name, needed for SSH hostalias
1896
  @type idx: int
1897
  @param idx: the index of the disk in the instance's disk list,
1898
      used to export to the OS scripts environment
1899
  @rtype: None
1900

1901
  """
1902
  inst_os = OSFromDisk(instance.os)
1903
  export_env = OSEnvironment(instance, inst_os)
1904

    
1905
  export_script = inst_os.export_script
1906

    
1907
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1908
                                     instance.name, int(time.time()))
1909
  if not os.path.exists(constants.LOG_OS_DIR):
1910
    os.mkdir(constants.LOG_OS_DIR, 0750)
1911
  real_disk = _RecursiveFindBD(disk)
1912
  if real_disk is None:
1913
    _Fail("Block device '%s' is not set up", disk)
1914

    
1915
  real_disk.Open()
1916

    
1917
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1918
  export_env['EXPORT_INDEX'] = str(idx)
1919

    
1920
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1921
  destfile = disk.physical_id[1]
1922

    
1923
  # the target command is built out of three individual commands,
1924
  # which are joined by pipes; we check each individual command for
1925
  # valid parameters
1926
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1927
                               inst_os.path, export_script, logfile)
1928

    
1929
  comprcmd = "gzip"
1930

    
1931
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1932
                                destdir, destdir, destfile)
1933
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1934
                                                   constants.GANETI_RUNAS,
1935
                                                   destcmd)
1936

    
1937
  # all commands have been checked, so we're safe to combine them
1938
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1939

    
1940
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1941

    
1942
  if result.failed:
1943
    _Fail("OS snapshot export command '%s' returned error: %s"
1944
          " output: %s", command, result.fail_reason, result.output)
1945

    
1946

    
1947
def FinalizeExport(instance, snap_disks):
1948
  """Write out the export configuration information.
1949

1950
  @type instance: L{objects.Instance}
1951
  @param instance: the instance which we export, used for
1952
      saving configuration
1953
  @type snap_disks: list of L{objects.Disk}
1954
  @param snap_disks: list of snapshot block devices, which
1955
      will be used to get the actual name of the dump file
1956

1957
  @rtype: None
1958

1959
  """
1960
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1961
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1962

    
1963
  config = objects.SerializableConfigParser()
1964

    
1965
  config.add_section(constants.INISECT_EXP)
1966
  config.set(constants.INISECT_EXP, 'version', '0')
1967
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1968
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1969
  config.set(constants.INISECT_EXP, 'os', instance.os)
1970
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1971

    
1972
  config.add_section(constants.INISECT_INS)
1973
  config.set(constants.INISECT_INS, 'name', instance.name)
1974
  config.set(constants.INISECT_INS, 'memory', '%d' %
1975
             instance.beparams[constants.BE_MEMORY])
1976
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1977
             instance.beparams[constants.BE_VCPUS])
1978
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1979

    
1980
  nic_total = 0
1981
  for nic_count, nic in enumerate(instance.nics):
1982
    nic_total += 1
1983
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1984
               nic_count, '%s' % nic.mac)
1985
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1986
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1987
               '%s' % nic.bridge)
1988
  # TODO: redundant: on load can read nics until it doesn't exist
1989
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1990

    
1991
  disk_total = 0
1992
  for disk_count, disk in enumerate(snap_disks):
1993
    if disk:
1994
      disk_total += 1
1995
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1996
                 ('%s' % disk.iv_name))
1997
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1998
                 ('%s' % disk.physical_id[1]))
1999
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2000
                 ('%d' % disk.size))
2001

    
2002
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2003

    
2004
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2005
                  data=config.Dumps())
2006
  shutil.rmtree(finaldestdir, True)
2007
  shutil.move(destdir, finaldestdir)
2008

    
2009

    
2010
def ExportInfo(dest):
2011
  """Get export configuration information.
2012

2013
  @type dest: str
2014
  @param dest: directory containing the export
2015

2016
  @rtype: L{objects.SerializableConfigParser}
2017
  @return: a serializable config file containing the
2018
      export info
2019

2020
  """
2021
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2022

    
2023
  config = objects.SerializableConfigParser()
2024
  config.read(cff)
2025

    
2026
  if (not config.has_section(constants.INISECT_EXP) or
2027
      not config.has_section(constants.INISECT_INS)):
2028
    _Fail("Export info file doesn't have the required fields")
2029

    
2030
  return config.Dumps()
2031

    
2032

    
2033
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2034
  """Import an os image into an instance.
2035

2036
  @type instance: L{objects.Instance}
2037
  @param instance: instance to import the disks into
2038
  @type src_node: string
2039
  @param src_node: source node for the disk images
2040
  @type src_images: list of string
2041
  @param src_images: absolute paths of the disk images
2042
  @rtype: list of boolean
2043
  @return: each boolean represent the success of importing the n-th disk
2044

2045
  """
2046
  inst_os = OSFromDisk(instance.os)
2047
  import_env = OSEnvironment(instance, inst_os)
2048
  import_script = inst_os.import_script
2049

    
2050
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2051
                                        instance.name, int(time.time()))
2052
  if not os.path.exists(constants.LOG_OS_DIR):
2053
    os.mkdir(constants.LOG_OS_DIR, 0750)
2054

    
2055
  comprcmd = "gunzip"
2056
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2057
                               import_script, logfile)
2058

    
2059
  final_result = []
2060
  for idx, image in enumerate(src_images):
2061
    if image:
2062
      destcmd = utils.BuildShellCmd('cat %s', image)
2063
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2064
                                                       constants.GANETI_RUNAS,
2065
                                                       destcmd)
2066
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2067
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2068
      import_env['IMPORT_INDEX'] = str(idx)
2069
      result = utils.RunCmd(command, env=import_env)
2070
      if result.failed:
2071
        logging.error("Disk import command '%s' returned error: %s"
2072
                      " output: %s", command, result.fail_reason,
2073
                      result.output)
2074
        final_result.append("error importing disk %d: %s, %s" %
2075
                            (idx, result.fail_reason, result.output[-100]))
2076

    
2077
  if final_result:
2078
    _Fail("; ".join(final_result), log=False)
2079

    
2080

    
2081
def ListExports():
2082
  """Return a list of exports currently available on this machine.
2083

2084
  @rtype: list
2085
  @return: list of the exports
2086

2087
  """
2088
  if os.path.isdir(constants.EXPORT_DIR):
2089
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2090
  else:
2091
    _Fail("No exports directory")
2092

    
2093

    
2094
def RemoveExport(export):
2095
  """Remove an existing export from the node.
2096

2097
  @type export: str
2098
  @param export: the name of the export to remove
2099
  @rtype: None
2100

2101
  """
2102
  target = os.path.join(constants.EXPORT_DIR, export)
2103

    
2104
  try:
2105
    shutil.rmtree(target)
2106
  except EnvironmentError, err:
2107
    _Fail("Error while removing the export: %s", err, exc=True)
2108

    
2109

    
2110
def BlockdevRename(devlist):
2111
  """Rename a list of block devices.
2112

2113
  @type devlist: list of tuples
2114
  @param devlist: list of tuples of the form  (disk,
2115
      new_logical_id, new_physical_id); disk is an
2116
      L{objects.Disk} object describing the current disk,
2117
      and new logical_id/physical_id is the name we
2118
      rename it to
2119
  @rtype: boolean
2120
  @return: True if all renames succeeded, False otherwise
2121

2122
  """
2123
  msgs = []
2124
  result = True
2125
  for disk, unique_id in devlist:
2126
    dev = _RecursiveFindBD(disk)
2127
    if dev is None:
2128
      msgs.append("Can't find device %s in rename" % str(disk))
2129
      result = False
2130
      continue
2131
    try:
2132
      old_rpath = dev.dev_path
2133
      dev.Rename(unique_id)
2134
      new_rpath = dev.dev_path
2135
      if old_rpath != new_rpath:
2136
        DevCacheManager.RemoveCache(old_rpath)
2137
        # FIXME: we should add the new cache information here, like:
2138
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2139
        # but we don't have the owner here - maybe parse from existing
2140
        # cache? for now, we only lose lvm data when we rename, which
2141
        # is less critical than DRBD or MD
2142
    except errors.BlockDeviceError, err:
2143
      msgs.append("Can't rename device '%s' to '%s': %s" %
2144
                  (dev, unique_id, err))
2145
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2146
      result = False
2147
  if not result:
2148
    _Fail("; ".join(msgs))
2149

    
2150

    
2151
def _TransformFileStorageDir(file_storage_dir):
2152
  """Checks whether given file_storage_dir is valid.
2153

2154
  Checks wheter the given file_storage_dir is within the cluster-wide
2155
  default file_storage_dir stored in SimpleStore. Only paths under that
2156
  directory are allowed.
2157

2158
  @type file_storage_dir: str
2159
  @param file_storage_dir: the path to check
2160

2161
  @return: the normalized path if valid, None otherwise
2162

2163
  """
2164
  cfg = _GetConfig()
2165
  file_storage_dir = os.path.normpath(file_storage_dir)
2166
  base_file_storage_dir = cfg.GetFileStorageDir()
2167
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2168
      base_file_storage_dir):
2169
    _Fail("File storage directory '%s' is not under base file"
2170
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2171
  return file_storage_dir
2172

    
2173

    
2174
def CreateFileStorageDir(file_storage_dir):
2175
  """Create file storage directory.
2176

2177
  @type file_storage_dir: str
2178
  @param file_storage_dir: directory to create
2179

2180
  @rtype: tuple
2181
  @return: tuple with first element a boolean indicating wheter dir
2182
      creation was successful or not
2183

2184
  """
2185
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2186
  if os.path.exists(file_storage_dir):
2187
    if not os.path.isdir(file_storage_dir):
2188
      _Fail("Specified storage dir '%s' is not a directory",
2189
            file_storage_dir)
2190
  else:
2191
    try:
2192
      os.makedirs(file_storage_dir, 0750)
2193
    except OSError, err:
2194
      _Fail("Cannot create file storage directory '%s': %s",
2195
            file_storage_dir, err, exc=True)
2196

    
2197

    
2198
def RemoveFileStorageDir(file_storage_dir):
2199
  """Remove file storage directory.
2200

2201
  Remove it only if it's empty. If not log an error and return.
2202

2203
  @type file_storage_dir: str
2204
  @param file_storage_dir: the directory we should cleanup
2205
  @rtype: tuple (success,)
2206
  @return: tuple of one element, C{success}, denoting
2207
      whether the operation was successful
2208

2209
  """
2210
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2211
  if os.path.exists(file_storage_dir):
2212
    if not os.path.isdir(file_storage_dir):
2213
      _Fail("Specified Storage directory '%s' is not a directory",
2214
            file_storage_dir)
2215
    # deletes dir only if empty, otherwise we want to fail the rpc call
2216
    try:
2217
      os.rmdir(file_storage_dir)
2218
    except OSError, err:
2219
      _Fail("Cannot remove file storage directory '%s': %s",
2220
            file_storage_dir, err)
2221

    
2222

    
2223
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2224
  """Rename the file storage directory.
2225

2226
  @type old_file_storage_dir: str
2227
  @param old_file_storage_dir: the current path
2228
  @type new_file_storage_dir: str
2229
  @param new_file_storage_dir: the name we should rename to
2230
  @rtype: tuple (success,)
2231
  @return: tuple of one element, C{success}, denoting
2232
      whether the operation was successful
2233

2234
  """
2235
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2236
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2237
  if not os.path.exists(new_file_storage_dir):
2238
    if os.path.isdir(old_file_storage_dir):
2239
      try:
2240
        os.rename(old_file_storage_dir, new_file_storage_dir)
2241
      except OSError, err:
2242
        _Fail("Cannot rename '%s' to '%s': %s",
2243
              old_file_storage_dir, new_file_storage_dir, err)
2244
    else:
2245
      _Fail("Specified storage dir '%s' is not a directory",
2246
            old_file_storage_dir)
2247
  else:
2248
    if os.path.exists(old_file_storage_dir):
2249
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2250
            old_file_storage_dir, new_file_storage_dir)
2251

    
2252

    
2253
def _EnsureJobQueueFile(file_name):
2254
  """Checks whether the given filename is in the queue directory.
2255

2256
  @type file_name: str
2257
  @param file_name: the file name we should check
2258
  @rtype: None
2259
  @raises RPCFail: if the file is not valid
2260

2261
  """
2262
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2263
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2264

    
2265
  if not result:
2266
    _Fail("Passed job queue file '%s' does not belong to"
2267
          " the queue directory '%s'", file_name, queue_dir)
2268

    
2269

    
2270
def JobQueueUpdate(file_name, content):
2271
  """Updates a file in the queue directory.
2272

2273
  This is just a wrapper over L{utils.WriteFile}, with proper
2274
  checking.
2275

2276
  @type file_name: str
2277
  @param file_name: the job file name
2278
  @type content: str
2279
  @param content: the new job contents
2280
  @rtype: boolean
2281
  @return: the success of the operation
2282

2283
  """
2284
  _EnsureJobQueueFile(file_name)
2285

    
2286
  # Write and replace the file atomically
2287
  utils.WriteFile(file_name, data=_Decompress(content))
2288

    
2289

    
2290
def JobQueueRename(old, new):
2291
  """Renames a job queue file.
2292

2293
  This is just a wrapper over os.rename with proper checking.
2294

2295
  @type old: str
2296
  @param old: the old (actual) file name
2297
  @type new: str
2298
  @param new: the desired file name
2299
  @rtype: tuple
2300
  @return: the success of the operation and payload
2301

2302
  """
2303
  _EnsureJobQueueFile(old)
2304
  _EnsureJobQueueFile(new)
2305

    
2306
  utils.RenameFile(old, new, mkdir=True)
2307

    
2308

    
2309
def JobQueueSetDrainFlag(drain_flag):
2310
  """Set the drain flag for the queue.
2311

2312
  This will set or unset the queue drain flag.
2313

2314
  @type drain_flag: boolean
2315
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2316
  @rtype: truple
2317
  @return: always True, None
2318
  @warning: the function always returns True
2319

2320
  """
2321
  if drain_flag:
2322
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2323
  else:
2324
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2325

    
2326

    
2327
def BlockdevClose(instance_name, disks):
2328
  """Closes the given block devices.
2329

2330
  This means they will be switched to secondary mode (in case of
2331
  DRBD).
2332

2333
  @param instance_name: if the argument is not empty, the symlinks
2334
      of this instance will be removed
2335
  @type disks: list of L{objects.Disk}
2336
  @param disks: the list of disks to be closed
2337
  @rtype: tuple (success, message)
2338
  @return: a tuple of success and message, where success
2339
      indicates the succes of the operation, and message
2340
      which will contain the error details in case we
2341
      failed
2342

2343
  """
2344
  bdevs = []
2345
  for cf in disks:
2346
    rd = _RecursiveFindBD(cf)
2347
    if rd is None:
2348
      _Fail("Can't find device %s", cf)
2349
    bdevs.append(rd)
2350

    
2351
  msg = []
2352
  for rd in bdevs:
2353
    try:
2354
      rd.Close()
2355
    except errors.BlockDeviceError, err:
2356
      msg.append(str(err))
2357
  if msg:
2358
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2359
  else:
2360
    if instance_name:
2361
      _RemoveBlockDevLinks(instance_name, disks)
2362

    
2363

    
2364
def ValidateHVParams(hvname, hvparams):
2365
  """Validates the given hypervisor parameters.
2366

2367
  @type hvname: string
2368
  @param hvname: the hypervisor name
2369
  @type hvparams: dict
2370
  @param hvparams: the hypervisor parameters to be validated
2371
  @rtype: None
2372

2373
  """
2374
  try:
2375
    hv_type = hypervisor.GetHypervisor(hvname)
2376
    hv_type.ValidateParameters(hvparams)
2377
  except errors.HypervisorError, err:
2378
    _Fail(str(err), log=False)
2379

    
2380

    
2381
def DemoteFromMC():
2382
  """Demotes the current node from master candidate role.
2383

2384
  """
2385
  # try to ensure we're not the master by mistake
2386
  master, myself = ssconf.GetMasterAndMyself()
2387
  if master == myself:
2388
    _Fail("ssconf status shows I'm the master node, will not demote")
2389
  pid_file = utils.DaemonPidFileName(constants.MASTERD)
2390
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2391
    _Fail("The master daemon is running, will not demote")
2392
  try:
2393
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2394
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2395
  except EnvironmentError, err:
2396
    if err.errno != errno.ENOENT:
2397
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2398
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2399

    
2400

    
2401
def _FindDisks(nodes_ip, disks):
2402
  """Sets the physical ID on disks and returns the block devices.
2403

2404
  """
2405
  # set the correct physical ID
2406
  my_name = utils.HostInfo().name
2407
  for cf in disks:
2408
    cf.SetPhysicalID(my_name, nodes_ip)
2409

    
2410
  bdevs = []
2411

    
2412
  for cf in disks:
2413
    rd = _RecursiveFindBD(cf)
2414
    if rd is None:
2415
      _Fail("Can't find device %s", cf)
2416
    bdevs.append(rd)
2417
  return bdevs
2418

    
2419

    
2420
def DrbdDisconnectNet(nodes_ip, disks):
2421
  """Disconnects the network on a list of drbd devices.
2422

2423
  """
2424
  bdevs = _FindDisks(nodes_ip, disks)
2425

    
2426
  # disconnect disks
2427
  for rd in bdevs:
2428
    try:
2429
      rd.DisconnectNet()
2430
    except errors.BlockDeviceError, err:
2431
      _Fail("Can't change network configuration to standalone mode: %s",
2432
            err, exc=True)
2433

    
2434

    
2435
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2436
  """Attaches the network on a list of drbd devices.
2437

2438
  """
2439
  bdevs = _FindDisks(nodes_ip, disks)
2440

    
2441
  if multimaster:
2442
    for idx, rd in enumerate(bdevs):
2443
      try:
2444
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2445
      except EnvironmentError, err:
2446
        _Fail("Can't create symlink: %s", err)
2447
  # reconnect disks, switch to new master configuration and if
2448
  # needed primary mode
2449
  for rd in bdevs:
2450
    try:
2451
      rd.AttachNet(multimaster)
2452
    except errors.BlockDeviceError, err:
2453
      _Fail("Can't change network configuration: %s", err)
2454
  # wait until the disks are connected; we need to retry the re-attach
2455
  # if the device becomes standalone, as this might happen if the one
2456
  # node disconnects and reconnects in a different mode before the
2457
  # other node reconnects; in this case, one or both of the nodes will
2458
  # decide it has wrong configuration and switch to standalone
2459
  RECONNECT_TIMEOUT = 2 * 60
2460
  sleep_time = 0.100 # start with 100 miliseconds
2461
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2462
  while time.time() < timeout_limit:
2463
    all_connected = True
2464
    for rd in bdevs:
2465
      stats = rd.GetProcStatus()
2466
      if not (stats.is_connected or stats.is_in_resync):
2467
        all_connected = False
2468
      if stats.is_standalone:
2469
        # peer had different config info and this node became
2470
        # standalone, even though this should not happen with the
2471
        # new staged way of changing disk configs
2472
        try:
2473
          rd.AttachNet(multimaster)
2474
        except errors.BlockDeviceError, err:
2475
          _Fail("Can't change network configuration: %s", err)
2476
    if all_connected:
2477
      break
2478
    time.sleep(sleep_time)
2479
    sleep_time = min(5, sleep_time * 1.5)
2480
  if not all_connected:
2481
    _Fail("Timeout in disk reconnecting")
2482
  if multimaster:
2483
    # change to primary mode
2484
    for rd in bdevs:
2485
      try:
2486
        rd.Open()
2487
      except errors.BlockDeviceError, err:
2488
        _Fail("Can't change to primary mode: %s", err)
2489

    
2490

    
2491
def DrbdWaitSync(nodes_ip, disks):
2492
  """Wait until DRBDs have synchronized.
2493

2494
  """
2495
  bdevs = _FindDisks(nodes_ip, disks)
2496

    
2497
  min_resync = 100
2498
  alldone = True
2499
  for rd in bdevs:
2500
    stats = rd.GetProcStatus()
2501
    if not (stats.is_connected or stats.is_in_resync):
2502
      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2503
    alldone = alldone and (not stats.is_in_resync)
2504
    if stats.sync_percent is not None:
2505
      min_resync = min(min_resync, stats.sync_percent)
2506

    
2507
  return (alldone, min_resync)
2508

    
2509

    
2510
def PowercycleNode(hypervisor_type):
2511
  """Hard-powercycle the node.
2512

2513
  Because we need to return first, and schedule the powercycle in the
2514
  background, we won't be able to report failures nicely.
2515

2516
  """
2517
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2518
  try:
2519
    pid = os.fork()
2520
  except OSError:
2521
    # if we can't fork, we'll pretend that we're in the child process
2522
    pid = 0
2523
  if pid > 0:
2524
    return "Reboot scheduled in 5 seconds"
2525
  time.sleep(5)
2526
  hyper.PowercycleNode()
2527

    
2528

    
2529
class HooksRunner(object):
2530
  """Hook runner.
2531

2532
  This class is instantiated on the node side (ganeti-noded) and not
2533
  on the master side.
2534

2535
  """
2536
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2537

    
2538
  def __init__(self, hooks_base_dir=None):
2539
    """Constructor for hooks runner.
2540

2541
    @type hooks_base_dir: str or None
2542
    @param hooks_base_dir: if not None, this overrides the
2543
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2544

2545
    """
2546
    if hooks_base_dir is None:
2547
      hooks_base_dir = constants.HOOKS_BASE_DIR
2548
    self._BASE_DIR = hooks_base_dir
2549

    
2550
  @staticmethod
2551
  def ExecHook(script, env):
2552
    """Exec one hook script.
2553

2554
    @type script: str
2555
    @param script: the full path to the script
2556
    @type env: dict
2557
    @param env: the environment with which to exec the script
2558
    @rtype: tuple (success, message)
2559
    @return: a tuple of success and message, where success
2560
        indicates the succes of the operation, and message
2561
        which will contain the error details in case we
2562
        failed
2563

2564
    """
2565
    # exec the process using subprocess and log the output
2566
    fdstdin = None
2567
    try:
2568
      fdstdin = open("/dev/null", "r")
2569
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2570
                               stderr=subprocess.STDOUT, close_fds=True,
2571
                               shell=False, cwd="/", env=env)
2572
      output = ""
2573
      try:
2574
        output = child.stdout.read(4096)
2575
        child.stdout.close()
2576
      except EnvironmentError, err:
2577
        output += "Hook script error: %s" % str(err)
2578

    
2579
      while True:
2580
        try:
2581
          result = child.wait()
2582
          break
2583
        except EnvironmentError, err:
2584
          if err.errno == errno.EINTR:
2585
            continue
2586
          raise
2587
    finally:
2588
      # try not to leak fds
2589
      for fd in (fdstdin, ):
2590
        if fd is not None:
2591
          try:
2592
            fd.close()
2593
          except EnvironmentError, err:
2594
            # just log the error
2595
            #logging.exception("Error while closing fd %s", fd)
2596
            pass
2597

    
2598
    return result == 0, utils.SafeEncode(output.strip())
2599

    
2600
  def RunHooks(self, hpath, phase, env):
2601
    """Run the scripts in the hooks directory.
2602

2603
    @type hpath: str
2604
    @param hpath: the path to the hooks directory which
2605
        holds the scripts
2606
    @type phase: str
2607
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2608
        L{constants.HOOKS_PHASE_POST}
2609
    @type env: dict
2610
    @param env: dictionary with the environment for the hook
2611
    @rtype: list
2612
    @return: list of 3-element tuples:
2613
      - script path
2614
      - script result, either L{constants.HKR_SUCCESS} or
2615
        L{constants.HKR_FAIL}
2616
      - output of the script
2617

2618
    @raise errors.ProgrammerError: for invalid input
2619
        parameters
2620

2621
    """
2622
    if phase == constants.HOOKS_PHASE_PRE:
2623
      suffix = "pre"
2624
    elif phase == constants.HOOKS_PHASE_POST:
2625
      suffix = "post"
2626
    else:
2627
      _Fail("Unknown hooks phase '%s'", phase)
2628

    
2629
    rr = []
2630

    
2631
    subdir = "%s-%s.d" % (hpath, suffix)
2632
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2633
    try:
2634
      dir_contents = utils.ListVisibleFiles(dir_name)
2635
    except OSError:
2636
      # FIXME: must log output in case of failures
2637
      return rr
2638

    
2639
    # we use the standard python sort order,
2640
    # so 00name is the recommended naming scheme
2641
    dir_contents.sort()
2642
    for relname in dir_contents:
2643
      fname = os.path.join(dir_name, relname)
2644
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2645
          self.RE_MASK.match(relname) is not None):
2646
        rrval = constants.HKR_SKIP
2647
        output = ""
2648
      else:
2649
        result, output = self.ExecHook(fname, env)
2650
        if not result:
2651
          rrval = constants.HKR_FAIL
2652
        else:
2653
          rrval = constants.HKR_SUCCESS
2654
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2655

    
2656
    return rr
2657

    
2658

    
2659
class IAllocatorRunner(object):
2660
  """IAllocator runner.
2661

2662
  This class is instantiated on the node side (ganeti-noded) and not on
2663
  the master side.
2664

2665
  """
2666
  def Run(self, name, idata):
2667
    """Run an iallocator script.
2668

2669
    @type name: str
2670
    @param name: the iallocator script name
2671
    @type idata: str
2672
    @param idata: the allocator input data
2673

2674
    @rtype: tuple
2675
    @return: two element tuple of:
2676
       - status
2677
       - either error message or stdout of allocator (for success)
2678

2679
    """
2680
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2681
                                  os.path.isfile)
2682
    if alloc_script is None:
2683
      _Fail("iallocator module '%s' not found in the search path", name)
2684

    
2685
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2686
    try:
2687
      os.write(fd, idata)
2688
      os.close(fd)
2689
      result = utils.RunCmd([alloc_script, fin_name])
2690
      if result.failed:
2691
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2692
              name, result.fail_reason, result.output)
2693
    finally:
2694
      os.unlink(fin_name)
2695

    
2696
    return result.stdout
2697

    
2698

    
2699
class DevCacheManager(object):
2700
  """Simple class for managing a cache of block device information.
2701

2702
  """
2703
  _DEV_PREFIX = "/dev/"
2704
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2705

    
2706
  @classmethod
2707
  def _ConvertPath(cls, dev_path):
2708
    """Converts a /dev/name path to the cache file name.
2709

2710
    This replaces slashes with underscores and strips the /dev
2711
    prefix. It then returns the full path to the cache file.
2712

2713
    @type dev_path: str
2714
    @param dev_path: the C{/dev/} path name
2715
    @rtype: str
2716
    @return: the converted path name
2717

2718
    """
2719
    if dev_path.startswith(cls._DEV_PREFIX):
2720
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2721
    dev_path = dev_path.replace("/", "_")
2722
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2723
    return fpath
2724

    
2725
  @classmethod
2726
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2727
    """Updates the cache information for a given device.
2728

2729
    @type dev_path: str
2730
    @param dev_path: the pathname of the device
2731
    @type owner: str
2732
    @param owner: the owner (instance name) of the device
2733
    @type on_primary: bool
2734
    @param on_primary: whether this is the primary
2735
        node nor not
2736
    @type iv_name: str
2737
    @param iv_name: the instance-visible name of the
2738
        device, as in objects.Disk.iv_name
2739

2740
    @rtype: None
2741

2742
    """
2743
    if dev_path is None:
2744
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2745
      return
2746
    fpath = cls._ConvertPath(dev_path)
2747
    if on_primary:
2748
      state = "primary"
2749
    else:
2750
      state = "secondary"
2751
    if iv_name is None:
2752
      iv_name = "not_visible"
2753
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2754
    try:
2755
      utils.WriteFile(fpath, data=fdata)
2756
    except EnvironmentError, err:
2757
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2758

    
2759
  @classmethod
2760
  def RemoveCache(cls, dev_path):
2761
    """Remove data for a dev_path.
2762

2763
    This is just a wrapper over L{utils.RemoveFile} with a converted
2764
    path name and logging.
2765

2766
    @type dev_path: str
2767
    @param dev_path: the pathname of the device
2768

2769
    @rtype: None
2770

2771
    """
2772
    if dev_path is None:
2773
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2774
      return
2775
    fpath = cls._ConvertPath(dev_path)
2776
    try:
2777
      utils.RemoveFile(fpath)
2778
    except EnvironmentError, err:
2779
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)