Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 196ec587

History | View | Annotate | Download (88.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50

    
51
from ganeti import errors
52
from ganeti import utils
53
from ganeti import ssh
54
from ganeti import hypervisor
55
from ganeti import constants
56
from ganeti import bdev
57
from ganeti import objects
58
from ganeti import ssconf
59

    
60

    
61
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62
_ALLOWED_CLEAN_DIRS = frozenset([
63
  constants.DATA_DIR,
64
  constants.JOB_QUEUE_ARCHIVE_DIR,
65
  constants.QUEUE_DIR,
66
  ])
67

    
68

    
69
class RPCFail(Exception):
70
  """Class denoting RPC failure.
71

72
  Its argument is the error message.
73

74
  """
75

    
76

    
77
def _Fail(msg, *args, **kwargs):
78
  """Log an error and the raise an RPCFail exception.
79

80
  This exception is then handled specially in the ganeti daemon and
81
  turned into a 'failed' return type. As such, this function is a
82
  useful shortcut for logging the error and returning it to the master
83
  daemon.
84

85
  @type msg: string
86
  @param msg: the text of the exception
87
  @raise RPCFail
88

89
  """
90
  if args:
91
    msg = msg % args
92
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
93
    if "exc" in kwargs and kwargs["exc"]:
94
      logging.exception(msg)
95
    else:
96
      logging.error(msg)
97
  raise RPCFail(msg)
98

    
99

    
100
def _GetConfig():
101
  """Simple wrapper to return a SimpleStore.
102

103
  @rtype: L{ssconf.SimpleStore}
104
  @return: a SimpleStore instance
105

106
  """
107
  return ssconf.SimpleStore()
108

    
109

    
110
def _GetSshRunner(cluster_name):
111
  """Simple wrapper to return an SshRunner.
112

113
  @type cluster_name: str
114
  @param cluster_name: the cluster name, which is needed
115
      by the SshRunner constructor
116
  @rtype: L{ssh.SshRunner}
117
  @return: an SshRunner instance
118

119
  """
120
  return ssh.SshRunner(cluster_name)
121

    
122

    
123
def _Decompress(data):
124
  """Unpacks data compressed by the RPC client.
125

126
  @type data: list or tuple
127
  @param data: Data sent by RPC client
128
  @rtype: str
129
  @return: Decompressed data
130

131
  """
132
  assert isinstance(data, (list, tuple))
133
  assert len(data) == 2
134
  (encoding, content) = data
135
  if encoding == constants.RPC_ENCODING_NONE:
136
    return content
137
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
138
    return zlib.decompress(base64.b64decode(content))
139
  else:
140
    raise AssertionError("Unknown data encoding")
141

    
142

    
143
def _CleanDirectory(path, exclude=None):
144
  """Removes all regular files in a directory.
145

146
  @type path: str
147
  @param path: the directory to clean
148
  @type exclude: list
149
  @param exclude: list of files to be excluded, defaults
150
      to the empty list
151

152
  """
153
  if path not in _ALLOWED_CLEAN_DIRS:
154
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
155
          path)
156

    
157
  if not os.path.isdir(path):
158
    return
159
  if exclude is None:
160
    exclude = []
161
  else:
162
    # Normalize excluded paths
163
    exclude = [os.path.normpath(i) for i in exclude]
164

    
165
  for rel_name in utils.ListVisibleFiles(path):
166
    full_name = utils.PathJoin(path, rel_name)
167
    if full_name in exclude:
168
      continue
169
    if os.path.isfile(full_name) and not os.path.islink(full_name):
170
      utils.RemoveFile(full_name)
171

    
172

    
173
def _BuildUploadFileList():
174
  """Build the list of allowed upload files.
175

176
  This is abstracted so that it's built only once at module import time.
177

178
  """
179
  allowed_files = set([
180
    constants.CLUSTER_CONF_FILE,
181
    constants.ETC_HOSTS,
182
    constants.SSH_KNOWN_HOSTS_FILE,
183
    constants.VNC_PASSWORD_FILE,
184
    constants.RAPI_CERT_FILE,
185
    constants.RAPI_USERS_FILE,
186
    constants.CONFD_HMAC_KEY,
187
    ])
188

    
189
  for hv_name in constants.HYPER_TYPES:
190
    hv_class = hypervisor.GetHypervisorClass(hv_name)
191
    allowed_files.update(hv_class.GetAncillaryFiles())
192

    
193
  return frozenset(allowed_files)
194

    
195

    
196
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
197

    
198

    
199
def JobQueuePurge():
200
  """Removes job queue files and archived jobs.
201

202
  @rtype: tuple
203
  @return: True, None
204

205
  """
206
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
207
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
208

    
209

    
210
def GetMasterInfo():
211
  """Returns master information.
212

213
  This is an utility function to compute master information, either
214
  for consumption here or from the node daemon.
215

216
  @rtype: tuple
217
  @return: master_netdev, master_ip, master_name
218
  @raise RPCFail: in case of errors
219

220
  """
221
  try:
222
    cfg = _GetConfig()
223
    master_netdev = cfg.GetMasterNetdev()
224
    master_ip = cfg.GetMasterIP()
225
    master_node = cfg.GetMasterNode()
226
  except errors.ConfigurationError, err:
227
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
228
  return (master_netdev, master_ip, master_node)
229

    
230

    
231
def StartMaster(start_daemons, no_voting):
232
  """Activate local node as master node.
233

234
  The function will always try activate the IP address of the master
235
  (unless someone else has it). It will also start the master daemons,
236
  based on the start_daemons parameter.
237

238
  @type start_daemons: boolean
239
  @param start_daemons: whether to also start the master
240
      daemons (ganeti-masterd and ganeti-rapi)
241
  @type no_voting: boolean
242
  @param no_voting: whether to start ganeti-masterd without a node vote
243
      (if start_daemons is True), but still non-interactively
244
  @rtype: None
245

246
  """
247
  # GetMasterInfo will raise an exception if not able to return data
248
  master_netdev, master_ip, _ = GetMasterInfo()
249

    
250
  err_msgs = []
251
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
252
    if utils.OwnIpAddress(master_ip):
253
      # we already have the ip:
254
      logging.debug("Master IP already configured, doing nothing")
255
    else:
256
      msg = "Someone else has the master ip, not activating"
257
      logging.error(msg)
258
      err_msgs.append(msg)
259
  else:
260
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
261
                           "dev", master_netdev, "label",
262
                           "%s:0" % master_netdev])
263
    if result.failed:
264
      msg = "Can't activate master IP: %s" % result.output
265
      logging.error(msg)
266
      err_msgs.append(msg)
267

    
268
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
269
                           "-s", master_ip, master_ip])
270
    # we'll ignore the exit code of arping
271

    
272
  # and now start the master and rapi daemons
273
  if start_daemons:
274
    if no_voting:
275
      masterd_args = "--no-voting --yes-do-it"
276
    else:
277
      masterd_args = ""
278

    
279
    env = {
280
      "EXTRA_MASTERD_ARGS": masterd_args,
281
      }
282

    
283
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
284
    if result.failed:
285
      msg = "Can't start Ganeti master: %s" % result.output
286
      logging.error(msg)
287
      err_msgs.append(msg)
288

    
289
  if err_msgs:
290
    _Fail("; ".join(err_msgs))
291

    
292

    
293
def StopMaster(stop_daemons):
294
  """Deactivate this node as master.
295

296
  The function will always try to deactivate the IP address of the
297
  master. It will also stop the master daemons depending on the
298
  stop_daemons parameter.
299

300
  @type stop_daemons: boolean
301
  @param stop_daemons: whether to also stop the master daemons
302
      (ganeti-masterd and ganeti-rapi)
303
  @rtype: None
304

305
  """
306
  # TODO: log and report back to the caller the error failures; we
307
  # need to decide in which case we fail the RPC for this
308

    
309
  # GetMasterInfo will raise an exception if not able to return data
310
  master_netdev, master_ip, _ = GetMasterInfo()
311

    
312
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
313
                         "dev", master_netdev])
314
  if result.failed:
315
    logging.error("Can't remove the master IP, error: %s", result.output)
316
    # but otherwise ignore the failure
317

    
318
  if stop_daemons:
319
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
320
    if result.failed:
321
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
322
                    " and error %s",
323
                    result.cmd, result.exit_code, result.output)
324

    
325

    
326
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
327
  """Joins this node to the cluster.
328

329
  This does the following:
330
      - updates the hostkeys of the machine (rsa and dsa)
331
      - adds the ssh private key to the user
332
      - adds the ssh public key to the users' authorized_keys file
333

334
  @type dsa: str
335
  @param dsa: the DSA private key to write
336
  @type dsapub: str
337
  @param dsapub: the DSA public key to write
338
  @type rsa: str
339
  @param rsa: the RSA private key to write
340
  @type rsapub: str
341
  @param rsapub: the RSA public key to write
342
  @type sshkey: str
343
  @param sshkey: the SSH private key to write
344
  @type sshpub: str
345
  @param sshpub: the SSH public key to write
346
  @rtype: boolean
347
  @return: the success of the operation
348

349
  """
350
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
351
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
352
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
353
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
354
  for name, content, mode in sshd_keys:
355
    utils.WriteFile(name, data=content, mode=mode)
356

    
357
  try:
358
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
359
                                                    mkdir=True)
360
  except errors.OpExecError, err:
361
    _Fail("Error while processing user ssh files: %s", err, exc=True)
362

    
363
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
364
    utils.WriteFile(name, data=content, mode=0600)
365

    
366
  utils.AddAuthorizedKey(auth_keys, sshpub)
367

    
368
  result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
369
  if result.failed:
370
    _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
371
          result.cmd, result.exit_code, result.output)
372

    
373

    
374
def LeaveCluster(modify_ssh_setup):
375
  """Cleans up and remove the current node.
376

377
  This function cleans up and prepares the current node to be removed
378
  from the cluster.
379

380
  If processing is successful, then it raises an
381
  L{errors.QuitGanetiException} which is used as a special case to
382
  shutdown the node daemon.
383

384
  @param modify_ssh_setup: boolean
385

386
  """
387
  _CleanDirectory(constants.DATA_DIR)
388
  JobQueuePurge()
389

    
390
  if modify_ssh_setup:
391
    try:
392
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
393

    
394
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
395

    
396
      utils.RemoveFile(priv_key)
397
      utils.RemoveFile(pub_key)
398
    except errors.OpExecError:
399
      logging.exception("Error while processing ssh files")
400

    
401
  try:
402
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
403
    utils.RemoveFile(constants.RAPI_CERT_FILE)
404
    utils.RemoveFile(constants.NODED_CERT_FILE)
405
  except: # pylint: disable-msg=W0702
406
    logging.exception("Error while removing cluster secrets")
407

    
408
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
409
  if result.failed:
410
    logging.error("Command %s failed with exitcode %s and error %s",
411
                  result.cmd, result.exit_code, result.output)
412

    
413
  # Raise a custom exception (handled in ganeti-noded)
414
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
415

    
416

    
417
def GetNodeInfo(vgname, hypervisor_type):
418
  """Gives back a hash with different information about the node.
419

420
  @type vgname: C{string}
421
  @param vgname: the name of the volume group to ask for disk space information
422
  @type hypervisor_type: C{str}
423
  @param hypervisor_type: the name of the hypervisor to ask for
424
      memory information
425
  @rtype: C{dict}
426
  @return: dictionary with the following keys:
427
      - vg_size is the size of the configured volume group in MiB
428
      - vg_free is the free size of the volume group in MiB
429
      - memory_dom0 is the memory allocated for domain0 in MiB
430
      - memory_free is the currently available (free) ram in MiB
431
      - memory_total is the total number of ram in MiB
432

433
  """
434
  outputarray = {}
435
  vginfo = _GetVGInfo(vgname)
436
  outputarray['vg_size'] = vginfo['vg_size']
437
  outputarray['vg_free'] = vginfo['vg_free']
438

    
439
  hyper = hypervisor.GetHypervisor(hypervisor_type)
440
  hyp_info = hyper.GetNodeInfo()
441
  if hyp_info is not None:
442
    outputarray.update(hyp_info)
443

    
444
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
445

    
446
  return outputarray
447

    
448

    
449
def VerifyNode(what, cluster_name):
450
  """Verify the status of the local node.
451

452
  Based on the input L{what} parameter, various checks are done on the
453
  local node.
454

455
  If the I{filelist} key is present, this list of
456
  files is checksummed and the file/checksum pairs are returned.
457

458
  If the I{nodelist} key is present, we check that we have
459
  connectivity via ssh with the target nodes (and check the hostname
460
  report).
461

462
  If the I{node-net-test} key is present, we check that we have
463
  connectivity to the given nodes via both primary IP and, if
464
  applicable, secondary IPs.
465

466
  @type what: C{dict}
467
  @param what: a dictionary of things to check:
468
      - filelist: list of files for which to compute checksums
469
      - nodelist: list of nodes we should check ssh communication with
470
      - node-net-test: list of nodes we should check node daemon port
471
        connectivity with
472
      - hypervisor: list with hypervisors to run the verify for
473
  @rtype: dict
474
  @return: a dictionary with the same keys as the input dict, and
475
      values representing the result of the checks
476

477
  """
478
  result = {}
479
  my_name = utils.HostInfo().name
480
  port = utils.GetDaemonPort(constants.NODED)
481

    
482
  if constants.NV_HYPERVISOR in what:
483
    result[constants.NV_HYPERVISOR] = tmp = {}
484
    for hv_name in what[constants.NV_HYPERVISOR]:
485
      try:
486
        val = hypervisor.GetHypervisor(hv_name).Verify()
487
      except errors.HypervisorError, err:
488
        val = "Error while checking hypervisor: %s" % str(err)
489
      tmp[hv_name] = val
490

    
491
  if constants.NV_FILELIST in what:
492
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
493
      what[constants.NV_FILELIST])
494

    
495
  if constants.NV_NODELIST in what:
496
    result[constants.NV_NODELIST] = tmp = {}
497
    random.shuffle(what[constants.NV_NODELIST])
498
    for node in what[constants.NV_NODELIST]:
499
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
500
      if not success:
501
        tmp[node] = message
502

    
503
  if constants.NV_NODENETTEST in what:
504
    result[constants.NV_NODENETTEST] = tmp = {}
505
    my_pip = my_sip = None
506
    for name, pip, sip in what[constants.NV_NODENETTEST]:
507
      if name == my_name:
508
        my_pip = pip
509
        my_sip = sip
510
        break
511
    if not my_pip:
512
      tmp[my_name] = ("Can't find my own primary/secondary IP"
513
                      " in the node list")
514
    else:
515
      for name, pip, sip in what[constants.NV_NODENETTEST]:
516
        fail = []
517
        if not utils.TcpPing(pip, port, source=my_pip):
518
          fail.append("primary")
519
        if sip != pip:
520
          if not utils.TcpPing(sip, port, source=my_sip):
521
            fail.append("secondary")
522
        if fail:
523
          tmp[name] = ("failure using the %s interface(s)" %
524
                       " and ".join(fail))
525

    
526
  if constants.NV_MASTERIP in what:
527
    # FIXME: add checks on incoming data structures (here and in the
528
    # rest of the function)
529
    master_name, master_ip = what[constants.NV_MASTERIP]
530
    if master_name == my_name:
531
      source = constants.LOCALHOST_IP_ADDRESS
532
    else:
533
      source = None
534
    result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
535
                                                  source=source)
536

    
537
  if constants.NV_LVLIST in what:
538
    try:
539
      val = GetVolumeList(what[constants.NV_LVLIST])
540
    except RPCFail, err:
541
      val = str(err)
542
    result[constants.NV_LVLIST] = val
543

    
544
  if constants.NV_INSTANCELIST in what:
545
    # GetInstanceList can fail
546
    try:
547
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
548
    except RPCFail, err:
549
      val = str(err)
550
    result[constants.NV_INSTANCELIST] = val
551

    
552
  if constants.NV_VGLIST in what:
553
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
554

    
555
  if constants.NV_PVLIST in what:
556
    result[constants.NV_PVLIST] = \
557
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
558
                                   filter_allocatable=False)
559

    
560
  if constants.NV_VERSION in what:
561
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
562
                                    constants.RELEASE_VERSION)
563

    
564
  if constants.NV_HVINFO in what:
565
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
566
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
567

    
568
  if constants.NV_DRBDLIST in what:
569
    try:
570
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
571
    except errors.BlockDeviceError, err:
572
      logging.warning("Can't get used minors list", exc_info=True)
573
      used_minors = str(err)
574
    result[constants.NV_DRBDLIST] = used_minors
575

    
576
  if constants.NV_NODESETUP in what:
577
    result[constants.NV_NODESETUP] = tmpr = []
578
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
579
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
580
                  " under /sys, missing required directories /sys/block"
581
                  " and /sys/class/net")
582
    if (not os.path.isdir("/proc/sys") or
583
        not os.path.isfile("/proc/sysrq-trigger")):
584
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
585
                  " under /proc, missing required directory /proc/sys and"
586
                  " the file /proc/sysrq-trigger")
587

    
588
  if constants.NV_TIME in what:
589
    result[constants.NV_TIME] = utils.SplitTime(time.time())
590

    
591
  return result
592

    
593

    
594
def GetVolumeList(vg_name):
595
  """Compute list of logical volumes and their size.
596

597
  @type vg_name: str
598
  @param vg_name: the volume group whose LVs we should list
599
  @rtype: dict
600
  @return:
601
      dictionary of all partions (key) with value being a tuple of
602
      their size (in MiB), inactive and online status::
603

604
        {'test1': ('20.06', True, True)}
605

606
      in case of errors, a string is returned with the error
607
      details.
608

609
  """
610
  lvs = {}
611
  sep = '|'
612
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
613
                         "--separator=%s" % sep,
614
                         "-olv_name,lv_size,lv_attr", vg_name])
615
  if result.failed:
616
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
617

    
618
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
619
  for line in result.stdout.splitlines():
620
    line = line.strip()
621
    match = valid_line_re.match(line)
622
    if not match:
623
      logging.error("Invalid line returned from lvs output: '%s'", line)
624
      continue
625
    name, size, attr = match.groups()
626
    inactive = attr[4] == '-'
627
    online = attr[5] == 'o'
628
    virtual = attr[0] == 'v'
629
    if virtual:
630
      # we don't want to report such volumes as existing, since they
631
      # don't really hold data
632
      continue
633
    lvs[name] = (size, inactive, online)
634

    
635
  return lvs
636

    
637

    
638
def ListVolumeGroups():
639
  """List the volume groups and their size.
640

641
  @rtype: dict
642
  @return: dictionary with keys volume name and values the
643
      size of the volume
644

645
  """
646
  return utils.ListVolumeGroups()
647

    
648

    
649
def NodeVolumes():
650
  """List all volumes on this node.
651

652
  @rtype: list
653
  @return:
654
    A list of dictionaries, each having four keys:
655
      - name: the logical volume name,
656
      - size: the size of the logical volume
657
      - dev: the physical device on which the LV lives
658
      - vg: the volume group to which it belongs
659

660
    In case of errors, we return an empty list and log the
661
    error.
662

663
    Note that since a logical volume can live on multiple physical
664
    volumes, the resulting list might include a logical volume
665
    multiple times.
666

667
  """
668
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
669
                         "--separator=|",
670
                         "--options=lv_name,lv_size,devices,vg_name"])
671
  if result.failed:
672
    _Fail("Failed to list logical volumes, lvs output: %s",
673
          result.output)
674

    
675
  def parse_dev(dev):
676
    return dev.split('(')[0]
677

    
678
  def handle_dev(dev):
679
    return [parse_dev(x) for x in dev.split(",")]
680

    
681
  def map_line(line):
682
    line = [v.strip() for v in line]
683
    return [{'name': line[0], 'size': line[1],
684
             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
685

    
686
  all_devs = []
687
  for line in result.stdout.splitlines():
688
    if line.count('|') >= 3:
689
      all_devs.extend(map_line(line.split('|')))
690
    else:
691
      logging.warning("Strange line in the output from lvs: '%s'", line)
692
  return all_devs
693

    
694

    
695
def BridgesExist(bridges_list):
696
  """Check if a list of bridges exist on the current node.
697

698
  @rtype: boolean
699
  @return: C{True} if all of them exist, C{False} otherwise
700

701
  """
702
  missing = []
703
  for bridge in bridges_list:
704
    if not utils.BridgeExists(bridge):
705
      missing.append(bridge)
706

    
707
  if missing:
708
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
709

    
710

    
711
def GetInstanceList(hypervisor_list):
712
  """Provides a list of instances.
713

714
  @type hypervisor_list: list
715
  @param hypervisor_list: the list of hypervisors to query information
716

717
  @rtype: list
718
  @return: a list of all running instances on the current node
719
    - instance1.example.com
720
    - instance2.example.com
721

722
  """
723
  results = []
724
  for hname in hypervisor_list:
725
    try:
726
      names = hypervisor.GetHypervisor(hname).ListInstances()
727
      results.extend(names)
728
    except errors.HypervisorError, err:
729
      _Fail("Error enumerating instances (hypervisor %s): %s",
730
            hname, err, exc=True)
731

    
732
  return results
733

    
734

    
735
def GetInstanceInfo(instance, hname):
736
  """Gives back the information about an instance as a dictionary.
737

738
  @type instance: string
739
  @param instance: the instance name
740
  @type hname: string
741
  @param hname: the hypervisor type of the instance
742

743
  @rtype: dict
744
  @return: dictionary with the following keys:
745
      - memory: memory size of instance (int)
746
      - state: xen state of instance (string)
747
      - time: cpu time of instance (float)
748

749
  """
750
  output = {}
751

    
752
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
753
  if iinfo is not None:
754
    output['memory'] = iinfo[2]
755
    output['state'] = iinfo[4]
756
    output['time'] = iinfo[5]
757

    
758
  return output
759

    
760

    
761
def GetInstanceMigratable(instance):
762
  """Gives whether an instance can be migrated.
763

764
  @type instance: L{objects.Instance}
765
  @param instance: object representing the instance to be checked.
766

767
  @rtype: tuple
768
  @return: tuple of (result, description) where:
769
      - result: whether the instance can be migrated or not
770
      - description: a description of the issue, if relevant
771

772
  """
773
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
774
  iname = instance.name
775
  if iname not in hyper.ListInstances():
776
    _Fail("Instance %s is not running", iname)
777

    
778
  for idx in range(len(instance.disks)):
779
    link_name = _GetBlockDevSymlinkPath(iname, idx)
780
    if not os.path.islink(link_name):
781
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
782

    
783

    
784
def GetAllInstancesInfo(hypervisor_list):
785
  """Gather data about all instances.
786

787
  This is the equivalent of L{GetInstanceInfo}, except that it
788
  computes data for all instances at once, thus being faster if one
789
  needs data about more than one instance.
790

791
  @type hypervisor_list: list
792
  @param hypervisor_list: list of hypervisors to query for instance data
793

794
  @rtype: dict
795
  @return: dictionary of instance: data, with data having the following keys:
796
      - memory: memory size of instance (int)
797
      - state: xen state of instance (string)
798
      - time: cpu time of instance (float)
799
      - vcpus: the number of vcpus
800

801
  """
802
  output = {}
803

    
804
  for hname in hypervisor_list:
805
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
806
    if iinfo:
807
      for name, _, memory, vcpus, state, times in iinfo:
808
        value = {
809
          'memory': memory,
810
          'vcpus': vcpus,
811
          'state': state,
812
          'time': times,
813
          }
814
        if name in output:
815
          # we only check static parameters, like memory and vcpus,
816
          # and not state and time which can change between the
817
          # invocations of the different hypervisors
818
          for key in 'memory', 'vcpus':
819
            if value[key] != output[name][key]:
820
              _Fail("Instance %s is running twice"
821
                    " with different parameters", name)
822
        output[name] = value
823

    
824
  return output
825

    
826

    
827
def _InstanceLogName(kind, os_name, instance):
828
  """Compute the OS log filename for a given instance and operation.
829

830
  The instance name and os name are passed in as strings since not all
831
  operations have these as part of an instance object.
832

833
  @type kind: string
834
  @param kind: the operation type (e.g. add, import, etc.)
835
  @type os_name: string
836
  @param os_name: the os name
837
  @type instance: string
838
  @param instance: the name of the instance being imported/added/etc.
839

840
  """
841
  base = ("%s-%s-%s-%s.log" %
842
          (kind, os_name, instance, utils.TimestampForFilename()))
843
  return utils.PathJoin(constants.LOG_OS_DIR, base)
844

    
845

    
846
def InstanceOsAdd(instance, reinstall, debug):
847
  """Add an OS to an instance.
848

849
  @type instance: L{objects.Instance}
850
  @param instance: Instance whose OS is to be installed
851
  @type reinstall: boolean
852
  @param reinstall: whether this is an instance reinstall
853
  @type debug: integer
854
  @param debug: debug level, passed to the OS scripts
855
  @rtype: None
856

857
  """
858
  inst_os = OSFromDisk(instance.os)
859

    
860
  create_env = OSEnvironment(instance, inst_os, debug)
861
  if reinstall:
862
    create_env['INSTANCE_REINSTALL'] = "1"
863

    
864
  logfile = _InstanceLogName("add", instance.os, instance.name)
865

    
866
  result = utils.RunCmd([inst_os.create_script], env=create_env,
867
                        cwd=inst_os.path, output=logfile,)
868
  if result.failed:
869
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
870
                  " output: %s", result.cmd, result.fail_reason, logfile,
871
                  result.output)
872
    lines = [utils.SafeEncode(val)
873
             for val in utils.TailFile(logfile, lines=20)]
874
    _Fail("OS create script failed (%s), last lines in the"
875
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
876

    
877

    
878
def RunRenameInstance(instance, old_name, debug):
879
  """Run the OS rename script for an instance.
880

881
  @type instance: L{objects.Instance}
882
  @param instance: Instance whose OS is to be installed
883
  @type old_name: string
884
  @param old_name: previous instance name
885
  @type debug: integer
886
  @param debug: debug level, passed to the OS scripts
887
  @rtype: boolean
888
  @return: the success of the operation
889

890
  """
891
  inst_os = OSFromDisk(instance.os)
892

    
893
  rename_env = OSEnvironment(instance, inst_os, debug)
894
  rename_env['OLD_INSTANCE_NAME'] = old_name
895

    
896
  logfile = _InstanceLogName("rename", instance.os,
897
                             "%s-%s" % (old_name, instance.name))
898

    
899
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
900
                        cwd=inst_os.path, output=logfile)
901

    
902
  if result.failed:
903
    logging.error("os create command '%s' returned error: %s output: %s",
904
                  result.cmd, result.fail_reason, result.output)
905
    lines = [utils.SafeEncode(val)
906
             for val in utils.TailFile(logfile, lines=20)]
907
    _Fail("OS rename script failed (%s), last lines in the"
908
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
909

    
910

    
911
def _GetVGInfo(vg_name):
912
  """Get information about the volume group.
913

914
  @type vg_name: str
915
  @param vg_name: the volume group which we query
916
  @rtype: dict
917
  @return:
918
    A dictionary with the following keys:
919
      - C{vg_size} is the total size of the volume group in MiB
920
      - C{vg_free} is the free size of the volume group in MiB
921
      - C{pv_count} are the number of physical disks in that VG
922

923
    If an error occurs during gathering of data, we return the same dict
924
    with keys all set to None.
925

926
  """
927
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
928

    
929
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
930
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
931

    
932
  if retval.failed:
933
    logging.error("volume group %s not present", vg_name)
934
    return retdic
935
  valarr = retval.stdout.strip().rstrip(':').split(':')
936
  if len(valarr) == 3:
937
    try:
938
      retdic = {
939
        "vg_size": int(round(float(valarr[0]), 0)),
940
        "vg_free": int(round(float(valarr[1]), 0)),
941
        "pv_count": int(valarr[2]),
942
        }
943
    except (TypeError, ValueError), err:
944
      logging.exception("Fail to parse vgs output: %s", err)
945
  else:
946
    logging.error("vgs output has the wrong number of fields (expected"
947
                  " three): %s", str(valarr))
948
  return retdic
949

    
950

    
951
def _GetBlockDevSymlinkPath(instance_name, idx):
952
  return utils.PathJoin(constants.DISK_LINKS_DIR,
953
                        "%s:%d" % (instance_name, idx))
954

    
955

    
956
def _SymlinkBlockDev(instance_name, device_path, idx):
957
  """Set up symlinks to a instance's block device.
958

959
  This is an auxiliary function run when an instance is start (on the primary
960
  node) or when an instance is migrated (on the target node).
961

962

963
  @param instance_name: the name of the target instance
964
  @param device_path: path of the physical block device, on the node
965
  @param idx: the disk index
966
  @return: absolute path to the disk's symlink
967

968
  """
969
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
970
  try:
971
    os.symlink(device_path, link_name)
972
  except OSError, err:
973
    if err.errno == errno.EEXIST:
974
      if (not os.path.islink(link_name) or
975
          os.readlink(link_name) != device_path):
976
        os.remove(link_name)
977
        os.symlink(device_path, link_name)
978
    else:
979
      raise
980

    
981
  return link_name
982

    
983

    
984
def _RemoveBlockDevLinks(instance_name, disks):
985
  """Remove the block device symlinks belonging to the given instance.
986

987
  """
988
  for idx, _ in enumerate(disks):
989
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
990
    if os.path.islink(link_name):
991
      try:
992
        os.remove(link_name)
993
      except OSError:
994
        logging.exception("Can't remove symlink '%s'", link_name)
995

    
996

    
997
def _GatherAndLinkBlockDevs(instance):
998
  """Set up an instance's block device(s).
999

1000
  This is run on the primary node at instance startup. The block
1001
  devices must be already assembled.
1002

1003
  @type instance: L{objects.Instance}
1004
  @param instance: the instance whose disks we shoul assemble
1005
  @rtype: list
1006
  @return: list of (disk_object, device_path)
1007

1008
  """
1009
  block_devices = []
1010
  for idx, disk in enumerate(instance.disks):
1011
    device = _RecursiveFindBD(disk)
1012
    if device is None:
1013
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1014
                                    str(disk))
1015
    device.Open()
1016
    try:
1017
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1018
    except OSError, e:
1019
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1020
                                    e.strerror)
1021

    
1022
    block_devices.append((disk, link_name))
1023

    
1024
  return block_devices
1025

    
1026

    
1027
def StartInstance(instance):
1028
  """Start an instance.
1029

1030
  @type instance: L{objects.Instance}
1031
  @param instance: the instance object
1032
  @rtype: None
1033

1034
  """
1035
  running_instances = GetInstanceList([instance.hypervisor])
1036

    
1037
  if instance.name in running_instances:
1038
    logging.info("Instance %s already running, not starting", instance.name)
1039
    return
1040

    
1041
  try:
1042
    block_devices = _GatherAndLinkBlockDevs(instance)
1043
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1044
    hyper.StartInstance(instance, block_devices)
1045
  except errors.BlockDeviceError, err:
1046
    _Fail("Block device error: %s", err, exc=True)
1047
  except errors.HypervisorError, err:
1048
    _RemoveBlockDevLinks(instance.name, instance.disks)
1049
    _Fail("Hypervisor error: %s", err, exc=True)
1050

    
1051

    
1052
def InstanceShutdown(instance, timeout):
1053
  """Shut an instance down.
1054

1055
  @note: this functions uses polling with a hardcoded timeout.
1056

1057
  @type instance: L{objects.Instance}
1058
  @param instance: the instance object
1059
  @type timeout: integer
1060
  @param timeout: maximum timeout for soft shutdown
1061
  @rtype: None
1062

1063
  """
1064
  hv_name = instance.hypervisor
1065
  hyper = hypervisor.GetHypervisor(hv_name)
1066
  iname = instance.name
1067

    
1068
  if instance.name not in hyper.ListInstances():
1069
    logging.info("Instance %s not running, doing nothing", iname)
1070
    return
1071

    
1072
  class _TryShutdown:
1073
    def __init__(self):
1074
      self.tried_once = False
1075

    
1076
    def __call__(self):
1077
      if iname not in hyper.ListInstances():
1078
        return
1079

    
1080
      try:
1081
        hyper.StopInstance(instance, retry=self.tried_once)
1082
      except errors.HypervisorError, err:
1083
        if iname not in hyper.ListInstances():
1084
          # if the instance is no longer existing, consider this a
1085
          # success and go to cleanup
1086
          return
1087

    
1088
        _Fail("Failed to stop instance %s: %s", iname, err)
1089

    
1090
      self.tried_once = True
1091

    
1092
      raise utils.RetryAgain()
1093

    
1094
  try:
1095
    utils.Retry(_TryShutdown(), 5, timeout)
1096
  except utils.RetryTimeout:
1097
    # the shutdown did not succeed
1098
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1099

    
1100
    try:
1101
      hyper.StopInstance(instance, force=True)
1102
    except errors.HypervisorError, err:
1103
      if iname in hyper.ListInstances():
1104
        # only raise an error if the instance still exists, otherwise
1105
        # the error could simply be "instance ... unknown"!
1106
        _Fail("Failed to force stop instance %s: %s", iname, err)
1107

    
1108
    time.sleep(1)
1109

    
1110
    if iname in hyper.ListInstances():
1111
      _Fail("Could not shutdown instance %s even by destroy", iname)
1112

    
1113
  try:
1114
    hyper.CleanupInstance(instance.name)
1115
  except errors.HypervisorError, err:
1116
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1117

    
1118
  _RemoveBlockDevLinks(iname, instance.disks)
1119

    
1120

    
1121
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1122
  """Reboot an instance.
1123

1124
  @type instance: L{objects.Instance}
1125
  @param instance: the instance object to reboot
1126
  @type reboot_type: str
1127
  @param reboot_type: the type of reboot, one the following
1128
    constants:
1129
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1130
        instance OS, do not recreate the VM
1131
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1132
        restart the VM (at the hypervisor level)
1133
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1134
        not accepted here, since that mode is handled differently, in
1135
        cmdlib, and translates into full stop and start of the
1136
        instance (instead of a call_instance_reboot RPC)
1137
  @type shutdown_timeout: integer
1138
  @param shutdown_timeout: maximum timeout for soft shutdown
1139
  @rtype: None
1140

1141
  """
1142
  running_instances = GetInstanceList([instance.hypervisor])
1143

    
1144
  if instance.name not in running_instances:
1145
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1146

    
1147
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1148
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1149
    try:
1150
      hyper.RebootInstance(instance)
1151
    except errors.HypervisorError, err:
1152
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1153
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1154
    try:
1155
      InstanceShutdown(instance, shutdown_timeout)
1156
      return StartInstance(instance)
1157
    except errors.HypervisorError, err:
1158
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1159
  else:
1160
    _Fail("Invalid reboot_type received: %s", reboot_type)
1161

    
1162

    
1163
def MigrationInfo(instance):
1164
  """Gather information about an instance to be migrated.
1165

1166
  @type instance: L{objects.Instance}
1167
  @param instance: the instance definition
1168

1169
  """
1170
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1171
  try:
1172
    info = hyper.MigrationInfo(instance)
1173
  except errors.HypervisorError, err:
1174
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1175
  return info
1176

    
1177

    
1178
def AcceptInstance(instance, info, target):
1179
  """Prepare the node to accept an instance.
1180

1181
  @type instance: L{objects.Instance}
1182
  @param instance: the instance definition
1183
  @type info: string/data (opaque)
1184
  @param info: migration information, from the source node
1185
  @type target: string
1186
  @param target: target host (usually ip), on this node
1187

1188
  """
1189
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1190
  try:
1191
    hyper.AcceptInstance(instance, info, target)
1192
  except errors.HypervisorError, err:
1193
    _Fail("Failed to accept instance: %s", err, exc=True)
1194

    
1195

    
1196
def FinalizeMigration(instance, info, success):
1197
  """Finalize any preparation to accept an instance.
1198

1199
  @type instance: L{objects.Instance}
1200
  @param instance: the instance definition
1201
  @type info: string/data (opaque)
1202
  @param info: migration information, from the source node
1203
  @type success: boolean
1204
  @param success: whether the migration was a success or a failure
1205

1206
  """
1207
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1208
  try:
1209
    hyper.FinalizeMigration(instance, info, success)
1210
  except errors.HypervisorError, err:
1211
    _Fail("Failed to finalize migration: %s", err, exc=True)
1212

    
1213

    
1214
def MigrateInstance(instance, target, live):
1215
  """Migrates an instance to another node.
1216

1217
  @type instance: L{objects.Instance}
1218
  @param instance: the instance definition
1219
  @type target: string
1220
  @param target: the target node name
1221
  @type live: boolean
1222
  @param live: whether the migration should be done live or not (the
1223
      interpretation of this parameter is left to the hypervisor)
1224
  @rtype: tuple
1225
  @return: a tuple of (success, msg) where:
1226
      - succes is a boolean denoting the success/failure of the operation
1227
      - msg is a string with details in case of failure
1228

1229
  """
1230
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1231

    
1232
  try:
1233
    hyper.MigrateInstance(instance, target, live)
1234
  except errors.HypervisorError, err:
1235
    _Fail("Failed to migrate instance: %s", err, exc=True)
1236

    
1237

    
1238
def BlockdevCreate(disk, size, owner, on_primary, info):
1239
  """Creates a block device for an instance.
1240

1241
  @type disk: L{objects.Disk}
1242
  @param disk: the object describing the disk we should create
1243
  @type size: int
1244
  @param size: the size of the physical underlying device, in MiB
1245
  @type owner: str
1246
  @param owner: the name of the instance for which disk is created,
1247
      used for device cache data
1248
  @type on_primary: boolean
1249
  @param on_primary:  indicates if it is the primary node or not
1250
  @type info: string
1251
  @param info: string that will be sent to the physical device
1252
      creation, used for example to set (LVM) tags on LVs
1253

1254
  @return: the new unique_id of the device (this can sometime be
1255
      computed only after creation), or None. On secondary nodes,
1256
      it's not required to return anything.
1257

1258
  """
1259
  # TODO: remove the obsolete 'size' argument
1260
  # pylint: disable-msg=W0613
1261
  clist = []
1262
  if disk.children:
1263
    for child in disk.children:
1264
      try:
1265
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1266
      except errors.BlockDeviceError, err:
1267
        _Fail("Can't assemble device %s: %s", child, err)
1268
      if on_primary or disk.AssembleOnSecondary():
1269
        # we need the children open in case the device itself has to
1270
        # be assembled
1271
        try:
1272
          # pylint: disable-msg=E1103
1273
          crdev.Open()
1274
        except errors.BlockDeviceError, err:
1275
          _Fail("Can't make child '%s' read-write: %s", child, err)
1276
      clist.append(crdev)
1277

    
1278
  try:
1279
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1280
  except errors.BlockDeviceError, err:
1281
    _Fail("Can't create block device: %s", err)
1282

    
1283
  if on_primary or disk.AssembleOnSecondary():
1284
    try:
1285
      device.Assemble()
1286
    except errors.BlockDeviceError, err:
1287
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1288
    device.SetSyncSpeed(constants.SYNC_SPEED)
1289
    if on_primary or disk.OpenOnSecondary():
1290
      try:
1291
        device.Open(force=True)
1292
      except errors.BlockDeviceError, err:
1293
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1294
    DevCacheManager.UpdateCache(device.dev_path, owner,
1295
                                on_primary, disk.iv_name)
1296

    
1297
  device.SetInfo(info)
1298

    
1299
  return device.unique_id
1300

    
1301

    
1302
def BlockdevRemove(disk):
1303
  """Remove a block device.
1304

1305
  @note: This is intended to be called recursively.
1306

1307
  @type disk: L{objects.Disk}
1308
  @param disk: the disk object we should remove
1309
  @rtype: boolean
1310
  @return: the success of the operation
1311

1312
  """
1313
  msgs = []
1314
  try:
1315
    rdev = _RecursiveFindBD(disk)
1316
  except errors.BlockDeviceError, err:
1317
    # probably can't attach
1318
    logging.info("Can't attach to device %s in remove", disk)
1319
    rdev = None
1320
  if rdev is not None:
1321
    r_path = rdev.dev_path
1322
    try:
1323
      rdev.Remove()
1324
    except errors.BlockDeviceError, err:
1325
      msgs.append(str(err))
1326
    if not msgs:
1327
      DevCacheManager.RemoveCache(r_path)
1328

    
1329
  if disk.children:
1330
    for child in disk.children:
1331
      try:
1332
        BlockdevRemove(child)
1333
      except RPCFail, err:
1334
        msgs.append(str(err))
1335

    
1336
  if msgs:
1337
    _Fail("; ".join(msgs))
1338

    
1339

    
1340
def _RecursiveAssembleBD(disk, owner, as_primary):
1341
  """Activate a block device for an instance.
1342

1343
  This is run on the primary and secondary nodes for an instance.
1344

1345
  @note: this function is called recursively.
1346

1347
  @type disk: L{objects.Disk}
1348
  @param disk: the disk we try to assemble
1349
  @type owner: str
1350
  @param owner: the name of the instance which owns the disk
1351
  @type as_primary: boolean
1352
  @param as_primary: if we should make the block device
1353
      read/write
1354

1355
  @return: the assembled device or None (in case no device
1356
      was assembled)
1357
  @raise errors.BlockDeviceError: in case there is an error
1358
      during the activation of the children or the device
1359
      itself
1360

1361
  """
1362
  children = []
1363
  if disk.children:
1364
    mcn = disk.ChildrenNeeded()
1365
    if mcn == -1:
1366
      mcn = 0 # max number of Nones allowed
1367
    else:
1368
      mcn = len(disk.children) - mcn # max number of Nones
1369
    for chld_disk in disk.children:
1370
      try:
1371
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1372
      except errors.BlockDeviceError, err:
1373
        if children.count(None) >= mcn:
1374
          raise
1375
        cdev = None
1376
        logging.error("Error in child activation (but continuing): %s",
1377
                      str(err))
1378
      children.append(cdev)
1379

    
1380
  if as_primary or disk.AssembleOnSecondary():
1381
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1382
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1383
    result = r_dev
1384
    if as_primary or disk.OpenOnSecondary():
1385
      r_dev.Open()
1386
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1387
                                as_primary, disk.iv_name)
1388

    
1389
  else:
1390
    result = True
1391
  return result
1392

    
1393

    
1394
def BlockdevAssemble(disk, owner, as_primary):
1395
  """Activate a block device for an instance.
1396

1397
  This is a wrapper over _RecursiveAssembleBD.
1398

1399
  @rtype: str or boolean
1400
  @return: a C{/dev/...} path for primary nodes, and
1401
      C{True} for secondary nodes
1402

1403
  """
1404
  try:
1405
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1406
    if isinstance(result, bdev.BlockDev):
1407
      # pylint: disable-msg=E1103
1408
      result = result.dev_path
1409
  except errors.BlockDeviceError, err:
1410
    _Fail("Error while assembling disk: %s", err, exc=True)
1411

    
1412
  return result
1413

    
1414

    
1415
def BlockdevShutdown(disk):
1416
  """Shut down a block device.
1417

1418
  First, if the device is assembled (Attach() is successful), then
1419
  the device is shutdown. Then the children of the device are
1420
  shutdown.
1421

1422
  This function is called recursively. Note that we don't cache the
1423
  children or such, as oppossed to assemble, shutdown of different
1424
  devices doesn't require that the upper device was active.
1425

1426
  @type disk: L{objects.Disk}
1427
  @param disk: the description of the disk we should
1428
      shutdown
1429
  @rtype: None
1430

1431
  """
1432
  msgs = []
1433
  r_dev = _RecursiveFindBD(disk)
1434
  if r_dev is not None:
1435
    r_path = r_dev.dev_path
1436
    try:
1437
      r_dev.Shutdown()
1438
      DevCacheManager.RemoveCache(r_path)
1439
    except errors.BlockDeviceError, err:
1440
      msgs.append(str(err))
1441

    
1442
  if disk.children:
1443
    for child in disk.children:
1444
      try:
1445
        BlockdevShutdown(child)
1446
      except RPCFail, err:
1447
        msgs.append(str(err))
1448

    
1449
  if msgs:
1450
    _Fail("; ".join(msgs))
1451

    
1452

    
1453
def BlockdevAddchildren(parent_cdev, new_cdevs):
1454
  """Extend a mirrored block device.
1455

1456
  @type parent_cdev: L{objects.Disk}
1457
  @param parent_cdev: the disk to which we should add children
1458
  @type new_cdevs: list of L{objects.Disk}
1459
  @param new_cdevs: the list of children which we should add
1460
  @rtype: None
1461

1462
  """
1463
  parent_bdev = _RecursiveFindBD(parent_cdev)
1464
  if parent_bdev is None:
1465
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1466
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1467
  if new_bdevs.count(None) > 0:
1468
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1469
  parent_bdev.AddChildren(new_bdevs)
1470

    
1471

    
1472
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1473
  """Shrink a mirrored block device.
1474

1475
  @type parent_cdev: L{objects.Disk}
1476
  @param parent_cdev: the disk from which we should remove children
1477
  @type new_cdevs: list of L{objects.Disk}
1478
  @param new_cdevs: the list of children which we should remove
1479
  @rtype: None
1480

1481
  """
1482
  parent_bdev = _RecursiveFindBD(parent_cdev)
1483
  if parent_bdev is None:
1484
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1485
  devs = []
1486
  for disk in new_cdevs:
1487
    rpath = disk.StaticDevPath()
1488
    if rpath is None:
1489
      bd = _RecursiveFindBD(disk)
1490
      if bd is None:
1491
        _Fail("Can't find device %s while removing children", disk)
1492
      else:
1493
        devs.append(bd.dev_path)
1494
    else:
1495
      if not utils.IsNormAbsPath(rpath):
1496
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1497
      devs.append(rpath)
1498
  parent_bdev.RemoveChildren(devs)
1499

    
1500

    
1501
def BlockdevGetmirrorstatus(disks):
1502
  """Get the mirroring status of a list of devices.
1503

1504
  @type disks: list of L{objects.Disk}
1505
  @param disks: the list of disks which we should query
1506
  @rtype: disk
1507
  @return:
1508
      a list of (mirror_done, estimated_time) tuples, which
1509
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1510
  @raise errors.BlockDeviceError: if any of the disks cannot be
1511
      found
1512

1513
  """
1514
  stats = []
1515
  for dsk in disks:
1516
    rbd = _RecursiveFindBD(dsk)
1517
    if rbd is None:
1518
      _Fail("Can't find device %s", dsk)
1519

    
1520
    stats.append(rbd.CombinedSyncStatus())
1521

    
1522
  return stats
1523

    
1524

    
1525
def _RecursiveFindBD(disk):
1526
  """Check if a device is activated.
1527

1528
  If so, return information about the real device.
1529

1530
  @type disk: L{objects.Disk}
1531
  @param disk: the disk object we need to find
1532

1533
  @return: None if the device can't be found,
1534
      otherwise the device instance
1535

1536
  """
1537
  children = []
1538
  if disk.children:
1539
    for chdisk in disk.children:
1540
      children.append(_RecursiveFindBD(chdisk))
1541

    
1542
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1543

    
1544

    
1545
def _OpenRealBD(disk):
1546
  """Opens the underlying block device of a disk.
1547

1548
  @type disk: L{objects.Disk}
1549
  @param disk: the disk object we want to open
1550

1551
  """
1552
  real_disk = _RecursiveFindBD(disk)
1553
  if real_disk is None:
1554
    _Fail("Block device '%s' is not set up", disk)
1555

    
1556
  real_disk.Open()
1557

    
1558
  return real_disk
1559

    
1560

    
1561
def BlockdevFind(disk):
1562
  """Check if a device is activated.
1563

1564
  If it is, return information about the real device.
1565

1566
  @type disk: L{objects.Disk}
1567
  @param disk: the disk to find
1568
  @rtype: None or objects.BlockDevStatus
1569
  @return: None if the disk cannot be found, otherwise a the current
1570
           information
1571

1572
  """
1573
  try:
1574
    rbd = _RecursiveFindBD(disk)
1575
  except errors.BlockDeviceError, err:
1576
    _Fail("Failed to find device: %s", err, exc=True)
1577

    
1578
  if rbd is None:
1579
    return None
1580

    
1581
  return rbd.GetSyncStatus()
1582

    
1583

    
1584
def BlockdevGetsize(disks):
1585
  """Computes the size of the given disks.
1586

1587
  If a disk is not found, returns None instead.
1588

1589
  @type disks: list of L{objects.Disk}
1590
  @param disks: the list of disk to compute the size for
1591
  @rtype: list
1592
  @return: list with elements None if the disk cannot be found,
1593
      otherwise the size
1594

1595
  """
1596
  result = []
1597
  for cf in disks:
1598
    try:
1599
      rbd = _RecursiveFindBD(cf)
1600
    except errors.BlockDeviceError:
1601
      result.append(None)
1602
      continue
1603
    if rbd is None:
1604
      result.append(None)
1605
    else:
1606
      result.append(rbd.GetActualSize())
1607
  return result
1608

    
1609

    
1610
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1611
  """Export a block device to a remote node.
1612

1613
  @type disk: L{objects.Disk}
1614
  @param disk: the description of the disk to export
1615
  @type dest_node: str
1616
  @param dest_node: the destination node to export to
1617
  @type dest_path: str
1618
  @param dest_path: the destination path on the target node
1619
  @type cluster_name: str
1620
  @param cluster_name: the cluster name, needed for SSH hostalias
1621
  @rtype: None
1622

1623
  """
1624
  real_disk = _OpenRealBD(disk)
1625

    
1626
  # the block size on the read dd is 1MiB to match our units
1627
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1628
                               "dd if=%s bs=1048576 count=%s",
1629
                               real_disk.dev_path, str(disk.size))
1630

    
1631
  # we set here a smaller block size as, due to ssh buffering, more
1632
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1633
  # device is not already there or we pass a wrong path; we use
1634
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1635
  # to not buffer too much memory; this means that at best, we flush
1636
  # every 64k, which will not be very fast
1637
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1638
                                " oflag=dsync", dest_path)
1639

    
1640
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1641
                                                   constants.GANETI_RUNAS,
1642
                                                   destcmd)
1643

    
1644
  # all commands have been checked, so we're safe to combine them
1645
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1646

    
1647
  result = utils.RunCmd(["bash", "-c", command])
1648

    
1649
  if result.failed:
1650
    _Fail("Disk copy command '%s' returned error: %s"
1651
          " output: %s", command, result.fail_reason, result.output)
1652

    
1653

    
1654
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1655
  """Write a file to the filesystem.
1656

1657
  This allows the master to overwrite(!) a file. It will only perform
1658
  the operation if the file belongs to a list of configuration files.
1659

1660
  @type file_name: str
1661
  @param file_name: the target file name
1662
  @type data: str
1663
  @param data: the new contents of the file
1664
  @type mode: int
1665
  @param mode: the mode to give the file (can be None)
1666
  @type uid: int
1667
  @param uid: the owner of the file (can be -1 for default)
1668
  @type gid: int
1669
  @param gid: the group of the file (can be -1 for default)
1670
  @type atime: float
1671
  @param atime: the atime to set on the file (can be None)
1672
  @type mtime: float
1673
  @param mtime: the mtime to set on the file (can be None)
1674
  @rtype: None
1675

1676
  """
1677
  if not os.path.isabs(file_name):
1678
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1679

    
1680
  if file_name not in _ALLOWED_UPLOAD_FILES:
1681
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1682
          file_name)
1683

    
1684
  raw_data = _Decompress(data)
1685

    
1686
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1687
                  atime=atime, mtime=mtime)
1688

    
1689

    
1690
def WriteSsconfFiles(values):
1691
  """Update all ssconf files.
1692

1693
  Wrapper around the SimpleStore.WriteFiles.
1694

1695
  """
1696
  ssconf.SimpleStore().WriteFiles(values)
1697

    
1698

    
1699
def _ErrnoOrStr(err):
1700
  """Format an EnvironmentError exception.
1701

1702
  If the L{err} argument has an errno attribute, it will be looked up
1703
  and converted into a textual C{E...} description. Otherwise the
1704
  string representation of the error will be returned.
1705

1706
  @type err: L{EnvironmentError}
1707
  @param err: the exception to format
1708

1709
  """
1710
  if hasattr(err, 'errno'):
1711
    detail = errno.errorcode[err.errno]
1712
  else:
1713
    detail = str(err)
1714
  return detail
1715

    
1716

    
1717
def _OSOndiskAPIVersion(os_dir):
1718
  """Compute and return the API version of a given OS.
1719

1720
  This function will try to read the API version of the OS residing in
1721
  the 'os_dir' directory.
1722

1723
  @type os_dir: str
1724
  @param os_dir: the directory in which we should look for the OS
1725
  @rtype: tuple
1726
  @return: tuple (status, data) with status denoting the validity and
1727
      data holding either the vaid versions or an error message
1728

1729
  """
1730
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1731

    
1732
  try:
1733
    st = os.stat(api_file)
1734
  except EnvironmentError, err:
1735
    return False, ("Required file '%s' not found under path %s: %s" %
1736
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1737

    
1738
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1739
    return False, ("File '%s' in %s is not a regular file" %
1740
                   (constants.OS_API_FILE, os_dir))
1741

    
1742
  try:
1743
    api_versions = utils.ReadFile(api_file).splitlines()
1744
  except EnvironmentError, err:
1745
    return False, ("Error while reading the API version file at %s: %s" %
1746
                   (api_file, _ErrnoOrStr(err)))
1747

    
1748
  try:
1749
    api_versions = [int(version.strip()) for version in api_versions]
1750
  except (TypeError, ValueError), err:
1751
    return False, ("API version(s) can't be converted to integer: %s" %
1752
                   str(err))
1753

    
1754
  return True, api_versions
1755

    
1756

    
1757
def DiagnoseOS(top_dirs=None):
1758
  """Compute the validity for all OSes.
1759

1760
  @type top_dirs: list
1761
  @param top_dirs: the list of directories in which to
1762
      search (if not given defaults to
1763
      L{constants.OS_SEARCH_PATH})
1764
  @rtype: list of L{objects.OS}
1765
  @return: a list of tuples (name, path, status, diagnose, variants)
1766
      for all (potential) OSes under all search paths, where:
1767
          - name is the (potential) OS name
1768
          - path is the full path to the OS
1769
          - status True/False is the validity of the OS
1770
          - diagnose is the error message for an invalid OS, otherwise empty
1771
          - variants is a list of supported OS variants, if any
1772

1773
  """
1774
  if top_dirs is None:
1775
    top_dirs = constants.OS_SEARCH_PATH
1776

    
1777
  result = []
1778
  for dir_name in top_dirs:
1779
    if os.path.isdir(dir_name):
1780
      try:
1781
        f_names = utils.ListVisibleFiles(dir_name)
1782
      except EnvironmentError, err:
1783
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1784
        break
1785
      for name in f_names:
1786
        os_path = utils.PathJoin(dir_name, name)
1787
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1788
        if status:
1789
          diagnose = ""
1790
          variants = os_inst.supported_variants
1791
        else:
1792
          diagnose = os_inst
1793
          variants = []
1794
        result.append((name, os_path, status, diagnose, variants))
1795

    
1796
  return result
1797

    
1798

    
1799
def _TryOSFromDisk(name, base_dir=None):
1800
  """Create an OS instance from disk.
1801

1802
  This function will return an OS instance if the given name is a
1803
  valid OS name.
1804

1805
  @type base_dir: string
1806
  @keyword base_dir: Base directory containing OS installations.
1807
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1808
  @rtype: tuple
1809
  @return: success and either the OS instance if we find a valid one,
1810
      or error message
1811

1812
  """
1813
  if base_dir is None:
1814
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1815
  else:
1816
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1817

    
1818
  if os_dir is None:
1819
    return False, "Directory for OS %s not found in search path" % name
1820

    
1821
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1822
  if not status:
1823
    # push the error up
1824
    return status, api_versions
1825

    
1826
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1827
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1828
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1829

    
1830
  # OS Files dictionary, we will populate it with the absolute path names
1831
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1832

    
1833
  if max(api_versions) >= constants.OS_API_V15:
1834
    os_files[constants.OS_VARIANTS_FILE] = ''
1835

    
1836
  for filename in os_files:
1837
    os_files[filename] = utils.PathJoin(os_dir, filename)
1838

    
1839
    try:
1840
      st = os.stat(os_files[filename])
1841
    except EnvironmentError, err:
1842
      return False, ("File '%s' under path '%s' is missing (%s)" %
1843
                     (filename, os_dir, _ErrnoOrStr(err)))
1844

    
1845
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1846
      return False, ("File '%s' under path '%s' is not a regular file" %
1847
                     (filename, os_dir))
1848

    
1849
    if filename in constants.OS_SCRIPTS:
1850
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1851
        return False, ("File '%s' under path '%s' is not executable" %
1852
                       (filename, os_dir))
1853

    
1854
  variants = None
1855
  if constants.OS_VARIANTS_FILE in os_files:
1856
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1857
    try:
1858
      variants = utils.ReadFile(variants_file).splitlines()
1859
    except EnvironmentError, err:
1860
      return False, ("Error while reading the OS variants file at %s: %s" %
1861
                     (variants_file, _ErrnoOrStr(err)))
1862
    if not variants:
1863
      return False, ("No supported os variant found")
1864

    
1865
  os_obj = objects.OS(name=name, path=os_dir,
1866
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1867
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1868
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1869
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1870
                      supported_variants=variants,
1871
                      api_versions=api_versions)
1872
  return True, os_obj
1873

    
1874

    
1875
def OSFromDisk(name, base_dir=None):
1876
  """Create an OS instance from disk.
1877

1878
  This function will return an OS instance if the given name is a
1879
  valid OS name. Otherwise, it will raise an appropriate
1880
  L{RPCFail} exception, detailing why this is not a valid OS.
1881

1882
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1883
  an exception but returns true/false status data.
1884

1885
  @type base_dir: string
1886
  @keyword base_dir: Base directory containing OS installations.
1887
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1888
  @rtype: L{objects.OS}
1889
  @return: the OS instance if we find a valid one
1890
  @raise RPCFail: if we don't find a valid OS
1891

1892
  """
1893
  name_only = name.split("+", 1)[0]
1894
  status, payload = _TryOSFromDisk(name_only, base_dir)
1895

    
1896
  if not status:
1897
    _Fail(payload)
1898

    
1899
  return payload
1900

    
1901

    
1902
def OSEnvironment(instance, inst_os, debug=0):
1903
  """Calculate the environment for an os script.
1904

1905
  @type instance: L{objects.Instance}
1906
  @param instance: target instance for the os script run
1907
  @type inst_os: L{objects.OS}
1908
  @param inst_os: operating system for which the environment is being built
1909
  @type debug: integer
1910
  @param debug: debug level (0 or 1, for OS Api 10)
1911
  @rtype: dict
1912
  @return: dict of environment variables
1913
  @raise errors.BlockDeviceError: if the block device
1914
      cannot be found
1915

1916
  """
1917
  result = {}
1918
  api_version = \
1919
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1920
  result['OS_API_VERSION'] = '%d' % api_version
1921
  result['INSTANCE_NAME'] = instance.name
1922
  result['INSTANCE_OS'] = instance.os
1923
  result['HYPERVISOR'] = instance.hypervisor
1924
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1925
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1926
  result['DEBUG_LEVEL'] = '%d' % debug
1927
  if api_version >= constants.OS_API_V15:
1928
    try:
1929
      variant = instance.os.split('+', 1)[1]
1930
    except IndexError:
1931
      variant = inst_os.supported_variants[0]
1932
    result['OS_VARIANT'] = variant
1933
  for idx, disk in enumerate(instance.disks):
1934
    real_disk = _OpenRealBD(disk)
1935
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1936
    result['DISK_%d_ACCESS' % idx] = disk.mode
1937
    if constants.HV_DISK_TYPE in instance.hvparams:
1938
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1939
        instance.hvparams[constants.HV_DISK_TYPE]
1940
    if disk.dev_type in constants.LDS_BLOCK:
1941
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1942
    elif disk.dev_type == constants.LD_FILE:
1943
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1944
        'file:%s' % disk.physical_id[0]
1945
  for idx, nic in enumerate(instance.nics):
1946
    result['NIC_%d_MAC' % idx] = nic.mac
1947
    if nic.ip:
1948
      result['NIC_%d_IP' % idx] = nic.ip
1949
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1950
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1951
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1952
    if nic.nicparams[constants.NIC_LINK]:
1953
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1954
    if constants.HV_NIC_TYPE in instance.hvparams:
1955
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1956
        instance.hvparams[constants.HV_NIC_TYPE]
1957

    
1958
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1959
    for key, value in source.items():
1960
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1961

    
1962
  return result
1963

    
1964

    
1965
def BlockdevGrow(disk, amount):
1966
  """Grow a stack of block devices.
1967

1968
  This function is called recursively, with the childrens being the
1969
  first ones to resize.
1970

1971
  @type disk: L{objects.Disk}
1972
  @param disk: the disk to be grown
1973
  @rtype: (status, result)
1974
  @return: a tuple with the status of the operation
1975
      (True/False), and the errors message if status
1976
      is False
1977

1978
  """
1979
  r_dev = _RecursiveFindBD(disk)
1980
  if r_dev is None:
1981
    _Fail("Cannot find block device %s", disk)
1982

    
1983
  try:
1984
    r_dev.Grow(amount)
1985
  except errors.BlockDeviceError, err:
1986
    _Fail("Failed to grow block device: %s", err, exc=True)
1987

    
1988

    
1989
def BlockdevSnapshot(disk):
1990
  """Create a snapshot copy of a block device.
1991

1992
  This function is called recursively, and the snapshot is actually created
1993
  just for the leaf lvm backend device.
1994

1995
  @type disk: L{objects.Disk}
1996
  @param disk: the disk to be snapshotted
1997
  @rtype: string
1998
  @return: snapshot disk path
1999

2000
  """
2001
  if disk.dev_type == constants.LD_DRBD8:
2002
    if not disk.children:
2003
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2004
            disk.unique_id)
2005
    return BlockdevSnapshot(disk.children[0])
2006
  elif disk.dev_type == constants.LD_LV:
2007
    r_dev = _RecursiveFindBD(disk)
2008
    if r_dev is not None:
2009
      # FIXME: choose a saner value for the snapshot size
2010
      # let's stay on the safe side and ask for the full size, for now
2011
      return r_dev.Snapshot(disk.size)
2012
    else:
2013
      _Fail("Cannot find block device %s", disk)
2014
  else:
2015
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2016
          disk.unique_id, disk.dev_type)
2017

    
2018

    
2019
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
2020
  """Export a block device snapshot to a remote node.
2021

2022
  @type disk: L{objects.Disk}
2023
  @param disk: the description of the disk to export
2024
  @type dest_node: str
2025
  @param dest_node: the destination node to export to
2026
  @type instance: L{objects.Instance}
2027
  @param instance: the instance object to whom the disk belongs
2028
  @type cluster_name: str
2029
  @param cluster_name: the cluster name, needed for SSH hostalias
2030
  @type idx: int
2031
  @param idx: the index of the disk in the instance's disk list,
2032
      used to export to the OS scripts environment
2033
  @type debug: integer
2034
  @param debug: debug level, passed to the OS scripts
2035
  @rtype: None
2036

2037
  """
2038
  inst_os = OSFromDisk(instance.os)
2039
  export_env = OSEnvironment(instance, inst_os, debug)
2040

    
2041
  export_script = inst_os.export_script
2042

    
2043
  logfile = _InstanceLogName("export", inst_os.name, instance.name)
2044

    
2045
  real_disk = _OpenRealBD(disk)
2046

    
2047
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
2048
  export_env['EXPORT_INDEX'] = str(idx)
2049

    
2050
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2051
  destfile = disk.physical_id[1]
2052

    
2053
  # the target command is built out of three individual commands,
2054
  # which are joined by pipes; we check each individual command for
2055
  # valid parameters
2056
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2057
                               inst_os.path, export_script, logfile)
2058

    
2059
  comprcmd = "gzip"
2060

    
2061
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2062
                                destdir, utils.PathJoin(destdir, destfile))
2063
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2064
                                                   constants.GANETI_RUNAS,
2065
                                                   destcmd)
2066

    
2067
  # all commands have been checked, so we're safe to combine them
2068
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2069

    
2070
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
2071

    
2072
  if result.failed:
2073
    _Fail("OS snapshot export command '%s' returned error: %s"
2074
          " output: %s", command, result.fail_reason, result.output)
2075

    
2076

    
2077
def FinalizeExport(instance, snap_disks):
2078
  """Write out the export configuration information.
2079

2080
  @type instance: L{objects.Instance}
2081
  @param instance: the instance which we export, used for
2082
      saving configuration
2083
  @type snap_disks: list of L{objects.Disk}
2084
  @param snap_disks: list of snapshot block devices, which
2085
      will be used to get the actual name of the dump file
2086

2087
  @rtype: None
2088

2089
  """
2090
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2091
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2092

    
2093
  config = objects.SerializableConfigParser()
2094

    
2095
  config.add_section(constants.INISECT_EXP)
2096
  config.set(constants.INISECT_EXP, 'version', '0')
2097
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2098
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2099
  config.set(constants.INISECT_EXP, 'os', instance.os)
2100
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2101

    
2102
  config.add_section(constants.INISECT_INS)
2103
  config.set(constants.INISECT_INS, 'name', instance.name)
2104
  config.set(constants.INISECT_INS, 'memory', '%d' %
2105
             instance.beparams[constants.BE_MEMORY])
2106
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2107
             instance.beparams[constants.BE_VCPUS])
2108
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2109
  config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2110

    
2111
  nic_total = 0
2112
  for nic_count, nic in enumerate(instance.nics):
2113
    nic_total += 1
2114
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2115
               nic_count, '%s' % nic.mac)
2116
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2117
    for param in constants.NICS_PARAMETER_TYPES:
2118
      config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2119
                 '%s' % nic.nicparams.get(param, None))
2120
  # TODO: redundant: on load can read nics until it doesn't exist
2121
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2122

    
2123
  disk_total = 0
2124
  for disk_count, disk in enumerate(snap_disks):
2125
    if disk:
2126
      disk_total += 1
2127
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2128
                 ('%s' % disk.iv_name))
2129
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2130
                 ('%s' % disk.physical_id[1]))
2131
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2132
                 ('%d' % disk.size))
2133

    
2134
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2135

    
2136
  # New-style hypervisor/backend parameters
2137

    
2138
  config.add_section(constants.INISECT_HYP)
2139
  for name, value in instance.hvparams.items():
2140
    if name not in constants.HVC_GLOBALS:
2141
      config.set(constants.INISECT_HYP, name, str(value))
2142

    
2143
  config.add_section(constants.INISECT_BEP)
2144
  for name, value in instance.beparams.items():
2145
    config.set(constants.INISECT_BEP, name, str(value))
2146

    
2147
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2148
                  data=config.Dumps())
2149
  shutil.rmtree(finaldestdir, ignore_errors=True)
2150
  shutil.move(destdir, finaldestdir)
2151

    
2152

    
2153
def ExportInfo(dest):
2154
  """Get export configuration information.
2155

2156
  @type dest: str
2157
  @param dest: directory containing the export
2158

2159
  @rtype: L{objects.SerializableConfigParser}
2160
  @return: a serializable config file containing the
2161
      export info
2162

2163
  """
2164
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2165

    
2166
  config = objects.SerializableConfigParser()
2167
  config.read(cff)
2168

    
2169
  if (not config.has_section(constants.INISECT_EXP) or
2170
      not config.has_section(constants.INISECT_INS)):
2171
    _Fail("Export info file doesn't have the required fields")
2172

    
2173
  return config.Dumps()
2174

    
2175

    
2176
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2177
  """Import an os image into an instance.
2178

2179
  @type instance: L{objects.Instance}
2180
  @param instance: instance to import the disks into
2181
  @type src_node: string
2182
  @param src_node: source node for the disk images
2183
  @type src_images: list of string
2184
  @param src_images: absolute paths of the disk images
2185
  @type debug: integer
2186
  @param debug: debug level, passed to the OS scripts
2187
  @rtype: list of boolean
2188
  @return: each boolean represent the success of importing the n-th disk
2189

2190
  """
2191
  inst_os = OSFromDisk(instance.os)
2192
  import_env = OSEnvironment(instance, inst_os, debug)
2193
  import_script = inst_os.import_script
2194

    
2195
  logfile = _InstanceLogName("import", instance.os, instance.name)
2196

    
2197
  comprcmd = "gunzip"
2198
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2199
                               import_script, logfile)
2200

    
2201
  final_result = []
2202
  for idx, image in enumerate(src_images):
2203
    if image:
2204
      destcmd = utils.BuildShellCmd('cat %s', image)
2205
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2206
                                                       constants.GANETI_RUNAS,
2207
                                                       destcmd)
2208
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2209
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2210
      import_env['IMPORT_INDEX'] = str(idx)
2211
      result = utils.RunCmd(command, env=import_env)
2212
      if result.failed:
2213
        logging.error("Disk import command '%s' returned error: %s"
2214
                      " output: %s", command, result.fail_reason,
2215
                      result.output)
2216
        final_result.append("error importing disk %d: %s, %s" %
2217
                            (idx, result.fail_reason, result.output[-100]))
2218

    
2219
  if final_result:
2220
    _Fail("; ".join(final_result), log=False)
2221

    
2222

    
2223
def ListExports():
2224
  """Return a list of exports currently available on this machine.
2225

2226
  @rtype: list
2227
  @return: list of the exports
2228

2229
  """
2230
  if os.path.isdir(constants.EXPORT_DIR):
2231
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2232
  else:
2233
    _Fail("No exports directory")
2234

    
2235

    
2236
def RemoveExport(export):
2237
  """Remove an existing export from the node.
2238

2239
  @type export: str
2240
  @param export: the name of the export to remove
2241
  @rtype: None
2242

2243
  """
2244
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2245

    
2246
  try:
2247
    shutil.rmtree(target)
2248
  except EnvironmentError, err:
2249
    _Fail("Error while removing the export: %s", err, exc=True)
2250

    
2251

    
2252
def BlockdevRename(devlist):
2253
  """Rename a list of block devices.
2254

2255
  @type devlist: list of tuples
2256
  @param devlist: list of tuples of the form  (disk,
2257
      new_logical_id, new_physical_id); disk is an
2258
      L{objects.Disk} object describing the current disk,
2259
      and new logical_id/physical_id is the name we
2260
      rename it to
2261
  @rtype: boolean
2262
  @return: True if all renames succeeded, False otherwise
2263

2264
  """
2265
  msgs = []
2266
  result = True
2267
  for disk, unique_id in devlist:
2268
    dev = _RecursiveFindBD(disk)
2269
    if dev is None:
2270
      msgs.append("Can't find device %s in rename" % str(disk))
2271
      result = False
2272
      continue
2273
    try:
2274
      old_rpath = dev.dev_path
2275
      dev.Rename(unique_id)
2276
      new_rpath = dev.dev_path
2277
      if old_rpath != new_rpath:
2278
        DevCacheManager.RemoveCache(old_rpath)
2279
        # FIXME: we should add the new cache information here, like:
2280
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2281
        # but we don't have the owner here - maybe parse from existing
2282
        # cache? for now, we only lose lvm data when we rename, which
2283
        # is less critical than DRBD or MD
2284
    except errors.BlockDeviceError, err:
2285
      msgs.append("Can't rename device '%s' to '%s': %s" %
2286
                  (dev, unique_id, err))
2287
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2288
      result = False
2289
  if not result:
2290
    _Fail("; ".join(msgs))
2291

    
2292

    
2293
def _TransformFileStorageDir(file_storage_dir):
2294
  """Checks whether given file_storage_dir is valid.
2295

2296
  Checks wheter the given file_storage_dir is within the cluster-wide
2297
  default file_storage_dir stored in SimpleStore. Only paths under that
2298
  directory are allowed.
2299

2300
  @type file_storage_dir: str
2301
  @param file_storage_dir: the path to check
2302

2303
  @return: the normalized path if valid, None otherwise
2304

2305
  """
2306
  if not constants.ENABLE_FILE_STORAGE:
2307
    _Fail("File storage disabled at configure time")
2308
  cfg = _GetConfig()
2309
  file_storage_dir = os.path.normpath(file_storage_dir)
2310
  base_file_storage_dir = cfg.GetFileStorageDir()
2311
  if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2312
      base_file_storage_dir):
2313
    _Fail("File storage directory '%s' is not under base file"
2314
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2315
  return file_storage_dir
2316

    
2317

    
2318
def CreateFileStorageDir(file_storage_dir):
2319
  """Create file storage directory.
2320

2321
  @type file_storage_dir: str
2322
  @param file_storage_dir: directory to create
2323

2324
  @rtype: tuple
2325
  @return: tuple with first element a boolean indicating wheter dir
2326
      creation was successful or not
2327

2328
  """
2329
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2330
  if os.path.exists(file_storage_dir):
2331
    if not os.path.isdir(file_storage_dir):
2332
      _Fail("Specified storage dir '%s' is not a directory",
2333
            file_storage_dir)
2334
  else:
2335
    try:
2336
      os.makedirs(file_storage_dir, 0750)
2337
    except OSError, err:
2338
      _Fail("Cannot create file storage directory '%s': %s",
2339
            file_storage_dir, err, exc=True)
2340

    
2341

    
2342
def RemoveFileStorageDir(file_storage_dir):
2343
  """Remove file storage directory.
2344

2345
  Remove it only if it's empty. If not log an error and return.
2346

2347
  @type file_storage_dir: str
2348
  @param file_storage_dir: the directory we should cleanup
2349
  @rtype: tuple (success,)
2350
  @return: tuple of one element, C{success}, denoting
2351
      whether the operation was successful
2352

2353
  """
2354
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2355
  if os.path.exists(file_storage_dir):
2356
    if not os.path.isdir(file_storage_dir):
2357
      _Fail("Specified Storage directory '%s' is not a directory",
2358
            file_storage_dir)
2359
    # deletes dir only if empty, otherwise we want to fail the rpc call
2360
    try:
2361
      os.rmdir(file_storage_dir)
2362
    except OSError, err:
2363
      _Fail("Cannot remove file storage directory '%s': %s",
2364
            file_storage_dir, err)
2365

    
2366

    
2367
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2368
  """Rename the file storage directory.
2369

2370
  @type old_file_storage_dir: str
2371
  @param old_file_storage_dir: the current path
2372
  @type new_file_storage_dir: str
2373
  @param new_file_storage_dir: the name we should rename to
2374
  @rtype: tuple (success,)
2375
  @return: tuple of one element, C{success}, denoting
2376
      whether the operation was successful
2377

2378
  """
2379
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2380
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2381
  if not os.path.exists(new_file_storage_dir):
2382
    if os.path.isdir(old_file_storage_dir):
2383
      try:
2384
        os.rename(old_file_storage_dir, new_file_storage_dir)
2385
      except OSError, err:
2386
        _Fail("Cannot rename '%s' to '%s': %s",
2387
              old_file_storage_dir, new_file_storage_dir, err)
2388
    else:
2389
      _Fail("Specified storage dir '%s' is not a directory",
2390
            old_file_storage_dir)
2391
  else:
2392
    if os.path.exists(old_file_storage_dir):
2393
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2394
            old_file_storage_dir, new_file_storage_dir)
2395

    
2396

    
2397
def _EnsureJobQueueFile(file_name):
2398
  """Checks whether the given filename is in the queue directory.
2399

2400
  @type file_name: str
2401
  @param file_name: the file name we should check
2402
  @rtype: None
2403
  @raises RPCFail: if the file is not valid
2404

2405
  """
2406
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2407
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2408

    
2409
  if not result:
2410
    _Fail("Passed job queue file '%s' does not belong to"
2411
          " the queue directory '%s'", file_name, queue_dir)
2412

    
2413

    
2414
def JobQueueUpdate(file_name, content):
2415
  """Updates a file in the queue directory.
2416

2417
  This is just a wrapper over L{utils.WriteFile}, with proper
2418
  checking.
2419

2420
  @type file_name: str
2421
  @param file_name: the job file name
2422
  @type content: str
2423
  @param content: the new job contents
2424
  @rtype: boolean
2425
  @return: the success of the operation
2426

2427
  """
2428
  _EnsureJobQueueFile(file_name)
2429

    
2430
  # Write and replace the file atomically
2431
  utils.WriteFile(file_name, data=_Decompress(content))
2432

    
2433

    
2434
def JobQueueRename(old, new):
2435
  """Renames a job queue file.
2436

2437
  This is just a wrapper over os.rename with proper checking.
2438

2439
  @type old: str
2440
  @param old: the old (actual) file name
2441
  @type new: str
2442
  @param new: the desired file name
2443
  @rtype: tuple
2444
  @return: the success of the operation and payload
2445

2446
  """
2447
  _EnsureJobQueueFile(old)
2448
  _EnsureJobQueueFile(new)
2449

    
2450
  utils.RenameFile(old, new, mkdir=True)
2451

    
2452

    
2453
def JobQueueSetDrainFlag(drain_flag):
2454
  """Set the drain flag for the queue.
2455

2456
  This will set or unset the queue drain flag.
2457

2458
  @type drain_flag: boolean
2459
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2460
  @rtype: truple
2461
  @return: always True, None
2462
  @warning: the function always returns True
2463

2464
  """
2465
  if drain_flag:
2466
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2467
  else:
2468
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2469

    
2470

    
2471
def BlockdevClose(instance_name, disks):
2472
  """Closes the given block devices.
2473

2474
  This means they will be switched to secondary mode (in case of
2475
  DRBD).
2476

2477
  @param instance_name: if the argument is not empty, the symlinks
2478
      of this instance will be removed
2479
  @type disks: list of L{objects.Disk}
2480
  @param disks: the list of disks to be closed
2481
  @rtype: tuple (success, message)
2482
  @return: a tuple of success and message, where success
2483
      indicates the succes of the operation, and message
2484
      which will contain the error details in case we
2485
      failed
2486

2487
  """
2488
  bdevs = []
2489
  for cf in disks:
2490
    rd = _RecursiveFindBD(cf)
2491
    if rd is None:
2492
      _Fail("Can't find device %s", cf)
2493
    bdevs.append(rd)
2494

    
2495
  msg = []
2496
  for rd in bdevs:
2497
    try:
2498
      rd.Close()
2499
    except errors.BlockDeviceError, err:
2500
      msg.append(str(err))
2501
  if msg:
2502
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2503
  else:
2504
    if instance_name:
2505
      _RemoveBlockDevLinks(instance_name, disks)
2506

    
2507

    
2508
def ValidateHVParams(hvname, hvparams):
2509
  """Validates the given hypervisor parameters.
2510

2511
  @type hvname: string
2512
  @param hvname: the hypervisor name
2513
  @type hvparams: dict
2514
  @param hvparams: the hypervisor parameters to be validated
2515
  @rtype: None
2516

2517
  """
2518
  try:
2519
    hv_type = hypervisor.GetHypervisor(hvname)
2520
    hv_type.ValidateParameters(hvparams)
2521
  except errors.HypervisorError, err:
2522
    _Fail(str(err), log=False)
2523

    
2524

    
2525
def DemoteFromMC():
2526
  """Demotes the current node from master candidate role.
2527

2528
  """
2529
  # try to ensure we're not the master by mistake
2530
  master, myself = ssconf.GetMasterAndMyself()
2531
  if master == myself:
2532
    _Fail("ssconf status shows I'm the master node, will not demote")
2533

    
2534
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2535
  if not result.failed:
2536
    _Fail("The master daemon is running, will not demote")
2537

    
2538
  try:
2539
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2540
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2541
  except EnvironmentError, err:
2542
    if err.errno != errno.ENOENT:
2543
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2544

    
2545
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2546

    
2547

    
2548
def _FindDisks(nodes_ip, disks):
2549
  """Sets the physical ID on disks and returns the block devices.
2550

2551
  """
2552
  # set the correct physical ID
2553
  my_name = utils.HostInfo().name
2554
  for cf in disks:
2555
    cf.SetPhysicalID(my_name, nodes_ip)
2556

    
2557
  bdevs = []
2558

    
2559
  for cf in disks:
2560
    rd = _RecursiveFindBD(cf)
2561
    if rd is None:
2562
      _Fail("Can't find device %s", cf)
2563
    bdevs.append(rd)
2564
  return bdevs
2565

    
2566

    
2567
def DrbdDisconnectNet(nodes_ip, disks):
2568
  """Disconnects the network on a list of drbd devices.
2569

2570
  """
2571
  bdevs = _FindDisks(nodes_ip, disks)
2572

    
2573
  # disconnect disks
2574
  for rd in bdevs:
2575
    try:
2576
      rd.DisconnectNet()
2577
    except errors.BlockDeviceError, err:
2578
      _Fail("Can't change network configuration to standalone mode: %s",
2579
            err, exc=True)
2580

    
2581

    
2582
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2583
  """Attaches the network on a list of drbd devices.
2584

2585
  """
2586
  bdevs = _FindDisks(nodes_ip, disks)
2587

    
2588
  if multimaster:
2589
    for idx, rd in enumerate(bdevs):
2590
      try:
2591
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2592
      except EnvironmentError, err:
2593
        _Fail("Can't create symlink: %s", err)
2594
  # reconnect disks, switch to new master configuration and if
2595
  # needed primary mode
2596
  for rd in bdevs:
2597
    try:
2598
      rd.AttachNet(multimaster)
2599
    except errors.BlockDeviceError, err:
2600
      _Fail("Can't change network configuration: %s", err)
2601

    
2602
  # wait until the disks are connected; we need to retry the re-attach
2603
  # if the device becomes standalone, as this might happen if the one
2604
  # node disconnects and reconnects in a different mode before the
2605
  # other node reconnects; in this case, one or both of the nodes will
2606
  # decide it has wrong configuration and switch to standalone
2607

    
2608
  def _Attach():
2609
    all_connected = True
2610

    
2611
    for rd in bdevs:
2612
      stats = rd.GetProcStatus()
2613

    
2614
      all_connected = (all_connected and
2615
                       (stats.is_connected or stats.is_in_resync))
2616

    
2617
      if stats.is_standalone:
2618
        # peer had different config info and this node became
2619
        # standalone, even though this should not happen with the
2620
        # new staged way of changing disk configs
2621
        try:
2622
          rd.AttachNet(multimaster)
2623
        except errors.BlockDeviceError, err:
2624
          _Fail("Can't change network configuration: %s", err)
2625

    
2626
    if not all_connected:
2627
      raise utils.RetryAgain()
2628

    
2629
  try:
2630
    # Start with a delay of 100 miliseconds and go up to 5 seconds
2631
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2632
  except utils.RetryTimeout:
2633
    _Fail("Timeout in disk reconnecting")
2634

    
2635
  if multimaster:
2636
    # change to primary mode
2637
    for rd in bdevs:
2638
      try:
2639
        rd.Open()
2640
      except errors.BlockDeviceError, err:
2641
        _Fail("Can't change to primary mode: %s", err)
2642

    
2643

    
2644
def DrbdWaitSync(nodes_ip, disks):
2645
  """Wait until DRBDs have synchronized.
2646

2647
  """
2648
  def _helper(rd):
2649
    stats = rd.GetProcStatus()
2650
    if not (stats.is_connected or stats.is_in_resync):
2651
      raise utils.RetryAgain()
2652
    return stats
2653

    
2654
  bdevs = _FindDisks(nodes_ip, disks)
2655

    
2656
  min_resync = 100
2657
  alldone = True
2658
  for rd in bdevs:
2659
    try:
2660
      # poll each second for 15 seconds
2661
      stats = utils.Retry(_helper, 1, 15, args=[rd])
2662
    except utils.RetryTimeout:
2663
      stats = rd.GetProcStatus()
2664
      # last check
2665
      if not (stats.is_connected or stats.is_in_resync):
2666
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2667
    alldone = alldone and (not stats.is_in_resync)
2668
    if stats.sync_percent is not None:
2669
      min_resync = min(min_resync, stats.sync_percent)
2670

    
2671
  return (alldone, min_resync)
2672

    
2673

    
2674
def PowercycleNode(hypervisor_type):
2675
  """Hard-powercycle the node.
2676

2677
  Because we need to return first, and schedule the powercycle in the
2678
  background, we won't be able to report failures nicely.
2679

2680
  """
2681
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2682
  try:
2683
    pid = os.fork()
2684
  except OSError:
2685
    # if we can't fork, we'll pretend that we're in the child process
2686
    pid = 0
2687
  if pid > 0:
2688
    return "Reboot scheduled in 5 seconds"
2689
  # ensure the child is running on ram
2690
  try:
2691
    utils.Mlockall()
2692
  except Exception: # pylint: disable-msg=W0703
2693
    pass
2694
  time.sleep(5)
2695
  hyper.PowercycleNode()
2696

    
2697

    
2698
class HooksRunner(object):
2699
  """Hook runner.
2700

2701
  This class is instantiated on the node side (ganeti-noded) and not
2702
  on the master side.
2703

2704
  """
2705
  def __init__(self, hooks_base_dir=None):
2706
    """Constructor for hooks runner.
2707

2708
    @type hooks_base_dir: str or None
2709
    @param hooks_base_dir: if not None, this overrides the
2710
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2711

2712
    """
2713
    if hooks_base_dir is None:
2714
      hooks_base_dir = constants.HOOKS_BASE_DIR
2715
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
2716
    # constant
2717
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2718

    
2719
  def RunHooks(self, hpath, phase, env):
2720
    """Run the scripts in the hooks directory.
2721

2722
    @type hpath: str
2723
    @param hpath: the path to the hooks directory which
2724
        holds the scripts
2725
    @type phase: str
2726
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2727
        L{constants.HOOKS_PHASE_POST}
2728
    @type env: dict
2729
    @param env: dictionary with the environment for the hook
2730
    @rtype: list
2731
    @return: list of 3-element tuples:
2732
      - script path
2733
      - script result, either L{constants.HKR_SUCCESS} or
2734
        L{constants.HKR_FAIL}
2735
      - output of the script
2736

2737
    @raise errors.ProgrammerError: for invalid input
2738
        parameters
2739

2740
    """
2741
    if phase == constants.HOOKS_PHASE_PRE:
2742
      suffix = "pre"
2743
    elif phase == constants.HOOKS_PHASE_POST:
2744
      suffix = "post"
2745
    else:
2746
      _Fail("Unknown hooks phase '%s'", phase)
2747

    
2748

    
2749
    subdir = "%s-%s.d" % (hpath, suffix)
2750
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2751

    
2752
    results = []
2753

    
2754
    if not os.path.isdir(dir_name):
2755
      # for non-existing/non-dirs, we simply exit instead of logging a
2756
      # warning at every operation
2757
      return results
2758

    
2759
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2760

    
2761
    for (relname, relstatus, runresult)  in runparts_results:
2762
      if relstatus == constants.RUNPARTS_SKIP:
2763
        rrval = constants.HKR_SKIP
2764
        output = ""
2765
      elif relstatus == constants.RUNPARTS_ERR:
2766
        rrval = constants.HKR_FAIL
2767
        output = "Hook script execution error: %s" % runresult
2768
      elif relstatus == constants.RUNPARTS_RUN:
2769
        if runresult.failed:
2770
          rrval = constants.HKR_FAIL
2771
        else:
2772
          rrval = constants.HKR_SUCCESS
2773
        output = utils.SafeEncode(runresult.output.strip())
2774
      results.append(("%s/%s" % (subdir, relname), rrval, output))
2775

    
2776
    return results
2777

    
2778

    
2779
class IAllocatorRunner(object):
2780
  """IAllocator runner.
2781

2782
  This class is instantiated on the node side (ganeti-noded) and not on
2783
  the master side.
2784

2785
  """
2786
  @staticmethod
2787
  def Run(name, idata):
2788
    """Run an iallocator script.
2789

2790
    @type name: str
2791
    @param name: the iallocator script name
2792
    @type idata: str
2793
    @param idata: the allocator input data
2794

2795
    @rtype: tuple
2796
    @return: two element tuple of:
2797
       - status
2798
       - either error message or stdout of allocator (for success)
2799

2800
    """
2801
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2802
                                  os.path.isfile)
2803
    if alloc_script is None:
2804
      _Fail("iallocator module '%s' not found in the search path", name)
2805

    
2806
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2807
    try:
2808
      os.write(fd, idata)
2809
      os.close(fd)
2810
      result = utils.RunCmd([alloc_script, fin_name])
2811
      if result.failed:
2812
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2813
              name, result.fail_reason, result.output)
2814
    finally:
2815
      os.unlink(fin_name)
2816

    
2817
    return result.stdout
2818

    
2819

    
2820
class DevCacheManager(object):
2821
  """Simple class for managing a cache of block device information.
2822

2823
  """
2824
  _DEV_PREFIX = "/dev/"
2825
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2826

    
2827
  @classmethod
2828
  def _ConvertPath(cls, dev_path):
2829
    """Converts a /dev/name path to the cache file name.
2830

2831
    This replaces slashes with underscores and strips the /dev
2832
    prefix. It then returns the full path to the cache file.
2833

2834
    @type dev_path: str
2835
    @param dev_path: the C{/dev/} path name
2836
    @rtype: str
2837
    @return: the converted path name
2838

2839
    """
2840
    if dev_path.startswith(cls._DEV_PREFIX):
2841
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2842
    dev_path = dev_path.replace("/", "_")
2843
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
2844
    return fpath
2845

    
2846
  @classmethod
2847
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2848
    """Updates the cache information for a given device.
2849

2850
    @type dev_path: str
2851
    @param dev_path: the pathname of the device
2852
    @type owner: str
2853
    @param owner: the owner (instance name) of the device
2854
    @type on_primary: bool
2855
    @param on_primary: whether this is the primary
2856
        node nor not
2857
    @type iv_name: str
2858
    @param iv_name: the instance-visible name of the
2859
        device, as in objects.Disk.iv_name
2860

2861
    @rtype: None
2862

2863
    """
2864
    if dev_path is None:
2865
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2866
      return
2867
    fpath = cls._ConvertPath(dev_path)
2868
    if on_primary:
2869
      state = "primary"
2870
    else:
2871
      state = "secondary"
2872
    if iv_name is None:
2873
      iv_name = "not_visible"
2874
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2875
    try:
2876
      utils.WriteFile(fpath, data=fdata)
2877
    except EnvironmentError, err:
2878
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2879

    
2880
  @classmethod
2881
  def RemoveCache(cls, dev_path):
2882
    """Remove data for a dev_path.
2883

2884
    This is just a wrapper over L{utils.RemoveFile} with a converted
2885
    path name and logging.
2886

2887
    @type dev_path: str
2888
    @param dev_path: the pathname of the device
2889

2890
    @rtype: None
2891

2892
    """
2893
    if dev_path is None:
2894
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2895
      return
2896
    fpath = cls._ConvertPath(dev_path)
2897
    try:
2898
      utils.RemoveFile(fpath)
2899
    except EnvironmentError, err:
2900
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)