Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 168c1de2

History | View | Annotate | Download (87 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50

    
51
from ganeti import errors
52
from ganeti import utils
53
from ganeti import ssh
54
from ganeti import hypervisor
55
from ganeti import constants
56
from ganeti import bdev
57
from ganeti import objects
58
from ganeti import ssconf
59

    
60

    
61
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62
_ALLOWED_CLEAN_DIRS = frozenset([
63
  constants.DATA_DIR,
64
  constants.JOB_QUEUE_ARCHIVE_DIR,
65
  constants.QUEUE_DIR,
66
  ])
67

    
68

    
69
class RPCFail(Exception):
70
  """Class denoting RPC failure.
71

72
  Its argument is the error message.
73

74
  """
75

    
76

    
77
def _Fail(msg, *args, **kwargs):
78
  """Log an error and the raise an RPCFail exception.
79

80
  This exception is then handled specially in the ganeti daemon and
81
  turned into a 'failed' return type. As such, this function is a
82
  useful shortcut for logging the error and returning it to the master
83
  daemon.
84

85
  @type msg: string
86
  @param msg: the text of the exception
87
  @raise RPCFail
88

89
  """
90
  if args:
91
    msg = msg % args
92
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
93
    if "exc" in kwargs and kwargs["exc"]:
94
      logging.exception(msg)
95
    else:
96
      logging.error(msg)
97
  raise RPCFail(msg)
98

    
99

    
100
def _GetConfig():
101
  """Simple wrapper to return a SimpleStore.
102

103
  @rtype: L{ssconf.SimpleStore}
104
  @return: a SimpleStore instance
105

106
  """
107
  return ssconf.SimpleStore()
108

    
109

    
110
def _GetSshRunner(cluster_name):
111
  """Simple wrapper to return an SshRunner.
112

113
  @type cluster_name: str
114
  @param cluster_name: the cluster name, which is needed
115
      by the SshRunner constructor
116
  @rtype: L{ssh.SshRunner}
117
  @return: an SshRunner instance
118

119
  """
120
  return ssh.SshRunner(cluster_name)
121

    
122

    
123
def _Decompress(data):
124
  """Unpacks data compressed by the RPC client.
125

126
  @type data: list or tuple
127
  @param data: Data sent by RPC client
128
  @rtype: str
129
  @return: Decompressed data
130

131
  """
132
  assert isinstance(data, (list, tuple))
133
  assert len(data) == 2
134
  (encoding, content) = data
135
  if encoding == constants.RPC_ENCODING_NONE:
136
    return content
137
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
138
    return zlib.decompress(base64.b64decode(content))
139
  else:
140
    raise AssertionError("Unknown data encoding")
141

    
142

    
143
def _CleanDirectory(path, exclude=None):
144
  """Removes all regular files in a directory.
145

146
  @type path: str
147
  @param path: the directory to clean
148
  @type exclude: list
149
  @param exclude: list of files to be excluded, defaults
150
      to the empty list
151

152
  """
153
  if path not in _ALLOWED_CLEAN_DIRS:
154
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
155
          path)
156

    
157
  if not os.path.isdir(path):
158
    return
159
  if exclude is None:
160
    exclude = []
161
  else:
162
    # Normalize excluded paths
163
    exclude = [os.path.normpath(i) for i in exclude]
164

    
165
  for rel_name in utils.ListVisibleFiles(path):
166
    full_name = utils.PathJoin(path, rel_name)
167
    if full_name in exclude:
168
      continue
169
    if os.path.isfile(full_name) and not os.path.islink(full_name):
170
      utils.RemoveFile(full_name)
171

    
172

    
173
def _BuildUploadFileList():
174
  """Build the list of allowed upload files.
175

176
  This is abstracted so that it's built only once at module import time.
177

178
  """
179
  allowed_files = set([
180
    constants.CLUSTER_CONF_FILE,
181
    constants.ETC_HOSTS,
182
    constants.SSH_KNOWN_HOSTS_FILE,
183
    constants.VNC_PASSWORD_FILE,
184
    constants.RAPI_CERT_FILE,
185
    constants.RAPI_USERS_FILE,
186
    constants.HMAC_CLUSTER_KEY,
187
    ])
188

    
189
  for hv_name in constants.HYPER_TYPES:
190
    hv_class = hypervisor.GetHypervisorClass(hv_name)
191
    allowed_files.update(hv_class.GetAncillaryFiles())
192

    
193
  return frozenset(allowed_files)
194

    
195

    
196
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
197

    
198

    
199
def JobQueuePurge():
200
  """Removes job queue files and archived jobs.
201

202
  @rtype: tuple
203
  @return: True, None
204

205
  """
206
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
207
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
208

    
209

    
210
def GetMasterInfo():
211
  """Returns master information.
212

213
  This is an utility function to compute master information, either
214
  for consumption here or from the node daemon.
215

216
  @rtype: tuple
217
  @return: master_netdev, master_ip, master_name
218
  @raise RPCFail: in case of errors
219

220
  """
221
  try:
222
    cfg = _GetConfig()
223
    master_netdev = cfg.GetMasterNetdev()
224
    master_ip = cfg.GetMasterIP()
225
    master_node = cfg.GetMasterNode()
226
  except errors.ConfigurationError, err:
227
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
228
  return (master_netdev, master_ip, master_node)
229

    
230

    
231
def StartMaster(start_daemons, no_voting):
232
  """Activate local node as master node.
233

234
  The function will always try activate the IP address of the master
235
  (unless someone else has it). It will also start the master daemons,
236
  based on the start_daemons parameter.
237

238
  @type start_daemons: boolean
239
  @param start_daemons: whether to also start the master
240
      daemons (ganeti-masterd and ganeti-rapi)
241
  @type no_voting: boolean
242
  @param no_voting: whether to start ganeti-masterd without a node vote
243
      (if start_daemons is True), but still non-interactively
244
  @rtype: None
245

246
  """
247
  # GetMasterInfo will raise an exception if not able to return data
248
  master_netdev, master_ip, _ = GetMasterInfo()
249

    
250
  err_msgs = []
251
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
252
    if utils.OwnIpAddress(master_ip):
253
      # we already have the ip:
254
      logging.debug("Master IP already configured, doing nothing")
255
    else:
256
      msg = "Someone else has the master ip, not activating"
257
      logging.error(msg)
258
      err_msgs.append(msg)
259
  else:
260
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
261
                           "dev", master_netdev, "label",
262
                           "%s:0" % master_netdev])
263
    if result.failed:
264
      msg = "Can't activate master IP: %s" % result.output
265
      logging.error(msg)
266
      err_msgs.append(msg)
267

    
268
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
269
                           "-s", master_ip, master_ip])
270
    # we'll ignore the exit code of arping
271

    
272
  # and now start the master and rapi daemons
273
  if start_daemons:
274
    if no_voting:
275
      masterd_args = "--no-voting --yes-do-it"
276
    else:
277
      masterd_args = ""
278

    
279
    env = {
280
      "EXTRA_MASTERD_ARGS": masterd_args,
281
      }
282

    
283
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
284
    if result.failed:
285
      msg = "Can't start Ganeti master: %s" % result.output
286
      logging.error(msg)
287
      err_msgs.append(msg)
288

    
289
  if err_msgs:
290
    _Fail("; ".join(err_msgs))
291

    
292

    
293
def StopMaster(stop_daemons):
294
  """Deactivate this node as master.
295

296
  The function will always try to deactivate the IP address of the
297
  master. It will also stop the master daemons depending on the
298
  stop_daemons parameter.
299

300
  @type stop_daemons: boolean
301
  @param stop_daemons: whether to also stop the master daemons
302
      (ganeti-masterd and ganeti-rapi)
303
  @rtype: None
304

305
  """
306
  # TODO: log and report back to the caller the error failures; we
307
  # need to decide in which case we fail the RPC for this
308

    
309
  # GetMasterInfo will raise an exception if not able to return data
310
  master_netdev, master_ip, _ = GetMasterInfo()
311

    
312
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
313
                         "dev", master_netdev])
314
  if result.failed:
315
    logging.error("Can't remove the master IP, error: %s", result.output)
316
    # but otherwise ignore the failure
317

    
318
  if stop_daemons:
319
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
320
    if result.failed:
321
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
322
                    " and error %s",
323
                    result.cmd, result.exit_code, result.output)
324

    
325

    
326
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
327
  """Joins this node to the cluster.
328

329
  This does the following:
330
      - updates the hostkeys of the machine (rsa and dsa)
331
      - adds the ssh private key to the user
332
      - adds the ssh public key to the users' authorized_keys file
333

334
  @type dsa: str
335
  @param dsa: the DSA private key to write
336
  @type dsapub: str
337
  @param dsapub: the DSA public key to write
338
  @type rsa: str
339
  @param rsa: the RSA private key to write
340
  @type rsapub: str
341
  @param rsapub: the RSA public key to write
342
  @type sshkey: str
343
  @param sshkey: the SSH private key to write
344
  @type sshpub: str
345
  @param sshpub: the SSH public key to write
346
  @rtype: boolean
347
  @return: the success of the operation
348

349
  """
350
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
351
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
352
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
353
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
354
  for name, content, mode in sshd_keys:
355
    utils.WriteFile(name, data=content, mode=mode)
356

    
357
  try:
358
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
359
                                                    mkdir=True)
360
  except errors.OpExecError, err:
361
    _Fail("Error while processing user ssh files: %s", err, exc=True)
362

    
363
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
364
    utils.WriteFile(name, data=content, mode=0600)
365

    
366
  utils.AddAuthorizedKey(auth_keys, sshpub)
367

    
368
  result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
369
  if result.failed:
370
    _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
371
          result.cmd, result.exit_code, result.output)
372

    
373

    
374
def LeaveCluster(modify_ssh_setup):
375
  """Cleans up and remove the current node.
376

377
  This function cleans up and prepares the current node to be removed
378
  from the cluster.
379

380
  If processing is successful, then it raises an
381
  L{errors.QuitGanetiException} which is used as a special case to
382
  shutdown the node daemon.
383

384
  @param modify_ssh_setup: boolean
385

386
  """
387
  _CleanDirectory(constants.DATA_DIR)
388
  JobQueuePurge()
389

    
390
  if modify_ssh_setup:
391
    try:
392
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
393

    
394
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
395

    
396
      utils.RemoveFile(priv_key)
397
      utils.RemoveFile(pub_key)
398
    except errors.OpExecError:
399
      logging.exception("Error while processing ssh files")
400

    
401
  try:
402
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
403
    utils.RemoveFile(constants.RAPI_CERT_FILE)
404
    utils.RemoveFile(constants.NODED_CERT_FILE)
405
  except: # pylint: disable-msg=W0702
406
    logging.exception("Error while removing cluster secrets")
407

    
408
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
409
  if result.failed:
410
    logging.error("Command %s failed with exitcode %s and error %s",
411
                  result.cmd, result.exit_code, result.output)
412

    
413
  # Raise a custom exception (handled in ganeti-noded)
414
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
415

    
416

    
417
def GetNodeInfo(vgname, hypervisor_type):
418
  """Gives back a hash with different information about the node.
419

420
  @type vgname: C{string}
421
  @param vgname: the name of the volume group to ask for disk space information
422
  @type hypervisor_type: C{str}
423
  @param hypervisor_type: the name of the hypervisor to ask for
424
      memory information
425
  @rtype: C{dict}
426
  @return: dictionary with the following keys:
427
      - vg_size is the size of the configured volume group in MiB
428
      - vg_free is the free size of the volume group in MiB
429
      - memory_dom0 is the memory allocated for domain0 in MiB
430
      - memory_free is the currently available (free) ram in MiB
431
      - memory_total is the total number of ram in MiB
432

433
  """
434
  outputarray = {}
435
  vginfo = _GetVGInfo(vgname)
436
  outputarray['vg_size'] = vginfo['vg_size']
437
  outputarray['vg_free'] = vginfo['vg_free']
438

    
439
  hyper = hypervisor.GetHypervisor(hypervisor_type)
440
  hyp_info = hyper.GetNodeInfo()
441
  if hyp_info is not None:
442
    outputarray.update(hyp_info)
443

    
444
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
445

    
446
  return outputarray
447

    
448

    
449
def VerifyNode(what, cluster_name):
450
  """Verify the status of the local node.
451

452
  Based on the input L{what} parameter, various checks are done on the
453
  local node.
454

455
  If the I{filelist} key is present, this list of
456
  files is checksummed and the file/checksum pairs are returned.
457

458
  If the I{nodelist} key is present, we check that we have
459
  connectivity via ssh with the target nodes (and check the hostname
460
  report).
461

462
  If the I{node-net-test} key is present, we check that we have
463
  connectivity to the given nodes via both primary IP and, if
464
  applicable, secondary IPs.
465

466
  @type what: C{dict}
467
  @param what: a dictionary of things to check:
468
      - filelist: list of files for which to compute checksums
469
      - nodelist: list of nodes we should check ssh communication with
470
      - node-net-test: list of nodes we should check node daemon port
471
        connectivity with
472
      - hypervisor: list with hypervisors to run the verify for
473
  @rtype: dict
474
  @return: a dictionary with the same keys as the input dict, and
475
      values representing the result of the checks
476

477
  """
478
  result = {}
479

    
480
  if constants.NV_HYPERVISOR in what:
481
    result[constants.NV_HYPERVISOR] = tmp = {}
482
    for hv_name in what[constants.NV_HYPERVISOR]:
483
      try:
484
        val = hypervisor.GetHypervisor(hv_name).Verify()
485
      except errors.HypervisorError, err:
486
        val = "Error while checking hypervisor: %s" % str(err)
487
      tmp[hv_name] = val
488

    
489
  if constants.NV_FILELIST in what:
490
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
491
      what[constants.NV_FILELIST])
492

    
493
  if constants.NV_NODELIST in what:
494
    result[constants.NV_NODELIST] = tmp = {}
495
    random.shuffle(what[constants.NV_NODELIST])
496
    for node in what[constants.NV_NODELIST]:
497
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
498
      if not success:
499
        tmp[node] = message
500

    
501
  if constants.NV_NODENETTEST in what:
502
    result[constants.NV_NODENETTEST] = tmp = {}
503
    my_name = utils.HostInfo().name
504
    my_pip = my_sip = None
505
    for name, pip, sip in what[constants.NV_NODENETTEST]:
506
      if name == my_name:
507
        my_pip = pip
508
        my_sip = sip
509
        break
510
    if not my_pip:
511
      tmp[my_name] = ("Can't find my own primary/secondary IP"
512
                      " in the node list")
513
    else:
514
      port = utils.GetDaemonPort(constants.NODED)
515
      for name, pip, sip in what[constants.NV_NODENETTEST]:
516
        fail = []
517
        if not utils.TcpPing(pip, port, source=my_pip):
518
          fail.append("primary")
519
        if sip != pip:
520
          if not utils.TcpPing(sip, port, source=my_sip):
521
            fail.append("secondary")
522
        if fail:
523
          tmp[name] = ("failure using the %s interface(s)" %
524
                       " and ".join(fail))
525

    
526
  if constants.NV_LVLIST in what:
527
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
528

    
529
  if constants.NV_INSTANCELIST in what:
530
    # GetInstanceList can fail
531
    try:
532
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
533
    except RPCFail, err:
534
      val = str(err)
535
    result[constants.NV_INSTANCELIST] = val
536

    
537
  if constants.NV_VGLIST in what:
538
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
539

    
540
  if constants.NV_PVLIST in what:
541
    result[constants.NV_PVLIST] = \
542
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
543
                                   filter_allocatable=False)
544

    
545
  if constants.NV_VERSION in what:
546
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
547
                                    constants.RELEASE_VERSION)
548

    
549
  if constants.NV_HVINFO in what:
550
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
551
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
552

    
553
  if constants.NV_DRBDLIST in what:
554
    try:
555
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
556
    except errors.BlockDeviceError, err:
557
      logging.warning("Can't get used minors list", exc_info=True)
558
      used_minors = str(err)
559
    result[constants.NV_DRBDLIST] = used_minors
560

    
561
  if constants.NV_NODESETUP in what:
562
    result[constants.NV_NODESETUP] = tmpr = []
563
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
564
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
565
                  " under /sys, missing required directories /sys/block"
566
                  " and /sys/class/net")
567
    if (not os.path.isdir("/proc/sys") or
568
        not os.path.isfile("/proc/sysrq-trigger")):
569
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
570
                  " under /proc, missing required directory /proc/sys and"
571
                  " the file /proc/sysrq-trigger")
572

    
573
  if constants.NV_TIME in what:
574
    result[constants.NV_TIME] = utils.SplitTime(time.time())
575

    
576
  return result
577

    
578

    
579
def GetVolumeList(vg_name):
580
  """Compute list of logical volumes and their size.
581

582
  @type vg_name: str
583
  @param vg_name: the volume group whose LVs we should list
584
  @rtype: dict
585
  @return:
586
      dictionary of all partions (key) with value being a tuple of
587
      their size (in MiB), inactive and online status::
588

589
        {'test1': ('20.06', True, True)}
590

591
      in case of errors, a string is returned with the error
592
      details.
593

594
  """
595
  lvs = {}
596
  sep = '|'
597
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
598
                         "--separator=%s" % sep,
599
                         "-olv_name,lv_size,lv_attr", vg_name])
600
  if result.failed:
601
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
602

    
603
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
604
  for line in result.stdout.splitlines():
605
    line = line.strip()
606
    match = valid_line_re.match(line)
607
    if not match:
608
      logging.error("Invalid line returned from lvs output: '%s'", line)
609
      continue
610
    name, size, attr = match.groups()
611
    inactive = attr[4] == '-'
612
    online = attr[5] == 'o'
613
    virtual = attr[0] == 'v'
614
    if virtual:
615
      # we don't want to report such volumes as existing, since they
616
      # don't really hold data
617
      continue
618
    lvs[name] = (size, inactive, online)
619

    
620
  return lvs
621

    
622

    
623
def ListVolumeGroups():
624
  """List the volume groups and their size.
625

626
  @rtype: dict
627
  @return: dictionary with keys volume name and values the
628
      size of the volume
629

630
  """
631
  return utils.ListVolumeGroups()
632

    
633

    
634
def NodeVolumes():
635
  """List all volumes on this node.
636

637
  @rtype: list
638
  @return:
639
    A list of dictionaries, each having four keys:
640
      - name: the logical volume name,
641
      - size: the size of the logical volume
642
      - dev: the physical device on which the LV lives
643
      - vg: the volume group to which it belongs
644

645
    In case of errors, we return an empty list and log the
646
    error.
647

648
    Note that since a logical volume can live on multiple physical
649
    volumes, the resulting list might include a logical volume
650
    multiple times.
651

652
  """
653
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
654
                         "--separator=|",
655
                         "--options=lv_name,lv_size,devices,vg_name"])
656
  if result.failed:
657
    _Fail("Failed to list logical volumes, lvs output: %s",
658
          result.output)
659

    
660
  def parse_dev(dev):
661
    return dev.split('(')[0]
662

    
663
  def handle_dev(dev):
664
    return [parse_dev(x) for x in dev.split(",")]
665

    
666
  def map_line(line):
667
    line = [v.strip() for v in line]
668
    return [{'name': line[0], 'size': line[1],
669
             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
670

    
671
  all_devs = []
672
  for line in result.stdout.splitlines():
673
    if line.count('|') >= 3:
674
      all_devs.extend(map_line(line.split('|')))
675
    else:
676
      logging.warning("Strange line in the output from lvs: '%s'", line)
677
  return all_devs
678

    
679

    
680
def BridgesExist(bridges_list):
681
  """Check if a list of bridges exist on the current node.
682

683
  @rtype: boolean
684
  @return: C{True} if all of them exist, C{False} otherwise
685

686
  """
687
  missing = []
688
  for bridge in bridges_list:
689
    if not utils.BridgeExists(bridge):
690
      missing.append(bridge)
691

    
692
  if missing:
693
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
694

    
695

    
696
def GetInstanceList(hypervisor_list):
697
  """Provides a list of instances.
698

699
  @type hypervisor_list: list
700
  @param hypervisor_list: the list of hypervisors to query information
701

702
  @rtype: list
703
  @return: a list of all running instances on the current node
704
    - instance1.example.com
705
    - instance2.example.com
706

707
  """
708
  results = []
709
  for hname in hypervisor_list:
710
    try:
711
      names = hypervisor.GetHypervisor(hname).ListInstances()
712
      results.extend(names)
713
    except errors.HypervisorError, err:
714
      _Fail("Error enumerating instances (hypervisor %s): %s",
715
            hname, err, exc=True)
716

    
717
  return results
718

    
719

    
720
def GetInstanceInfo(instance, hname):
721
  """Gives back the information about an instance as a dictionary.
722

723
  @type instance: string
724
  @param instance: the instance name
725
  @type hname: string
726
  @param hname: the hypervisor type of the instance
727

728
  @rtype: dict
729
  @return: dictionary with the following keys:
730
      - memory: memory size of instance (int)
731
      - state: xen state of instance (string)
732
      - time: cpu time of instance (float)
733

734
  """
735
  output = {}
736

    
737
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
738
  if iinfo is not None:
739
    output['memory'] = iinfo[2]
740
    output['state'] = iinfo[4]
741
    output['time'] = iinfo[5]
742

    
743
  return output
744

    
745

    
746
def GetInstanceMigratable(instance):
747
  """Gives whether an instance can be migrated.
748

749
  @type instance: L{objects.Instance}
750
  @param instance: object representing the instance to be checked.
751

752
  @rtype: tuple
753
  @return: tuple of (result, description) where:
754
      - result: whether the instance can be migrated or not
755
      - description: a description of the issue, if relevant
756

757
  """
758
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
759
  iname = instance.name
760
  if iname not in hyper.ListInstances():
761
    _Fail("Instance %s is not running", iname)
762

    
763
  for idx in range(len(instance.disks)):
764
    link_name = _GetBlockDevSymlinkPath(iname, idx)
765
    if not os.path.islink(link_name):
766
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
767

    
768

    
769
def GetAllInstancesInfo(hypervisor_list):
770
  """Gather data about all instances.
771

772
  This is the equivalent of L{GetInstanceInfo}, except that it
773
  computes data for all instances at once, thus being faster if one
774
  needs data about more than one instance.
775

776
  @type hypervisor_list: list
777
  @param hypervisor_list: list of hypervisors to query for instance data
778

779
  @rtype: dict
780
  @return: dictionary of instance: data, with data having the following keys:
781
      - memory: memory size of instance (int)
782
      - state: xen state of instance (string)
783
      - time: cpu time of instance (float)
784
      - vcpus: the number of vcpus
785

786
  """
787
  output = {}
788

    
789
  for hname in hypervisor_list:
790
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
791
    if iinfo:
792
      for name, _, memory, vcpus, state, times in iinfo:
793
        value = {
794
          'memory': memory,
795
          'vcpus': vcpus,
796
          'state': state,
797
          'time': times,
798
          }
799
        if name in output:
800
          # we only check static parameters, like memory and vcpus,
801
          # and not state and time which can change between the
802
          # invocations of the different hypervisors
803
          for key in 'memory', 'vcpus':
804
            if value[key] != output[name][key]:
805
              _Fail("Instance %s is running twice"
806
                    " with different parameters", name)
807
        output[name] = value
808

    
809
  return output
810

    
811

    
812
def _InstanceLogName(kind, os_name, instance):
813
  """Compute the OS log filename for a given instance and operation.
814

815
  The instance name and os name are passed in as strings since not all
816
  operations have these as part of an instance object.
817

818
  @type kind: string
819
  @param kind: the operation type (e.g. add, import, etc.)
820
  @type os_name: string
821
  @param os_name: the os name
822
  @type instance: string
823
  @param instance: the name of the instance being imported/added/etc.
824

825
  """
826
  base = ("%s-%s-%s-%s.log" %
827
          (kind, os_name, instance, utils.TimestampForFilename()))
828
  return utils.PathJoin(constants.LOG_OS_DIR, base)
829

    
830

    
831
def InstanceOsAdd(instance, reinstall, debug):
832
  """Add an OS to an instance.
833

834
  @type instance: L{objects.Instance}
835
  @param instance: Instance whose OS is to be installed
836
  @type reinstall: boolean
837
  @param reinstall: whether this is an instance reinstall
838
  @type debug: integer
839
  @param debug: debug level, passed to the OS scripts
840
  @rtype: None
841

842
  """
843
  inst_os = OSFromDisk(instance.os)
844

    
845
  create_env = OSEnvironment(instance, inst_os, debug)
846
  if reinstall:
847
    create_env['INSTANCE_REINSTALL'] = "1"
848

    
849
  logfile = _InstanceLogName("add", instance.os, instance.name)
850

    
851
  result = utils.RunCmd([inst_os.create_script], env=create_env,
852
                        cwd=inst_os.path, output=logfile,)
853
  if result.failed:
854
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
855
                  " output: %s", result.cmd, result.fail_reason, logfile,
856
                  result.output)
857
    lines = [utils.SafeEncode(val)
858
             for val in utils.TailFile(logfile, lines=20)]
859
    _Fail("OS create script failed (%s), last lines in the"
860
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
861

    
862

    
863
def RunRenameInstance(instance, old_name, debug):
864
  """Run the OS rename script for an instance.
865

866
  @type instance: L{objects.Instance}
867
  @param instance: Instance whose OS is to be installed
868
  @type old_name: string
869
  @param old_name: previous instance name
870
  @type debug: integer
871
  @param debug: debug level, passed to the OS scripts
872
  @rtype: boolean
873
  @return: the success of the operation
874

875
  """
876
  inst_os = OSFromDisk(instance.os)
877

    
878
  rename_env = OSEnvironment(instance, inst_os, debug)
879
  rename_env['OLD_INSTANCE_NAME'] = old_name
880

    
881
  logfile = _InstanceLogName("rename", instance.os,
882
                             "%s-%s" % (old_name, instance.name))
883

    
884
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
885
                        cwd=inst_os.path, output=logfile)
886

    
887
  if result.failed:
888
    logging.error("os create command '%s' returned error: %s output: %s",
889
                  result.cmd, result.fail_reason, result.output)
890
    lines = [utils.SafeEncode(val)
891
             for val in utils.TailFile(logfile, lines=20)]
892
    _Fail("OS rename script failed (%s), last lines in the"
893
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
894

    
895

    
896
def _GetVGInfo(vg_name):
897
  """Get information about the volume group.
898

899
  @type vg_name: str
900
  @param vg_name: the volume group which we query
901
  @rtype: dict
902
  @return:
903
    A dictionary with the following keys:
904
      - C{vg_size} is the total size of the volume group in MiB
905
      - C{vg_free} is the free size of the volume group in MiB
906
      - C{pv_count} are the number of physical disks in that VG
907

908
    If an error occurs during gathering of data, we return the same dict
909
    with keys all set to None.
910

911
  """
912
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
913

    
914
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
915
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
916

    
917
  if retval.failed:
918
    logging.error("volume group %s not present", vg_name)
919
    return retdic
920
  valarr = retval.stdout.strip().rstrip(':').split(':')
921
  if len(valarr) == 3:
922
    try:
923
      retdic = {
924
        "vg_size": int(round(float(valarr[0]), 0)),
925
        "vg_free": int(round(float(valarr[1]), 0)),
926
        "pv_count": int(valarr[2]),
927
        }
928
    except (TypeError, ValueError), err:
929
      logging.exception("Fail to parse vgs output: %s", err)
930
  else:
931
    logging.error("vgs output has the wrong number of fields (expected"
932
                  " three): %s", str(valarr))
933
  return retdic
934

    
935

    
936
def _GetBlockDevSymlinkPath(instance_name, idx):
937
  return utils.PathJoin(constants.DISK_LINKS_DIR,
938
                        "%s:%d" % (instance_name, idx))
939

    
940

    
941
def _SymlinkBlockDev(instance_name, device_path, idx):
942
  """Set up symlinks to a instance's block device.
943

944
  This is an auxiliary function run when an instance is start (on the primary
945
  node) or when an instance is migrated (on the target node).
946

947

948
  @param instance_name: the name of the target instance
949
  @param device_path: path of the physical block device, on the node
950
  @param idx: the disk index
951
  @return: absolute path to the disk's symlink
952

953
  """
954
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
955
  try:
956
    os.symlink(device_path, link_name)
957
  except OSError, err:
958
    if err.errno == errno.EEXIST:
959
      if (not os.path.islink(link_name) or
960
          os.readlink(link_name) != device_path):
961
        os.remove(link_name)
962
        os.symlink(device_path, link_name)
963
    else:
964
      raise
965

    
966
  return link_name
967

    
968

    
969
def _RemoveBlockDevLinks(instance_name, disks):
970
  """Remove the block device symlinks belonging to the given instance.
971

972
  """
973
  for idx, _ in enumerate(disks):
974
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
975
    if os.path.islink(link_name):
976
      try:
977
        os.remove(link_name)
978
      except OSError:
979
        logging.exception("Can't remove symlink '%s'", link_name)
980

    
981

    
982
def _GatherAndLinkBlockDevs(instance):
983
  """Set up an instance's block device(s).
984

985
  This is run on the primary node at instance startup. The block
986
  devices must be already assembled.
987

988
  @type instance: L{objects.Instance}
989
  @param instance: the instance whose disks we shoul assemble
990
  @rtype: list
991
  @return: list of (disk_object, device_path)
992

993
  """
994
  block_devices = []
995
  for idx, disk in enumerate(instance.disks):
996
    device = _RecursiveFindBD(disk)
997
    if device is None:
998
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
999
                                    str(disk))
1000
    device.Open()
1001
    try:
1002
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1003
    except OSError, e:
1004
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1005
                                    e.strerror)
1006

    
1007
    block_devices.append((disk, link_name))
1008

    
1009
  return block_devices
1010

    
1011

    
1012
def StartInstance(instance):
1013
  """Start an instance.
1014

1015
  @type instance: L{objects.Instance}
1016
  @param instance: the instance object
1017
  @rtype: None
1018

1019
  """
1020
  running_instances = GetInstanceList([instance.hypervisor])
1021

    
1022
  if instance.name in running_instances:
1023
    logging.info("Instance %s already running, not starting", instance.name)
1024
    return
1025

    
1026
  try:
1027
    block_devices = _GatherAndLinkBlockDevs(instance)
1028
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1029
    hyper.StartInstance(instance, block_devices)
1030
  except errors.BlockDeviceError, err:
1031
    _Fail("Block device error: %s", err, exc=True)
1032
  except errors.HypervisorError, err:
1033
    _RemoveBlockDevLinks(instance.name, instance.disks)
1034
    _Fail("Hypervisor error: %s", err, exc=True)
1035

    
1036

    
1037
def InstanceShutdown(instance, timeout):
1038
  """Shut an instance down.
1039

1040
  @note: this functions uses polling with a hardcoded timeout.
1041

1042
  @type instance: L{objects.Instance}
1043
  @param instance: the instance object
1044
  @type timeout: integer
1045
  @param timeout: maximum timeout for soft shutdown
1046
  @rtype: None
1047

1048
  """
1049
  hv_name = instance.hypervisor
1050
  hyper = hypervisor.GetHypervisor(hv_name)
1051
  iname = instance.name
1052

    
1053
  if instance.name not in hyper.ListInstances():
1054
    logging.info("Instance %s not running, doing nothing", iname)
1055
    return
1056

    
1057
  class _TryShutdown:
1058
    def __init__(self):
1059
      self.tried_once = False
1060

    
1061
    def __call__(self):
1062
      if iname not in hyper.ListInstances():
1063
        return
1064

    
1065
      try:
1066
        hyper.StopInstance(instance, retry=self.tried_once)
1067
      except errors.HypervisorError, err:
1068
        if iname not in hyper.ListInstances():
1069
          # if the instance is no longer existing, consider this a
1070
          # success and go to cleanup
1071
          return
1072

    
1073
        _Fail("Failed to stop instance %s: %s", iname, err)
1074

    
1075
      self.tried_once = True
1076

    
1077
      raise utils.RetryAgain()
1078

    
1079
  try:
1080
    utils.Retry(_TryShutdown(), 5, timeout)
1081
  except utils.RetryTimeout:
1082
    # the shutdown did not succeed
1083
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1084

    
1085
    try:
1086
      hyper.StopInstance(instance, force=True)
1087
    except errors.HypervisorError, err:
1088
      if iname in hyper.ListInstances():
1089
        # only raise an error if the instance still exists, otherwise
1090
        # the error could simply be "instance ... unknown"!
1091
        _Fail("Failed to force stop instance %s: %s", iname, err)
1092

    
1093
    time.sleep(1)
1094

    
1095
    if iname in hyper.ListInstances():
1096
      _Fail("Could not shutdown instance %s even by destroy", iname)
1097

    
1098
  _RemoveBlockDevLinks(iname, instance.disks)
1099

    
1100

    
1101
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1102
  """Reboot an instance.
1103

1104
  @type instance: L{objects.Instance}
1105
  @param instance: the instance object to reboot
1106
  @type reboot_type: str
1107
  @param reboot_type: the type of reboot, one the following
1108
    constants:
1109
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1110
        instance OS, do not recreate the VM
1111
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1112
        restart the VM (at the hypervisor level)
1113
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1114
        not accepted here, since that mode is handled differently, in
1115
        cmdlib, and translates into full stop and start of the
1116
        instance (instead of a call_instance_reboot RPC)
1117
  @type shutdown_timeout: integer
1118
  @param shutdown_timeout: maximum timeout for soft shutdown
1119
  @rtype: None
1120

1121
  """
1122
  running_instances = GetInstanceList([instance.hypervisor])
1123

    
1124
  if instance.name not in running_instances:
1125
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1126

    
1127
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1128
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1129
    try:
1130
      hyper.RebootInstance(instance)
1131
    except errors.HypervisorError, err:
1132
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1133
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1134
    try:
1135
      InstanceShutdown(instance, shutdown_timeout)
1136
      return StartInstance(instance)
1137
    except errors.HypervisorError, err:
1138
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1139
  else:
1140
    _Fail("Invalid reboot_type received: %s", reboot_type)
1141

    
1142

    
1143
def MigrationInfo(instance):
1144
  """Gather information about an instance to be migrated.
1145

1146
  @type instance: L{objects.Instance}
1147
  @param instance: the instance definition
1148

1149
  """
1150
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1151
  try:
1152
    info = hyper.MigrationInfo(instance)
1153
  except errors.HypervisorError, err:
1154
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1155
  return info
1156

    
1157

    
1158
def AcceptInstance(instance, info, target):
1159
  """Prepare the node to accept an instance.
1160

1161
  @type instance: L{objects.Instance}
1162
  @param instance: the instance definition
1163
  @type info: string/data (opaque)
1164
  @param info: migration information, from the source node
1165
  @type target: string
1166
  @param target: target host (usually ip), on this node
1167

1168
  """
1169
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1170
  try:
1171
    hyper.AcceptInstance(instance, info, target)
1172
  except errors.HypervisorError, err:
1173
    _Fail("Failed to accept instance: %s", err, exc=True)
1174

    
1175

    
1176
def FinalizeMigration(instance, info, success):
1177
  """Finalize any preparation to accept an instance.
1178

1179
  @type instance: L{objects.Instance}
1180
  @param instance: the instance definition
1181
  @type info: string/data (opaque)
1182
  @param info: migration information, from the source node
1183
  @type success: boolean
1184
  @param success: whether the migration was a success or a failure
1185

1186
  """
1187
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1188
  try:
1189
    hyper.FinalizeMigration(instance, info, success)
1190
  except errors.HypervisorError, err:
1191
    _Fail("Failed to finalize migration: %s", err, exc=True)
1192

    
1193

    
1194
def MigrateInstance(instance, target, live):
1195
  """Migrates an instance to another node.
1196

1197
  @type instance: L{objects.Instance}
1198
  @param instance: the instance definition
1199
  @type target: string
1200
  @param target: the target node name
1201
  @type live: boolean
1202
  @param live: whether the migration should be done live or not (the
1203
      interpretation of this parameter is left to the hypervisor)
1204
  @rtype: tuple
1205
  @return: a tuple of (success, msg) where:
1206
      - succes is a boolean denoting the success/failure of the operation
1207
      - msg is a string with details in case of failure
1208

1209
  """
1210
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1211

    
1212
  try:
1213
    hyper.MigrateInstance(instance, target, live)
1214
  except errors.HypervisorError, err:
1215
    _Fail("Failed to migrate instance: %s", err, exc=True)
1216

    
1217

    
1218
def BlockdevCreate(disk, size, owner, on_primary, info):
1219
  """Creates a block device for an instance.
1220

1221
  @type disk: L{objects.Disk}
1222
  @param disk: the object describing the disk we should create
1223
  @type size: int
1224
  @param size: the size of the physical underlying device, in MiB
1225
  @type owner: str
1226
  @param owner: the name of the instance for which disk is created,
1227
      used for device cache data
1228
  @type on_primary: boolean
1229
  @param on_primary:  indicates if it is the primary node or not
1230
  @type info: string
1231
  @param info: string that will be sent to the physical device
1232
      creation, used for example to set (LVM) tags on LVs
1233

1234
  @return: the new unique_id of the device (this can sometime be
1235
      computed only after creation), or None. On secondary nodes,
1236
      it's not required to return anything.
1237

1238
  """
1239
  # TODO: remove the obsolete 'size' argument
1240
  # pylint: disable-msg=W0613
1241
  clist = []
1242
  if disk.children:
1243
    for child in disk.children:
1244
      try:
1245
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1246
      except errors.BlockDeviceError, err:
1247
        _Fail("Can't assemble device %s: %s", child, err)
1248
      if on_primary or disk.AssembleOnSecondary():
1249
        # we need the children open in case the device itself has to
1250
        # be assembled
1251
        try:
1252
          # pylint: disable-msg=E1103
1253
          crdev.Open()
1254
        except errors.BlockDeviceError, err:
1255
          _Fail("Can't make child '%s' read-write: %s", child, err)
1256
      clist.append(crdev)
1257

    
1258
  try:
1259
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1260
  except errors.BlockDeviceError, err:
1261
    _Fail("Can't create block device: %s", err)
1262

    
1263
  if on_primary or disk.AssembleOnSecondary():
1264
    try:
1265
      device.Assemble()
1266
    except errors.BlockDeviceError, err:
1267
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1268
    device.SetSyncSpeed(constants.SYNC_SPEED)
1269
    if on_primary or disk.OpenOnSecondary():
1270
      try:
1271
        device.Open(force=True)
1272
      except errors.BlockDeviceError, err:
1273
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1274
    DevCacheManager.UpdateCache(device.dev_path, owner,
1275
                                on_primary, disk.iv_name)
1276

    
1277
  device.SetInfo(info)
1278

    
1279
  return device.unique_id
1280

    
1281

    
1282
def BlockdevRemove(disk):
1283
  """Remove a block device.
1284

1285
  @note: This is intended to be called recursively.
1286

1287
  @type disk: L{objects.Disk}
1288
  @param disk: the disk object we should remove
1289
  @rtype: boolean
1290
  @return: the success of the operation
1291

1292
  """
1293
  msgs = []
1294
  try:
1295
    rdev = _RecursiveFindBD(disk)
1296
  except errors.BlockDeviceError, err:
1297
    # probably can't attach
1298
    logging.info("Can't attach to device %s in remove", disk)
1299
    rdev = None
1300
  if rdev is not None:
1301
    r_path = rdev.dev_path
1302
    try:
1303
      rdev.Remove()
1304
    except errors.BlockDeviceError, err:
1305
      msgs.append(str(err))
1306
    if not msgs:
1307
      DevCacheManager.RemoveCache(r_path)
1308

    
1309
  if disk.children:
1310
    for child in disk.children:
1311
      try:
1312
        BlockdevRemove(child)
1313
      except RPCFail, err:
1314
        msgs.append(str(err))
1315

    
1316
  if msgs:
1317
    _Fail("; ".join(msgs))
1318

    
1319

    
1320
def _RecursiveAssembleBD(disk, owner, as_primary):
1321
  """Activate a block device for an instance.
1322

1323
  This is run on the primary and secondary nodes for an instance.
1324

1325
  @note: this function is called recursively.
1326

1327
  @type disk: L{objects.Disk}
1328
  @param disk: the disk we try to assemble
1329
  @type owner: str
1330
  @param owner: the name of the instance which owns the disk
1331
  @type as_primary: boolean
1332
  @param as_primary: if we should make the block device
1333
      read/write
1334

1335
  @return: the assembled device or None (in case no device
1336
      was assembled)
1337
  @raise errors.BlockDeviceError: in case there is an error
1338
      during the activation of the children or the device
1339
      itself
1340

1341
  """
1342
  children = []
1343
  if disk.children:
1344
    mcn = disk.ChildrenNeeded()
1345
    if mcn == -1:
1346
      mcn = 0 # max number of Nones allowed
1347
    else:
1348
      mcn = len(disk.children) - mcn # max number of Nones
1349
    for chld_disk in disk.children:
1350
      try:
1351
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1352
      except errors.BlockDeviceError, err:
1353
        if children.count(None) >= mcn:
1354
          raise
1355
        cdev = None
1356
        logging.error("Error in child activation (but continuing): %s",
1357
                      str(err))
1358
      children.append(cdev)
1359

    
1360
  if as_primary or disk.AssembleOnSecondary():
1361
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1362
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1363
    result = r_dev
1364
    if as_primary or disk.OpenOnSecondary():
1365
      r_dev.Open()
1366
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1367
                                as_primary, disk.iv_name)
1368

    
1369
  else:
1370
    result = True
1371
  return result
1372

    
1373

    
1374
def BlockdevAssemble(disk, owner, as_primary):
1375
  """Activate a block device for an instance.
1376

1377
  This is a wrapper over _RecursiveAssembleBD.
1378

1379
  @rtype: str or boolean
1380
  @return: a C{/dev/...} path for primary nodes, and
1381
      C{True} for secondary nodes
1382

1383
  """
1384
  try:
1385
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1386
    if isinstance(result, bdev.BlockDev):
1387
      # pylint: disable-msg=E1103
1388
      result = result.dev_path
1389
  except errors.BlockDeviceError, err:
1390
    _Fail("Error while assembling disk: %s", err, exc=True)
1391

    
1392
  return result
1393

    
1394

    
1395
def BlockdevShutdown(disk):
1396
  """Shut down a block device.
1397

1398
  First, if the device is assembled (Attach() is successful), then
1399
  the device is shutdown. Then the children of the device are
1400
  shutdown.
1401

1402
  This function is called recursively. Note that we don't cache the
1403
  children or such, as oppossed to assemble, shutdown of different
1404
  devices doesn't require that the upper device was active.
1405

1406
  @type disk: L{objects.Disk}
1407
  @param disk: the description of the disk we should
1408
      shutdown
1409
  @rtype: None
1410

1411
  """
1412
  msgs = []
1413
  r_dev = _RecursiveFindBD(disk)
1414
  if r_dev is not None:
1415
    r_path = r_dev.dev_path
1416
    try:
1417
      r_dev.Shutdown()
1418
      DevCacheManager.RemoveCache(r_path)
1419
    except errors.BlockDeviceError, err:
1420
      msgs.append(str(err))
1421

    
1422
  if disk.children:
1423
    for child in disk.children:
1424
      try:
1425
        BlockdevShutdown(child)
1426
      except RPCFail, err:
1427
        msgs.append(str(err))
1428

    
1429
  if msgs:
1430
    _Fail("; ".join(msgs))
1431

    
1432

    
1433
def BlockdevAddchildren(parent_cdev, new_cdevs):
1434
  """Extend a mirrored block device.
1435

1436
  @type parent_cdev: L{objects.Disk}
1437
  @param parent_cdev: the disk to which we should add children
1438
  @type new_cdevs: list of L{objects.Disk}
1439
  @param new_cdevs: the list of children which we should add
1440
  @rtype: None
1441

1442
  """
1443
  parent_bdev = _RecursiveFindBD(parent_cdev)
1444
  if parent_bdev is None:
1445
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1446
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1447
  if new_bdevs.count(None) > 0:
1448
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1449
  parent_bdev.AddChildren(new_bdevs)
1450

    
1451

    
1452
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1453
  """Shrink a mirrored block device.
1454

1455
  @type parent_cdev: L{objects.Disk}
1456
  @param parent_cdev: the disk from which we should remove children
1457
  @type new_cdevs: list of L{objects.Disk}
1458
  @param new_cdevs: the list of children which we should remove
1459
  @rtype: None
1460

1461
  """
1462
  parent_bdev = _RecursiveFindBD(parent_cdev)
1463
  if parent_bdev is None:
1464
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1465
  devs = []
1466
  for disk in new_cdevs:
1467
    rpath = disk.StaticDevPath()
1468
    if rpath is None:
1469
      bd = _RecursiveFindBD(disk)
1470
      if bd is None:
1471
        _Fail("Can't find device %s while removing children", disk)
1472
      else:
1473
        devs.append(bd.dev_path)
1474
    else:
1475
      if not utils.IsNormAbsPath(rpath):
1476
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1477
      devs.append(rpath)
1478
  parent_bdev.RemoveChildren(devs)
1479

    
1480

    
1481
def BlockdevGetmirrorstatus(disks):
1482
  """Get the mirroring status of a list of devices.
1483

1484
  @type disks: list of L{objects.Disk}
1485
  @param disks: the list of disks which we should query
1486
  @rtype: disk
1487
  @return:
1488
      a list of (mirror_done, estimated_time) tuples, which
1489
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1490
  @raise errors.BlockDeviceError: if any of the disks cannot be
1491
      found
1492

1493
  """
1494
  stats = []
1495
  for dsk in disks:
1496
    rbd = _RecursiveFindBD(dsk)
1497
    if rbd is None:
1498
      _Fail("Can't find device %s", dsk)
1499

    
1500
    stats.append(rbd.CombinedSyncStatus())
1501

    
1502
  return stats
1503

    
1504

    
1505
def _RecursiveFindBD(disk):
1506
  """Check if a device is activated.
1507

1508
  If so, return information about the real device.
1509

1510
  @type disk: L{objects.Disk}
1511
  @param disk: the disk object we need to find
1512

1513
  @return: None if the device can't be found,
1514
      otherwise the device instance
1515

1516
  """
1517
  children = []
1518
  if disk.children:
1519
    for chdisk in disk.children:
1520
      children.append(_RecursiveFindBD(chdisk))
1521

    
1522
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1523

    
1524

    
1525
def BlockdevFind(disk):
1526
  """Check if a device is activated.
1527

1528
  If it is, return information about the real device.
1529

1530
  @type disk: L{objects.Disk}
1531
  @param disk: the disk to find
1532
  @rtype: None or objects.BlockDevStatus
1533
  @return: None if the disk cannot be found, otherwise a the current
1534
           information
1535

1536
  """
1537
  try:
1538
    rbd = _RecursiveFindBD(disk)
1539
  except errors.BlockDeviceError, err:
1540
    _Fail("Failed to find device: %s", err, exc=True)
1541

    
1542
  if rbd is None:
1543
    return None
1544

    
1545
  return rbd.GetSyncStatus()
1546

    
1547

    
1548
def BlockdevGetsize(disks):
1549
  """Computes the size of the given disks.
1550

1551
  If a disk is not found, returns None instead.
1552

1553
  @type disks: list of L{objects.Disk}
1554
  @param disks: the list of disk to compute the size for
1555
  @rtype: list
1556
  @return: list with elements None if the disk cannot be found,
1557
      otherwise the size
1558

1559
  """
1560
  result = []
1561
  for cf in disks:
1562
    try:
1563
      rbd = _RecursiveFindBD(cf)
1564
    except errors.BlockDeviceError:
1565
      result.append(None)
1566
      continue
1567
    if rbd is None:
1568
      result.append(None)
1569
    else:
1570
      result.append(rbd.GetActualSize())
1571
  return result
1572

    
1573

    
1574
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1575
  """Export a block device to a remote node.
1576

1577
  @type disk: L{objects.Disk}
1578
  @param disk: the description of the disk to export
1579
  @type dest_node: str
1580
  @param dest_node: the destination node to export to
1581
  @type dest_path: str
1582
  @param dest_path: the destination path on the target node
1583
  @type cluster_name: str
1584
  @param cluster_name: the cluster name, needed for SSH hostalias
1585
  @rtype: None
1586

1587
  """
1588
  real_disk = _RecursiveFindBD(disk)
1589
  if real_disk is None:
1590
    _Fail("Block device '%s' is not set up", disk)
1591

    
1592
  real_disk.Open()
1593

    
1594
  # the block size on the read dd is 1MiB to match our units
1595
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1596
                               "dd if=%s bs=1048576 count=%s",
1597
                               real_disk.dev_path, str(disk.size))
1598

    
1599
  # we set here a smaller block size as, due to ssh buffering, more
1600
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1601
  # device is not already there or we pass a wrong path; we use
1602
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1603
  # to not buffer too much memory; this means that at best, we flush
1604
  # every 64k, which will not be very fast
1605
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1606
                                " oflag=dsync", dest_path)
1607

    
1608
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1609
                                                   constants.GANETI_RUNAS,
1610
                                                   destcmd)
1611

    
1612
  # all commands have been checked, so we're safe to combine them
1613
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1614

    
1615
  result = utils.RunCmd(["bash", "-c", command])
1616

    
1617
  if result.failed:
1618
    _Fail("Disk copy command '%s' returned error: %s"
1619
          " output: %s", command, result.fail_reason, result.output)
1620

    
1621

    
1622
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1623
  """Write a file to the filesystem.
1624

1625
  This allows the master to overwrite(!) a file. It will only perform
1626
  the operation if the file belongs to a list of configuration files.
1627

1628
  @type file_name: str
1629
  @param file_name: the target file name
1630
  @type data: str
1631
  @param data: the new contents of the file
1632
  @type mode: int
1633
  @param mode: the mode to give the file (can be None)
1634
  @type uid: int
1635
  @param uid: the owner of the file (can be -1 for default)
1636
  @type gid: int
1637
  @param gid: the group of the file (can be -1 for default)
1638
  @type atime: float
1639
  @param atime: the atime to set on the file (can be None)
1640
  @type mtime: float
1641
  @param mtime: the mtime to set on the file (can be None)
1642
  @rtype: None
1643

1644
  """
1645
  if not os.path.isabs(file_name):
1646
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1647

    
1648
  if file_name not in _ALLOWED_UPLOAD_FILES:
1649
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1650
          file_name)
1651

    
1652
  raw_data = _Decompress(data)
1653

    
1654
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1655
                  atime=atime, mtime=mtime)
1656

    
1657

    
1658
def WriteSsconfFiles(values):
1659
  """Update all ssconf files.
1660

1661
  Wrapper around the SimpleStore.WriteFiles.
1662

1663
  """
1664
  ssconf.SimpleStore().WriteFiles(values)
1665

    
1666

    
1667
def _ErrnoOrStr(err):
1668
  """Format an EnvironmentError exception.
1669

1670
  If the L{err} argument has an errno attribute, it will be looked up
1671
  and converted into a textual C{E...} description. Otherwise the
1672
  string representation of the error will be returned.
1673

1674
  @type err: L{EnvironmentError}
1675
  @param err: the exception to format
1676

1677
  """
1678
  if hasattr(err, 'errno'):
1679
    detail = errno.errorcode[err.errno]
1680
  else:
1681
    detail = str(err)
1682
  return detail
1683

    
1684

    
1685
def _OSOndiskAPIVersion(os_dir):
1686
  """Compute and return the API version of a given OS.
1687

1688
  This function will try to read the API version of the OS residing in
1689
  the 'os_dir' directory.
1690

1691
  @type os_dir: str
1692
  @param os_dir: the directory in which we should look for the OS
1693
  @rtype: tuple
1694
  @return: tuple (status, data) with status denoting the validity and
1695
      data holding either the vaid versions or an error message
1696

1697
  """
1698
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1699

    
1700
  try:
1701
    st = os.stat(api_file)
1702
  except EnvironmentError, err:
1703
    return False, ("Required file '%s' not found under path %s: %s" %
1704
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1705

    
1706
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1707
    return False, ("File '%s' in %s is not a regular file" %
1708
                   (constants.OS_API_FILE, os_dir))
1709

    
1710
  try:
1711
    api_versions = utils.ReadFile(api_file).splitlines()
1712
  except EnvironmentError, err:
1713
    return False, ("Error while reading the API version file at %s: %s" %
1714
                   (api_file, _ErrnoOrStr(err)))
1715

    
1716
  try:
1717
    api_versions = [int(version.strip()) for version in api_versions]
1718
  except (TypeError, ValueError), err:
1719
    return False, ("API version(s) can't be converted to integer: %s" %
1720
                   str(err))
1721

    
1722
  return True, api_versions
1723

    
1724

    
1725
def DiagnoseOS(top_dirs=None):
1726
  """Compute the validity for all OSes.
1727

1728
  @type top_dirs: list
1729
  @param top_dirs: the list of directories in which to
1730
      search (if not given defaults to
1731
      L{constants.OS_SEARCH_PATH})
1732
  @rtype: list of L{objects.OS}
1733
  @return: a list of tuples (name, path, status, diagnose, variants)
1734
      for all (potential) OSes under all search paths, where:
1735
          - name is the (potential) OS name
1736
          - path is the full path to the OS
1737
          - status True/False is the validity of the OS
1738
          - diagnose is the error message for an invalid OS, otherwise empty
1739
          - variants is a list of supported OS variants, if any
1740

1741
  """
1742
  if top_dirs is None:
1743
    top_dirs = constants.OS_SEARCH_PATH
1744

    
1745
  result = []
1746
  for dir_name in top_dirs:
1747
    if os.path.isdir(dir_name):
1748
      try:
1749
        f_names = utils.ListVisibleFiles(dir_name)
1750
      except EnvironmentError, err:
1751
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1752
        break
1753
      for name in f_names:
1754
        os_path = utils.PathJoin(dir_name, name)
1755
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1756
        if status:
1757
          diagnose = ""
1758
          variants = os_inst.supported_variants
1759
        else:
1760
          diagnose = os_inst
1761
          variants = []
1762
        result.append((name, os_path, status, diagnose, variants))
1763

    
1764
  return result
1765

    
1766

    
1767
def _TryOSFromDisk(name, base_dir=None):
1768
  """Create an OS instance from disk.
1769

1770
  This function will return an OS instance if the given name is a
1771
  valid OS name.
1772

1773
  @type base_dir: string
1774
  @keyword base_dir: Base directory containing OS installations.
1775
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1776
  @rtype: tuple
1777
  @return: success and either the OS instance if we find a valid one,
1778
      or error message
1779

1780
  """
1781
  if base_dir is None:
1782
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1783
  else:
1784
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1785

    
1786
  if os_dir is None:
1787
    return False, "Directory for OS %s not found in search path" % name
1788

    
1789
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1790
  if not status:
1791
    # push the error up
1792
    return status, api_versions
1793

    
1794
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1795
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1796
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1797

    
1798
  # OS Files dictionary, we will populate it with the absolute path names
1799
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1800

    
1801
  if max(api_versions) >= constants.OS_API_V15:
1802
    os_files[constants.OS_VARIANTS_FILE] = ''
1803

    
1804
  for filename in os_files:
1805
    os_files[filename] = utils.PathJoin(os_dir, filename)
1806

    
1807
    try:
1808
      st = os.stat(os_files[filename])
1809
    except EnvironmentError, err:
1810
      return False, ("File '%s' under path '%s' is missing (%s)" %
1811
                     (filename, os_dir, _ErrnoOrStr(err)))
1812

    
1813
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1814
      return False, ("File '%s' under path '%s' is not a regular file" %
1815
                     (filename, os_dir))
1816

    
1817
    if filename in constants.OS_SCRIPTS:
1818
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1819
        return False, ("File '%s' under path '%s' is not executable" %
1820
                       (filename, os_dir))
1821

    
1822
  variants = None
1823
  if constants.OS_VARIANTS_FILE in os_files:
1824
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1825
    try:
1826
      variants = utils.ReadFile(variants_file).splitlines()
1827
    except EnvironmentError, err:
1828
      return False, ("Error while reading the OS variants file at %s: %s" %
1829
                     (variants_file, _ErrnoOrStr(err)))
1830
    if not variants:
1831
      return False, ("No supported os variant found")
1832

    
1833
  os_obj = objects.OS(name=name, path=os_dir,
1834
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1835
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1836
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1837
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1838
                      supported_variants=variants,
1839
                      api_versions=api_versions)
1840
  return True, os_obj
1841

    
1842

    
1843
def OSFromDisk(name, base_dir=None):
1844
  """Create an OS instance from disk.
1845

1846
  This function will return an OS instance if the given name is a
1847
  valid OS name. Otherwise, it will raise an appropriate
1848
  L{RPCFail} exception, detailing why this is not a valid OS.
1849

1850
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1851
  an exception but returns true/false status data.
1852

1853
  @type base_dir: string
1854
  @keyword base_dir: Base directory containing OS installations.
1855
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1856
  @rtype: L{objects.OS}
1857
  @return: the OS instance if we find a valid one
1858
  @raise RPCFail: if we don't find a valid OS
1859

1860
  """
1861
  name_only = name.split("+", 1)[0]
1862
  status, payload = _TryOSFromDisk(name_only, base_dir)
1863

    
1864
  if not status:
1865
    _Fail(payload)
1866

    
1867
  return payload
1868

    
1869

    
1870
def OSEnvironment(instance, inst_os, debug=0):
1871
  """Calculate the environment for an os script.
1872

1873
  @type instance: L{objects.Instance}
1874
  @param instance: target instance for the os script run
1875
  @type inst_os: L{objects.OS}
1876
  @param inst_os: operating system for which the environment is being built
1877
  @type debug: integer
1878
  @param debug: debug level (0 or 1, for OS Api 10)
1879
  @rtype: dict
1880
  @return: dict of environment variables
1881
  @raise errors.BlockDeviceError: if the block device
1882
      cannot be found
1883

1884
  """
1885
  result = {}
1886
  api_version = \
1887
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1888
  result['OS_API_VERSION'] = '%d' % api_version
1889
  result['INSTANCE_NAME'] = instance.name
1890
  result['INSTANCE_OS'] = instance.os
1891
  result['HYPERVISOR'] = instance.hypervisor
1892
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1893
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1894
  result['DEBUG_LEVEL'] = '%d' % debug
1895
  if api_version >= constants.OS_API_V15:
1896
    try:
1897
      variant = instance.os.split('+', 1)[1]
1898
    except IndexError:
1899
      variant = inst_os.supported_variants[0]
1900
    result['OS_VARIANT'] = variant
1901
  for idx, disk in enumerate(instance.disks):
1902
    real_disk = _RecursiveFindBD(disk)
1903
    if real_disk is None:
1904
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1905
                                    str(disk))
1906
    real_disk.Open()
1907
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1908
    result['DISK_%d_ACCESS' % idx] = disk.mode
1909
    if constants.HV_DISK_TYPE in instance.hvparams:
1910
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1911
        instance.hvparams[constants.HV_DISK_TYPE]
1912
    if disk.dev_type in constants.LDS_BLOCK:
1913
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1914
    elif disk.dev_type == constants.LD_FILE:
1915
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1916
        'file:%s' % disk.physical_id[0]
1917
  for idx, nic in enumerate(instance.nics):
1918
    result['NIC_%d_MAC' % idx] = nic.mac
1919
    if nic.ip:
1920
      result['NIC_%d_IP' % idx] = nic.ip
1921
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1922
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1923
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1924
    if nic.nicparams[constants.NIC_LINK]:
1925
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1926
    if constants.HV_NIC_TYPE in instance.hvparams:
1927
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1928
        instance.hvparams[constants.HV_NIC_TYPE]
1929

    
1930
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1931
    for key, value in source.items():
1932
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1933

    
1934
  return result
1935

    
1936
def BlockdevGrow(disk, amount):
1937
  """Grow a stack of block devices.
1938

1939
  This function is called recursively, with the childrens being the
1940
  first ones to resize.
1941

1942
  @type disk: L{objects.Disk}
1943
  @param disk: the disk to be grown
1944
  @rtype: (status, result)
1945
  @return: a tuple with the status of the operation
1946
      (True/False), and the errors message if status
1947
      is False
1948

1949
  """
1950
  r_dev = _RecursiveFindBD(disk)
1951
  if r_dev is None:
1952
    _Fail("Cannot find block device %s", disk)
1953

    
1954
  try:
1955
    r_dev.Grow(amount)
1956
  except errors.BlockDeviceError, err:
1957
    _Fail("Failed to grow block device: %s", err, exc=True)
1958

    
1959

    
1960
def BlockdevSnapshot(disk):
1961
  """Create a snapshot copy of a block device.
1962

1963
  This function is called recursively, and the snapshot is actually created
1964
  just for the leaf lvm backend device.
1965

1966
  @type disk: L{objects.Disk}
1967
  @param disk: the disk to be snapshotted
1968
  @rtype: string
1969
  @return: snapshot disk path
1970

1971
  """
1972
  if disk.dev_type == constants.LD_DRBD8:
1973
    if not disk.children:
1974
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
1975
            disk.unique_id)
1976
    return BlockdevSnapshot(disk.children[0])
1977
  elif disk.dev_type == constants.LD_LV:
1978
    r_dev = _RecursiveFindBD(disk)
1979
    if r_dev is not None:
1980
      # FIXME: choose a saner value for the snapshot size
1981
      # let's stay on the safe side and ask for the full size, for now
1982
      return r_dev.Snapshot(disk.size)
1983
    else:
1984
      _Fail("Cannot find block device %s", disk)
1985
  else:
1986
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1987
          disk.unique_id, disk.dev_type)
1988

    
1989

    
1990
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
1991
  """Export a block device snapshot to a remote node.
1992

1993
  @type disk: L{objects.Disk}
1994
  @param disk: the description of the disk to export
1995
  @type dest_node: str
1996
  @param dest_node: the destination node to export to
1997
  @type instance: L{objects.Instance}
1998
  @param instance: the instance object to whom the disk belongs
1999
  @type cluster_name: str
2000
  @param cluster_name: the cluster name, needed for SSH hostalias
2001
  @type idx: int
2002
  @param idx: the index of the disk in the instance's disk list,
2003
      used to export to the OS scripts environment
2004
  @type debug: integer
2005
  @param debug: debug level, passed to the OS scripts
2006
  @rtype: None
2007

2008
  """
2009
  inst_os = OSFromDisk(instance.os)
2010
  export_env = OSEnvironment(instance, inst_os, debug)
2011

    
2012
  export_script = inst_os.export_script
2013

    
2014
  logfile = _InstanceLogName("export", inst_os.name, instance.name)
2015
  if not os.path.exists(constants.LOG_OS_DIR):
2016
    os.mkdir(constants.LOG_OS_DIR, 0750)
2017
  real_disk = _RecursiveFindBD(disk)
2018
  if real_disk is None:
2019
    _Fail("Block device '%s' is not set up", disk)
2020

    
2021
  real_disk.Open()
2022

    
2023
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
2024
  export_env['EXPORT_INDEX'] = str(idx)
2025

    
2026
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2027
  destfile = disk.physical_id[1]
2028

    
2029
  # the target command is built out of three individual commands,
2030
  # which are joined by pipes; we check each individual command for
2031
  # valid parameters
2032
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2033
                               inst_os.path, export_script, logfile)
2034

    
2035
  comprcmd = "gzip"
2036

    
2037
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2038
                                destdir, utils.PathJoin(destdir, destfile))
2039
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2040
                                                   constants.GANETI_RUNAS,
2041
                                                   destcmd)
2042

    
2043
  # all commands have been checked, so we're safe to combine them
2044
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2045

    
2046
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
2047

    
2048
  if result.failed:
2049
    _Fail("OS snapshot export command '%s' returned error: %s"
2050
          " output: %s", command, result.fail_reason, result.output)
2051

    
2052

    
2053
def FinalizeExport(instance, snap_disks):
2054
  """Write out the export configuration information.
2055

2056
  @type instance: L{objects.Instance}
2057
  @param instance: the instance which we export, used for
2058
      saving configuration
2059
  @type snap_disks: list of L{objects.Disk}
2060
  @param snap_disks: list of snapshot block devices, which
2061
      will be used to get the actual name of the dump file
2062

2063
  @rtype: None
2064

2065
  """
2066
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2067
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2068

    
2069
  config = objects.SerializableConfigParser()
2070

    
2071
  config.add_section(constants.INISECT_EXP)
2072
  config.set(constants.INISECT_EXP, 'version', '0')
2073
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2074
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2075
  config.set(constants.INISECT_EXP, 'os', instance.os)
2076
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2077

    
2078
  config.add_section(constants.INISECT_INS)
2079
  config.set(constants.INISECT_INS, 'name', instance.name)
2080
  config.set(constants.INISECT_INS, 'memory', '%d' %
2081
             instance.beparams[constants.BE_MEMORY])
2082
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2083
             instance.beparams[constants.BE_VCPUS])
2084
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2085

    
2086
  nic_total = 0
2087
  for nic_count, nic in enumerate(instance.nics):
2088
    nic_total += 1
2089
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2090
               nic_count, '%s' % nic.mac)
2091
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2092
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2093
               '%s' % nic.bridge)
2094
  # TODO: redundant: on load can read nics until it doesn't exist
2095
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2096

    
2097
  disk_total = 0
2098
  for disk_count, disk in enumerate(snap_disks):
2099
    if disk:
2100
      disk_total += 1
2101
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2102
                 ('%s' % disk.iv_name))
2103
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2104
                 ('%s' % disk.physical_id[1]))
2105
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2106
                 ('%d' % disk.size))
2107

    
2108
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2109

    
2110
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2111
                  data=config.Dumps())
2112
  shutil.rmtree(finaldestdir, True)
2113
  shutil.move(destdir, finaldestdir)
2114

    
2115

    
2116
def ExportInfo(dest):
2117
  """Get export configuration information.
2118

2119
  @type dest: str
2120
  @param dest: directory containing the export
2121

2122
  @rtype: L{objects.SerializableConfigParser}
2123
  @return: a serializable config file containing the
2124
      export info
2125

2126
  """
2127
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2128

    
2129
  config = objects.SerializableConfigParser()
2130
  config.read(cff)
2131

    
2132
  if (not config.has_section(constants.INISECT_EXP) or
2133
      not config.has_section(constants.INISECT_INS)):
2134
    _Fail("Export info file doesn't have the required fields")
2135

    
2136
  return config.Dumps()
2137

    
2138

    
2139
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2140
  """Import an os image into an instance.
2141

2142
  @type instance: L{objects.Instance}
2143
  @param instance: instance to import the disks into
2144
  @type src_node: string
2145
  @param src_node: source node for the disk images
2146
  @type src_images: list of string
2147
  @param src_images: absolute paths of the disk images
2148
  @type debug: integer
2149
  @param debug: debug level, passed to the OS scripts
2150
  @rtype: list of boolean
2151
  @return: each boolean represent the success of importing the n-th disk
2152

2153
  """
2154
  inst_os = OSFromDisk(instance.os)
2155
  import_env = OSEnvironment(instance, inst_os, debug)
2156
  import_script = inst_os.import_script
2157

    
2158
  logfile = _InstanceLogName("import", instance.os, instance.name)
2159
  if not os.path.exists(constants.LOG_OS_DIR):
2160
    os.mkdir(constants.LOG_OS_DIR, 0750)
2161

    
2162
  comprcmd = "gunzip"
2163
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2164
                               import_script, logfile)
2165

    
2166
  final_result = []
2167
  for idx, image in enumerate(src_images):
2168
    if image:
2169
      destcmd = utils.BuildShellCmd('cat %s', image)
2170
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2171
                                                       constants.GANETI_RUNAS,
2172
                                                       destcmd)
2173
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2174
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2175
      import_env['IMPORT_INDEX'] = str(idx)
2176
      result = utils.RunCmd(command, env=import_env)
2177
      if result.failed:
2178
        logging.error("Disk import command '%s' returned error: %s"
2179
                      " output: %s", command, result.fail_reason,
2180
                      result.output)
2181
        final_result.append("error importing disk %d: %s, %s" %
2182
                            (idx, result.fail_reason, result.output[-100]))
2183

    
2184
  if final_result:
2185
    _Fail("; ".join(final_result), log=False)
2186

    
2187

    
2188
def ListExports():
2189
  """Return a list of exports currently available on this machine.
2190

2191
  @rtype: list
2192
  @return: list of the exports
2193

2194
  """
2195
  if os.path.isdir(constants.EXPORT_DIR):
2196
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2197
  else:
2198
    _Fail("No exports directory")
2199

    
2200

    
2201
def RemoveExport(export):
2202
  """Remove an existing export from the node.
2203

2204
  @type export: str
2205
  @param export: the name of the export to remove
2206
  @rtype: None
2207

2208
  """
2209
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2210

    
2211
  try:
2212
    shutil.rmtree(target)
2213
  except EnvironmentError, err:
2214
    _Fail("Error while removing the export: %s", err, exc=True)
2215

    
2216

    
2217
def BlockdevRename(devlist):
2218
  """Rename a list of block devices.
2219

2220
  @type devlist: list of tuples
2221
  @param devlist: list of tuples of the form  (disk,
2222
      new_logical_id, new_physical_id); disk is an
2223
      L{objects.Disk} object describing the current disk,
2224
      and new logical_id/physical_id is the name we
2225
      rename it to
2226
  @rtype: boolean
2227
  @return: True if all renames succeeded, False otherwise
2228

2229
  """
2230
  msgs = []
2231
  result = True
2232
  for disk, unique_id in devlist:
2233
    dev = _RecursiveFindBD(disk)
2234
    if dev is None:
2235
      msgs.append("Can't find device %s in rename" % str(disk))
2236
      result = False
2237
      continue
2238
    try:
2239
      old_rpath = dev.dev_path
2240
      dev.Rename(unique_id)
2241
      new_rpath = dev.dev_path
2242
      if old_rpath != new_rpath:
2243
        DevCacheManager.RemoveCache(old_rpath)
2244
        # FIXME: we should add the new cache information here, like:
2245
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2246
        # but we don't have the owner here - maybe parse from existing
2247
        # cache? for now, we only lose lvm data when we rename, which
2248
        # is less critical than DRBD or MD
2249
    except errors.BlockDeviceError, err:
2250
      msgs.append("Can't rename device '%s' to '%s': %s" %
2251
                  (dev, unique_id, err))
2252
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2253
      result = False
2254
  if not result:
2255
    _Fail("; ".join(msgs))
2256

    
2257

    
2258
def _TransformFileStorageDir(file_storage_dir):
2259
  """Checks whether given file_storage_dir is valid.
2260

2261
  Checks wheter the given file_storage_dir is within the cluster-wide
2262
  default file_storage_dir stored in SimpleStore. Only paths under that
2263
  directory are allowed.
2264

2265
  @type file_storage_dir: str
2266
  @param file_storage_dir: the path to check
2267

2268
  @return: the normalized path if valid, None otherwise
2269

2270
  """
2271
  if not constants.ENABLE_FILE_STORAGE:
2272
    _Fail("File storage disabled at configure time")
2273
  cfg = _GetConfig()
2274
  file_storage_dir = os.path.normpath(file_storage_dir)
2275
  base_file_storage_dir = cfg.GetFileStorageDir()
2276
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2277
      base_file_storage_dir):
2278
    _Fail("File storage directory '%s' is not under base file"
2279
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2280
  return file_storage_dir
2281

    
2282

    
2283
def CreateFileStorageDir(file_storage_dir):
2284
  """Create file storage directory.
2285

2286
  @type file_storage_dir: str
2287
  @param file_storage_dir: directory to create
2288

2289
  @rtype: tuple
2290
  @return: tuple with first element a boolean indicating wheter dir
2291
      creation was successful or not
2292

2293
  """
2294
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2295
  if os.path.exists(file_storage_dir):
2296
    if not os.path.isdir(file_storage_dir):
2297
      _Fail("Specified storage dir '%s' is not a directory",
2298
            file_storage_dir)
2299
  else:
2300
    try:
2301
      os.makedirs(file_storage_dir, 0750)
2302
    except OSError, err:
2303
      _Fail("Cannot create file storage directory '%s': %s",
2304
            file_storage_dir, err, exc=True)
2305

    
2306

    
2307
def RemoveFileStorageDir(file_storage_dir):
2308
  """Remove file storage directory.
2309

2310
  Remove it only if it's empty. If not log an error and return.
2311

2312
  @type file_storage_dir: str
2313
  @param file_storage_dir: the directory we should cleanup
2314
  @rtype: tuple (success,)
2315
  @return: tuple of one element, C{success}, denoting
2316
      whether the operation was successful
2317

2318
  """
2319
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2320
  if os.path.exists(file_storage_dir):
2321
    if not os.path.isdir(file_storage_dir):
2322
      _Fail("Specified Storage directory '%s' is not a directory",
2323
            file_storage_dir)
2324
    # deletes dir only if empty, otherwise we want to fail the rpc call
2325
    try:
2326
      os.rmdir(file_storage_dir)
2327
    except OSError, err:
2328
      _Fail("Cannot remove file storage directory '%s': %s",
2329
            file_storage_dir, err)
2330

    
2331

    
2332
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2333
  """Rename the file storage directory.
2334

2335
  @type old_file_storage_dir: str
2336
  @param old_file_storage_dir: the current path
2337
  @type new_file_storage_dir: str
2338
  @param new_file_storage_dir: the name we should rename to
2339
  @rtype: tuple (success,)
2340
  @return: tuple of one element, C{success}, denoting
2341
      whether the operation was successful
2342

2343
  """
2344
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2345
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2346
  if not os.path.exists(new_file_storage_dir):
2347
    if os.path.isdir(old_file_storage_dir):
2348
      try:
2349
        os.rename(old_file_storage_dir, new_file_storage_dir)
2350
      except OSError, err:
2351
        _Fail("Cannot rename '%s' to '%s': %s",
2352
              old_file_storage_dir, new_file_storage_dir, err)
2353
    else:
2354
      _Fail("Specified storage dir '%s' is not a directory",
2355
            old_file_storage_dir)
2356
  else:
2357
    if os.path.exists(old_file_storage_dir):
2358
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2359
            old_file_storage_dir, new_file_storage_dir)
2360

    
2361

    
2362
def _EnsureJobQueueFile(file_name):
2363
  """Checks whether the given filename is in the queue directory.
2364

2365
  @type file_name: str
2366
  @param file_name: the file name we should check
2367
  @rtype: None
2368
  @raises RPCFail: if the file is not valid
2369

2370
  """
2371
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2372
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2373

    
2374
  if not result:
2375
    _Fail("Passed job queue file '%s' does not belong to"
2376
          " the queue directory '%s'", file_name, queue_dir)
2377

    
2378

    
2379
def JobQueueUpdate(file_name, content):
2380
  """Updates a file in the queue directory.
2381

2382
  This is just a wrapper over L{utils.WriteFile}, with proper
2383
  checking.
2384

2385
  @type file_name: str
2386
  @param file_name: the job file name
2387
  @type content: str
2388
  @param content: the new job contents
2389
  @rtype: boolean
2390
  @return: the success of the operation
2391

2392
  """
2393
  _EnsureJobQueueFile(file_name)
2394

    
2395
  # Write and replace the file atomically
2396
  utils.WriteFile(file_name, data=_Decompress(content))
2397

    
2398

    
2399
def JobQueueRename(old, new):
2400
  """Renames a job queue file.
2401

2402
  This is just a wrapper over os.rename with proper checking.
2403

2404
  @type old: str
2405
  @param old: the old (actual) file name
2406
  @type new: str
2407
  @param new: the desired file name
2408
  @rtype: tuple
2409
  @return: the success of the operation and payload
2410

2411
  """
2412
  _EnsureJobQueueFile(old)
2413
  _EnsureJobQueueFile(new)
2414

    
2415
  utils.RenameFile(old, new, mkdir=True)
2416

    
2417

    
2418
def JobQueueSetDrainFlag(drain_flag):
2419
  """Set the drain flag for the queue.
2420

2421
  This will set or unset the queue drain flag.
2422

2423
  @type drain_flag: boolean
2424
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2425
  @rtype: truple
2426
  @return: always True, None
2427
  @warning: the function always returns True
2428

2429
  """
2430
  if drain_flag:
2431
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2432
  else:
2433
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2434

    
2435

    
2436
def BlockdevClose(instance_name, disks):
2437
  """Closes the given block devices.
2438

2439
  This means they will be switched to secondary mode (in case of
2440
  DRBD).
2441

2442
  @param instance_name: if the argument is not empty, the symlinks
2443
      of this instance will be removed
2444
  @type disks: list of L{objects.Disk}
2445
  @param disks: the list of disks to be closed
2446
  @rtype: tuple (success, message)
2447
  @return: a tuple of success and message, where success
2448
      indicates the succes of the operation, and message
2449
      which will contain the error details in case we
2450
      failed
2451

2452
  """
2453
  bdevs = []
2454
  for cf in disks:
2455
    rd = _RecursiveFindBD(cf)
2456
    if rd is None:
2457
      _Fail("Can't find device %s", cf)
2458
    bdevs.append(rd)
2459

    
2460
  msg = []
2461
  for rd in bdevs:
2462
    try:
2463
      rd.Close()
2464
    except errors.BlockDeviceError, err:
2465
      msg.append(str(err))
2466
  if msg:
2467
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2468
  else:
2469
    if instance_name:
2470
      _RemoveBlockDevLinks(instance_name, disks)
2471

    
2472

    
2473
def ValidateHVParams(hvname, hvparams):
2474
  """Validates the given hypervisor parameters.
2475

2476
  @type hvname: string
2477
  @param hvname: the hypervisor name
2478
  @type hvparams: dict
2479
  @param hvparams: the hypervisor parameters to be validated
2480
  @rtype: None
2481

2482
  """
2483
  try:
2484
    hv_type = hypervisor.GetHypervisor(hvname)
2485
    hv_type.ValidateParameters(hvparams)
2486
  except errors.HypervisorError, err:
2487
    _Fail(str(err), log=False)
2488

    
2489

    
2490
def DemoteFromMC():
2491
  """Demotes the current node from master candidate role.
2492

2493
  """
2494
  # try to ensure we're not the master by mistake
2495
  master, myself = ssconf.GetMasterAndMyself()
2496
  if master == myself:
2497
    _Fail("ssconf status shows I'm the master node, will not demote")
2498

    
2499
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2500
  if not result.failed:
2501
    _Fail("The master daemon is running, will not demote")
2502

    
2503
  try:
2504
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2505
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2506
  except EnvironmentError, err:
2507
    if err.errno != errno.ENOENT:
2508
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2509

    
2510
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2511

    
2512

    
2513
def _FindDisks(nodes_ip, disks):
2514
  """Sets the physical ID on disks and returns the block devices.
2515

2516
  """
2517
  # set the correct physical ID
2518
  my_name = utils.HostInfo().name
2519
  for cf in disks:
2520
    cf.SetPhysicalID(my_name, nodes_ip)
2521

    
2522
  bdevs = []
2523

    
2524
  for cf in disks:
2525
    rd = _RecursiveFindBD(cf)
2526
    if rd is None:
2527
      _Fail("Can't find device %s", cf)
2528
    bdevs.append(rd)
2529
  return bdevs
2530

    
2531

    
2532
def DrbdDisconnectNet(nodes_ip, disks):
2533
  """Disconnects the network on a list of drbd devices.
2534

2535
  """
2536
  bdevs = _FindDisks(nodes_ip, disks)
2537

    
2538
  # disconnect disks
2539
  for rd in bdevs:
2540
    try:
2541
      rd.DisconnectNet()
2542
    except errors.BlockDeviceError, err:
2543
      _Fail("Can't change network configuration to standalone mode: %s",
2544
            err, exc=True)
2545

    
2546

    
2547
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2548
  """Attaches the network on a list of drbd devices.
2549

2550
  """
2551
  bdevs = _FindDisks(nodes_ip, disks)
2552

    
2553
  if multimaster:
2554
    for idx, rd in enumerate(bdevs):
2555
      try:
2556
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2557
      except EnvironmentError, err:
2558
        _Fail("Can't create symlink: %s", err)
2559
  # reconnect disks, switch to new master configuration and if
2560
  # needed primary mode
2561
  for rd in bdevs:
2562
    try:
2563
      rd.AttachNet(multimaster)
2564
    except errors.BlockDeviceError, err:
2565
      _Fail("Can't change network configuration: %s", err)
2566

    
2567
  # wait until the disks are connected; we need to retry the re-attach
2568
  # if the device becomes standalone, as this might happen if the one
2569
  # node disconnects and reconnects in a different mode before the
2570
  # other node reconnects; in this case, one or both of the nodes will
2571
  # decide it has wrong configuration and switch to standalone
2572

    
2573
  def _Attach():
2574
    all_connected = True
2575

    
2576
    for rd in bdevs:
2577
      stats = rd.GetProcStatus()
2578

    
2579
      all_connected = (all_connected and
2580
                       (stats.is_connected or stats.is_in_resync))
2581

    
2582
      if stats.is_standalone:
2583
        # peer had different config info and this node became
2584
        # standalone, even though this should not happen with the
2585
        # new staged way of changing disk configs
2586
        try:
2587
          rd.AttachNet(multimaster)
2588
        except errors.BlockDeviceError, err:
2589
          _Fail("Can't change network configuration: %s", err)
2590

    
2591
    if not all_connected:
2592
      raise utils.RetryAgain()
2593

    
2594
  try:
2595
    # Start with a delay of 100 miliseconds and go up to 5 seconds
2596
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2597
  except utils.RetryTimeout:
2598
    _Fail("Timeout in disk reconnecting")
2599

    
2600
  if multimaster:
2601
    # change to primary mode
2602
    for rd in bdevs:
2603
      try:
2604
        rd.Open()
2605
      except errors.BlockDeviceError, err:
2606
        _Fail("Can't change to primary mode: %s", err)
2607

    
2608

    
2609
def DrbdWaitSync(nodes_ip, disks):
2610
  """Wait until DRBDs have synchronized.
2611

2612
  """
2613
  def _helper(rd):
2614
    stats = rd.GetProcStatus()
2615
    if not (stats.is_connected or stats.is_in_resync):
2616
      raise utils.RetryAgain()
2617
    return stats
2618

    
2619
  bdevs = _FindDisks(nodes_ip, disks)
2620

    
2621
  min_resync = 100
2622
  alldone = True
2623
  for rd in bdevs:
2624
    try:
2625
      # poll each second for 15 seconds
2626
      stats = utils.Retry(_helper, 1, 15, args=[rd])
2627
    except utils.RetryTimeout:
2628
      stats = rd.GetProcStatus()
2629
      # last check
2630
      if not (stats.is_connected or stats.is_in_resync):
2631
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2632
    alldone = alldone and (not stats.is_in_resync)
2633
    if stats.sync_percent is not None:
2634
      min_resync = min(min_resync, stats.sync_percent)
2635

    
2636
  return (alldone, min_resync)
2637

    
2638

    
2639
def PowercycleNode(hypervisor_type):
2640
  """Hard-powercycle the node.
2641

2642
  Because we need to return first, and schedule the powercycle in the
2643
  background, we won't be able to report failures nicely.
2644

2645
  """
2646
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2647
  try:
2648
    pid = os.fork()
2649
  except OSError:
2650
    # if we can't fork, we'll pretend that we're in the child process
2651
    pid = 0
2652
  if pid > 0:
2653
    return "Reboot scheduled in 5 seconds"
2654
  time.sleep(5)
2655
  hyper.PowercycleNode()
2656

    
2657

    
2658
class HooksRunner(object):
2659
  """Hook runner.
2660

2661
  This class is instantiated on the node side (ganeti-noded) and not
2662
  on the master side.
2663

2664
  """
2665
  def __init__(self, hooks_base_dir=None):
2666
    """Constructor for hooks runner.
2667

2668
    @type hooks_base_dir: str or None
2669
    @param hooks_base_dir: if not None, this overrides the
2670
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2671

2672
    """
2673
    if hooks_base_dir is None:
2674
      hooks_base_dir = constants.HOOKS_BASE_DIR
2675
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
2676
    # constant
2677
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2678

    
2679
  def RunHooks(self, hpath, phase, env):
2680
    """Run the scripts in the hooks directory.
2681

2682
    @type hpath: str
2683
    @param hpath: the path to the hooks directory which
2684
        holds the scripts
2685
    @type phase: str
2686
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2687
        L{constants.HOOKS_PHASE_POST}
2688
    @type env: dict
2689
    @param env: dictionary with the environment for the hook
2690
    @rtype: list
2691
    @return: list of 3-element tuples:
2692
      - script path
2693
      - script result, either L{constants.HKR_SUCCESS} or
2694
        L{constants.HKR_FAIL}
2695
      - output of the script
2696

2697
    @raise errors.ProgrammerError: for invalid input
2698
        parameters
2699

2700
    """
2701
    if phase == constants.HOOKS_PHASE_PRE:
2702
      suffix = "pre"
2703
    elif phase == constants.HOOKS_PHASE_POST:
2704
      suffix = "post"
2705
    else:
2706
      _Fail("Unknown hooks phase '%s'", phase)
2707

    
2708

    
2709
    subdir = "%s-%s.d" % (hpath, suffix)
2710
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2711

    
2712
    results = []
2713

    
2714
    if not os.path.isdir(dir_name):
2715
      # for non-existing/non-dirs, we simply exit instead of logging a
2716
      # warning at every operation
2717
      return results
2718

    
2719
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2720

    
2721
    for (relname, relstatus, runresult)  in runparts_results:
2722
      if relstatus == constants.RUNPARTS_SKIP:
2723
        rrval = constants.HKR_SKIP
2724
        output = ""
2725
      elif relstatus == constants.RUNPARTS_ERR:
2726
        rrval = constants.HKR_FAIL
2727
        output = "Hook script execution error: %s" % runresult
2728
      elif relstatus == constants.RUNPARTS_RUN:
2729
        if runresult.failed:
2730
          rrval = constants.HKR_FAIL
2731
        else:
2732
          rrval = constants.HKR_SUCCESS
2733
        output = utils.SafeEncode(runresult.output.strip())
2734
      results.append(("%s/%s" % (subdir, relname), rrval, output))
2735

    
2736
    return results
2737

    
2738

    
2739
class IAllocatorRunner(object):
2740
  """IAllocator runner.
2741

2742
  This class is instantiated on the node side (ganeti-noded) and not on
2743
  the master side.
2744

2745
  """
2746
  @staticmethod
2747
  def Run(name, idata):
2748
    """Run an iallocator script.
2749

2750
    @type name: str
2751
    @param name: the iallocator script name
2752
    @type idata: str
2753
    @param idata: the allocator input data
2754

2755
    @rtype: tuple
2756
    @return: two element tuple of:
2757
       - status
2758
       - either error message or stdout of allocator (for success)
2759

2760
    """
2761
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2762
                                  os.path.isfile)
2763
    if alloc_script is None:
2764
      _Fail("iallocator module '%s' not found in the search path", name)
2765

    
2766
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2767
    try:
2768
      os.write(fd, idata)
2769
      os.close(fd)
2770
      result = utils.RunCmd([alloc_script, fin_name])
2771
      if result.failed:
2772
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2773
              name, result.fail_reason, result.output)
2774
    finally:
2775
      os.unlink(fin_name)
2776

    
2777
    return result.stdout
2778

    
2779

    
2780
class DevCacheManager(object):
2781
  """Simple class for managing a cache of block device information.
2782

2783
  """
2784
  _DEV_PREFIX = "/dev/"
2785
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2786

    
2787
  @classmethod
2788
  def _ConvertPath(cls, dev_path):
2789
    """Converts a /dev/name path to the cache file name.
2790

2791
    This replaces slashes with underscores and strips the /dev
2792
    prefix. It then returns the full path to the cache file.
2793

2794
    @type dev_path: str
2795
    @param dev_path: the C{/dev/} path name
2796
    @rtype: str
2797
    @return: the converted path name
2798

2799
    """
2800
    if dev_path.startswith(cls._DEV_PREFIX):
2801
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2802
    dev_path = dev_path.replace("/", "_")
2803
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
2804
    return fpath
2805

    
2806
  @classmethod
2807
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2808
    """Updates the cache information for a given device.
2809

2810
    @type dev_path: str
2811
    @param dev_path: the pathname of the device
2812
    @type owner: str
2813
    @param owner: the owner (instance name) of the device
2814
    @type on_primary: bool
2815
    @param on_primary: whether this is the primary
2816
        node nor not
2817
    @type iv_name: str
2818
    @param iv_name: the instance-visible name of the
2819
        device, as in objects.Disk.iv_name
2820

2821
    @rtype: None
2822

2823
    """
2824
    if dev_path is None:
2825
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2826
      return
2827
    fpath = cls._ConvertPath(dev_path)
2828
    if on_primary:
2829
      state = "primary"
2830
    else:
2831
      state = "secondary"
2832
    if iv_name is None:
2833
      iv_name = "not_visible"
2834
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2835
    try:
2836
      utils.WriteFile(fpath, data=fdata)
2837
    except EnvironmentError, err:
2838
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2839

    
2840
  @classmethod
2841
  def RemoveCache(cls, dev_path):
2842
    """Remove data for a dev_path.
2843

2844
    This is just a wrapper over L{utils.RemoveFile} with a converted
2845
    path name and logging.
2846

2847
    @type dev_path: str
2848
    @param dev_path: the pathname of the device
2849

2850
    @rtype: None
2851

2852
    """
2853
    if dev_path is None:
2854
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2855
      return
2856
    fpath = cls._ConvertPath(dev_path)
2857
    try:
2858
      utils.RemoveFile(fpath)
2859
    except EnvironmentError, err:
2860
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)