Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ e02b9114

History | View | Annotate | Download (86.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50

    
51
from ganeti import errors
52
from ganeti import utils
53
from ganeti import ssh
54
from ganeti import hypervisor
55
from ganeti import constants
56
from ganeti import bdev
57
from ganeti import objects
58
from ganeti import ssconf
59

    
60

    
61
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62
_ALLOWED_CLEAN_DIRS = frozenset([
63
  constants.DATA_DIR,
64
  constants.JOB_QUEUE_ARCHIVE_DIR,
65
  constants.QUEUE_DIR,
66
  ])
67

    
68

    
69
class RPCFail(Exception):
70
  """Class denoting RPC failure.
71

72
  Its argument is the error message.
73

74
  """
75

    
76

    
77
def _Fail(msg, *args, **kwargs):
78
  """Log an error and the raise an RPCFail exception.
79

80
  This exception is then handled specially in the ganeti daemon and
81
  turned into a 'failed' return type. As such, this function is a
82
  useful shortcut for logging the error and returning it to the master
83
  daemon.
84

85
  @type msg: string
86
  @param msg: the text of the exception
87
  @raise RPCFail
88

89
  """
90
  if args:
91
    msg = msg % args
92
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
93
    if "exc" in kwargs and kwargs["exc"]:
94
      logging.exception(msg)
95
    else:
96
      logging.error(msg)
97
  raise RPCFail(msg)
98

    
99

    
100
def _GetConfig():
101
  """Simple wrapper to return a SimpleStore.
102

103
  @rtype: L{ssconf.SimpleStore}
104
  @return: a SimpleStore instance
105

106
  """
107
  return ssconf.SimpleStore()
108

    
109

    
110
def _GetSshRunner(cluster_name):
111
  """Simple wrapper to return an SshRunner.
112

113
  @type cluster_name: str
114
  @param cluster_name: the cluster name, which is needed
115
      by the SshRunner constructor
116
  @rtype: L{ssh.SshRunner}
117
  @return: an SshRunner instance
118

119
  """
120
  return ssh.SshRunner(cluster_name)
121

    
122

    
123
def _Decompress(data):
124
  """Unpacks data compressed by the RPC client.
125

126
  @type data: list or tuple
127
  @param data: Data sent by RPC client
128
  @rtype: str
129
  @return: Decompressed data
130

131
  """
132
  assert isinstance(data, (list, tuple))
133
  assert len(data) == 2
134
  (encoding, content) = data
135
  if encoding == constants.RPC_ENCODING_NONE:
136
    return content
137
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
138
    return zlib.decompress(base64.b64decode(content))
139
  else:
140
    raise AssertionError("Unknown data encoding")
141

    
142

    
143
def _CleanDirectory(path, exclude=None):
144
  """Removes all regular files in a directory.
145

146
  @type path: str
147
  @param path: the directory to clean
148
  @type exclude: list
149
  @param exclude: list of files to be excluded, defaults
150
      to the empty list
151

152
  """
153
  if path not in _ALLOWED_CLEAN_DIRS:
154
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
155
          path)
156

    
157
  if not os.path.isdir(path):
158
    return
159
  if exclude is None:
160
    exclude = []
161
  else:
162
    # Normalize excluded paths
163
    exclude = [os.path.normpath(i) for i in exclude]
164

    
165
  for rel_name in utils.ListVisibleFiles(path):
166
    full_name = utils.PathJoin(path, rel_name)
167
    if full_name in exclude:
168
      continue
169
    if os.path.isfile(full_name) and not os.path.islink(full_name):
170
      utils.RemoveFile(full_name)
171

    
172

    
173
def _BuildUploadFileList():
174
  """Build the list of allowed upload files.
175

176
  This is abstracted so that it's built only once at module import time.
177

178
  """
179
  allowed_files = set([
180
    constants.CLUSTER_CONF_FILE,
181
    constants.ETC_HOSTS,
182
    constants.SSH_KNOWN_HOSTS_FILE,
183
    constants.VNC_PASSWORD_FILE,
184
    constants.RAPI_CERT_FILE,
185
    constants.RAPI_USERS_FILE,
186
    constants.HMAC_CLUSTER_KEY,
187
    ])
188

    
189
  for hv_name in constants.HYPER_TYPES:
190
    hv_class = hypervisor.GetHypervisorClass(hv_name)
191
    allowed_files.update(hv_class.GetAncillaryFiles())
192

    
193
  return frozenset(allowed_files)
194

    
195

    
196
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
197

    
198

    
199
def JobQueuePurge():
200
  """Removes job queue files and archived jobs.
201

202
  @rtype: tuple
203
  @return: True, None
204

205
  """
206
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
207
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
208

    
209

    
210
def GetMasterInfo():
211
  """Returns master information.
212

213
  This is an utility function to compute master information, either
214
  for consumption here or from the node daemon.
215

216
  @rtype: tuple
217
  @return: master_netdev, master_ip, master_name
218
  @raise RPCFail: in case of errors
219

220
  """
221
  try:
222
    cfg = _GetConfig()
223
    master_netdev = cfg.GetMasterNetdev()
224
    master_ip = cfg.GetMasterIP()
225
    master_node = cfg.GetMasterNode()
226
  except errors.ConfigurationError, err:
227
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
228
  return (master_netdev, master_ip, master_node)
229

    
230

    
231
def StartMaster(start_daemons, no_voting):
232
  """Activate local node as master node.
233

234
  The function will always try activate the IP address of the master
235
  (unless someone else has it). It will also start the master daemons,
236
  based on the start_daemons parameter.
237

238
  @type start_daemons: boolean
239
  @param start_daemons: whether to also start the master
240
      daemons (ganeti-masterd and ganeti-rapi)
241
  @type no_voting: boolean
242
  @param no_voting: whether to start ganeti-masterd without a node vote
243
      (if start_daemons is True), but still non-interactively
244
  @rtype: None
245

246
  """
247
  # GetMasterInfo will raise an exception if not able to return data
248
  master_netdev, master_ip, _ = GetMasterInfo()
249

    
250
  err_msgs = []
251
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
252
    if utils.OwnIpAddress(master_ip):
253
      # we already have the ip:
254
      logging.debug("Master IP already configured, doing nothing")
255
    else:
256
      msg = "Someone else has the master ip, not activating"
257
      logging.error(msg)
258
      err_msgs.append(msg)
259
  else:
260
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
261
                           "dev", master_netdev, "label",
262
                           "%s:0" % master_netdev])
263
    if result.failed:
264
      msg = "Can't activate master IP: %s" % result.output
265
      logging.error(msg)
266
      err_msgs.append(msg)
267

    
268
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
269
                           "-s", master_ip, master_ip])
270
    # we'll ignore the exit code of arping
271

    
272
  # and now start the master and rapi daemons
273
  if start_daemons:
274
    if no_voting:
275
      masterd_args = "--no-voting --yes-do-it"
276
    else:
277
      masterd_args = ""
278

    
279
    env = {
280
      "EXTRA_MASTERD_ARGS": masterd_args,
281
      }
282

    
283
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
284
    if result.failed:
285
      msg = "Can't start Ganeti master: %s" % result.output
286
      logging.error(msg)
287
      err_msgs.append(msg)
288

    
289
  if err_msgs:
290
    _Fail("; ".join(err_msgs))
291

    
292

    
293
def StopMaster(stop_daemons):
294
  """Deactivate this node as master.
295

296
  The function will always try to deactivate the IP address of the
297
  master. It will also stop the master daemons depending on the
298
  stop_daemons parameter.
299

300
  @type stop_daemons: boolean
301
  @param stop_daemons: whether to also stop the master daemons
302
      (ganeti-masterd and ganeti-rapi)
303
  @rtype: None
304

305
  """
306
  # TODO: log and report back to the caller the error failures; we
307
  # need to decide in which case we fail the RPC for this
308

    
309
  # GetMasterInfo will raise an exception if not able to return data
310
  master_netdev, master_ip, _ = GetMasterInfo()
311

    
312
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
313
                         "dev", master_netdev])
314
  if result.failed:
315
    logging.error("Can't remove the master IP, error: %s", result.output)
316
    # but otherwise ignore the failure
317

    
318
  if stop_daemons:
319
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
320
    if result.failed:
321
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
322
                    " and error %s",
323
                    result.cmd, result.exit_code, result.output)
324

    
325

    
326
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
327
  """Joins this node to the cluster.
328

329
  This does the following:
330
      - updates the hostkeys of the machine (rsa and dsa)
331
      - adds the ssh private key to the user
332
      - adds the ssh public key to the users' authorized_keys file
333

334
  @type dsa: str
335
  @param dsa: the DSA private key to write
336
  @type dsapub: str
337
  @param dsapub: the DSA public key to write
338
  @type rsa: str
339
  @param rsa: the RSA private key to write
340
  @type rsapub: str
341
  @param rsapub: the RSA public key to write
342
  @type sshkey: str
343
  @param sshkey: the SSH private key to write
344
  @type sshpub: str
345
  @param sshpub: the SSH public key to write
346
  @rtype: boolean
347
  @return: the success of the operation
348

349
  """
350
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
351
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
352
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
353
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
354
  for name, content, mode in sshd_keys:
355
    utils.WriteFile(name, data=content, mode=mode)
356

    
357
  try:
358
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
359
                                                    mkdir=True)
360
  except errors.OpExecError, err:
361
    _Fail("Error while processing user ssh files: %s", err, exc=True)
362

    
363
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
364
    utils.WriteFile(name, data=content, mode=0600)
365

    
366
  utils.AddAuthorizedKey(auth_keys, sshpub)
367

    
368
  result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
369
  if result.failed:
370
    _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
371
          result.cmd, result.exit_code, result.output)
372

    
373

    
374
def LeaveCluster(modify_ssh_setup):
375
  """Cleans up and remove the current node.
376

377
  This function cleans up and prepares the current node to be removed
378
  from the cluster.
379

380
  If processing is successful, then it raises an
381
  L{errors.QuitGanetiException} which is used as a special case to
382
  shutdown the node daemon.
383

384
  @param modify_ssh_setup: boolean
385

386
  """
387
  _CleanDirectory(constants.DATA_DIR)
388
  JobQueuePurge()
389

    
390
  if modify_ssh_setup:
391
    try:
392
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
393

    
394
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
395

    
396
      utils.RemoveFile(priv_key)
397
      utils.RemoveFile(pub_key)
398
    except errors.OpExecError:
399
      logging.exception("Error while processing ssh files")
400

    
401
  try:
402
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
403
    utils.RemoveFile(constants.RAPI_CERT_FILE)
404
    utils.RemoveFile(constants.SSL_CERT_FILE)
405
  except: # pylint: disable-msg=W0702
406
    logging.exception("Error while removing cluster secrets")
407

    
408
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
409
  if result.failed:
410
    logging.error("Command %s failed with exitcode %s and error %s",
411
                  result.cmd, result.exit_code, result.output)
412

    
413
  # Raise a custom exception (handled in ganeti-noded)
414
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
415

    
416

    
417
def GetNodeInfo(vgname, hypervisor_type):
418
  """Gives back a hash with different information about the node.
419

420
  @type vgname: C{string}
421
  @param vgname: the name of the volume group to ask for disk space information
422
  @type hypervisor_type: C{str}
423
  @param hypervisor_type: the name of the hypervisor to ask for
424
      memory information
425
  @rtype: C{dict}
426
  @return: dictionary with the following keys:
427
      - vg_size is the size of the configured volume group in MiB
428
      - vg_free is the free size of the volume group in MiB
429
      - memory_dom0 is the memory allocated for domain0 in MiB
430
      - memory_free is the currently available (free) ram in MiB
431
      - memory_total is the total number of ram in MiB
432

433
  """
434
  outputarray = {}
435
  vginfo = _GetVGInfo(vgname)
436
  outputarray['vg_size'] = vginfo['vg_size']
437
  outputarray['vg_free'] = vginfo['vg_free']
438

    
439
  hyper = hypervisor.GetHypervisor(hypervisor_type)
440
  hyp_info = hyper.GetNodeInfo()
441
  if hyp_info is not None:
442
    outputarray.update(hyp_info)
443

    
444
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
445

    
446
  return outputarray
447

    
448

    
449
def VerifyNode(what, cluster_name):
450
  """Verify the status of the local node.
451

452
  Based on the input L{what} parameter, various checks are done on the
453
  local node.
454

455
  If the I{filelist} key is present, this list of
456
  files is checksummed and the file/checksum pairs are returned.
457

458
  If the I{nodelist} key is present, we check that we have
459
  connectivity via ssh with the target nodes (and check the hostname
460
  report).
461

462
  If the I{node-net-test} key is present, we check that we have
463
  connectivity to the given nodes via both primary IP and, if
464
  applicable, secondary IPs.
465

466
  @type what: C{dict}
467
  @param what: a dictionary of things to check:
468
      - filelist: list of files for which to compute checksums
469
      - nodelist: list of nodes we should check ssh communication with
470
      - node-net-test: list of nodes we should check node daemon port
471
        connectivity with
472
      - hypervisor: list with hypervisors to run the verify for
473
  @rtype: dict
474
  @return: a dictionary with the same keys as the input dict, and
475
      values representing the result of the checks
476

477
  """
478
  result = {}
479

    
480
  if constants.NV_HYPERVISOR in what:
481
    result[constants.NV_HYPERVISOR] = tmp = {}
482
    for hv_name in what[constants.NV_HYPERVISOR]:
483
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
484

    
485
  if constants.NV_FILELIST in what:
486
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
487
      what[constants.NV_FILELIST])
488

    
489
  if constants.NV_NODELIST in what:
490
    result[constants.NV_NODELIST] = tmp = {}
491
    random.shuffle(what[constants.NV_NODELIST])
492
    for node in what[constants.NV_NODELIST]:
493
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
494
      if not success:
495
        tmp[node] = message
496

    
497
  if constants.NV_NODENETTEST in what:
498
    result[constants.NV_NODENETTEST] = tmp = {}
499
    my_name = utils.HostInfo().name
500
    my_pip = my_sip = None
501
    for name, pip, sip in what[constants.NV_NODENETTEST]:
502
      if name == my_name:
503
        my_pip = pip
504
        my_sip = sip
505
        break
506
    if not my_pip:
507
      tmp[my_name] = ("Can't find my own primary/secondary IP"
508
                      " in the node list")
509
    else:
510
      port = utils.GetDaemonPort(constants.NODED)
511
      for name, pip, sip in what[constants.NV_NODENETTEST]:
512
        fail = []
513
        if not utils.TcpPing(pip, port, source=my_pip):
514
          fail.append("primary")
515
        if sip != pip:
516
          if not utils.TcpPing(sip, port, source=my_sip):
517
            fail.append("secondary")
518
        if fail:
519
          tmp[name] = ("failure using the %s interface(s)" %
520
                       " and ".join(fail))
521

    
522
  if constants.NV_LVLIST in what:
523
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
524

    
525
  if constants.NV_INSTANCELIST in what:
526
    result[constants.NV_INSTANCELIST] = GetInstanceList(
527
      what[constants.NV_INSTANCELIST])
528

    
529
  if constants.NV_VGLIST in what:
530
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
531

    
532
  if constants.NV_PVLIST in what:
533
    result[constants.NV_PVLIST] = \
534
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
535
                                   filter_allocatable=False)
536

    
537
  if constants.NV_VERSION in what:
538
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
539
                                    constants.RELEASE_VERSION)
540

    
541
  if constants.NV_HVINFO in what:
542
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
543
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
544

    
545
  if constants.NV_DRBDLIST in what:
546
    try:
547
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
548
    except errors.BlockDeviceError, err:
549
      logging.warning("Can't get used minors list", exc_info=True)
550
      used_minors = str(err)
551
    result[constants.NV_DRBDLIST] = used_minors
552

    
553
  if constants.NV_NODESETUP in what:
554
    result[constants.NV_NODESETUP] = tmpr = []
555
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
556
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
557
                  " under /sys, missing required directories /sys/block"
558
                  " and /sys/class/net")
559
    if (not os.path.isdir("/proc/sys") or
560
        not os.path.isfile("/proc/sysrq-trigger")):
561
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
562
                  " under /proc, missing required directory /proc/sys and"
563
                  " the file /proc/sysrq-trigger")
564

    
565
  if constants.NV_TIME in what:
566
    result[constants.NV_TIME] = utils.SplitTime(time.time())
567

    
568
  return result
569

    
570

    
571
def GetVolumeList(vg_name):
572
  """Compute list of logical volumes and their size.
573

574
  @type vg_name: str
575
  @param vg_name: the volume group whose LVs we should list
576
  @rtype: dict
577
  @return:
578
      dictionary of all partions (key) with value being a tuple of
579
      their size (in MiB), inactive and online status::
580

581
        {'test1': ('20.06', True, True)}
582

583
      in case of errors, a string is returned with the error
584
      details.
585

586
  """
587
  lvs = {}
588
  sep = '|'
589
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
590
                         "--separator=%s" % sep,
591
                         "-olv_name,lv_size,lv_attr", vg_name])
592
  if result.failed:
593
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
594

    
595
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
596
  for line in result.stdout.splitlines():
597
    line = line.strip()
598
    match = valid_line_re.match(line)
599
    if not match:
600
      logging.error("Invalid line returned from lvs output: '%s'", line)
601
      continue
602
    name, size, attr = match.groups()
603
    inactive = attr[4] == '-'
604
    online = attr[5] == 'o'
605
    virtual = attr[0] == 'v'
606
    if virtual:
607
      # we don't want to report such volumes as existing, since they
608
      # don't really hold data
609
      continue
610
    lvs[name] = (size, inactive, online)
611

    
612
  return lvs
613

    
614

    
615
def ListVolumeGroups():
616
  """List the volume groups and their size.
617

618
  @rtype: dict
619
  @return: dictionary with keys volume name and values the
620
      size of the volume
621

622
  """
623
  return utils.ListVolumeGroups()
624

    
625

    
626
def NodeVolumes():
627
  """List all volumes on this node.
628

629
  @rtype: list
630
  @return:
631
    A list of dictionaries, each having four keys:
632
      - name: the logical volume name,
633
      - size: the size of the logical volume
634
      - dev: the physical device on which the LV lives
635
      - vg: the volume group to which it belongs
636

637
    In case of errors, we return an empty list and log the
638
    error.
639

640
    Note that since a logical volume can live on multiple physical
641
    volumes, the resulting list might include a logical volume
642
    multiple times.
643

644
  """
645
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
646
                         "--separator=|",
647
                         "--options=lv_name,lv_size,devices,vg_name"])
648
  if result.failed:
649
    _Fail("Failed to list logical volumes, lvs output: %s",
650
          result.output)
651

    
652
  def parse_dev(dev):
653
    if '(' in dev:
654
      return dev.split('(')[0]
655
    else:
656
      return dev
657

    
658
  def map_line(line):
659
    return {
660
      'name': line[0].strip(),
661
      'size': line[1].strip(),
662
      'dev': parse_dev(line[2].strip()),
663
      'vg': line[3].strip(),
664
    }
665

    
666
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
667
          if line.count('|') >= 3]
668

    
669

    
670
def BridgesExist(bridges_list):
671
  """Check if a list of bridges exist on the current node.
672

673
  @rtype: boolean
674
  @return: C{True} if all of them exist, C{False} otherwise
675

676
  """
677
  missing = []
678
  for bridge in bridges_list:
679
    if not utils.BridgeExists(bridge):
680
      missing.append(bridge)
681

    
682
  if missing:
683
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
684

    
685

    
686
def GetInstanceList(hypervisor_list):
687
  """Provides a list of instances.
688

689
  @type hypervisor_list: list
690
  @param hypervisor_list: the list of hypervisors to query information
691

692
  @rtype: list
693
  @return: a list of all running instances on the current node
694
    - instance1.example.com
695
    - instance2.example.com
696

697
  """
698
  results = []
699
  for hname in hypervisor_list:
700
    try:
701
      names = hypervisor.GetHypervisor(hname).ListInstances()
702
      results.extend(names)
703
    except errors.HypervisorError, err:
704
      _Fail("Error enumerating instances (hypervisor %s): %s",
705
            hname, err, exc=True)
706

    
707
  return results
708

    
709

    
710
def GetInstanceInfo(instance, hname):
711
  """Gives back the information about an instance as a dictionary.
712

713
  @type instance: string
714
  @param instance: the instance name
715
  @type hname: string
716
  @param hname: the hypervisor type of the instance
717

718
  @rtype: dict
719
  @return: dictionary with the following keys:
720
      - memory: memory size of instance (int)
721
      - state: xen state of instance (string)
722
      - time: cpu time of instance (float)
723

724
  """
725
  output = {}
726

    
727
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
728
  if iinfo is not None:
729
    output['memory'] = iinfo[2]
730
    output['state'] = iinfo[4]
731
    output['time'] = iinfo[5]
732

    
733
  return output
734

    
735

    
736
def GetInstanceMigratable(instance):
737
  """Gives whether an instance can be migrated.
738

739
  @type instance: L{objects.Instance}
740
  @param instance: object representing the instance to be checked.
741

742
  @rtype: tuple
743
  @return: tuple of (result, description) where:
744
      - result: whether the instance can be migrated or not
745
      - description: a description of the issue, if relevant
746

747
  """
748
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
749
  iname = instance.name
750
  if iname not in hyper.ListInstances():
751
    _Fail("Instance %s is not running", iname)
752

    
753
  for idx in range(len(instance.disks)):
754
    link_name = _GetBlockDevSymlinkPath(iname, idx)
755
    if not os.path.islink(link_name):
756
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
757

    
758

    
759
def GetAllInstancesInfo(hypervisor_list):
760
  """Gather data about all instances.
761

762
  This is the equivalent of L{GetInstanceInfo}, except that it
763
  computes data for all instances at once, thus being faster if one
764
  needs data about more than one instance.
765

766
  @type hypervisor_list: list
767
  @param hypervisor_list: list of hypervisors to query for instance data
768

769
  @rtype: dict
770
  @return: dictionary of instance: data, with data having the following keys:
771
      - memory: memory size of instance (int)
772
      - state: xen state of instance (string)
773
      - time: cpu time of instance (float)
774
      - vcpus: the number of vcpus
775

776
  """
777
  output = {}
778

    
779
  for hname in hypervisor_list:
780
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
781
    if iinfo:
782
      for name, _, memory, vcpus, state, times in iinfo:
783
        value = {
784
          'memory': memory,
785
          'vcpus': vcpus,
786
          'state': state,
787
          'time': times,
788
          }
789
        if name in output:
790
          # we only check static parameters, like memory and vcpus,
791
          # and not state and time which can change between the
792
          # invocations of the different hypervisors
793
          for key in 'memory', 'vcpus':
794
            if value[key] != output[name][key]:
795
              _Fail("Instance %s is running twice"
796
                    " with different parameters", name)
797
        output[name] = value
798

    
799
  return output
800

    
801

    
802
def _InstanceLogName(kind, os_name, instance):
803
  """Compute the OS log filename for a given instance and operation.
804

805
  The instance name and os name are passed in as strings since not all
806
  operations have these as part of an instance object.
807

808
  @type kind: string
809
  @param kind: the operation type (e.g. add, import, etc.)
810
  @type os_name: string
811
  @param os_name: the os name
812
  @type instance: string
813
  @param instance: the name of the instance being imported/added/etc.
814

815
  """
816
  base = "%s-%s-%s-%d.log" % (kind, os_name, instance, int(time.time()))
817
  return utils.PathJoin(constants.LOG_OS_DIR, base)
818

    
819

    
820
def InstanceOsAdd(instance, reinstall, debug):
821
  """Add an OS to an instance.
822

823
  @type instance: L{objects.Instance}
824
  @param instance: Instance whose OS is to be installed
825
  @type reinstall: boolean
826
  @param reinstall: whether this is an instance reinstall
827
  @type debug: integer
828
  @param debug: debug level, passed to the OS scripts
829
  @rtype: None
830

831
  """
832
  inst_os = OSFromDisk(instance.os)
833

    
834
  create_env = OSEnvironment(instance, inst_os, debug)
835
  if reinstall:
836
    create_env['INSTANCE_REINSTALL'] = "1"
837

    
838
  logfile = _InstanceLogName("add", instance.os, instance.name)
839

    
840
  result = utils.RunCmd([inst_os.create_script], env=create_env,
841
                        cwd=inst_os.path, output=logfile,)
842
  if result.failed:
843
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
844
                  " output: %s", result.cmd, result.fail_reason, logfile,
845
                  result.output)
846
    lines = [utils.SafeEncode(val)
847
             for val in utils.TailFile(logfile, lines=20)]
848
    _Fail("OS create script failed (%s), last lines in the"
849
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
850

    
851

    
852
def RunRenameInstance(instance, old_name, debug):
853
  """Run the OS rename script for an instance.
854

855
  @type instance: L{objects.Instance}
856
  @param instance: Instance whose OS is to be installed
857
  @type old_name: string
858
  @param old_name: previous instance name
859
  @type debug: integer
860
  @param debug: debug level, passed to the OS scripts
861
  @rtype: boolean
862
  @return: the success of the operation
863

864
  """
865
  inst_os = OSFromDisk(instance.os)
866

    
867
  rename_env = OSEnvironment(instance, inst_os, debug)
868
  rename_env['OLD_INSTANCE_NAME'] = old_name
869

    
870
  logfile = _InstanceLogName("rename", instance.os,
871
                             "%s-%s" % (old_name, instance.name))
872

    
873
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
874
                        cwd=inst_os.path, output=logfile)
875

    
876
  if result.failed:
877
    logging.error("os create command '%s' returned error: %s output: %s",
878
                  result.cmd, result.fail_reason, result.output)
879
    lines = [utils.SafeEncode(val)
880
             for val in utils.TailFile(logfile, lines=20)]
881
    _Fail("OS rename script failed (%s), last lines in the"
882
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
883

    
884

    
885
def _GetVGInfo(vg_name):
886
  """Get information about the volume group.
887

888
  @type vg_name: str
889
  @param vg_name: the volume group which we query
890
  @rtype: dict
891
  @return:
892
    A dictionary with the following keys:
893
      - C{vg_size} is the total size of the volume group in MiB
894
      - C{vg_free} is the free size of the volume group in MiB
895
      - C{pv_count} are the number of physical disks in that VG
896

897
    If an error occurs during gathering of data, we return the same dict
898
    with keys all set to None.
899

900
  """
901
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
902

    
903
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
904
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
905

    
906
  if retval.failed:
907
    logging.error("volume group %s not present", vg_name)
908
    return retdic
909
  valarr = retval.stdout.strip().rstrip(':').split(':')
910
  if len(valarr) == 3:
911
    try:
912
      retdic = {
913
        "vg_size": int(round(float(valarr[0]), 0)),
914
        "vg_free": int(round(float(valarr[1]), 0)),
915
        "pv_count": int(valarr[2]),
916
        }
917
    except (TypeError, ValueError), err:
918
      logging.exception("Fail to parse vgs output: %s", err)
919
  else:
920
    logging.error("vgs output has the wrong number of fields (expected"
921
                  " three): %s", str(valarr))
922
  return retdic
923

    
924

    
925
def _GetBlockDevSymlinkPath(instance_name, idx):
926
  return utils.PathJoin(constants.DISK_LINKS_DIR,
927
                        "%s:%d" % (instance_name, idx))
928

    
929

    
930
def _SymlinkBlockDev(instance_name, device_path, idx):
931
  """Set up symlinks to a instance's block device.
932

933
  This is an auxiliary function run when an instance is start (on the primary
934
  node) or when an instance is migrated (on the target node).
935

936

937
  @param instance_name: the name of the target instance
938
  @param device_path: path of the physical block device, on the node
939
  @param idx: the disk index
940
  @return: absolute path to the disk's symlink
941

942
  """
943
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
944
  try:
945
    os.symlink(device_path, link_name)
946
  except OSError, err:
947
    if err.errno == errno.EEXIST:
948
      if (not os.path.islink(link_name) or
949
          os.readlink(link_name) != device_path):
950
        os.remove(link_name)
951
        os.symlink(device_path, link_name)
952
    else:
953
      raise
954

    
955
  return link_name
956

    
957

    
958
def _RemoveBlockDevLinks(instance_name, disks):
959
  """Remove the block device symlinks belonging to the given instance.
960

961
  """
962
  for idx, _ in enumerate(disks):
963
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
964
    if os.path.islink(link_name):
965
      try:
966
        os.remove(link_name)
967
      except OSError:
968
        logging.exception("Can't remove symlink '%s'", link_name)
969

    
970

    
971
def _GatherAndLinkBlockDevs(instance):
972
  """Set up an instance's block device(s).
973

974
  This is run on the primary node at instance startup. The block
975
  devices must be already assembled.
976

977
  @type instance: L{objects.Instance}
978
  @param instance: the instance whose disks we shoul assemble
979
  @rtype: list
980
  @return: list of (disk_object, device_path)
981

982
  """
983
  block_devices = []
984
  for idx, disk in enumerate(instance.disks):
985
    device = _RecursiveFindBD(disk)
986
    if device is None:
987
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
988
                                    str(disk))
989
    device.Open()
990
    try:
991
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
992
    except OSError, e:
993
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
994
                                    e.strerror)
995

    
996
    block_devices.append((disk, link_name))
997

    
998
  return block_devices
999

    
1000

    
1001
def StartInstance(instance):
1002
  """Start an instance.
1003

1004
  @type instance: L{objects.Instance}
1005
  @param instance: the instance object
1006
  @rtype: None
1007

1008
  """
1009
  running_instances = GetInstanceList([instance.hypervisor])
1010

    
1011
  if instance.name in running_instances:
1012
    logging.info("Instance %s already running, not starting", instance.name)
1013
    return
1014

    
1015
  try:
1016
    block_devices = _GatherAndLinkBlockDevs(instance)
1017
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1018
    hyper.StartInstance(instance, block_devices)
1019
  except errors.BlockDeviceError, err:
1020
    _Fail("Block device error: %s", err, exc=True)
1021
  except errors.HypervisorError, err:
1022
    _RemoveBlockDevLinks(instance.name, instance.disks)
1023
    _Fail("Hypervisor error: %s", err, exc=True)
1024

    
1025

    
1026
def InstanceShutdown(instance, timeout):
1027
  """Shut an instance down.
1028

1029
  @note: this functions uses polling with a hardcoded timeout.
1030

1031
  @type instance: L{objects.Instance}
1032
  @param instance: the instance object
1033
  @type timeout: integer
1034
  @param timeout: maximum timeout for soft shutdown
1035
  @rtype: None
1036

1037
  """
1038
  hv_name = instance.hypervisor
1039
  hyper = hypervisor.GetHypervisor(hv_name)
1040
  iname = instance.name
1041

    
1042
  if instance.name not in hyper.ListInstances():
1043
    logging.info("Instance %s not running, doing nothing", iname)
1044
    return
1045

    
1046
  class _TryShutdown:
1047
    def __init__(self):
1048
      self.tried_once = False
1049

    
1050
    def __call__(self):
1051
      if iname not in hyper.ListInstances():
1052
        return
1053

    
1054
      try:
1055
        hyper.StopInstance(instance, retry=self.tried_once)
1056
      except errors.HypervisorError, err:
1057
        if iname not in hyper.ListInstances():
1058
          # if the instance is no longer existing, consider this a
1059
          # success and go to cleanup
1060
          return
1061

    
1062
        _Fail("Failed to stop instance %s: %s", iname, err)
1063

    
1064
      self.tried_once = True
1065

    
1066
      raise utils.RetryAgain()
1067

    
1068
  try:
1069
    utils.Retry(_TryShutdown(), 5, timeout)
1070
  except utils.RetryTimeout:
1071
    # the shutdown did not succeed
1072
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1073

    
1074
    try:
1075
      hyper.StopInstance(instance, force=True)
1076
    except errors.HypervisorError, err:
1077
      if iname in hyper.ListInstances():
1078
        # only raise an error if the instance still exists, otherwise
1079
        # the error could simply be "instance ... unknown"!
1080
        _Fail("Failed to force stop instance %s: %s", iname, err)
1081

    
1082
    time.sleep(1)
1083

    
1084
    if iname in hyper.ListInstances():
1085
      _Fail("Could not shutdown instance %s even by destroy", iname)
1086

    
1087
  _RemoveBlockDevLinks(iname, instance.disks)
1088

    
1089

    
1090
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1091
  """Reboot an instance.
1092

1093
  @type instance: L{objects.Instance}
1094
  @param instance: the instance object to reboot
1095
  @type reboot_type: str
1096
  @param reboot_type: the type of reboot, one the following
1097
    constants:
1098
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1099
        instance OS, do not recreate the VM
1100
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1101
        restart the VM (at the hypervisor level)
1102
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1103
        not accepted here, since that mode is handled differently, in
1104
        cmdlib, and translates into full stop and start of the
1105
        instance (instead of a call_instance_reboot RPC)
1106
  @type shutdown_timeout: integer
1107
  @param shutdown_timeout: maximum timeout for soft shutdown
1108
  @rtype: None
1109

1110
  """
1111
  running_instances = GetInstanceList([instance.hypervisor])
1112

    
1113
  if instance.name not in running_instances:
1114
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1115

    
1116
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1117
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1118
    try:
1119
      hyper.RebootInstance(instance)
1120
    except errors.HypervisorError, err:
1121
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1122
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1123
    try:
1124
      InstanceShutdown(instance, shutdown_timeout)
1125
      return StartInstance(instance)
1126
    except errors.HypervisorError, err:
1127
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1128
  else:
1129
    _Fail("Invalid reboot_type received: %s", reboot_type)
1130

    
1131

    
1132
def MigrationInfo(instance):
1133
  """Gather information about an instance to be migrated.
1134

1135
  @type instance: L{objects.Instance}
1136
  @param instance: the instance definition
1137

1138
  """
1139
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1140
  try:
1141
    info = hyper.MigrationInfo(instance)
1142
  except errors.HypervisorError, err:
1143
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1144
  return info
1145

    
1146

    
1147
def AcceptInstance(instance, info, target):
1148
  """Prepare the node to accept an instance.
1149

1150
  @type instance: L{objects.Instance}
1151
  @param instance: the instance definition
1152
  @type info: string/data (opaque)
1153
  @param info: migration information, from the source node
1154
  @type target: string
1155
  @param target: target host (usually ip), on this node
1156

1157
  """
1158
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159
  try:
1160
    hyper.AcceptInstance(instance, info, target)
1161
  except errors.HypervisorError, err:
1162
    _Fail("Failed to accept instance: %s", err, exc=True)
1163

    
1164

    
1165
def FinalizeMigration(instance, info, success):
1166
  """Finalize any preparation to accept an instance.
1167

1168
  @type instance: L{objects.Instance}
1169
  @param instance: the instance definition
1170
  @type info: string/data (opaque)
1171
  @param info: migration information, from the source node
1172
  @type success: boolean
1173
  @param success: whether the migration was a success or a failure
1174

1175
  """
1176
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1177
  try:
1178
    hyper.FinalizeMigration(instance, info, success)
1179
  except errors.HypervisorError, err:
1180
    _Fail("Failed to finalize migration: %s", err, exc=True)
1181

    
1182

    
1183
def MigrateInstance(instance, target, live):
1184
  """Migrates an instance to another node.
1185

1186
  @type instance: L{objects.Instance}
1187
  @param instance: the instance definition
1188
  @type target: string
1189
  @param target: the target node name
1190
  @type live: boolean
1191
  @param live: whether the migration should be done live or not (the
1192
      interpretation of this parameter is left to the hypervisor)
1193
  @rtype: tuple
1194
  @return: a tuple of (success, msg) where:
1195
      - succes is a boolean denoting the success/failure of the operation
1196
      - msg is a string with details in case of failure
1197

1198
  """
1199
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1200

    
1201
  try:
1202
    hyper.MigrateInstance(instance, target, live)
1203
  except errors.HypervisorError, err:
1204
    _Fail("Failed to migrate instance: %s", err, exc=True)
1205

    
1206

    
1207
def BlockdevCreate(disk, size, owner, on_primary, info):
1208
  """Creates a block device for an instance.
1209

1210
  @type disk: L{objects.Disk}
1211
  @param disk: the object describing the disk we should create
1212
  @type size: int
1213
  @param size: the size of the physical underlying device, in MiB
1214
  @type owner: str
1215
  @param owner: the name of the instance for which disk is created,
1216
      used for device cache data
1217
  @type on_primary: boolean
1218
  @param on_primary:  indicates if it is the primary node or not
1219
  @type info: string
1220
  @param info: string that will be sent to the physical device
1221
      creation, used for example to set (LVM) tags on LVs
1222

1223
  @return: the new unique_id of the device (this can sometime be
1224
      computed only after creation), or None. On secondary nodes,
1225
      it's not required to return anything.
1226

1227
  """
1228
  # TODO: remove the obsolete 'size' argument
1229
  # pylint: disable-msg=W0613
1230
  clist = []
1231
  if disk.children:
1232
    for child in disk.children:
1233
      try:
1234
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1235
      except errors.BlockDeviceError, err:
1236
        _Fail("Can't assemble device %s: %s", child, err)
1237
      if on_primary or disk.AssembleOnSecondary():
1238
        # we need the children open in case the device itself has to
1239
        # be assembled
1240
        try:
1241
          # pylint: disable-msg=E1103
1242
          crdev.Open()
1243
        except errors.BlockDeviceError, err:
1244
          _Fail("Can't make child '%s' read-write: %s", child, err)
1245
      clist.append(crdev)
1246

    
1247
  try:
1248
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1249
  except errors.BlockDeviceError, err:
1250
    _Fail("Can't create block device: %s", err)
1251

    
1252
  if on_primary or disk.AssembleOnSecondary():
1253
    try:
1254
      device.Assemble()
1255
    except errors.BlockDeviceError, err:
1256
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1257
    device.SetSyncSpeed(constants.SYNC_SPEED)
1258
    if on_primary or disk.OpenOnSecondary():
1259
      try:
1260
        device.Open(force=True)
1261
      except errors.BlockDeviceError, err:
1262
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1263
    DevCacheManager.UpdateCache(device.dev_path, owner,
1264
                                on_primary, disk.iv_name)
1265

    
1266
  device.SetInfo(info)
1267

    
1268
  return device.unique_id
1269

    
1270

    
1271
def BlockdevRemove(disk):
1272
  """Remove a block device.
1273

1274
  @note: This is intended to be called recursively.
1275

1276
  @type disk: L{objects.Disk}
1277
  @param disk: the disk object we should remove
1278
  @rtype: boolean
1279
  @return: the success of the operation
1280

1281
  """
1282
  msgs = []
1283
  try:
1284
    rdev = _RecursiveFindBD(disk)
1285
  except errors.BlockDeviceError, err:
1286
    # probably can't attach
1287
    logging.info("Can't attach to device %s in remove", disk)
1288
    rdev = None
1289
  if rdev is not None:
1290
    r_path = rdev.dev_path
1291
    try:
1292
      rdev.Remove()
1293
    except errors.BlockDeviceError, err:
1294
      msgs.append(str(err))
1295
    if not msgs:
1296
      DevCacheManager.RemoveCache(r_path)
1297

    
1298
  if disk.children:
1299
    for child in disk.children:
1300
      try:
1301
        BlockdevRemove(child)
1302
      except RPCFail, err:
1303
        msgs.append(str(err))
1304

    
1305
  if msgs:
1306
    _Fail("; ".join(msgs))
1307

    
1308

    
1309
def _RecursiveAssembleBD(disk, owner, as_primary):
1310
  """Activate a block device for an instance.
1311

1312
  This is run on the primary and secondary nodes for an instance.
1313

1314
  @note: this function is called recursively.
1315

1316
  @type disk: L{objects.Disk}
1317
  @param disk: the disk we try to assemble
1318
  @type owner: str
1319
  @param owner: the name of the instance which owns the disk
1320
  @type as_primary: boolean
1321
  @param as_primary: if we should make the block device
1322
      read/write
1323

1324
  @return: the assembled device or None (in case no device
1325
      was assembled)
1326
  @raise errors.BlockDeviceError: in case there is an error
1327
      during the activation of the children or the device
1328
      itself
1329

1330
  """
1331
  children = []
1332
  if disk.children:
1333
    mcn = disk.ChildrenNeeded()
1334
    if mcn == -1:
1335
      mcn = 0 # max number of Nones allowed
1336
    else:
1337
      mcn = len(disk.children) - mcn # max number of Nones
1338
    for chld_disk in disk.children:
1339
      try:
1340
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1341
      except errors.BlockDeviceError, err:
1342
        if children.count(None) >= mcn:
1343
          raise
1344
        cdev = None
1345
        logging.error("Error in child activation (but continuing): %s",
1346
                      str(err))
1347
      children.append(cdev)
1348

    
1349
  if as_primary or disk.AssembleOnSecondary():
1350
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1351
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1352
    result = r_dev
1353
    if as_primary or disk.OpenOnSecondary():
1354
      r_dev.Open()
1355
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1356
                                as_primary, disk.iv_name)
1357

    
1358
  else:
1359
    result = True
1360
  return result
1361

    
1362

    
1363
def BlockdevAssemble(disk, owner, as_primary):
1364
  """Activate a block device for an instance.
1365

1366
  This is a wrapper over _RecursiveAssembleBD.
1367

1368
  @rtype: str or boolean
1369
  @return: a C{/dev/...} path for primary nodes, and
1370
      C{True} for secondary nodes
1371

1372
  """
1373
  try:
1374
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1375
    if isinstance(result, bdev.BlockDev):
1376
      # pylint: disable-msg=E1103
1377
      result = result.dev_path
1378
  except errors.BlockDeviceError, err:
1379
    _Fail("Error while assembling disk: %s", err, exc=True)
1380

    
1381
  return result
1382

    
1383

    
1384
def BlockdevShutdown(disk):
1385
  """Shut down a block device.
1386

1387
  First, if the device is assembled (Attach() is successful), then
1388
  the device is shutdown. Then the children of the device are
1389
  shutdown.
1390

1391
  This function is called recursively. Note that we don't cache the
1392
  children or such, as oppossed to assemble, shutdown of different
1393
  devices doesn't require that the upper device was active.
1394

1395
  @type disk: L{objects.Disk}
1396
  @param disk: the description of the disk we should
1397
      shutdown
1398
  @rtype: None
1399

1400
  """
1401
  msgs = []
1402
  r_dev = _RecursiveFindBD(disk)
1403
  if r_dev is not None:
1404
    r_path = r_dev.dev_path
1405
    try:
1406
      r_dev.Shutdown()
1407
      DevCacheManager.RemoveCache(r_path)
1408
    except errors.BlockDeviceError, err:
1409
      msgs.append(str(err))
1410

    
1411
  if disk.children:
1412
    for child in disk.children:
1413
      try:
1414
        BlockdevShutdown(child)
1415
      except RPCFail, err:
1416
        msgs.append(str(err))
1417

    
1418
  if msgs:
1419
    _Fail("; ".join(msgs))
1420

    
1421

    
1422
def BlockdevAddchildren(parent_cdev, new_cdevs):
1423
  """Extend a mirrored block device.
1424

1425
  @type parent_cdev: L{objects.Disk}
1426
  @param parent_cdev: the disk to which we should add children
1427
  @type new_cdevs: list of L{objects.Disk}
1428
  @param new_cdevs: the list of children which we should add
1429
  @rtype: None
1430

1431
  """
1432
  parent_bdev = _RecursiveFindBD(parent_cdev)
1433
  if parent_bdev is None:
1434
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1435
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1436
  if new_bdevs.count(None) > 0:
1437
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1438
  parent_bdev.AddChildren(new_bdevs)
1439

    
1440

    
1441
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1442
  """Shrink a mirrored block device.
1443

1444
  @type parent_cdev: L{objects.Disk}
1445
  @param parent_cdev: the disk from which we should remove children
1446
  @type new_cdevs: list of L{objects.Disk}
1447
  @param new_cdevs: the list of children which we should remove
1448
  @rtype: None
1449

1450
  """
1451
  parent_bdev = _RecursiveFindBD(parent_cdev)
1452
  if parent_bdev is None:
1453
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1454
  devs = []
1455
  for disk in new_cdevs:
1456
    rpath = disk.StaticDevPath()
1457
    if rpath is None:
1458
      bd = _RecursiveFindBD(disk)
1459
      if bd is None:
1460
        _Fail("Can't find device %s while removing children", disk)
1461
      else:
1462
        devs.append(bd.dev_path)
1463
    else:
1464
      devs.append(rpath)
1465
  parent_bdev.RemoveChildren(devs)
1466

    
1467

    
1468
def BlockdevGetmirrorstatus(disks):
1469
  """Get the mirroring status of a list of devices.
1470

1471
  @type disks: list of L{objects.Disk}
1472
  @param disks: the list of disks which we should query
1473
  @rtype: disk
1474
  @return:
1475
      a list of (mirror_done, estimated_time) tuples, which
1476
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1477
  @raise errors.BlockDeviceError: if any of the disks cannot be
1478
      found
1479

1480
  """
1481
  stats = []
1482
  for dsk in disks:
1483
    rbd = _RecursiveFindBD(dsk)
1484
    if rbd is None:
1485
      _Fail("Can't find device %s", dsk)
1486

    
1487
    stats.append(rbd.CombinedSyncStatus())
1488

    
1489
  return stats
1490

    
1491

    
1492
def _RecursiveFindBD(disk):
1493
  """Check if a device is activated.
1494

1495
  If so, return information about the real device.
1496

1497
  @type disk: L{objects.Disk}
1498
  @param disk: the disk object we need to find
1499

1500
  @return: None if the device can't be found,
1501
      otherwise the device instance
1502

1503
  """
1504
  children = []
1505
  if disk.children:
1506
    for chdisk in disk.children:
1507
      children.append(_RecursiveFindBD(chdisk))
1508

    
1509
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1510

    
1511

    
1512
def BlockdevFind(disk):
1513
  """Check if a device is activated.
1514

1515
  If it is, return information about the real device.
1516

1517
  @type disk: L{objects.Disk}
1518
  @param disk: the disk to find
1519
  @rtype: None or objects.BlockDevStatus
1520
  @return: None if the disk cannot be found, otherwise a the current
1521
           information
1522

1523
  """
1524
  try:
1525
    rbd = _RecursiveFindBD(disk)
1526
  except errors.BlockDeviceError, err:
1527
    _Fail("Failed to find device: %s", err, exc=True)
1528

    
1529
  if rbd is None:
1530
    return None
1531

    
1532
  return rbd.GetSyncStatus()
1533

    
1534

    
1535
def BlockdevGetsize(disks):
1536
  """Computes the size of the given disks.
1537

1538
  If a disk is not found, returns None instead.
1539

1540
  @type disks: list of L{objects.Disk}
1541
  @param disks: the list of disk to compute the size for
1542
  @rtype: list
1543
  @return: list with elements None if the disk cannot be found,
1544
      otherwise the size
1545

1546
  """
1547
  result = []
1548
  for cf in disks:
1549
    try:
1550
      rbd = _RecursiveFindBD(cf)
1551
    except errors.BlockDeviceError:
1552
      result.append(None)
1553
      continue
1554
    if rbd is None:
1555
      result.append(None)
1556
    else:
1557
      result.append(rbd.GetActualSize())
1558
  return result
1559

    
1560

    
1561
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1562
  """Export a block device to a remote node.
1563

1564
  @type disk: L{objects.Disk}
1565
  @param disk: the description of the disk to export
1566
  @type dest_node: str
1567
  @param dest_node: the destination node to export to
1568
  @type dest_path: str
1569
  @param dest_path: the destination path on the target node
1570
  @type cluster_name: str
1571
  @param cluster_name: the cluster name, needed for SSH hostalias
1572
  @rtype: None
1573

1574
  """
1575
  real_disk = _RecursiveFindBD(disk)
1576
  if real_disk is None:
1577
    _Fail("Block device '%s' is not set up", disk)
1578

    
1579
  real_disk.Open()
1580

    
1581
  # the block size on the read dd is 1MiB to match our units
1582
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1583
                               "dd if=%s bs=1048576 count=%s",
1584
                               real_disk.dev_path, str(disk.size))
1585

    
1586
  # we set here a smaller block size as, due to ssh buffering, more
1587
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1588
  # device is not already there or we pass a wrong path; we use
1589
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1590
  # to not buffer too much memory; this means that at best, we flush
1591
  # every 64k, which will not be very fast
1592
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1593
                                " oflag=dsync", dest_path)
1594

    
1595
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1596
                                                   constants.GANETI_RUNAS,
1597
                                                   destcmd)
1598

    
1599
  # all commands have been checked, so we're safe to combine them
1600
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1601

    
1602
  result = utils.RunCmd(["bash", "-c", command])
1603

    
1604
  if result.failed:
1605
    _Fail("Disk copy command '%s' returned error: %s"
1606
          " output: %s", command, result.fail_reason, result.output)
1607

    
1608

    
1609
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1610
  """Write a file to the filesystem.
1611

1612
  This allows the master to overwrite(!) a file. It will only perform
1613
  the operation if the file belongs to a list of configuration files.
1614

1615
  @type file_name: str
1616
  @param file_name: the target file name
1617
  @type data: str
1618
  @param data: the new contents of the file
1619
  @type mode: int
1620
  @param mode: the mode to give the file (can be None)
1621
  @type uid: int
1622
  @param uid: the owner of the file (can be -1 for default)
1623
  @type gid: int
1624
  @param gid: the group of the file (can be -1 for default)
1625
  @type atime: float
1626
  @param atime: the atime to set on the file (can be None)
1627
  @type mtime: float
1628
  @param mtime: the mtime to set on the file (can be None)
1629
  @rtype: None
1630

1631
  """
1632
  if not os.path.isabs(file_name):
1633
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1634

    
1635
  if file_name not in _ALLOWED_UPLOAD_FILES:
1636
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1637
          file_name)
1638

    
1639
  raw_data = _Decompress(data)
1640

    
1641
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1642
                  atime=atime, mtime=mtime)
1643

    
1644

    
1645
def WriteSsconfFiles(values):
1646
  """Update all ssconf files.
1647

1648
  Wrapper around the SimpleStore.WriteFiles.
1649

1650
  """
1651
  ssconf.SimpleStore().WriteFiles(values)
1652

    
1653

    
1654
def _ErrnoOrStr(err):
1655
  """Format an EnvironmentError exception.
1656

1657
  If the L{err} argument has an errno attribute, it will be looked up
1658
  and converted into a textual C{E...} description. Otherwise the
1659
  string representation of the error will be returned.
1660

1661
  @type err: L{EnvironmentError}
1662
  @param err: the exception to format
1663

1664
  """
1665
  if hasattr(err, 'errno'):
1666
    detail = errno.errorcode[err.errno]
1667
  else:
1668
    detail = str(err)
1669
  return detail
1670

    
1671

    
1672
def _OSOndiskAPIVersion(os_dir):
1673
  """Compute and return the API version of a given OS.
1674

1675
  This function will try to read the API version of the OS residing in
1676
  the 'os_dir' directory.
1677

1678
  @type os_dir: str
1679
  @param os_dir: the directory in which we should look for the OS
1680
  @rtype: tuple
1681
  @return: tuple (status, data) with status denoting the validity and
1682
      data holding either the vaid versions or an error message
1683

1684
  """
1685
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1686

    
1687
  try:
1688
    st = os.stat(api_file)
1689
  except EnvironmentError, err:
1690
    return False, ("Required file '%s' not found under path %s: %s" %
1691
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1692

    
1693
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1694
    return False, ("File '%s' in %s is not a regular file" %
1695
                   (constants.OS_API_FILE, os_dir))
1696

    
1697
  try:
1698
    api_versions = utils.ReadFile(api_file).splitlines()
1699
  except EnvironmentError, err:
1700
    return False, ("Error while reading the API version file at %s: %s" %
1701
                   (api_file, _ErrnoOrStr(err)))
1702

    
1703
  try:
1704
    api_versions = [int(version.strip()) for version in api_versions]
1705
  except (TypeError, ValueError), err:
1706
    return False, ("API version(s) can't be converted to integer: %s" %
1707
                   str(err))
1708

    
1709
  return True, api_versions
1710

    
1711

    
1712
def DiagnoseOS(top_dirs=None):
1713
  """Compute the validity for all OSes.
1714

1715
  @type top_dirs: list
1716
  @param top_dirs: the list of directories in which to
1717
      search (if not given defaults to
1718
      L{constants.OS_SEARCH_PATH})
1719
  @rtype: list of L{objects.OS}
1720
  @return: a list of tuples (name, path, status, diagnose, variants)
1721
      for all (potential) OSes under all search paths, where:
1722
          - name is the (potential) OS name
1723
          - path is the full path to the OS
1724
          - status True/False is the validity of the OS
1725
          - diagnose is the error message for an invalid OS, otherwise empty
1726
          - variants is a list of supported OS variants, if any
1727

1728
  """
1729
  if top_dirs is None:
1730
    top_dirs = constants.OS_SEARCH_PATH
1731

    
1732
  result = []
1733
  for dir_name in top_dirs:
1734
    if os.path.isdir(dir_name):
1735
      try:
1736
        f_names = utils.ListVisibleFiles(dir_name)
1737
      except EnvironmentError, err:
1738
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1739
        break
1740
      for name in f_names:
1741
        os_path = utils.PathJoin(dir_name, name)
1742
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1743
        if status:
1744
          diagnose = ""
1745
          variants = os_inst.supported_variants
1746
        else:
1747
          diagnose = os_inst
1748
          variants = []
1749
        result.append((name, os_path, status, diagnose, variants))
1750

    
1751
  return result
1752

    
1753

    
1754
def _TryOSFromDisk(name, base_dir=None):
1755
  """Create an OS instance from disk.
1756

1757
  This function will return an OS instance if the given name is a
1758
  valid OS name.
1759

1760
  @type base_dir: string
1761
  @keyword base_dir: Base directory containing OS installations.
1762
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1763
  @rtype: tuple
1764
  @return: success and either the OS instance if we find a valid one,
1765
      or error message
1766

1767
  """
1768
  if base_dir is None:
1769
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1770
  else:
1771
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1772

    
1773
  if os_dir is None:
1774
    return False, "Directory for OS %s not found in search path" % name
1775

    
1776
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1777
  if not status:
1778
    # push the error up
1779
    return status, api_versions
1780

    
1781
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1782
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1783
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1784

    
1785
  # OS Files dictionary, we will populate it with the absolute path names
1786
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1787

    
1788
  if max(api_versions) >= constants.OS_API_V15:
1789
    os_files[constants.OS_VARIANTS_FILE] = ''
1790

    
1791
  for filename in os_files:
1792
    os_files[filename] = utils.PathJoin(os_dir, filename)
1793

    
1794
    try:
1795
      st = os.stat(os_files[filename])
1796
    except EnvironmentError, err:
1797
      return False, ("File '%s' under path '%s' is missing (%s)" %
1798
                     (filename, os_dir, _ErrnoOrStr(err)))
1799

    
1800
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1801
      return False, ("File '%s' under path '%s' is not a regular file" %
1802
                     (filename, os_dir))
1803

    
1804
    if filename in constants.OS_SCRIPTS:
1805
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1806
        return False, ("File '%s' under path '%s' is not executable" %
1807
                       (filename, os_dir))
1808

    
1809
  variants = None
1810
  if constants.OS_VARIANTS_FILE in os_files:
1811
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1812
    try:
1813
      variants = utils.ReadFile(variants_file).splitlines()
1814
    except EnvironmentError, err:
1815
      return False, ("Error while reading the OS variants file at %s: %s" %
1816
                     (variants_file, _ErrnoOrStr(err)))
1817
    if not variants:
1818
      return False, ("No supported os variant found")
1819

    
1820
  os_obj = objects.OS(name=name, path=os_dir,
1821
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1822
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1823
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1824
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1825
                      supported_variants=variants,
1826
                      api_versions=api_versions)
1827
  return True, os_obj
1828

    
1829

    
1830
def OSFromDisk(name, base_dir=None):
1831
  """Create an OS instance from disk.
1832

1833
  This function will return an OS instance if the given name is a
1834
  valid OS name. Otherwise, it will raise an appropriate
1835
  L{RPCFail} exception, detailing why this is not a valid OS.
1836

1837
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1838
  an exception but returns true/false status data.
1839

1840
  @type base_dir: string
1841
  @keyword base_dir: Base directory containing OS installations.
1842
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1843
  @rtype: L{objects.OS}
1844
  @return: the OS instance if we find a valid one
1845
  @raise RPCFail: if we don't find a valid OS
1846

1847
  """
1848
  name_only = name.split("+", 1)[0]
1849
  status, payload = _TryOSFromDisk(name_only, base_dir)
1850

    
1851
  if not status:
1852
    _Fail(payload)
1853

    
1854
  return payload
1855

    
1856

    
1857
def OSEnvironment(instance, inst_os, debug=0):
1858
  """Calculate the environment for an os script.
1859

1860
  @type instance: L{objects.Instance}
1861
  @param instance: target instance for the os script run
1862
  @type inst_os: L{objects.OS}
1863
  @param inst_os: operating system for which the environment is being built
1864
  @type debug: integer
1865
  @param debug: debug level (0 or 1, for OS Api 10)
1866
  @rtype: dict
1867
  @return: dict of environment variables
1868
  @raise errors.BlockDeviceError: if the block device
1869
      cannot be found
1870

1871
  """
1872
  result = {}
1873
  api_version = \
1874
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1875
  result['OS_API_VERSION'] = '%d' % api_version
1876
  result['INSTANCE_NAME'] = instance.name
1877
  result['INSTANCE_OS'] = instance.os
1878
  result['HYPERVISOR'] = instance.hypervisor
1879
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1880
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1881
  result['DEBUG_LEVEL'] = '%d' % debug
1882
  if api_version >= constants.OS_API_V15:
1883
    try:
1884
      variant = instance.os.split('+', 1)[1]
1885
    except IndexError:
1886
      variant = inst_os.supported_variants[0]
1887
    result['OS_VARIANT'] = variant
1888
  for idx, disk in enumerate(instance.disks):
1889
    real_disk = _RecursiveFindBD(disk)
1890
    if real_disk is None:
1891
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1892
                                    str(disk))
1893
    real_disk.Open()
1894
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1895
    result['DISK_%d_ACCESS' % idx] = disk.mode
1896
    if constants.HV_DISK_TYPE in instance.hvparams:
1897
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1898
        instance.hvparams[constants.HV_DISK_TYPE]
1899
    if disk.dev_type in constants.LDS_BLOCK:
1900
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1901
    elif disk.dev_type == constants.LD_FILE:
1902
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1903
        'file:%s' % disk.physical_id[0]
1904
  for idx, nic in enumerate(instance.nics):
1905
    result['NIC_%d_MAC' % idx] = nic.mac
1906
    if nic.ip:
1907
      result['NIC_%d_IP' % idx] = nic.ip
1908
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1909
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1910
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1911
    if nic.nicparams[constants.NIC_LINK]:
1912
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1913
    if constants.HV_NIC_TYPE in instance.hvparams:
1914
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1915
        instance.hvparams[constants.HV_NIC_TYPE]
1916

    
1917
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1918
    for key, value in source.items():
1919
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1920

    
1921
  return result
1922

    
1923
def BlockdevGrow(disk, amount):
1924
  """Grow a stack of block devices.
1925

1926
  This function is called recursively, with the childrens being the
1927
  first ones to resize.
1928

1929
  @type disk: L{objects.Disk}
1930
  @param disk: the disk to be grown
1931
  @rtype: (status, result)
1932
  @return: a tuple with the status of the operation
1933
      (True/False), and the errors message if status
1934
      is False
1935

1936
  """
1937
  r_dev = _RecursiveFindBD(disk)
1938
  if r_dev is None:
1939
    _Fail("Cannot find block device %s", disk)
1940

    
1941
  try:
1942
    r_dev.Grow(amount)
1943
  except errors.BlockDeviceError, err:
1944
    _Fail("Failed to grow block device: %s", err, exc=True)
1945

    
1946

    
1947
def BlockdevSnapshot(disk):
1948
  """Create a snapshot copy of a block device.
1949

1950
  This function is called recursively, and the snapshot is actually created
1951
  just for the leaf lvm backend device.
1952

1953
  @type disk: L{objects.Disk}
1954
  @param disk: the disk to be snapshotted
1955
  @rtype: string
1956
  @return: snapshot disk path
1957

1958
  """
1959
  if disk.dev_type == constants.LD_DRBD8:
1960
    if not disk.children:
1961
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
1962
            disk.unique_id)
1963
    return BlockdevSnapshot(disk.children[0])
1964
  elif disk.dev_type == constants.LD_LV:
1965
    r_dev = _RecursiveFindBD(disk)
1966
    if r_dev is not None:
1967
      # FIXME: choose a saner value for the snapshot size
1968
      # let's stay on the safe side and ask for the full size, for now
1969
      return r_dev.Snapshot(disk.size)
1970
    else:
1971
      _Fail("Cannot find block device %s", disk)
1972
  else:
1973
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1974
          disk.unique_id, disk.dev_type)
1975

    
1976

    
1977
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
1978
  """Export a block device snapshot to a remote node.
1979

1980
  @type disk: L{objects.Disk}
1981
  @param disk: the description of the disk to export
1982
  @type dest_node: str
1983
  @param dest_node: the destination node to export to
1984
  @type instance: L{objects.Instance}
1985
  @param instance: the instance object to whom the disk belongs
1986
  @type cluster_name: str
1987
  @param cluster_name: the cluster name, needed for SSH hostalias
1988
  @type idx: int
1989
  @param idx: the index of the disk in the instance's disk list,
1990
      used to export to the OS scripts environment
1991
  @type debug: integer
1992
  @param debug: debug level, passed to the OS scripts
1993
  @rtype: None
1994

1995
  """
1996
  inst_os = OSFromDisk(instance.os)
1997
  export_env = OSEnvironment(instance, inst_os, debug)
1998

    
1999
  export_script = inst_os.export_script
2000

    
2001
  logfile = _InstanceLogName("export", inst_os.name, instance.name)
2002
  if not os.path.exists(constants.LOG_OS_DIR):
2003
    os.mkdir(constants.LOG_OS_DIR, 0750)
2004
  real_disk = _RecursiveFindBD(disk)
2005
  if real_disk is None:
2006
    _Fail("Block device '%s' is not set up", disk)
2007

    
2008
  real_disk.Open()
2009

    
2010
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
2011
  export_env['EXPORT_INDEX'] = str(idx)
2012

    
2013
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2014
  destfile = disk.physical_id[1]
2015

    
2016
  # the target command is built out of three individual commands,
2017
  # which are joined by pipes; we check each individual command for
2018
  # valid parameters
2019
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2020
                               inst_os.path, export_script, logfile)
2021

    
2022
  comprcmd = "gzip"
2023

    
2024
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
2025
                                destdir, destdir, destfile)
2026
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2027
                                                   constants.GANETI_RUNAS,
2028
                                                   destcmd)
2029

    
2030
  # all commands have been checked, so we're safe to combine them
2031
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2032

    
2033
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
2034

    
2035
  if result.failed:
2036
    _Fail("OS snapshot export command '%s' returned error: %s"
2037
          " output: %s", command, result.fail_reason, result.output)
2038

    
2039

    
2040
def FinalizeExport(instance, snap_disks):
2041
  """Write out the export configuration information.
2042

2043
  @type instance: L{objects.Instance}
2044
  @param instance: the instance which we export, used for
2045
      saving configuration
2046
  @type snap_disks: list of L{objects.Disk}
2047
  @param snap_disks: list of snapshot block devices, which
2048
      will be used to get the actual name of the dump file
2049

2050
  @rtype: None
2051

2052
  """
2053
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2054
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2055

    
2056
  config = objects.SerializableConfigParser()
2057

    
2058
  config.add_section(constants.INISECT_EXP)
2059
  config.set(constants.INISECT_EXP, 'version', '0')
2060
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2061
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2062
  config.set(constants.INISECT_EXP, 'os', instance.os)
2063
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2064

    
2065
  config.add_section(constants.INISECT_INS)
2066
  config.set(constants.INISECT_INS, 'name', instance.name)
2067
  config.set(constants.INISECT_INS, 'memory', '%d' %
2068
             instance.beparams[constants.BE_MEMORY])
2069
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2070
             instance.beparams[constants.BE_VCPUS])
2071
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2072

    
2073
  nic_total = 0
2074
  for nic_count, nic in enumerate(instance.nics):
2075
    nic_total += 1
2076
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2077
               nic_count, '%s' % nic.mac)
2078
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2079
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2080
               '%s' % nic.bridge)
2081
  # TODO: redundant: on load can read nics until it doesn't exist
2082
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2083

    
2084
  disk_total = 0
2085
  for disk_count, disk in enumerate(snap_disks):
2086
    if disk:
2087
      disk_total += 1
2088
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2089
                 ('%s' % disk.iv_name))
2090
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2091
                 ('%s' % disk.physical_id[1]))
2092
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2093
                 ('%d' % disk.size))
2094

    
2095
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2096

    
2097
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2098
                  data=config.Dumps())
2099
  shutil.rmtree(finaldestdir, True)
2100
  shutil.move(destdir, finaldestdir)
2101

    
2102

    
2103
def ExportInfo(dest):
2104
  """Get export configuration information.
2105

2106
  @type dest: str
2107
  @param dest: directory containing the export
2108

2109
  @rtype: L{objects.SerializableConfigParser}
2110
  @return: a serializable config file containing the
2111
      export info
2112

2113
  """
2114
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2115

    
2116
  config = objects.SerializableConfigParser()
2117
  config.read(cff)
2118

    
2119
  if (not config.has_section(constants.INISECT_EXP) or
2120
      not config.has_section(constants.INISECT_INS)):
2121
    _Fail("Export info file doesn't have the required fields")
2122

    
2123
  return config.Dumps()
2124

    
2125

    
2126
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2127
  """Import an os image into an instance.
2128

2129
  @type instance: L{objects.Instance}
2130
  @param instance: instance to import the disks into
2131
  @type src_node: string
2132
  @param src_node: source node for the disk images
2133
  @type src_images: list of string
2134
  @param src_images: absolute paths of the disk images
2135
  @type debug: integer
2136
  @param debug: debug level, passed to the OS scripts
2137
  @rtype: list of boolean
2138
  @return: each boolean represent the success of importing the n-th disk
2139

2140
  """
2141
  inst_os = OSFromDisk(instance.os)
2142
  import_env = OSEnvironment(instance, inst_os, debug)
2143
  import_script = inst_os.import_script
2144

    
2145
  logfile = _InstanceLogName("import", instance.os, instance.name)
2146
  if not os.path.exists(constants.LOG_OS_DIR):
2147
    os.mkdir(constants.LOG_OS_DIR, 0750)
2148

    
2149
  comprcmd = "gunzip"
2150
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2151
                               import_script, logfile)
2152

    
2153
  final_result = []
2154
  for idx, image in enumerate(src_images):
2155
    if image:
2156
      destcmd = utils.BuildShellCmd('cat %s', image)
2157
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2158
                                                       constants.GANETI_RUNAS,
2159
                                                       destcmd)
2160
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2161
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2162
      import_env['IMPORT_INDEX'] = str(idx)
2163
      result = utils.RunCmd(command, env=import_env)
2164
      if result.failed:
2165
        logging.error("Disk import command '%s' returned error: %s"
2166
                      " output: %s", command, result.fail_reason,
2167
                      result.output)
2168
        final_result.append("error importing disk %d: %s, %s" %
2169
                            (idx, result.fail_reason, result.output[-100]))
2170

    
2171
  if final_result:
2172
    _Fail("; ".join(final_result), log=False)
2173

    
2174

    
2175
def ListExports():
2176
  """Return a list of exports currently available on this machine.
2177

2178
  @rtype: list
2179
  @return: list of the exports
2180

2181
  """
2182
  if os.path.isdir(constants.EXPORT_DIR):
2183
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2184
  else:
2185
    _Fail("No exports directory")
2186

    
2187

    
2188
def RemoveExport(export):
2189
  """Remove an existing export from the node.
2190

2191
  @type export: str
2192
  @param export: the name of the export to remove
2193
  @rtype: None
2194

2195
  """
2196
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2197

    
2198
  try:
2199
    shutil.rmtree(target)
2200
  except EnvironmentError, err:
2201
    _Fail("Error while removing the export: %s", err, exc=True)
2202

    
2203

    
2204
def BlockdevRename(devlist):
2205
  """Rename a list of block devices.
2206

2207
  @type devlist: list of tuples
2208
  @param devlist: list of tuples of the form  (disk,
2209
      new_logical_id, new_physical_id); disk is an
2210
      L{objects.Disk} object describing the current disk,
2211
      and new logical_id/physical_id is the name we
2212
      rename it to
2213
  @rtype: boolean
2214
  @return: True if all renames succeeded, False otherwise
2215

2216
  """
2217
  msgs = []
2218
  result = True
2219
  for disk, unique_id in devlist:
2220
    dev = _RecursiveFindBD(disk)
2221
    if dev is None:
2222
      msgs.append("Can't find device %s in rename" % str(disk))
2223
      result = False
2224
      continue
2225
    try:
2226
      old_rpath = dev.dev_path
2227
      dev.Rename(unique_id)
2228
      new_rpath = dev.dev_path
2229
      if old_rpath != new_rpath:
2230
        DevCacheManager.RemoveCache(old_rpath)
2231
        # FIXME: we should add the new cache information here, like:
2232
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2233
        # but we don't have the owner here - maybe parse from existing
2234
        # cache? for now, we only lose lvm data when we rename, which
2235
        # is less critical than DRBD or MD
2236
    except errors.BlockDeviceError, err:
2237
      msgs.append("Can't rename device '%s' to '%s': %s" %
2238
                  (dev, unique_id, err))
2239
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2240
      result = False
2241
  if not result:
2242
    _Fail("; ".join(msgs))
2243

    
2244

    
2245
def _TransformFileStorageDir(file_storage_dir):
2246
  """Checks whether given file_storage_dir is valid.
2247

2248
  Checks wheter the given file_storage_dir is within the cluster-wide
2249
  default file_storage_dir stored in SimpleStore. Only paths under that
2250
  directory are allowed.
2251

2252
  @type file_storage_dir: str
2253
  @param file_storage_dir: the path to check
2254

2255
  @return: the normalized path if valid, None otherwise
2256

2257
  """
2258
  cfg = _GetConfig()
2259
  file_storage_dir = os.path.normpath(file_storage_dir)
2260
  base_file_storage_dir = cfg.GetFileStorageDir()
2261
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2262
      base_file_storage_dir):
2263
    _Fail("File storage directory '%s' is not under base file"
2264
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2265
  return file_storage_dir
2266

    
2267

    
2268
def CreateFileStorageDir(file_storage_dir):
2269
  """Create file storage directory.
2270

2271
  @type file_storage_dir: str
2272
  @param file_storage_dir: directory to create
2273

2274
  @rtype: tuple
2275
  @return: tuple with first element a boolean indicating wheter dir
2276
      creation was successful or not
2277

2278
  """
2279
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2280
  if os.path.exists(file_storage_dir):
2281
    if not os.path.isdir(file_storage_dir):
2282
      _Fail("Specified storage dir '%s' is not a directory",
2283
            file_storage_dir)
2284
  else:
2285
    try:
2286
      os.makedirs(file_storage_dir, 0750)
2287
    except OSError, err:
2288
      _Fail("Cannot create file storage directory '%s': %s",
2289
            file_storage_dir, err, exc=True)
2290

    
2291

    
2292
def RemoveFileStorageDir(file_storage_dir):
2293
  """Remove file storage directory.
2294

2295
  Remove it only if it's empty. If not log an error and return.
2296

2297
  @type file_storage_dir: str
2298
  @param file_storage_dir: the directory we should cleanup
2299
  @rtype: tuple (success,)
2300
  @return: tuple of one element, C{success}, denoting
2301
      whether the operation was successful
2302

2303
  """
2304
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2305
  if os.path.exists(file_storage_dir):
2306
    if not os.path.isdir(file_storage_dir):
2307
      _Fail("Specified Storage directory '%s' is not a directory",
2308
            file_storage_dir)
2309
    # deletes dir only if empty, otherwise we want to fail the rpc call
2310
    try:
2311
      os.rmdir(file_storage_dir)
2312
    except OSError, err:
2313
      _Fail("Cannot remove file storage directory '%s': %s",
2314
            file_storage_dir, err)
2315

    
2316

    
2317
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2318
  """Rename the file storage directory.
2319

2320
  @type old_file_storage_dir: str
2321
  @param old_file_storage_dir: the current path
2322
  @type new_file_storage_dir: str
2323
  @param new_file_storage_dir: the name we should rename to
2324
  @rtype: tuple (success,)
2325
  @return: tuple of one element, C{success}, denoting
2326
      whether the operation was successful
2327

2328
  """
2329
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2330
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2331
  if not os.path.exists(new_file_storage_dir):
2332
    if os.path.isdir(old_file_storage_dir):
2333
      try:
2334
        os.rename(old_file_storage_dir, new_file_storage_dir)
2335
      except OSError, err:
2336
        _Fail("Cannot rename '%s' to '%s': %s",
2337
              old_file_storage_dir, new_file_storage_dir, err)
2338
    else:
2339
      _Fail("Specified storage dir '%s' is not a directory",
2340
            old_file_storage_dir)
2341
  else:
2342
    if os.path.exists(old_file_storage_dir):
2343
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2344
            old_file_storage_dir, new_file_storage_dir)
2345

    
2346

    
2347
def _EnsureJobQueueFile(file_name):
2348
  """Checks whether the given filename is in the queue directory.
2349

2350
  @type file_name: str
2351
  @param file_name: the file name we should check
2352
  @rtype: None
2353
  @raises RPCFail: if the file is not valid
2354

2355
  """
2356
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2357
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2358

    
2359
  if not result:
2360
    _Fail("Passed job queue file '%s' does not belong to"
2361
          " the queue directory '%s'", file_name, queue_dir)
2362

    
2363

    
2364
def JobQueueUpdate(file_name, content):
2365
  """Updates a file in the queue directory.
2366

2367
  This is just a wrapper over L{utils.WriteFile}, with proper
2368
  checking.
2369

2370
  @type file_name: str
2371
  @param file_name: the job file name
2372
  @type content: str
2373
  @param content: the new job contents
2374
  @rtype: boolean
2375
  @return: the success of the operation
2376

2377
  """
2378
  _EnsureJobQueueFile(file_name)
2379

    
2380
  # Write and replace the file atomically
2381
  utils.WriteFile(file_name, data=_Decompress(content))
2382

    
2383

    
2384
def JobQueueRename(old, new):
2385
  """Renames a job queue file.
2386

2387
  This is just a wrapper over os.rename with proper checking.
2388

2389
  @type old: str
2390
  @param old: the old (actual) file name
2391
  @type new: str
2392
  @param new: the desired file name
2393
  @rtype: tuple
2394
  @return: the success of the operation and payload
2395

2396
  """
2397
  _EnsureJobQueueFile(old)
2398
  _EnsureJobQueueFile(new)
2399

    
2400
  utils.RenameFile(old, new, mkdir=True)
2401

    
2402

    
2403
def JobQueueSetDrainFlag(drain_flag):
2404
  """Set the drain flag for the queue.
2405

2406
  This will set or unset the queue drain flag.
2407

2408
  @type drain_flag: boolean
2409
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2410
  @rtype: truple
2411
  @return: always True, None
2412
  @warning: the function always returns True
2413

2414
  """
2415
  if drain_flag:
2416
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2417
  else:
2418
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2419

    
2420

    
2421
def BlockdevClose(instance_name, disks):
2422
  """Closes the given block devices.
2423

2424
  This means they will be switched to secondary mode (in case of
2425
  DRBD).
2426

2427
  @param instance_name: if the argument is not empty, the symlinks
2428
      of this instance will be removed
2429
  @type disks: list of L{objects.Disk}
2430
  @param disks: the list of disks to be closed
2431
  @rtype: tuple (success, message)
2432
  @return: a tuple of success and message, where success
2433
      indicates the succes of the operation, and message
2434
      which will contain the error details in case we
2435
      failed
2436

2437
  """
2438
  bdevs = []
2439
  for cf in disks:
2440
    rd = _RecursiveFindBD(cf)
2441
    if rd is None:
2442
      _Fail("Can't find device %s", cf)
2443
    bdevs.append(rd)
2444

    
2445
  msg = []
2446
  for rd in bdevs:
2447
    try:
2448
      rd.Close()
2449
    except errors.BlockDeviceError, err:
2450
      msg.append(str(err))
2451
  if msg:
2452
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2453
  else:
2454
    if instance_name:
2455
      _RemoveBlockDevLinks(instance_name, disks)
2456

    
2457

    
2458
def ValidateHVParams(hvname, hvparams):
2459
  """Validates the given hypervisor parameters.
2460

2461
  @type hvname: string
2462
  @param hvname: the hypervisor name
2463
  @type hvparams: dict
2464
  @param hvparams: the hypervisor parameters to be validated
2465
  @rtype: None
2466

2467
  """
2468
  try:
2469
    hv_type = hypervisor.GetHypervisor(hvname)
2470
    hv_type.ValidateParameters(hvparams)
2471
  except errors.HypervisorError, err:
2472
    _Fail(str(err), log=False)
2473

    
2474

    
2475
def DemoteFromMC():
2476
  """Demotes the current node from master candidate role.
2477

2478
  """
2479
  # try to ensure we're not the master by mistake
2480
  master, myself = ssconf.GetMasterAndMyself()
2481
  if master == myself:
2482
    _Fail("ssconf status shows I'm the master node, will not demote")
2483

    
2484
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2485
  if not result.failed:
2486
    _Fail("The master daemon is running, will not demote")
2487

    
2488
  try:
2489
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2490
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2491
  except EnvironmentError, err:
2492
    if err.errno != errno.ENOENT:
2493
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2494

    
2495
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2496

    
2497

    
2498
def _FindDisks(nodes_ip, disks):
2499
  """Sets the physical ID on disks and returns the block devices.
2500

2501
  """
2502
  # set the correct physical ID
2503
  my_name = utils.HostInfo().name
2504
  for cf in disks:
2505
    cf.SetPhysicalID(my_name, nodes_ip)
2506

    
2507
  bdevs = []
2508

    
2509
  for cf in disks:
2510
    rd = _RecursiveFindBD(cf)
2511
    if rd is None:
2512
      _Fail("Can't find device %s", cf)
2513
    bdevs.append(rd)
2514
  return bdevs
2515

    
2516

    
2517
def DrbdDisconnectNet(nodes_ip, disks):
2518
  """Disconnects the network on a list of drbd devices.
2519

2520
  """
2521
  bdevs = _FindDisks(nodes_ip, disks)
2522

    
2523
  # disconnect disks
2524
  for rd in bdevs:
2525
    try:
2526
      rd.DisconnectNet()
2527
    except errors.BlockDeviceError, err:
2528
      _Fail("Can't change network configuration to standalone mode: %s",
2529
            err, exc=True)
2530

    
2531

    
2532
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2533
  """Attaches the network on a list of drbd devices.
2534

2535
  """
2536
  bdevs = _FindDisks(nodes_ip, disks)
2537

    
2538
  if multimaster:
2539
    for idx, rd in enumerate(bdevs):
2540
      try:
2541
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2542
      except EnvironmentError, err:
2543
        _Fail("Can't create symlink: %s", err)
2544
  # reconnect disks, switch to new master configuration and if
2545
  # needed primary mode
2546
  for rd in bdevs:
2547
    try:
2548
      rd.AttachNet(multimaster)
2549
    except errors.BlockDeviceError, err:
2550
      _Fail("Can't change network configuration: %s", err)
2551

    
2552
  # wait until the disks are connected; we need to retry the re-attach
2553
  # if the device becomes standalone, as this might happen if the one
2554
  # node disconnects and reconnects in a different mode before the
2555
  # other node reconnects; in this case, one or both of the nodes will
2556
  # decide it has wrong configuration and switch to standalone
2557

    
2558
  def _Attach():
2559
    all_connected = True
2560

    
2561
    for rd in bdevs:
2562
      stats = rd.GetProcStatus()
2563

    
2564
      all_connected = (all_connected and
2565
                       (stats.is_connected or stats.is_in_resync))
2566

    
2567
      if stats.is_standalone:
2568
        # peer had different config info and this node became
2569
        # standalone, even though this should not happen with the
2570
        # new staged way of changing disk configs
2571
        try:
2572
          rd.AttachNet(multimaster)
2573
        except errors.BlockDeviceError, err:
2574
          _Fail("Can't change network configuration: %s", err)
2575

    
2576
    if not all_connected:
2577
      raise utils.RetryAgain()
2578

    
2579
  try:
2580
    # Start with a delay of 100 miliseconds and go up to 5 seconds
2581
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2582
  except utils.RetryTimeout:
2583
    _Fail("Timeout in disk reconnecting")
2584

    
2585
  if multimaster:
2586
    # change to primary mode
2587
    for rd in bdevs:
2588
      try:
2589
        rd.Open()
2590
      except errors.BlockDeviceError, err:
2591
        _Fail("Can't change to primary mode: %s", err)
2592

    
2593

    
2594
def DrbdWaitSync(nodes_ip, disks):
2595
  """Wait until DRBDs have synchronized.
2596

2597
  """
2598
  def _helper(rd):
2599
    stats = rd.GetProcStatus()
2600
    if not (stats.is_connected or stats.is_in_resync):
2601
      raise utils.RetryAgain()
2602
    return stats
2603

    
2604
  bdevs = _FindDisks(nodes_ip, disks)
2605

    
2606
  min_resync = 100
2607
  alldone = True
2608
  for rd in bdevs:
2609
    try:
2610
      # poll each second for 15 seconds
2611
      stats = utils.Retry(_helper, 1, 15, args=[rd])
2612
    except utils.RetryTimeout:
2613
      stats = rd.GetProcStatus()
2614
      # last check
2615
      if not (stats.is_connected or stats.is_in_resync):
2616
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2617
    alldone = alldone and (not stats.is_in_resync)
2618
    if stats.sync_percent is not None:
2619
      min_resync = min(min_resync, stats.sync_percent)
2620

    
2621
  return (alldone, min_resync)
2622

    
2623

    
2624
def PowercycleNode(hypervisor_type):
2625
  """Hard-powercycle the node.
2626

2627
  Because we need to return first, and schedule the powercycle in the
2628
  background, we won't be able to report failures nicely.
2629

2630
  """
2631
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2632
  try:
2633
    pid = os.fork()
2634
  except OSError:
2635
    # if we can't fork, we'll pretend that we're in the child process
2636
    pid = 0
2637
  if pid > 0:
2638
    return "Reboot scheduled in 5 seconds"
2639
  time.sleep(5)
2640
  hyper.PowercycleNode()
2641

    
2642

    
2643
class HooksRunner(object):
2644
  """Hook runner.
2645

2646
  This class is instantiated on the node side (ganeti-noded) and not
2647
  on the master side.
2648

2649
  """
2650
  def __init__(self, hooks_base_dir=None):
2651
    """Constructor for hooks runner.
2652

2653
    @type hooks_base_dir: str or None
2654
    @param hooks_base_dir: if not None, this overrides the
2655
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2656

2657
    """
2658
    if hooks_base_dir is None:
2659
      hooks_base_dir = constants.HOOKS_BASE_DIR
2660
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
2661
    # constant
2662
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2663

    
2664
  def RunHooks(self, hpath, phase, env):
2665
    """Run the scripts in the hooks directory.
2666

2667
    @type hpath: str
2668
    @param hpath: the path to the hooks directory which
2669
        holds the scripts
2670
    @type phase: str
2671
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2672
        L{constants.HOOKS_PHASE_POST}
2673
    @type env: dict
2674
    @param env: dictionary with the environment for the hook
2675
    @rtype: list
2676
    @return: list of 3-element tuples:
2677
      - script path
2678
      - script result, either L{constants.HKR_SUCCESS} or
2679
        L{constants.HKR_FAIL}
2680
      - output of the script
2681

2682
    @raise errors.ProgrammerError: for invalid input
2683
        parameters
2684

2685
    """
2686
    if phase == constants.HOOKS_PHASE_PRE:
2687
      suffix = "pre"
2688
    elif phase == constants.HOOKS_PHASE_POST:
2689
      suffix = "post"
2690
    else:
2691
      _Fail("Unknown hooks phase '%s'", phase)
2692

    
2693

    
2694
    subdir = "%s-%s.d" % (hpath, suffix)
2695
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2696

    
2697
    results = []
2698

    
2699
    if not os.path.isdir(dir_name):
2700
      # for non-existing/non-dirs, we simply exit instead of logging a
2701
      # warning at every operation
2702
      return results
2703

    
2704
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2705

    
2706
    for (relname, relstatus, runresult)  in runparts_results:
2707
      if relstatus == constants.RUNPARTS_SKIP:
2708
        rrval = constants.HKR_SKIP
2709
        output = ""
2710
      elif relstatus == constants.RUNPARTS_ERR:
2711
        rrval = constants.HKR_FAIL
2712
        output = "Hook script execution error: %s" % runresult
2713
      elif relstatus == constants.RUNPARTS_RUN:
2714
        if runresult.failed:
2715
          rrval = constants.HKR_FAIL
2716
        else:
2717
          rrval = constants.HKR_SUCCESS
2718
        output = utils.SafeEncode(runresult.output.strip())
2719
      results.append(("%s/%s" % (subdir, relname), rrval, output))
2720

    
2721
    return results
2722

    
2723

    
2724
class IAllocatorRunner(object):
2725
  """IAllocator runner.
2726

2727
  This class is instantiated on the node side (ganeti-noded) and not on
2728
  the master side.
2729

2730
  """
2731
  @staticmethod
2732
  def Run(name, idata):
2733
    """Run an iallocator script.
2734

2735
    @type name: str
2736
    @param name: the iallocator script name
2737
    @type idata: str
2738
    @param idata: the allocator input data
2739

2740
    @rtype: tuple
2741
    @return: two element tuple of:
2742
       - status
2743
       - either error message or stdout of allocator (for success)
2744

2745
    """
2746
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2747
                                  os.path.isfile)
2748
    if alloc_script is None:
2749
      _Fail("iallocator module '%s' not found in the search path", name)
2750

    
2751
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2752
    try:
2753
      os.write(fd, idata)
2754
      os.close(fd)
2755
      result = utils.RunCmd([alloc_script, fin_name])
2756
      if result.failed:
2757
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2758
              name, result.fail_reason, result.output)
2759
    finally:
2760
      os.unlink(fin_name)
2761

    
2762
    return result.stdout
2763

    
2764

    
2765
class DevCacheManager(object):
2766
  """Simple class for managing a cache of block device information.
2767

2768
  """
2769
  _DEV_PREFIX = "/dev/"
2770
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2771

    
2772
  @classmethod
2773
  def _ConvertPath(cls, dev_path):
2774
    """Converts a /dev/name path to the cache file name.
2775

2776
    This replaces slashes with underscores and strips the /dev
2777
    prefix. It then returns the full path to the cache file.
2778

2779
    @type dev_path: str
2780
    @param dev_path: the C{/dev/} path name
2781
    @rtype: str
2782
    @return: the converted path name
2783

2784
    """
2785
    if dev_path.startswith(cls._DEV_PREFIX):
2786
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2787
    dev_path = dev_path.replace("/", "_")
2788
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2789
    return fpath
2790

    
2791
  @classmethod
2792
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2793
    """Updates the cache information for a given device.
2794

2795
    @type dev_path: str
2796
    @param dev_path: the pathname of the device
2797
    @type owner: str
2798
    @param owner: the owner (instance name) of the device
2799
    @type on_primary: bool
2800
    @param on_primary: whether this is the primary
2801
        node nor not
2802
    @type iv_name: str
2803
    @param iv_name: the instance-visible name of the
2804
        device, as in objects.Disk.iv_name
2805

2806
    @rtype: None
2807

2808
    """
2809
    if dev_path is None:
2810
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2811
      return
2812
    fpath = cls._ConvertPath(dev_path)
2813
    if on_primary:
2814
      state = "primary"
2815
    else:
2816
      state = "secondary"
2817
    if iv_name is None:
2818
      iv_name = "not_visible"
2819
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2820
    try:
2821
      utils.WriteFile(fpath, data=fdata)
2822
    except EnvironmentError, err:
2823
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2824

    
2825
  @classmethod
2826
  def RemoveCache(cls, dev_path):
2827
    """Remove data for a dev_path.
2828

2829
    This is just a wrapper over L{utils.RemoveFile} with a converted
2830
    path name and logging.
2831

2832
    @type dev_path: str
2833
    @param dev_path: the pathname of the device
2834

2835
    @rtype: None
2836

2837
    """
2838
    if dev_path is None:
2839
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2840
      return
2841
    fpath = cls._ConvertPath(dev_path)
2842
    try:
2843
      utils.RemoveFile(fpath)
2844
    except EnvironmentError, err:
2845
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)