Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 0411c011

History | View | Annotate | Download (86.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50

    
51
from ganeti import errors
52
from ganeti import utils
53
from ganeti import ssh
54
from ganeti import hypervisor
55
from ganeti import constants
56
from ganeti import bdev
57
from ganeti import objects
58
from ganeti import ssconf
59

    
60

    
61
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62
_ALLOWED_CLEAN_DIRS = frozenset([
63
  constants.DATA_DIR,
64
  constants.JOB_QUEUE_ARCHIVE_DIR,
65
  constants.QUEUE_DIR,
66
  ])
67

    
68

    
69
class RPCFail(Exception):
70
  """Class denoting RPC failure.
71

72
  Its argument is the error message.
73

74
  """
75

    
76

    
77
def _Fail(msg, *args, **kwargs):
78
  """Log an error and the raise an RPCFail exception.
79

80
  This exception is then handled specially in the ganeti daemon and
81
  turned into a 'failed' return type. As such, this function is a
82
  useful shortcut for logging the error and returning it to the master
83
  daemon.
84

85
  @type msg: string
86
  @param msg: the text of the exception
87
  @raise RPCFail
88

89
  """
90
  if args:
91
    msg = msg % args
92
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
93
    if "exc" in kwargs and kwargs["exc"]:
94
      logging.exception(msg)
95
    else:
96
      logging.error(msg)
97
  raise RPCFail(msg)
98

    
99

    
100
def _GetConfig():
101
  """Simple wrapper to return a SimpleStore.
102

103
  @rtype: L{ssconf.SimpleStore}
104
  @return: a SimpleStore instance
105

106
  """
107
  return ssconf.SimpleStore()
108

    
109

    
110
def _GetSshRunner(cluster_name):
111
  """Simple wrapper to return an SshRunner.
112

113
  @type cluster_name: str
114
  @param cluster_name: the cluster name, which is needed
115
      by the SshRunner constructor
116
  @rtype: L{ssh.SshRunner}
117
  @return: an SshRunner instance
118

119
  """
120
  return ssh.SshRunner(cluster_name)
121

    
122

    
123
def _Decompress(data):
124
  """Unpacks data compressed by the RPC client.
125

126
  @type data: list or tuple
127
  @param data: Data sent by RPC client
128
  @rtype: str
129
  @return: Decompressed data
130

131
  """
132
  assert isinstance(data, (list, tuple))
133
  assert len(data) == 2
134
  (encoding, content) = data
135
  if encoding == constants.RPC_ENCODING_NONE:
136
    return content
137
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
138
    return zlib.decompress(base64.b64decode(content))
139
  else:
140
    raise AssertionError("Unknown data encoding")
141

    
142

    
143
def _CleanDirectory(path, exclude=None):
144
  """Removes all regular files in a directory.
145

146
  @type path: str
147
  @param path: the directory to clean
148
  @type exclude: list
149
  @param exclude: list of files to be excluded, defaults
150
      to the empty list
151

152
  """
153
  if path not in _ALLOWED_CLEAN_DIRS:
154
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
155
          path)
156

    
157
  if not os.path.isdir(path):
158
    return
159
  if exclude is None:
160
    exclude = []
161
  else:
162
    # Normalize excluded paths
163
    exclude = [os.path.normpath(i) for i in exclude]
164

    
165
  for rel_name in utils.ListVisibleFiles(path):
166
    full_name = utils.PathJoin(path, rel_name)
167
    if full_name in exclude:
168
      continue
169
    if os.path.isfile(full_name) and not os.path.islink(full_name):
170
      utils.RemoveFile(full_name)
171

    
172

    
173
def _BuildUploadFileList():
174
  """Build the list of allowed upload files.
175

176
  This is abstracted so that it's built only once at module import time.
177

178
  """
179
  allowed_files = set([
180
    constants.CLUSTER_CONF_FILE,
181
    constants.ETC_HOSTS,
182
    constants.SSH_KNOWN_HOSTS_FILE,
183
    constants.VNC_PASSWORD_FILE,
184
    constants.RAPI_CERT_FILE,
185
    constants.RAPI_USERS_FILE,
186
    constants.HMAC_CLUSTER_KEY,
187
    ])
188

    
189
  for hv_name in constants.HYPER_TYPES:
190
    hv_class = hypervisor.GetHypervisorClass(hv_name)
191
    allowed_files.update(hv_class.GetAncillaryFiles())
192

    
193
  return frozenset(allowed_files)
194

    
195

    
196
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
197

    
198

    
199
def JobQueuePurge():
200
  """Removes job queue files and archived jobs.
201

202
  @rtype: tuple
203
  @return: True, None
204

205
  """
206
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
207
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
208

    
209

    
210
def GetMasterInfo():
211
  """Returns master information.
212

213
  This is an utility function to compute master information, either
214
  for consumption here or from the node daemon.
215

216
  @rtype: tuple
217
  @return: master_netdev, master_ip, master_name
218
  @raise RPCFail: in case of errors
219

220
  """
221
  try:
222
    cfg = _GetConfig()
223
    master_netdev = cfg.GetMasterNetdev()
224
    master_ip = cfg.GetMasterIP()
225
    master_node = cfg.GetMasterNode()
226
  except errors.ConfigurationError, err:
227
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
228
  return (master_netdev, master_ip, master_node)
229

    
230

    
231
def StartMaster(start_daemons, no_voting):
232
  """Activate local node as master node.
233

234
  The function will always try activate the IP address of the master
235
  (unless someone else has it). It will also start the master daemons,
236
  based on the start_daemons parameter.
237

238
  @type start_daemons: boolean
239
  @param start_daemons: whether to also start the master
240
      daemons (ganeti-masterd and ganeti-rapi)
241
  @type no_voting: boolean
242
  @param no_voting: whether to start ganeti-masterd without a node vote
243
      (if start_daemons is True), but still non-interactively
244
  @rtype: None
245

246
  """
247
  # GetMasterInfo will raise an exception if not able to return data
248
  master_netdev, master_ip, _ = GetMasterInfo()
249

    
250
  err_msgs = []
251
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
252
    if utils.OwnIpAddress(master_ip):
253
      # we already have the ip:
254
      logging.debug("Master IP already configured, doing nothing")
255
    else:
256
      msg = "Someone else has the master ip, not activating"
257
      logging.error(msg)
258
      err_msgs.append(msg)
259
  else:
260
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
261
                           "dev", master_netdev, "label",
262
                           "%s:0" % master_netdev])
263
    if result.failed:
264
      msg = "Can't activate master IP: %s" % result.output
265
      logging.error(msg)
266
      err_msgs.append(msg)
267

    
268
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
269
                           "-s", master_ip, master_ip])
270
    # we'll ignore the exit code of arping
271

    
272
  # and now start the master and rapi daemons
273
  if start_daemons:
274
    if no_voting:
275
      masterd_args = "--no-voting --yes-do-it"
276
    else:
277
      masterd_args = ""
278

    
279
    env = {
280
      "EXTRA_MASTERD_ARGS": masterd_args,
281
      }
282

    
283
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
284
    if result.failed:
285
      msg = "Can't start Ganeti master: %s" % result.output
286
      logging.error(msg)
287
      err_msgs.append(msg)
288

    
289
  if err_msgs:
290
    _Fail("; ".join(err_msgs))
291

    
292

    
293
def StopMaster(stop_daemons):
294
  """Deactivate this node as master.
295

296
  The function will always try to deactivate the IP address of the
297
  master. It will also stop the master daemons depending on the
298
  stop_daemons parameter.
299

300
  @type stop_daemons: boolean
301
  @param stop_daemons: whether to also stop the master daemons
302
      (ganeti-masterd and ganeti-rapi)
303
  @rtype: None
304

305
  """
306
  # TODO: log and report back to the caller the error failures; we
307
  # need to decide in which case we fail the RPC for this
308

    
309
  # GetMasterInfo will raise an exception if not able to return data
310
  master_netdev, master_ip, _ = GetMasterInfo()
311

    
312
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
313
                         "dev", master_netdev])
314
  if result.failed:
315
    logging.error("Can't remove the master IP, error: %s", result.output)
316
    # but otherwise ignore the failure
317

    
318
  if stop_daemons:
319
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
320
    if result.failed:
321
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
322
                    " and error %s",
323
                    result.cmd, result.exit_code, result.output)
324

    
325

    
326
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
327
  """Joins this node to the cluster.
328

329
  This does the following:
330
      - updates the hostkeys of the machine (rsa and dsa)
331
      - adds the ssh private key to the user
332
      - adds the ssh public key to the users' authorized_keys file
333

334
  @type dsa: str
335
  @param dsa: the DSA private key to write
336
  @type dsapub: str
337
  @param dsapub: the DSA public key to write
338
  @type rsa: str
339
  @param rsa: the RSA private key to write
340
  @type rsapub: str
341
  @param rsapub: the RSA public key to write
342
  @type sshkey: str
343
  @param sshkey: the SSH private key to write
344
  @type sshpub: str
345
  @param sshpub: the SSH public key to write
346
  @rtype: boolean
347
  @return: the success of the operation
348

349
  """
350
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
351
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
352
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
353
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
354
  for name, content, mode in sshd_keys:
355
    utils.WriteFile(name, data=content, mode=mode)
356

    
357
  try:
358
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
359
                                                    mkdir=True)
360
  except errors.OpExecError, err:
361
    _Fail("Error while processing user ssh files: %s", err, exc=True)
362

    
363
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
364
    utils.WriteFile(name, data=content, mode=0600)
365

    
366
  utils.AddAuthorizedKey(auth_keys, sshpub)
367

    
368
  result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
369
  if result.failed:
370
    _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
371
          result.cmd, result.exit_code, result.output)
372

    
373

    
374
def LeaveCluster(modify_ssh_setup):
375
  """Cleans up and remove the current node.
376

377
  This function cleans up and prepares the current node to be removed
378
  from the cluster.
379

380
  If processing is successful, then it raises an
381
  L{errors.QuitGanetiException} which is used as a special case to
382
  shutdown the node daemon.
383

384
  @param modify_ssh_setup: boolean
385

386
  """
387
  _CleanDirectory(constants.DATA_DIR)
388
  JobQueuePurge()
389

    
390
  if modify_ssh_setup:
391
    try:
392
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
393

    
394
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
395

    
396
      utils.RemoveFile(priv_key)
397
      utils.RemoveFile(pub_key)
398
    except errors.OpExecError:
399
      logging.exception("Error while processing ssh files")
400

    
401
  try:
402
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
403
    utils.RemoveFile(constants.RAPI_CERT_FILE)
404
    utils.RemoveFile(constants.SSL_CERT_FILE)
405
  except: # pylint: disable-msg=W0702
406
    logging.exception("Error while removing cluster secrets")
407

    
408
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
409
  if result.failed:
410
    logging.error("Command %s failed with exitcode %s and error %s",
411
                  result.cmd, result.exit_code, result.output)
412

    
413
  # Raise a custom exception (handled in ganeti-noded)
414
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
415

    
416

    
417
def GetNodeInfo(vgname, hypervisor_type):
418
  """Gives back a hash with different information about the node.
419

420
  @type vgname: C{string}
421
  @param vgname: the name of the volume group to ask for disk space information
422
  @type hypervisor_type: C{str}
423
  @param hypervisor_type: the name of the hypervisor to ask for
424
      memory information
425
  @rtype: C{dict}
426
  @return: dictionary with the following keys:
427
      - vg_size is the size of the configured volume group in MiB
428
      - vg_free is the free size of the volume group in MiB
429
      - memory_dom0 is the memory allocated for domain0 in MiB
430
      - memory_free is the currently available (free) ram in MiB
431
      - memory_total is the total number of ram in MiB
432

433
  """
434
  outputarray = {}
435
  vginfo = _GetVGInfo(vgname)
436
  outputarray['vg_size'] = vginfo['vg_size']
437
  outputarray['vg_free'] = vginfo['vg_free']
438

    
439
  hyper = hypervisor.GetHypervisor(hypervisor_type)
440
  hyp_info = hyper.GetNodeInfo()
441
  if hyp_info is not None:
442
    outputarray.update(hyp_info)
443

    
444
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
445

    
446
  return outputarray
447

    
448

    
449
def VerifyNode(what, cluster_name):
450
  """Verify the status of the local node.
451

452
  Based on the input L{what} parameter, various checks are done on the
453
  local node.
454

455
  If the I{filelist} key is present, this list of
456
  files is checksummed and the file/checksum pairs are returned.
457

458
  If the I{nodelist} key is present, we check that we have
459
  connectivity via ssh with the target nodes (and check the hostname
460
  report).
461

462
  If the I{node-net-test} key is present, we check that we have
463
  connectivity to the given nodes via both primary IP and, if
464
  applicable, secondary IPs.
465

466
  @type what: C{dict}
467
  @param what: a dictionary of things to check:
468
      - filelist: list of files for which to compute checksums
469
      - nodelist: list of nodes we should check ssh communication with
470
      - node-net-test: list of nodes we should check node daemon port
471
        connectivity with
472
      - hypervisor: list with hypervisors to run the verify for
473
  @rtype: dict
474
  @return: a dictionary with the same keys as the input dict, and
475
      values representing the result of the checks
476

477
  """
478
  result = {}
479

    
480
  if constants.NV_HYPERVISOR in what:
481
    result[constants.NV_HYPERVISOR] = tmp = {}
482
    for hv_name in what[constants.NV_HYPERVISOR]:
483
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
484

    
485
  if constants.NV_FILELIST in what:
486
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
487
      what[constants.NV_FILELIST])
488

    
489
  if constants.NV_NODELIST in what:
490
    result[constants.NV_NODELIST] = tmp = {}
491
    random.shuffle(what[constants.NV_NODELIST])
492
    for node in what[constants.NV_NODELIST]:
493
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
494
      if not success:
495
        tmp[node] = message
496

    
497
  if constants.NV_NODENETTEST in what:
498
    result[constants.NV_NODENETTEST] = tmp = {}
499
    my_name = utils.HostInfo().name
500
    my_pip = my_sip = None
501
    for name, pip, sip in what[constants.NV_NODENETTEST]:
502
      if name == my_name:
503
        my_pip = pip
504
        my_sip = sip
505
        break
506
    if not my_pip:
507
      tmp[my_name] = ("Can't find my own primary/secondary IP"
508
                      " in the node list")
509
    else:
510
      port = utils.GetDaemonPort(constants.NODED)
511
      for name, pip, sip in what[constants.NV_NODENETTEST]:
512
        fail = []
513
        if not utils.TcpPing(pip, port, source=my_pip):
514
          fail.append("primary")
515
        if sip != pip:
516
          if not utils.TcpPing(sip, port, source=my_sip):
517
            fail.append("secondary")
518
        if fail:
519
          tmp[name] = ("failure using the %s interface(s)" %
520
                       " and ".join(fail))
521

    
522
  if constants.NV_LVLIST in what:
523
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
524

    
525
  if constants.NV_INSTANCELIST in what:
526
    result[constants.NV_INSTANCELIST] = GetInstanceList(
527
      what[constants.NV_INSTANCELIST])
528

    
529
  if constants.NV_VGLIST in what:
530
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
531

    
532
  if constants.NV_PVLIST in what:
533
    result[constants.NV_PVLIST] = \
534
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
535
                                   filter_allocatable=False)
536

    
537
  if constants.NV_VERSION in what:
538
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
539
                                    constants.RELEASE_VERSION)
540

    
541
  if constants.NV_HVINFO in what:
542
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
543
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
544

    
545
  if constants.NV_DRBDLIST in what:
546
    try:
547
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
548
    except errors.BlockDeviceError, err:
549
      logging.warning("Can't get used minors list", exc_info=True)
550
      used_minors = str(err)
551
    result[constants.NV_DRBDLIST] = used_minors
552

    
553
  if constants.NV_NODESETUP in what:
554
    result[constants.NV_NODESETUP] = tmpr = []
555
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
556
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
557
                  " under /sys, missing required directories /sys/block"
558
                  " and /sys/class/net")
559
    if (not os.path.isdir("/proc/sys") or
560
        not os.path.isfile("/proc/sysrq-trigger")):
561
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
562
                  " under /proc, missing required directory /proc/sys and"
563
                  " the file /proc/sysrq-trigger")
564

    
565
  if constants.NV_TIME in what:
566
    result[constants.NV_TIME] = utils.SplitTime(time.time())
567

    
568
  return result
569

    
570

    
571
def GetVolumeList(vg_name):
572
  """Compute list of logical volumes and their size.
573

574
  @type vg_name: str
575
  @param vg_name: the volume group whose LVs we should list
576
  @rtype: dict
577
  @return:
578
      dictionary of all partions (key) with value being a tuple of
579
      their size (in MiB), inactive and online status::
580

581
        {'test1': ('20.06', True, True)}
582

583
      in case of errors, a string is returned with the error
584
      details.
585

586
  """
587
  lvs = {}
588
  sep = '|'
589
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
590
                         "--separator=%s" % sep,
591
                         "-olv_name,lv_size,lv_attr", vg_name])
592
  if result.failed:
593
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
594

    
595
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
596
  for line in result.stdout.splitlines():
597
    line = line.strip()
598
    match = valid_line_re.match(line)
599
    if not match:
600
      logging.error("Invalid line returned from lvs output: '%s'", line)
601
      continue
602
    name, size, attr = match.groups()
603
    inactive = attr[4] == '-'
604
    online = attr[5] == 'o'
605
    virtual = attr[0] == 'v'
606
    if virtual:
607
      # we don't want to report such volumes as existing, since they
608
      # don't really hold data
609
      continue
610
    lvs[name] = (size, inactive, online)
611

    
612
  return lvs
613

    
614

    
615
def ListVolumeGroups():
616
  """List the volume groups and their size.
617

618
  @rtype: dict
619
  @return: dictionary with keys volume name and values the
620
      size of the volume
621

622
  """
623
  return utils.ListVolumeGroups()
624

    
625

    
626
def NodeVolumes():
627
  """List all volumes on this node.
628

629
  @rtype: list
630
  @return:
631
    A list of dictionaries, each having four keys:
632
      - name: the logical volume name,
633
      - size: the size of the logical volume
634
      - dev: the physical device on which the LV lives
635
      - vg: the volume group to which it belongs
636

637
    In case of errors, we return an empty list and log the
638
    error.
639

640
    Note that since a logical volume can live on multiple physical
641
    volumes, the resulting list might include a logical volume
642
    multiple times.
643

644
  """
645
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
646
                         "--separator=|",
647
                         "--options=lv_name,lv_size,devices,vg_name"])
648
  if result.failed:
649
    _Fail("Failed to list logical volumes, lvs output: %s",
650
          result.output)
651

    
652
  def parse_dev(dev):
653
    if '(' in dev:
654
      return dev.split('(')[0]
655
    else:
656
      return dev
657

    
658
  def map_line(line):
659
    return {
660
      'name': line[0].strip(),
661
      'size': line[1].strip(),
662
      'dev': parse_dev(line[2].strip()),
663
      'vg': line[3].strip(),
664
    }
665

    
666
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
667
          if line.count('|') >= 3]
668

    
669

    
670
def BridgesExist(bridges_list):
671
  """Check if a list of bridges exist on the current node.
672

673
  @rtype: boolean
674
  @return: C{True} if all of them exist, C{False} otherwise
675

676
  """
677
  missing = []
678
  for bridge in bridges_list:
679
    if not utils.BridgeExists(bridge):
680
      missing.append(bridge)
681

    
682
  if missing:
683
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
684

    
685

    
686
def GetInstanceList(hypervisor_list):
687
  """Provides a list of instances.
688

689
  @type hypervisor_list: list
690
  @param hypervisor_list: the list of hypervisors to query information
691

692
  @rtype: list
693
  @return: a list of all running instances on the current node
694
    - instance1.example.com
695
    - instance2.example.com
696

697
  """
698
  results = []
699
  for hname in hypervisor_list:
700
    try:
701
      names = hypervisor.GetHypervisor(hname).ListInstances()
702
      results.extend(names)
703
    except errors.HypervisorError, err:
704
      _Fail("Error enumerating instances (hypervisor %s): %s",
705
            hname, err, exc=True)
706

    
707
  return results
708

    
709

    
710
def GetInstanceInfo(instance, hname):
711
  """Gives back the information about an instance as a dictionary.
712

713
  @type instance: string
714
  @param instance: the instance name
715
  @type hname: string
716
  @param hname: the hypervisor type of the instance
717

718
  @rtype: dict
719
  @return: dictionary with the following keys:
720
      - memory: memory size of instance (int)
721
      - state: xen state of instance (string)
722
      - time: cpu time of instance (float)
723

724
  """
725
  output = {}
726

    
727
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
728
  if iinfo is not None:
729
    output['memory'] = iinfo[2]
730
    output['state'] = iinfo[4]
731
    output['time'] = iinfo[5]
732

    
733
  return output
734

    
735

    
736
def GetInstanceMigratable(instance):
737
  """Gives whether an instance can be migrated.
738

739
  @type instance: L{objects.Instance}
740
  @param instance: object representing the instance to be checked.
741

742
  @rtype: tuple
743
  @return: tuple of (result, description) where:
744
      - result: whether the instance can be migrated or not
745
      - description: a description of the issue, if relevant
746

747
  """
748
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
749
  iname = instance.name
750
  if iname not in hyper.ListInstances():
751
    _Fail("Instance %s is not running", iname)
752

    
753
  for idx in range(len(instance.disks)):
754
    link_name = _GetBlockDevSymlinkPath(iname, idx)
755
    if not os.path.islink(link_name):
756
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
757

    
758

    
759
def GetAllInstancesInfo(hypervisor_list):
760
  """Gather data about all instances.
761

762
  This is the equivalent of L{GetInstanceInfo}, except that it
763
  computes data for all instances at once, thus being faster if one
764
  needs data about more than one instance.
765

766
  @type hypervisor_list: list
767
  @param hypervisor_list: list of hypervisors to query for instance data
768

769
  @rtype: dict
770
  @return: dictionary of instance: data, with data having the following keys:
771
      - memory: memory size of instance (int)
772
      - state: xen state of instance (string)
773
      - time: cpu time of instance (float)
774
      - vcpus: the number of vcpus
775

776
  """
777
  output = {}
778

    
779
  for hname in hypervisor_list:
780
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
781
    if iinfo:
782
      for name, _, memory, vcpus, state, times in iinfo:
783
        value = {
784
          'memory': memory,
785
          'vcpus': vcpus,
786
          'state': state,
787
          'time': times,
788
          }
789
        if name in output:
790
          # we only check static parameters, like memory and vcpus,
791
          # and not state and time which can change between the
792
          # invocations of the different hypervisors
793
          for key in 'memory', 'vcpus':
794
            if value[key] != output[name][key]:
795
              _Fail("Instance %s is running twice"
796
                    " with different parameters", name)
797
        output[name] = value
798

    
799
  return output
800

    
801

    
802
def _InstanceLogName(kind, os_name, instance):
803
  """Compute the OS log filename for a given instance and operation.
804

805
  The instance name and os name are passed in as strings since not all
806
  operations have these as part of an instance object.
807

808
  @type kind: string
809
  @param kind: the operation type (e.g. add, import, etc.)
810
  @type os_name: string
811
  @param os_name: the os name
812
  @type instance: string
813
  @param instance: the name of the instance being imported/added/etc.
814

815
  """
816
  base = "%s-%s-%s-%d.log" % (kind, os_name, instance, int(time.time()))
817
  return utils.PathJoin(constants.LOG_OS_DIR, base)
818

    
819

    
820
def InstanceOsAdd(instance, reinstall, debug):
821
  """Add an OS to an instance.
822

823
  @type instance: L{objects.Instance}
824
  @param instance: Instance whose OS is to be installed
825
  @type reinstall: boolean
826
  @param reinstall: whether this is an instance reinstall
827
  @type debug: integer
828
  @param debug: debug level, passed to the OS scripts
829
  @rtype: None
830

831
  """
832
  inst_os = OSFromDisk(instance.os)
833

    
834
  create_env = OSEnvironment(instance, inst_os, debug)
835
  if reinstall:
836
    create_env['INSTANCE_REINSTALL'] = "1"
837

    
838
  logfile = _InstanceLogName("add", instance.os, instance.name)
839

    
840
  result = utils.RunCmd([inst_os.create_script], env=create_env,
841
                        cwd=inst_os.path, output=logfile,)
842
  if result.failed:
843
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
844
                  " output: %s", result.cmd, result.fail_reason, logfile,
845
                  result.output)
846
    lines = [utils.SafeEncode(val)
847
             for val in utils.TailFile(logfile, lines=20)]
848
    _Fail("OS create script failed (%s), last lines in the"
849
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
850

    
851

    
852
def RunRenameInstance(instance, old_name, debug):
853
  """Run the OS rename script for an instance.
854

855
  @type instance: L{objects.Instance}
856
  @param instance: Instance whose OS is to be installed
857
  @type old_name: string
858
  @param old_name: previous instance name
859
  @type debug: integer
860
  @param debug: debug level, passed to the OS scripts
861
  @rtype: boolean
862
  @return: the success of the operation
863

864
  """
865
  inst_os = OSFromDisk(instance.os)
866

    
867
  rename_env = OSEnvironment(instance, inst_os, debug)
868
  rename_env['OLD_INSTANCE_NAME'] = old_name
869

    
870
  logfile = _InstanceLogName("rename", instance.os,
871
                             "%s-%s" % (old_name, instance.name))
872

    
873
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
874
                        cwd=inst_os.path, output=logfile)
875

    
876
  if result.failed:
877
    logging.error("os create command '%s' returned error: %s output: %s",
878
                  result.cmd, result.fail_reason, result.output)
879
    lines = [utils.SafeEncode(val)
880
             for val in utils.TailFile(logfile, lines=20)]
881
    _Fail("OS rename script failed (%s), last lines in the"
882
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
883

    
884

    
885
def _GetVGInfo(vg_name):
886
  """Get information about the volume group.
887

888
  @type vg_name: str
889
  @param vg_name: the volume group which we query
890
  @rtype: dict
891
  @return:
892
    A dictionary with the following keys:
893
      - C{vg_size} is the total size of the volume group in MiB
894
      - C{vg_free} is the free size of the volume group in MiB
895
      - C{pv_count} are the number of physical disks in that VG
896

897
    If an error occurs during gathering of data, we return the same dict
898
    with keys all set to None.
899

900
  """
901
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
902

    
903
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
904
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
905

    
906
  if retval.failed:
907
    logging.error("volume group %s not present", vg_name)
908
    return retdic
909
  valarr = retval.stdout.strip().rstrip(':').split(':')
910
  if len(valarr) == 3:
911
    try:
912
      retdic = {
913
        "vg_size": int(round(float(valarr[0]), 0)),
914
        "vg_free": int(round(float(valarr[1]), 0)),
915
        "pv_count": int(valarr[2]),
916
        }
917
    except (TypeError, ValueError), err:
918
      logging.exception("Fail to parse vgs output: %s", err)
919
  else:
920
    logging.error("vgs output has the wrong number of fields (expected"
921
                  " three): %s", str(valarr))
922
  return retdic
923

    
924

    
925
def _GetBlockDevSymlinkPath(instance_name, idx):
926
  return utils.PathJoin(constants.DISK_LINKS_DIR,
927
                        "%s:%d" % (instance_name, idx))
928

    
929

    
930
def _SymlinkBlockDev(instance_name, device_path, idx):
931
  """Set up symlinks to a instance's block device.
932

933
  This is an auxiliary function run when an instance is start (on the primary
934
  node) or when an instance is migrated (on the target node).
935

936

937
  @param instance_name: the name of the target instance
938
  @param device_path: path of the physical block device, on the node
939
  @param idx: the disk index
940
  @return: absolute path to the disk's symlink
941

942
  """
943
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
944
  try:
945
    os.symlink(device_path, link_name)
946
  except OSError, err:
947
    if err.errno == errno.EEXIST:
948
      if (not os.path.islink(link_name) or
949
          os.readlink(link_name) != device_path):
950
        os.remove(link_name)
951
        os.symlink(device_path, link_name)
952
    else:
953
      raise
954

    
955
  return link_name
956

    
957

    
958
def _RemoveBlockDevLinks(instance_name, disks):
959
  """Remove the block device symlinks belonging to the given instance.
960

961
  """
962
  for idx, _ in enumerate(disks):
963
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
964
    if os.path.islink(link_name):
965
      try:
966
        os.remove(link_name)
967
      except OSError:
968
        logging.exception("Can't remove symlink '%s'", link_name)
969

    
970

    
971
def _GatherAndLinkBlockDevs(instance):
972
  """Set up an instance's block device(s).
973

974
  This is run on the primary node at instance startup. The block
975
  devices must be already assembled.
976

977
  @type instance: L{objects.Instance}
978
  @param instance: the instance whose disks we shoul assemble
979
  @rtype: list
980
  @return: list of (disk_object, device_path)
981

982
  """
983
  block_devices = []
984
  for idx, disk in enumerate(instance.disks):
985
    device = _RecursiveFindBD(disk)
986
    if device is None:
987
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
988
                                    str(disk))
989
    device.Open()
990
    try:
991
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
992
    except OSError, e:
993
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
994
                                    e.strerror)
995

    
996
    block_devices.append((disk, link_name))
997

    
998
  return block_devices
999

    
1000

    
1001
def StartInstance(instance):
1002
  """Start an instance.
1003

1004
  @type instance: L{objects.Instance}
1005
  @param instance: the instance object
1006
  @rtype: None
1007

1008
  """
1009
  running_instances = GetInstanceList([instance.hypervisor])
1010

    
1011
  if instance.name in running_instances:
1012
    logging.info("Instance %s already running, not starting", instance.name)
1013
    return
1014

    
1015
  try:
1016
    block_devices = _GatherAndLinkBlockDevs(instance)
1017
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1018
    hyper.StartInstance(instance, block_devices)
1019
  except errors.BlockDeviceError, err:
1020
    _Fail("Block device error: %s", err, exc=True)
1021
  except errors.HypervisorError, err:
1022
    _RemoveBlockDevLinks(instance.name, instance.disks)
1023
    _Fail("Hypervisor error: %s", err, exc=True)
1024

    
1025

    
1026
def InstanceShutdown(instance, timeout):
1027
  """Shut an instance down.
1028

1029
  @note: this functions uses polling with a hardcoded timeout.
1030

1031
  @type instance: L{objects.Instance}
1032
  @param instance: the instance object
1033
  @type timeout: integer
1034
  @param timeout: maximum timeout for soft shutdown
1035
  @rtype: None
1036

1037
  """
1038
  hv_name = instance.hypervisor
1039
  hyper = hypervisor.GetHypervisor(hv_name)
1040
  iname = instance.name
1041

    
1042
  if instance.name not in hyper.ListInstances():
1043
    logging.info("Instance %s not running, doing nothing", iname)
1044
    return
1045

    
1046
  class _TryShutdown:
1047
    def __init__(self):
1048
      self.tried_once = False
1049

    
1050
    def __call__(self):
1051
      if iname not in hyper.ListInstances():
1052
        return
1053

    
1054
      try:
1055
        hyper.StopInstance(instance, retry=self.tried_once)
1056
      except errors.HypervisorError, err:
1057
        if iname not in hyper.ListInstances():
1058
          # if the instance is no longer existing, consider this a
1059
          # success and go to cleanup
1060
          return
1061

    
1062
        _Fail("Failed to stop instance %s: %s", iname, err)
1063

    
1064
      self.tried_once = True
1065

    
1066
      raise utils.RetryAgain()
1067

    
1068
  try:
1069
    utils.Retry(_TryShutdown(), 5, timeout)
1070
  except utils.RetryTimeout:
1071
    # the shutdown did not succeed
1072
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1073

    
1074
    try:
1075
      hyper.StopInstance(instance, force=True)
1076
    except errors.HypervisorError, err:
1077
      if iname in hyper.ListInstances():
1078
        # only raise an error if the instance still exists, otherwise
1079
        # the error could simply be "instance ... unknown"!
1080
        _Fail("Failed to force stop instance %s: %s", iname, err)
1081

    
1082
    time.sleep(1)
1083

    
1084
    if iname in hyper.ListInstances():
1085
      _Fail("Could not shutdown instance %s even by destroy", iname)
1086

    
1087
  _RemoveBlockDevLinks(iname, instance.disks)
1088

    
1089

    
1090
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1091
  """Reboot an instance.
1092

1093
  @type instance: L{objects.Instance}
1094
  @param instance: the instance object to reboot
1095
  @type reboot_type: str
1096
  @param reboot_type: the type of reboot, one the following
1097
    constants:
1098
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1099
        instance OS, do not recreate the VM
1100
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1101
        restart the VM (at the hypervisor level)
1102
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1103
        not accepted here, since that mode is handled differently, in
1104
        cmdlib, and translates into full stop and start of the
1105
        instance (instead of a call_instance_reboot RPC)
1106
  @type shutdown_timeout: integer
1107
  @param shutdown_timeout: maximum timeout for soft shutdown
1108
  @rtype: None
1109

1110
  """
1111
  running_instances = GetInstanceList([instance.hypervisor])
1112

    
1113
  if instance.name not in running_instances:
1114
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1115

    
1116
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1117
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1118
    try:
1119
      hyper.RebootInstance(instance)
1120
    except errors.HypervisorError, err:
1121
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1122
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1123
    try:
1124
      InstanceShutdown(instance, shutdown_timeout)
1125
      return StartInstance(instance)
1126
    except errors.HypervisorError, err:
1127
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1128
  else:
1129
    _Fail("Invalid reboot_type received: %s", reboot_type)
1130

    
1131

    
1132
def MigrationInfo(instance):
1133
  """Gather information about an instance to be migrated.
1134

1135
  @type instance: L{objects.Instance}
1136
  @param instance: the instance definition
1137

1138
  """
1139
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1140
  try:
1141
    info = hyper.MigrationInfo(instance)
1142
  except errors.HypervisorError, err:
1143
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1144
  return info
1145

    
1146

    
1147
def AcceptInstance(instance, info, target):
1148
  """Prepare the node to accept an instance.
1149

1150
  @type instance: L{objects.Instance}
1151
  @param instance: the instance definition
1152
  @type info: string/data (opaque)
1153
  @param info: migration information, from the source node
1154
  @type target: string
1155
  @param target: target host (usually ip), on this node
1156

1157
  """
1158
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159
  try:
1160
    hyper.AcceptInstance(instance, info, target)
1161
  except errors.HypervisorError, err:
1162
    _Fail("Failed to accept instance: %s", err, exc=True)
1163

    
1164

    
1165
def FinalizeMigration(instance, info, success):
1166
  """Finalize any preparation to accept an instance.
1167

1168
  @type instance: L{objects.Instance}
1169
  @param instance: the instance definition
1170
  @type info: string/data (opaque)
1171
  @param info: migration information, from the source node
1172
  @type success: boolean
1173
  @param success: whether the migration was a success or a failure
1174

1175
  """
1176
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1177
  try:
1178
    hyper.FinalizeMigration(instance, info, success)
1179
  except errors.HypervisorError, err:
1180
    _Fail("Failed to finalize migration: %s", err, exc=True)
1181

    
1182

    
1183
def MigrateInstance(instance, target, live):
1184
  """Migrates an instance to another node.
1185

1186
  @type instance: L{objects.Instance}
1187
  @param instance: the instance definition
1188
  @type target: string
1189
  @param target: the target node name
1190
  @type live: boolean
1191
  @param live: whether the migration should be done live or not (the
1192
      interpretation of this parameter is left to the hypervisor)
1193
  @rtype: tuple
1194
  @return: a tuple of (success, msg) where:
1195
      - succes is a boolean denoting the success/failure of the operation
1196
      - msg is a string with details in case of failure
1197

1198
  """
1199
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1200

    
1201
  try:
1202
    hyper.MigrateInstance(instance, target, live)
1203
  except errors.HypervisorError, err:
1204
    _Fail("Failed to migrate instance: %s", err, exc=True)
1205

    
1206

    
1207
def BlockdevCreate(disk, size, owner, on_primary, info):
1208
  """Creates a block device for an instance.
1209

1210
  @type disk: L{objects.Disk}
1211
  @param disk: the object describing the disk we should create
1212
  @type size: int
1213
  @param size: the size of the physical underlying device, in MiB
1214
  @type owner: str
1215
  @param owner: the name of the instance for which disk is created,
1216
      used for device cache data
1217
  @type on_primary: boolean
1218
  @param on_primary:  indicates if it is the primary node or not
1219
  @type info: string
1220
  @param info: string that will be sent to the physical device
1221
      creation, used for example to set (LVM) tags on LVs
1222

1223
  @return: the new unique_id of the device (this can sometime be
1224
      computed only after creation), or None. On secondary nodes,
1225
      it's not required to return anything.
1226

1227
  """
1228
  # TODO: remove the obsolete 'size' argument
1229
  # pylint: disable-msg=W0613
1230
  clist = []
1231
  if disk.children:
1232
    for child in disk.children:
1233
      try:
1234
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1235
      except errors.BlockDeviceError, err:
1236
        _Fail("Can't assemble device %s: %s", child, err)
1237
      if on_primary or disk.AssembleOnSecondary():
1238
        # we need the children open in case the device itself has to
1239
        # be assembled
1240
        try:
1241
          # pylint: disable-msg=E1103
1242
          crdev.Open()
1243
        except errors.BlockDeviceError, err:
1244
          _Fail("Can't make child '%s' read-write: %s", child, err)
1245
      clist.append(crdev)
1246

    
1247
  try:
1248
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1249
  except errors.BlockDeviceError, err:
1250
    _Fail("Can't create block device: %s", err)
1251

    
1252
  if on_primary or disk.AssembleOnSecondary():
1253
    try:
1254
      device.Assemble()
1255
    except errors.BlockDeviceError, err:
1256
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1257
    device.SetSyncSpeed(constants.SYNC_SPEED)
1258
    if on_primary or disk.OpenOnSecondary():
1259
      try:
1260
        device.Open(force=True)
1261
      except errors.BlockDeviceError, err:
1262
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1263
    DevCacheManager.UpdateCache(device.dev_path, owner,
1264
                                on_primary, disk.iv_name)
1265

    
1266
  device.SetInfo(info)
1267

    
1268
  return device.unique_id
1269

    
1270

    
1271
def BlockdevRemove(disk):
1272
  """Remove a block device.
1273

1274
  @note: This is intended to be called recursively.
1275

1276
  @type disk: L{objects.Disk}
1277
  @param disk: the disk object we should remove
1278
  @rtype: boolean
1279
  @return: the success of the operation
1280

1281
  """
1282
  msgs = []
1283
  try:
1284
    rdev = _RecursiveFindBD(disk)
1285
  except errors.BlockDeviceError, err:
1286
    # probably can't attach
1287
    logging.info("Can't attach to device %s in remove", disk)
1288
    rdev = None
1289
  if rdev is not None:
1290
    r_path = rdev.dev_path
1291
    try:
1292
      rdev.Remove()
1293
    except errors.BlockDeviceError, err:
1294
      msgs.append(str(err))
1295
    if not msgs:
1296
      DevCacheManager.RemoveCache(r_path)
1297

    
1298
  if disk.children:
1299
    for child in disk.children:
1300
      try:
1301
        BlockdevRemove(child)
1302
      except RPCFail, err:
1303
        msgs.append(str(err))
1304

    
1305
  if msgs:
1306
    _Fail("; ".join(msgs))
1307

    
1308

    
1309
def _RecursiveAssembleBD(disk, owner, as_primary):
1310
  """Activate a block device for an instance.
1311

1312
  This is run on the primary and secondary nodes for an instance.
1313

1314
  @note: this function is called recursively.
1315

1316
  @type disk: L{objects.Disk}
1317
  @param disk: the disk we try to assemble
1318
  @type owner: str
1319
  @param owner: the name of the instance which owns the disk
1320
  @type as_primary: boolean
1321
  @param as_primary: if we should make the block device
1322
      read/write
1323

1324
  @return: the assembled device or None (in case no device
1325
      was assembled)
1326
  @raise errors.BlockDeviceError: in case there is an error
1327
      during the activation of the children or the device
1328
      itself
1329

1330
  """
1331
  children = []
1332
  if disk.children:
1333
    mcn = disk.ChildrenNeeded()
1334
    if mcn == -1:
1335
      mcn = 0 # max number of Nones allowed
1336
    else:
1337
      mcn = len(disk.children) - mcn # max number of Nones
1338
    for chld_disk in disk.children:
1339
      try:
1340
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1341
      except errors.BlockDeviceError, err:
1342
        if children.count(None) >= mcn:
1343
          raise
1344
        cdev = None
1345
        logging.error("Error in child activation (but continuing): %s",
1346
                      str(err))
1347
      children.append(cdev)
1348

    
1349
  if as_primary or disk.AssembleOnSecondary():
1350
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1351
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1352
    result = r_dev
1353
    if as_primary or disk.OpenOnSecondary():
1354
      r_dev.Open()
1355
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1356
                                as_primary, disk.iv_name)
1357

    
1358
  else:
1359
    result = True
1360
  return result
1361

    
1362

    
1363
def BlockdevAssemble(disk, owner, as_primary):
1364
  """Activate a block device for an instance.
1365

1366
  This is a wrapper over _RecursiveAssembleBD.
1367

1368
  @rtype: str or boolean
1369
  @return: a C{/dev/...} path for primary nodes, and
1370
      C{True} for secondary nodes
1371

1372
  """
1373
  try:
1374
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1375
    if isinstance(result, bdev.BlockDev):
1376
      # pylint: disable-msg=E1103
1377
      result = result.dev_path
1378
  except errors.BlockDeviceError, err:
1379
    _Fail("Error while assembling disk: %s", err, exc=True)
1380

    
1381
  return result
1382

    
1383

    
1384
def BlockdevShutdown(disk):
1385
  """Shut down a block device.
1386

1387
  First, if the device is assembled (Attach() is successful), then
1388
  the device is shutdown. Then the children of the device are
1389
  shutdown.
1390

1391
  This function is called recursively. Note that we don't cache the
1392
  children or such, as oppossed to assemble, shutdown of different
1393
  devices doesn't require that the upper device was active.
1394

1395
  @type disk: L{objects.Disk}
1396
  @param disk: the description of the disk we should
1397
      shutdown
1398
  @rtype: None
1399

1400
  """
1401
  msgs = []
1402
  r_dev = _RecursiveFindBD(disk)
1403
  if r_dev is not None:
1404
    r_path = r_dev.dev_path
1405
    try:
1406
      r_dev.Shutdown()
1407
      DevCacheManager.RemoveCache(r_path)
1408
    except errors.BlockDeviceError, err:
1409
      msgs.append(str(err))
1410

    
1411
  if disk.children:
1412
    for child in disk.children:
1413
      try:
1414
        BlockdevShutdown(child)
1415
      except RPCFail, err:
1416
        msgs.append(str(err))
1417

    
1418
  if msgs:
1419
    _Fail("; ".join(msgs))
1420

    
1421

    
1422
def BlockdevAddchildren(parent_cdev, new_cdevs):
1423
  """Extend a mirrored block device.
1424

1425
  @type parent_cdev: L{objects.Disk}
1426
  @param parent_cdev: the disk to which we should add children
1427
  @type new_cdevs: list of L{objects.Disk}
1428
  @param new_cdevs: the list of children which we should add
1429
  @rtype: None
1430

1431
  """
1432
  parent_bdev = _RecursiveFindBD(parent_cdev)
1433
  if parent_bdev is None:
1434
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1435
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1436
  if new_bdevs.count(None) > 0:
1437
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1438
  parent_bdev.AddChildren(new_bdevs)
1439

    
1440

    
1441
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1442
  """Shrink a mirrored block device.
1443

1444
  @type parent_cdev: L{objects.Disk}
1445
  @param parent_cdev: the disk from which we should remove children
1446
  @type new_cdevs: list of L{objects.Disk}
1447
  @param new_cdevs: the list of children which we should remove
1448
  @rtype: None
1449

1450
  """
1451
  parent_bdev = _RecursiveFindBD(parent_cdev)
1452
  if parent_bdev is None:
1453
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1454
  devs = []
1455
  for disk in new_cdevs:
1456
    rpath = disk.StaticDevPath()
1457
    if rpath is None:
1458
      bd = _RecursiveFindBD(disk)
1459
      if bd is None:
1460
        _Fail("Can't find device %s while removing children", disk)
1461
      else:
1462
        devs.append(bd.dev_path)
1463
    else:
1464
      if not utils.IsNormAbsPath(rpath):
1465
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1466
      devs.append(rpath)
1467
  parent_bdev.RemoveChildren(devs)
1468

    
1469

    
1470
def BlockdevGetmirrorstatus(disks):
1471
  """Get the mirroring status of a list of devices.
1472

1473
  @type disks: list of L{objects.Disk}
1474
  @param disks: the list of disks which we should query
1475
  @rtype: disk
1476
  @return:
1477
      a list of (mirror_done, estimated_time) tuples, which
1478
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1479
  @raise errors.BlockDeviceError: if any of the disks cannot be
1480
      found
1481

1482
  """
1483
  stats = []
1484
  for dsk in disks:
1485
    rbd = _RecursiveFindBD(dsk)
1486
    if rbd is None:
1487
      _Fail("Can't find device %s", dsk)
1488

    
1489
    stats.append(rbd.CombinedSyncStatus())
1490

    
1491
  return stats
1492

    
1493

    
1494
def _RecursiveFindBD(disk):
1495
  """Check if a device is activated.
1496

1497
  If so, return information about the real device.
1498

1499
  @type disk: L{objects.Disk}
1500
  @param disk: the disk object we need to find
1501

1502
  @return: None if the device can't be found,
1503
      otherwise the device instance
1504

1505
  """
1506
  children = []
1507
  if disk.children:
1508
    for chdisk in disk.children:
1509
      children.append(_RecursiveFindBD(chdisk))
1510

    
1511
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1512

    
1513

    
1514
def BlockdevFind(disk):
1515
  """Check if a device is activated.
1516

1517
  If it is, return information about the real device.
1518

1519
  @type disk: L{objects.Disk}
1520
  @param disk: the disk to find
1521
  @rtype: None or objects.BlockDevStatus
1522
  @return: None if the disk cannot be found, otherwise a the current
1523
           information
1524

1525
  """
1526
  try:
1527
    rbd = _RecursiveFindBD(disk)
1528
  except errors.BlockDeviceError, err:
1529
    _Fail("Failed to find device: %s", err, exc=True)
1530

    
1531
  if rbd is None:
1532
    return None
1533

    
1534
  return rbd.GetSyncStatus()
1535

    
1536

    
1537
def BlockdevGetsize(disks):
1538
  """Computes the size of the given disks.
1539

1540
  If a disk is not found, returns None instead.
1541

1542
  @type disks: list of L{objects.Disk}
1543
  @param disks: the list of disk to compute the size for
1544
  @rtype: list
1545
  @return: list with elements None if the disk cannot be found,
1546
      otherwise the size
1547

1548
  """
1549
  result = []
1550
  for cf in disks:
1551
    try:
1552
      rbd = _RecursiveFindBD(cf)
1553
    except errors.BlockDeviceError:
1554
      result.append(None)
1555
      continue
1556
    if rbd is None:
1557
      result.append(None)
1558
    else:
1559
      result.append(rbd.GetActualSize())
1560
  return result
1561

    
1562

    
1563
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1564
  """Export a block device to a remote node.
1565

1566
  @type disk: L{objects.Disk}
1567
  @param disk: the description of the disk to export
1568
  @type dest_node: str
1569
  @param dest_node: the destination node to export to
1570
  @type dest_path: str
1571
  @param dest_path: the destination path on the target node
1572
  @type cluster_name: str
1573
  @param cluster_name: the cluster name, needed for SSH hostalias
1574
  @rtype: None
1575

1576
  """
1577
  real_disk = _RecursiveFindBD(disk)
1578
  if real_disk is None:
1579
    _Fail("Block device '%s' is not set up", disk)
1580

    
1581
  real_disk.Open()
1582

    
1583
  # the block size on the read dd is 1MiB to match our units
1584
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1585
                               "dd if=%s bs=1048576 count=%s",
1586
                               real_disk.dev_path, str(disk.size))
1587

    
1588
  # we set here a smaller block size as, due to ssh buffering, more
1589
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1590
  # device is not already there or we pass a wrong path; we use
1591
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1592
  # to not buffer too much memory; this means that at best, we flush
1593
  # every 64k, which will not be very fast
1594
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1595
                                " oflag=dsync", dest_path)
1596

    
1597
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1598
                                                   constants.GANETI_RUNAS,
1599
                                                   destcmd)
1600

    
1601
  # all commands have been checked, so we're safe to combine them
1602
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1603

    
1604
  result = utils.RunCmd(["bash", "-c", command])
1605

    
1606
  if result.failed:
1607
    _Fail("Disk copy command '%s' returned error: %s"
1608
          " output: %s", command, result.fail_reason, result.output)
1609

    
1610

    
1611
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1612
  """Write a file to the filesystem.
1613

1614
  This allows the master to overwrite(!) a file. It will only perform
1615
  the operation if the file belongs to a list of configuration files.
1616

1617
  @type file_name: str
1618
  @param file_name: the target file name
1619
  @type data: str
1620
  @param data: the new contents of the file
1621
  @type mode: int
1622
  @param mode: the mode to give the file (can be None)
1623
  @type uid: int
1624
  @param uid: the owner of the file (can be -1 for default)
1625
  @type gid: int
1626
  @param gid: the group of the file (can be -1 for default)
1627
  @type atime: float
1628
  @param atime: the atime to set on the file (can be None)
1629
  @type mtime: float
1630
  @param mtime: the mtime to set on the file (can be None)
1631
  @rtype: None
1632

1633
  """
1634
  if not os.path.isabs(file_name):
1635
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1636

    
1637
  if file_name not in _ALLOWED_UPLOAD_FILES:
1638
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1639
          file_name)
1640

    
1641
  raw_data = _Decompress(data)
1642

    
1643
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1644
                  atime=atime, mtime=mtime)
1645

    
1646

    
1647
def WriteSsconfFiles(values):
1648
  """Update all ssconf files.
1649

1650
  Wrapper around the SimpleStore.WriteFiles.
1651

1652
  """
1653
  ssconf.SimpleStore().WriteFiles(values)
1654

    
1655

    
1656
def _ErrnoOrStr(err):
1657
  """Format an EnvironmentError exception.
1658

1659
  If the L{err} argument has an errno attribute, it will be looked up
1660
  and converted into a textual C{E...} description. Otherwise the
1661
  string representation of the error will be returned.
1662

1663
  @type err: L{EnvironmentError}
1664
  @param err: the exception to format
1665

1666
  """
1667
  if hasattr(err, 'errno'):
1668
    detail = errno.errorcode[err.errno]
1669
  else:
1670
    detail = str(err)
1671
  return detail
1672

    
1673

    
1674
def _OSOndiskAPIVersion(os_dir):
1675
  """Compute and return the API version of a given OS.
1676

1677
  This function will try to read the API version of the OS residing in
1678
  the 'os_dir' directory.
1679

1680
  @type os_dir: str
1681
  @param os_dir: the directory in which we should look for the OS
1682
  @rtype: tuple
1683
  @return: tuple (status, data) with status denoting the validity and
1684
      data holding either the vaid versions or an error message
1685

1686
  """
1687
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1688

    
1689
  try:
1690
    st = os.stat(api_file)
1691
  except EnvironmentError, err:
1692
    return False, ("Required file '%s' not found under path %s: %s" %
1693
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1694

    
1695
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1696
    return False, ("File '%s' in %s is not a regular file" %
1697
                   (constants.OS_API_FILE, os_dir))
1698

    
1699
  try:
1700
    api_versions = utils.ReadFile(api_file).splitlines()
1701
  except EnvironmentError, err:
1702
    return False, ("Error while reading the API version file at %s: %s" %
1703
                   (api_file, _ErrnoOrStr(err)))
1704

    
1705
  try:
1706
    api_versions = [int(version.strip()) for version in api_versions]
1707
  except (TypeError, ValueError), err:
1708
    return False, ("API version(s) can't be converted to integer: %s" %
1709
                   str(err))
1710

    
1711
  return True, api_versions
1712

    
1713

    
1714
def DiagnoseOS(top_dirs=None):
1715
  """Compute the validity for all OSes.
1716

1717
  @type top_dirs: list
1718
  @param top_dirs: the list of directories in which to
1719
      search (if not given defaults to
1720
      L{constants.OS_SEARCH_PATH})
1721
  @rtype: list of L{objects.OS}
1722
  @return: a list of tuples (name, path, status, diagnose, variants)
1723
      for all (potential) OSes under all search paths, where:
1724
          - name is the (potential) OS name
1725
          - path is the full path to the OS
1726
          - status True/False is the validity of the OS
1727
          - diagnose is the error message for an invalid OS, otherwise empty
1728
          - variants is a list of supported OS variants, if any
1729

1730
  """
1731
  if top_dirs is None:
1732
    top_dirs = constants.OS_SEARCH_PATH
1733

    
1734
  result = []
1735
  for dir_name in top_dirs:
1736
    if os.path.isdir(dir_name):
1737
      try:
1738
        f_names = utils.ListVisibleFiles(dir_name)
1739
      except EnvironmentError, err:
1740
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1741
        break
1742
      for name in f_names:
1743
        os_path = utils.PathJoin(dir_name, name)
1744
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1745
        if status:
1746
          diagnose = ""
1747
          variants = os_inst.supported_variants
1748
        else:
1749
          diagnose = os_inst
1750
          variants = []
1751
        result.append((name, os_path, status, diagnose, variants))
1752

    
1753
  return result
1754

    
1755

    
1756
def _TryOSFromDisk(name, base_dir=None):
1757
  """Create an OS instance from disk.
1758

1759
  This function will return an OS instance if the given name is a
1760
  valid OS name.
1761

1762
  @type base_dir: string
1763
  @keyword base_dir: Base directory containing OS installations.
1764
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1765
  @rtype: tuple
1766
  @return: success and either the OS instance if we find a valid one,
1767
      or error message
1768

1769
  """
1770
  if base_dir is None:
1771
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1772
  else:
1773
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1774

    
1775
  if os_dir is None:
1776
    return False, "Directory for OS %s not found in search path" % name
1777

    
1778
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1779
  if not status:
1780
    # push the error up
1781
    return status, api_versions
1782

    
1783
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1784
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1785
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1786

    
1787
  # OS Files dictionary, we will populate it with the absolute path names
1788
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1789

    
1790
  if max(api_versions) >= constants.OS_API_V15:
1791
    os_files[constants.OS_VARIANTS_FILE] = ''
1792

    
1793
  for filename in os_files:
1794
    os_files[filename] = utils.PathJoin(os_dir, filename)
1795

    
1796
    try:
1797
      st = os.stat(os_files[filename])
1798
    except EnvironmentError, err:
1799
      return False, ("File '%s' under path '%s' is missing (%s)" %
1800
                     (filename, os_dir, _ErrnoOrStr(err)))
1801

    
1802
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1803
      return False, ("File '%s' under path '%s' is not a regular file" %
1804
                     (filename, os_dir))
1805

    
1806
    if filename in constants.OS_SCRIPTS:
1807
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1808
        return False, ("File '%s' under path '%s' is not executable" %
1809
                       (filename, os_dir))
1810

    
1811
  variants = None
1812
  if constants.OS_VARIANTS_FILE in os_files:
1813
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1814
    try:
1815
      variants = utils.ReadFile(variants_file).splitlines()
1816
    except EnvironmentError, err:
1817
      return False, ("Error while reading the OS variants file at %s: %s" %
1818
                     (variants_file, _ErrnoOrStr(err)))
1819
    if not variants:
1820
      return False, ("No supported os variant found")
1821

    
1822
  os_obj = objects.OS(name=name, path=os_dir,
1823
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1824
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1825
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1826
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1827
                      supported_variants=variants,
1828
                      api_versions=api_versions)
1829
  return True, os_obj
1830

    
1831

    
1832
def OSFromDisk(name, base_dir=None):
1833
  """Create an OS instance from disk.
1834

1835
  This function will return an OS instance if the given name is a
1836
  valid OS name. Otherwise, it will raise an appropriate
1837
  L{RPCFail} exception, detailing why this is not a valid OS.
1838

1839
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1840
  an exception but returns true/false status data.
1841

1842
  @type base_dir: string
1843
  @keyword base_dir: Base directory containing OS installations.
1844
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1845
  @rtype: L{objects.OS}
1846
  @return: the OS instance if we find a valid one
1847
  @raise RPCFail: if we don't find a valid OS
1848

1849
  """
1850
  name_only = name.split("+", 1)[0]
1851
  status, payload = _TryOSFromDisk(name_only, base_dir)
1852

    
1853
  if not status:
1854
    _Fail(payload)
1855

    
1856
  return payload
1857

    
1858

    
1859
def OSEnvironment(instance, inst_os, debug=0):
1860
  """Calculate the environment for an os script.
1861

1862
  @type instance: L{objects.Instance}
1863
  @param instance: target instance for the os script run
1864
  @type inst_os: L{objects.OS}
1865
  @param inst_os: operating system for which the environment is being built
1866
  @type debug: integer
1867
  @param debug: debug level (0 or 1, for OS Api 10)
1868
  @rtype: dict
1869
  @return: dict of environment variables
1870
  @raise errors.BlockDeviceError: if the block device
1871
      cannot be found
1872

1873
  """
1874
  result = {}
1875
  api_version = \
1876
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1877
  result['OS_API_VERSION'] = '%d' % api_version
1878
  result['INSTANCE_NAME'] = instance.name
1879
  result['INSTANCE_OS'] = instance.os
1880
  result['HYPERVISOR'] = instance.hypervisor
1881
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1882
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1883
  result['DEBUG_LEVEL'] = '%d' % debug
1884
  if api_version >= constants.OS_API_V15:
1885
    try:
1886
      variant = instance.os.split('+', 1)[1]
1887
    except IndexError:
1888
      variant = inst_os.supported_variants[0]
1889
    result['OS_VARIANT'] = variant
1890
  for idx, disk in enumerate(instance.disks):
1891
    real_disk = _RecursiveFindBD(disk)
1892
    if real_disk is None:
1893
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1894
                                    str(disk))
1895
    real_disk.Open()
1896
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1897
    result['DISK_%d_ACCESS' % idx] = disk.mode
1898
    if constants.HV_DISK_TYPE in instance.hvparams:
1899
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1900
        instance.hvparams[constants.HV_DISK_TYPE]
1901
    if disk.dev_type in constants.LDS_BLOCK:
1902
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1903
    elif disk.dev_type == constants.LD_FILE:
1904
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1905
        'file:%s' % disk.physical_id[0]
1906
  for idx, nic in enumerate(instance.nics):
1907
    result['NIC_%d_MAC' % idx] = nic.mac
1908
    if nic.ip:
1909
      result['NIC_%d_IP' % idx] = nic.ip
1910
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1911
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1912
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1913
    if nic.nicparams[constants.NIC_LINK]:
1914
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1915
    if constants.HV_NIC_TYPE in instance.hvparams:
1916
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1917
        instance.hvparams[constants.HV_NIC_TYPE]
1918

    
1919
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1920
    for key, value in source.items():
1921
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1922

    
1923
  return result
1924

    
1925
def BlockdevGrow(disk, amount):
1926
  """Grow a stack of block devices.
1927

1928
  This function is called recursively, with the childrens being the
1929
  first ones to resize.
1930

1931
  @type disk: L{objects.Disk}
1932
  @param disk: the disk to be grown
1933
  @rtype: (status, result)
1934
  @return: a tuple with the status of the operation
1935
      (True/False), and the errors message if status
1936
      is False
1937

1938
  """
1939
  r_dev = _RecursiveFindBD(disk)
1940
  if r_dev is None:
1941
    _Fail("Cannot find block device %s", disk)
1942

    
1943
  try:
1944
    r_dev.Grow(amount)
1945
  except errors.BlockDeviceError, err:
1946
    _Fail("Failed to grow block device: %s", err, exc=True)
1947

    
1948

    
1949
def BlockdevSnapshot(disk):
1950
  """Create a snapshot copy of a block device.
1951

1952
  This function is called recursively, and the snapshot is actually created
1953
  just for the leaf lvm backend device.
1954

1955
  @type disk: L{objects.Disk}
1956
  @param disk: the disk to be snapshotted
1957
  @rtype: string
1958
  @return: snapshot disk path
1959

1960
  """
1961
  if disk.dev_type == constants.LD_DRBD8:
1962
    if not disk.children:
1963
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
1964
            disk.unique_id)
1965
    return BlockdevSnapshot(disk.children[0])
1966
  elif disk.dev_type == constants.LD_LV:
1967
    r_dev = _RecursiveFindBD(disk)
1968
    if r_dev is not None:
1969
      # FIXME: choose a saner value for the snapshot size
1970
      # let's stay on the safe side and ask for the full size, for now
1971
      return r_dev.Snapshot(disk.size)
1972
    else:
1973
      _Fail("Cannot find block device %s", disk)
1974
  else:
1975
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1976
          disk.unique_id, disk.dev_type)
1977

    
1978

    
1979
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
1980
  """Export a block device snapshot to a remote node.
1981

1982
  @type disk: L{objects.Disk}
1983
  @param disk: the description of the disk to export
1984
  @type dest_node: str
1985
  @param dest_node: the destination node to export to
1986
  @type instance: L{objects.Instance}
1987
  @param instance: the instance object to whom the disk belongs
1988
  @type cluster_name: str
1989
  @param cluster_name: the cluster name, needed for SSH hostalias
1990
  @type idx: int
1991
  @param idx: the index of the disk in the instance's disk list,
1992
      used to export to the OS scripts environment
1993
  @type debug: integer
1994
  @param debug: debug level, passed to the OS scripts
1995
  @rtype: None
1996

1997
  """
1998
  inst_os = OSFromDisk(instance.os)
1999
  export_env = OSEnvironment(instance, inst_os, debug)
2000

    
2001
  export_script = inst_os.export_script
2002

    
2003
  logfile = _InstanceLogName("export", inst_os.name, instance.name)
2004
  if not os.path.exists(constants.LOG_OS_DIR):
2005
    os.mkdir(constants.LOG_OS_DIR, 0750)
2006
  real_disk = _RecursiveFindBD(disk)
2007
  if real_disk is None:
2008
    _Fail("Block device '%s' is not set up", disk)
2009

    
2010
  real_disk.Open()
2011

    
2012
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
2013
  export_env['EXPORT_INDEX'] = str(idx)
2014

    
2015
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2016
  destfile = disk.physical_id[1]
2017

    
2018
  # the target command is built out of three individual commands,
2019
  # which are joined by pipes; we check each individual command for
2020
  # valid parameters
2021
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2022
                               inst_os.path, export_script, logfile)
2023

    
2024
  comprcmd = "gzip"
2025

    
2026
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2027
                                destdir, utils.PathJoin(destdir, destfile))
2028
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2029
                                                   constants.GANETI_RUNAS,
2030
                                                   destcmd)
2031

    
2032
  # all commands have been checked, so we're safe to combine them
2033
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2034

    
2035
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
2036

    
2037
  if result.failed:
2038
    _Fail("OS snapshot export command '%s' returned error: %s"
2039
          " output: %s", command, result.fail_reason, result.output)
2040

    
2041

    
2042
def FinalizeExport(instance, snap_disks):
2043
  """Write out the export configuration information.
2044

2045
  @type instance: L{objects.Instance}
2046
  @param instance: the instance which we export, used for
2047
      saving configuration
2048
  @type snap_disks: list of L{objects.Disk}
2049
  @param snap_disks: list of snapshot block devices, which
2050
      will be used to get the actual name of the dump file
2051

2052
  @rtype: None
2053

2054
  """
2055
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2056
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2057

    
2058
  config = objects.SerializableConfigParser()
2059

    
2060
  config.add_section(constants.INISECT_EXP)
2061
  config.set(constants.INISECT_EXP, 'version', '0')
2062
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2063
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2064
  config.set(constants.INISECT_EXP, 'os', instance.os)
2065
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2066

    
2067
  config.add_section(constants.INISECT_INS)
2068
  config.set(constants.INISECT_INS, 'name', instance.name)
2069
  config.set(constants.INISECT_INS, 'memory', '%d' %
2070
             instance.beparams[constants.BE_MEMORY])
2071
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2072
             instance.beparams[constants.BE_VCPUS])
2073
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2074

    
2075
  nic_total = 0
2076
  for nic_count, nic in enumerate(instance.nics):
2077
    nic_total += 1
2078
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2079
               nic_count, '%s' % nic.mac)
2080
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2081
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2082
               '%s' % nic.bridge)
2083
  # TODO: redundant: on load can read nics until it doesn't exist
2084
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2085

    
2086
  disk_total = 0
2087
  for disk_count, disk in enumerate(snap_disks):
2088
    if disk:
2089
      disk_total += 1
2090
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2091
                 ('%s' % disk.iv_name))
2092
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2093
                 ('%s' % disk.physical_id[1]))
2094
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2095
                 ('%d' % disk.size))
2096

    
2097
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2098

    
2099
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2100
                  data=config.Dumps())
2101
  shutil.rmtree(finaldestdir, True)
2102
  shutil.move(destdir, finaldestdir)
2103

    
2104

    
2105
def ExportInfo(dest):
2106
  """Get export configuration information.
2107

2108
  @type dest: str
2109
  @param dest: directory containing the export
2110

2111
  @rtype: L{objects.SerializableConfigParser}
2112
  @return: a serializable config file containing the
2113
      export info
2114

2115
  """
2116
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2117

    
2118
  config = objects.SerializableConfigParser()
2119
  config.read(cff)
2120

    
2121
  if (not config.has_section(constants.INISECT_EXP) or
2122
      not config.has_section(constants.INISECT_INS)):
2123
    _Fail("Export info file doesn't have the required fields")
2124

    
2125
  return config.Dumps()
2126

    
2127

    
2128
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2129
  """Import an os image into an instance.
2130

2131
  @type instance: L{objects.Instance}
2132
  @param instance: instance to import the disks into
2133
  @type src_node: string
2134
  @param src_node: source node for the disk images
2135
  @type src_images: list of string
2136
  @param src_images: absolute paths of the disk images
2137
  @type debug: integer
2138
  @param debug: debug level, passed to the OS scripts
2139
  @rtype: list of boolean
2140
  @return: each boolean represent the success of importing the n-th disk
2141

2142
  """
2143
  inst_os = OSFromDisk(instance.os)
2144
  import_env = OSEnvironment(instance, inst_os, debug)
2145
  import_script = inst_os.import_script
2146

    
2147
  logfile = _InstanceLogName("import", instance.os, instance.name)
2148
  if not os.path.exists(constants.LOG_OS_DIR):
2149
    os.mkdir(constants.LOG_OS_DIR, 0750)
2150

    
2151
  comprcmd = "gunzip"
2152
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2153
                               import_script, logfile)
2154

    
2155
  final_result = []
2156
  for idx, image in enumerate(src_images):
2157
    if image:
2158
      destcmd = utils.BuildShellCmd('cat %s', image)
2159
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2160
                                                       constants.GANETI_RUNAS,
2161
                                                       destcmd)
2162
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2163
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2164
      import_env['IMPORT_INDEX'] = str(idx)
2165
      result = utils.RunCmd(command, env=import_env)
2166
      if result.failed:
2167
        logging.error("Disk import command '%s' returned error: %s"
2168
                      " output: %s", command, result.fail_reason,
2169
                      result.output)
2170
        final_result.append("error importing disk %d: %s, %s" %
2171
                            (idx, result.fail_reason, result.output[-100]))
2172

    
2173
  if final_result:
2174
    _Fail("; ".join(final_result), log=False)
2175

    
2176

    
2177
def ListExports():
2178
  """Return a list of exports currently available on this machine.
2179

2180
  @rtype: list
2181
  @return: list of the exports
2182

2183
  """
2184
  if os.path.isdir(constants.EXPORT_DIR):
2185
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2186
  else:
2187
    _Fail("No exports directory")
2188

    
2189

    
2190
def RemoveExport(export):
2191
  """Remove an existing export from the node.
2192

2193
  @type export: str
2194
  @param export: the name of the export to remove
2195
  @rtype: None
2196

2197
  """
2198
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2199

    
2200
  try:
2201
    shutil.rmtree(target)
2202
  except EnvironmentError, err:
2203
    _Fail("Error while removing the export: %s", err, exc=True)
2204

    
2205

    
2206
def BlockdevRename(devlist):
2207
  """Rename a list of block devices.
2208

2209
  @type devlist: list of tuples
2210
  @param devlist: list of tuples of the form  (disk,
2211
      new_logical_id, new_physical_id); disk is an
2212
      L{objects.Disk} object describing the current disk,
2213
      and new logical_id/physical_id is the name we
2214
      rename it to
2215
  @rtype: boolean
2216
  @return: True if all renames succeeded, False otherwise
2217

2218
  """
2219
  msgs = []
2220
  result = True
2221
  for disk, unique_id in devlist:
2222
    dev = _RecursiveFindBD(disk)
2223
    if dev is None:
2224
      msgs.append("Can't find device %s in rename" % str(disk))
2225
      result = False
2226
      continue
2227
    try:
2228
      old_rpath = dev.dev_path
2229
      dev.Rename(unique_id)
2230
      new_rpath = dev.dev_path
2231
      if old_rpath != new_rpath:
2232
        DevCacheManager.RemoveCache(old_rpath)
2233
        # FIXME: we should add the new cache information here, like:
2234
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2235
        # but we don't have the owner here - maybe parse from existing
2236
        # cache? for now, we only lose lvm data when we rename, which
2237
        # is less critical than DRBD or MD
2238
    except errors.BlockDeviceError, err:
2239
      msgs.append("Can't rename device '%s' to '%s': %s" %
2240
                  (dev, unique_id, err))
2241
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2242
      result = False
2243
  if not result:
2244
    _Fail("; ".join(msgs))
2245

    
2246

    
2247
def _TransformFileStorageDir(file_storage_dir):
2248
  """Checks whether given file_storage_dir is valid.
2249

2250
  Checks wheter the given file_storage_dir is within the cluster-wide
2251
  default file_storage_dir stored in SimpleStore. Only paths under that
2252
  directory are allowed.
2253

2254
  @type file_storage_dir: str
2255
  @param file_storage_dir: the path to check
2256

2257
  @return: the normalized path if valid, None otherwise
2258

2259
  """
2260
  if not constants.ENABLE_FILE_STORAGE:
2261
    _Fail("File storage disabled at configure time")
2262
  cfg = _GetConfig()
2263
  file_storage_dir = os.path.normpath(file_storage_dir)
2264
  base_file_storage_dir = cfg.GetFileStorageDir()
2265
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2266
      base_file_storage_dir):
2267
    _Fail("File storage directory '%s' is not under base file"
2268
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2269
  return file_storage_dir
2270

    
2271

    
2272
def CreateFileStorageDir(file_storage_dir):
2273
  """Create file storage directory.
2274

2275
  @type file_storage_dir: str
2276
  @param file_storage_dir: directory to create
2277

2278
  @rtype: tuple
2279
  @return: tuple with first element a boolean indicating wheter dir
2280
      creation was successful or not
2281

2282
  """
2283
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2284
  if os.path.exists(file_storage_dir):
2285
    if not os.path.isdir(file_storage_dir):
2286
      _Fail("Specified storage dir '%s' is not a directory",
2287
            file_storage_dir)
2288
  else:
2289
    try:
2290
      os.makedirs(file_storage_dir, 0750)
2291
    except OSError, err:
2292
      _Fail("Cannot create file storage directory '%s': %s",
2293
            file_storage_dir, err, exc=True)
2294

    
2295

    
2296
def RemoveFileStorageDir(file_storage_dir):
2297
  """Remove file storage directory.
2298

2299
  Remove it only if it's empty. If not log an error and return.
2300

2301
  @type file_storage_dir: str
2302
  @param file_storage_dir: the directory we should cleanup
2303
  @rtype: tuple (success,)
2304
  @return: tuple of one element, C{success}, denoting
2305
      whether the operation was successful
2306

2307
  """
2308
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2309
  if os.path.exists(file_storage_dir):
2310
    if not os.path.isdir(file_storage_dir):
2311
      _Fail("Specified Storage directory '%s' is not a directory",
2312
            file_storage_dir)
2313
    # deletes dir only if empty, otherwise we want to fail the rpc call
2314
    try:
2315
      os.rmdir(file_storage_dir)
2316
    except OSError, err:
2317
      _Fail("Cannot remove file storage directory '%s': %s",
2318
            file_storage_dir, err)
2319

    
2320

    
2321
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2322
  """Rename the file storage directory.
2323

2324
  @type old_file_storage_dir: str
2325
  @param old_file_storage_dir: the current path
2326
  @type new_file_storage_dir: str
2327
  @param new_file_storage_dir: the name we should rename to
2328
  @rtype: tuple (success,)
2329
  @return: tuple of one element, C{success}, denoting
2330
      whether the operation was successful
2331

2332
  """
2333
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2334
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2335
  if not os.path.exists(new_file_storage_dir):
2336
    if os.path.isdir(old_file_storage_dir):
2337
      try:
2338
        os.rename(old_file_storage_dir, new_file_storage_dir)
2339
      except OSError, err:
2340
        _Fail("Cannot rename '%s' to '%s': %s",
2341
              old_file_storage_dir, new_file_storage_dir, err)
2342
    else:
2343
      _Fail("Specified storage dir '%s' is not a directory",
2344
            old_file_storage_dir)
2345
  else:
2346
    if os.path.exists(old_file_storage_dir):
2347
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2348
            old_file_storage_dir, new_file_storage_dir)
2349

    
2350

    
2351
def _EnsureJobQueueFile(file_name):
2352
  """Checks whether the given filename is in the queue directory.
2353

2354
  @type file_name: str
2355
  @param file_name: the file name we should check
2356
  @rtype: None
2357
  @raises RPCFail: if the file is not valid
2358

2359
  """
2360
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2361
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2362

    
2363
  if not result:
2364
    _Fail("Passed job queue file '%s' does not belong to"
2365
          " the queue directory '%s'", file_name, queue_dir)
2366

    
2367

    
2368
def JobQueueUpdate(file_name, content):
2369
  """Updates a file in the queue directory.
2370

2371
  This is just a wrapper over L{utils.WriteFile}, with proper
2372
  checking.
2373

2374
  @type file_name: str
2375
  @param file_name: the job file name
2376
  @type content: str
2377
  @param content: the new job contents
2378
  @rtype: boolean
2379
  @return: the success of the operation
2380

2381
  """
2382
  _EnsureJobQueueFile(file_name)
2383

    
2384
  # Write and replace the file atomically
2385
  utils.WriteFile(file_name, data=_Decompress(content))
2386

    
2387

    
2388
def JobQueueRename(old, new):
2389
  """Renames a job queue file.
2390

2391
  This is just a wrapper over os.rename with proper checking.
2392

2393
  @type old: str
2394
  @param old: the old (actual) file name
2395
  @type new: str
2396
  @param new: the desired file name
2397
  @rtype: tuple
2398
  @return: the success of the operation and payload
2399

2400
  """
2401
  _EnsureJobQueueFile(old)
2402
  _EnsureJobQueueFile(new)
2403

    
2404
  utils.RenameFile(old, new, mkdir=True)
2405

    
2406

    
2407
def JobQueueSetDrainFlag(drain_flag):
2408
  """Set the drain flag for the queue.
2409

2410
  This will set or unset the queue drain flag.
2411

2412
  @type drain_flag: boolean
2413
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2414
  @rtype: truple
2415
  @return: always True, None
2416
  @warning: the function always returns True
2417

2418
  """
2419
  if drain_flag:
2420
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2421
  else:
2422
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2423

    
2424

    
2425
def BlockdevClose(instance_name, disks):
2426
  """Closes the given block devices.
2427

2428
  This means they will be switched to secondary mode (in case of
2429
  DRBD).
2430

2431
  @param instance_name: if the argument is not empty, the symlinks
2432
      of this instance will be removed
2433
  @type disks: list of L{objects.Disk}
2434
  @param disks: the list of disks to be closed
2435
  @rtype: tuple (success, message)
2436
  @return: a tuple of success and message, where success
2437
      indicates the succes of the operation, and message
2438
      which will contain the error details in case we
2439
      failed
2440

2441
  """
2442
  bdevs = []
2443
  for cf in disks:
2444
    rd = _RecursiveFindBD(cf)
2445
    if rd is None:
2446
      _Fail("Can't find device %s", cf)
2447
    bdevs.append(rd)
2448

    
2449
  msg = []
2450
  for rd in bdevs:
2451
    try:
2452
      rd.Close()
2453
    except errors.BlockDeviceError, err:
2454
      msg.append(str(err))
2455
  if msg:
2456
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2457
  else:
2458
    if instance_name:
2459
      _RemoveBlockDevLinks(instance_name, disks)
2460

    
2461

    
2462
def ValidateHVParams(hvname, hvparams):
2463
  """Validates the given hypervisor parameters.
2464

2465
  @type hvname: string
2466
  @param hvname: the hypervisor name
2467
  @type hvparams: dict
2468
  @param hvparams: the hypervisor parameters to be validated
2469
  @rtype: None
2470

2471
  """
2472
  try:
2473
    hv_type = hypervisor.GetHypervisor(hvname)
2474
    hv_type.ValidateParameters(hvparams)
2475
  except errors.HypervisorError, err:
2476
    _Fail(str(err), log=False)
2477

    
2478

    
2479
def DemoteFromMC():
2480
  """Demotes the current node from master candidate role.
2481

2482
  """
2483
  # try to ensure we're not the master by mistake
2484
  master, myself = ssconf.GetMasterAndMyself()
2485
  if master == myself:
2486
    _Fail("ssconf status shows I'm the master node, will not demote")
2487

    
2488
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2489
  if not result.failed:
2490
    _Fail("The master daemon is running, will not demote")
2491

    
2492
  try:
2493
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2494
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2495
  except EnvironmentError, err:
2496
    if err.errno != errno.ENOENT:
2497
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2498

    
2499
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2500

    
2501

    
2502
def _FindDisks(nodes_ip, disks):
2503
  """Sets the physical ID on disks and returns the block devices.
2504

2505
  """
2506
  # set the correct physical ID
2507
  my_name = utils.HostInfo().name
2508
  for cf in disks:
2509
    cf.SetPhysicalID(my_name, nodes_ip)
2510

    
2511
  bdevs = []
2512

    
2513
  for cf in disks:
2514
    rd = _RecursiveFindBD(cf)
2515
    if rd is None:
2516
      _Fail("Can't find device %s", cf)
2517
    bdevs.append(rd)
2518
  return bdevs
2519

    
2520

    
2521
def DrbdDisconnectNet(nodes_ip, disks):
2522
  """Disconnects the network on a list of drbd devices.
2523

2524
  """
2525
  bdevs = _FindDisks(nodes_ip, disks)
2526

    
2527
  # disconnect disks
2528
  for rd in bdevs:
2529
    try:
2530
      rd.DisconnectNet()
2531
    except errors.BlockDeviceError, err:
2532
      _Fail("Can't change network configuration to standalone mode: %s",
2533
            err, exc=True)
2534

    
2535

    
2536
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2537
  """Attaches the network on a list of drbd devices.
2538

2539
  """
2540
  bdevs = _FindDisks(nodes_ip, disks)
2541

    
2542
  if multimaster:
2543
    for idx, rd in enumerate(bdevs):
2544
      try:
2545
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2546
      except EnvironmentError, err:
2547
        _Fail("Can't create symlink: %s", err)
2548
  # reconnect disks, switch to new master configuration and if
2549
  # needed primary mode
2550
  for rd in bdevs:
2551
    try:
2552
      rd.AttachNet(multimaster)
2553
    except errors.BlockDeviceError, err:
2554
      _Fail("Can't change network configuration: %s", err)
2555

    
2556
  # wait until the disks are connected; we need to retry the re-attach
2557
  # if the device becomes standalone, as this might happen if the one
2558
  # node disconnects and reconnects in a different mode before the
2559
  # other node reconnects; in this case, one or both of the nodes will
2560
  # decide it has wrong configuration and switch to standalone
2561

    
2562
  def _Attach():
2563
    all_connected = True
2564

    
2565
    for rd in bdevs:
2566
      stats = rd.GetProcStatus()
2567

    
2568
      all_connected = (all_connected and
2569
                       (stats.is_connected or stats.is_in_resync))
2570

    
2571
      if stats.is_standalone:
2572
        # peer had different config info and this node became
2573
        # standalone, even though this should not happen with the
2574
        # new staged way of changing disk configs
2575
        try:
2576
          rd.AttachNet(multimaster)
2577
        except errors.BlockDeviceError, err:
2578
          _Fail("Can't change network configuration: %s", err)
2579

    
2580
    if not all_connected:
2581
      raise utils.RetryAgain()
2582

    
2583
  try:
2584
    # Start with a delay of 100 miliseconds and go up to 5 seconds
2585
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2586
  except utils.RetryTimeout:
2587
    _Fail("Timeout in disk reconnecting")
2588

    
2589
  if multimaster:
2590
    # change to primary mode
2591
    for rd in bdevs:
2592
      try:
2593
        rd.Open()
2594
      except errors.BlockDeviceError, err:
2595
        _Fail("Can't change to primary mode: %s", err)
2596

    
2597

    
2598
def DrbdWaitSync(nodes_ip, disks):
2599
  """Wait until DRBDs have synchronized.
2600

2601
  """
2602
  def _helper(rd):
2603
    stats = rd.GetProcStatus()
2604
    if not (stats.is_connected or stats.is_in_resync):
2605
      raise utils.RetryAgain()
2606
    return stats
2607

    
2608
  bdevs = _FindDisks(nodes_ip, disks)
2609

    
2610
  min_resync = 100
2611
  alldone = True
2612
  for rd in bdevs:
2613
    try:
2614
      # poll each second for 15 seconds
2615
      stats = utils.Retry(_helper, 1, 15, args=[rd])
2616
    except utils.RetryTimeout:
2617
      stats = rd.GetProcStatus()
2618
      # last check
2619
      if not (stats.is_connected or stats.is_in_resync):
2620
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2621
    alldone = alldone and (not stats.is_in_resync)
2622
    if stats.sync_percent is not None:
2623
      min_resync = min(min_resync, stats.sync_percent)
2624

    
2625
  return (alldone, min_resync)
2626

    
2627

    
2628
def PowercycleNode(hypervisor_type):
2629
  """Hard-powercycle the node.
2630

2631
  Because we need to return first, and schedule the powercycle in the
2632
  background, we won't be able to report failures nicely.
2633

2634
  """
2635
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2636
  try:
2637
    pid = os.fork()
2638
  except OSError:
2639
    # if we can't fork, we'll pretend that we're in the child process
2640
    pid = 0
2641
  if pid > 0:
2642
    return "Reboot scheduled in 5 seconds"
2643
  time.sleep(5)
2644
  hyper.PowercycleNode()
2645

    
2646

    
2647
class HooksRunner(object):
2648
  """Hook runner.
2649

2650
  This class is instantiated on the node side (ganeti-noded) and not
2651
  on the master side.
2652

2653
  """
2654
  def __init__(self, hooks_base_dir=None):
2655
    """Constructor for hooks runner.
2656

2657
    @type hooks_base_dir: str or None
2658
    @param hooks_base_dir: if not None, this overrides the
2659
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2660

2661
    """
2662
    if hooks_base_dir is None:
2663
      hooks_base_dir = constants.HOOKS_BASE_DIR
2664
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
2665
    # constant
2666
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2667

    
2668
  def RunHooks(self, hpath, phase, env):
2669
    """Run the scripts in the hooks directory.
2670

2671
    @type hpath: str
2672
    @param hpath: the path to the hooks directory which
2673
        holds the scripts
2674
    @type phase: str
2675
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2676
        L{constants.HOOKS_PHASE_POST}
2677
    @type env: dict
2678
    @param env: dictionary with the environment for the hook
2679
    @rtype: list
2680
    @return: list of 3-element tuples:
2681
      - script path
2682
      - script result, either L{constants.HKR_SUCCESS} or
2683
        L{constants.HKR_FAIL}
2684
      - output of the script
2685

2686
    @raise errors.ProgrammerError: for invalid input
2687
        parameters
2688

2689
    """
2690
    if phase == constants.HOOKS_PHASE_PRE:
2691
      suffix = "pre"
2692
    elif phase == constants.HOOKS_PHASE_POST:
2693
      suffix = "post"
2694
    else:
2695
      _Fail("Unknown hooks phase '%s'", phase)
2696

    
2697

    
2698
    subdir = "%s-%s.d" % (hpath, suffix)
2699
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2700

    
2701
    results = []
2702

    
2703
    if not os.path.isdir(dir_name):
2704
      # for non-existing/non-dirs, we simply exit instead of logging a
2705
      # warning at every operation
2706
      return results
2707

    
2708
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2709

    
2710
    for (relname, relstatus, runresult)  in runparts_results:
2711
      if relstatus == constants.RUNPARTS_SKIP:
2712
        rrval = constants.HKR_SKIP
2713
        output = ""
2714
      elif relstatus == constants.RUNPARTS_ERR:
2715
        rrval = constants.HKR_FAIL
2716
        output = "Hook script execution error: %s" % runresult
2717
      elif relstatus == constants.RUNPARTS_RUN:
2718
        if runresult.failed:
2719
          rrval = constants.HKR_FAIL
2720
        else:
2721
          rrval = constants.HKR_SUCCESS
2722
        output = utils.SafeEncode(runresult.output.strip())
2723
      results.append(("%s/%s" % (subdir, relname), rrval, output))
2724

    
2725
    return results
2726

    
2727

    
2728
class IAllocatorRunner(object):
2729
  """IAllocator runner.
2730

2731
  This class is instantiated on the node side (ganeti-noded) and not on
2732
  the master side.
2733

2734
  """
2735
  @staticmethod
2736
  def Run(name, idata):
2737
    """Run an iallocator script.
2738

2739
    @type name: str
2740
    @param name: the iallocator script name
2741
    @type idata: str
2742
    @param idata: the allocator input data
2743

2744
    @rtype: tuple
2745
    @return: two element tuple of:
2746
       - status
2747
       - either error message or stdout of allocator (for success)
2748

2749
    """
2750
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2751
                                  os.path.isfile)
2752
    if alloc_script is None:
2753
      _Fail("iallocator module '%s' not found in the search path", name)
2754

    
2755
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2756
    try:
2757
      os.write(fd, idata)
2758
      os.close(fd)
2759
      result = utils.RunCmd([alloc_script, fin_name])
2760
      if result.failed:
2761
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2762
              name, result.fail_reason, result.output)
2763
    finally:
2764
      os.unlink(fin_name)
2765

    
2766
    return result.stdout
2767

    
2768

    
2769
class DevCacheManager(object):
2770
  """Simple class for managing a cache of block device information.
2771

2772
  """
2773
  _DEV_PREFIX = "/dev/"
2774
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2775

    
2776
  @classmethod
2777
  def _ConvertPath(cls, dev_path):
2778
    """Converts a /dev/name path to the cache file name.
2779

2780
    This replaces slashes with underscores and strips the /dev
2781
    prefix. It then returns the full path to the cache file.
2782

2783
    @type dev_path: str
2784
    @param dev_path: the C{/dev/} path name
2785
    @rtype: str
2786
    @return: the converted path name
2787

2788
    """
2789
    if dev_path.startswith(cls._DEV_PREFIX):
2790
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2791
    dev_path = dev_path.replace("/", "_")
2792
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
2793
    return fpath
2794

    
2795
  @classmethod
2796
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2797
    """Updates the cache information for a given device.
2798

2799
    @type dev_path: str
2800
    @param dev_path: the pathname of the device
2801
    @type owner: str
2802
    @param owner: the owner (instance name) of the device
2803
    @type on_primary: bool
2804
    @param on_primary: whether this is the primary
2805
        node nor not
2806
    @type iv_name: str
2807
    @param iv_name: the instance-visible name of the
2808
        device, as in objects.Disk.iv_name
2809

2810
    @rtype: None
2811

2812
    """
2813
    if dev_path is None:
2814
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2815
      return
2816
    fpath = cls._ConvertPath(dev_path)
2817
    if on_primary:
2818
      state = "primary"
2819
    else:
2820
      state = "secondary"
2821
    if iv_name is None:
2822
      iv_name = "not_visible"
2823
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2824
    try:
2825
      utils.WriteFile(fpath, data=fdata)
2826
    except EnvironmentError, err:
2827
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2828

    
2829
  @classmethod
2830
  def RemoveCache(cls, dev_path):
2831
    """Remove data for a dev_path.
2832

2833
    This is just a wrapper over L{utils.RemoveFile} with a converted
2834
    path name and logging.
2835

2836
    @type dev_path: str
2837
    @param dev_path: the pathname of the device
2838

2839
    @rtype: None
2840

2841
    """
2842
    if dev_path is None:
2843
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2844
      return
2845
    fpath = cls._ConvertPath(dev_path)
2846
    try:
2847
      utils.RemoveFile(fpath)
2848
    except EnvironmentError, err:
2849
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)