Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ a5310c2a

History | View | Annotate | Download (94.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable-msg=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61

    
62

    
63
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64
_ALLOWED_CLEAN_DIRS = frozenset([
65
  constants.DATA_DIR,
66
  constants.JOB_QUEUE_ARCHIVE_DIR,
67
  constants.QUEUE_DIR,
68
  constants.CRYPTO_KEYS_DIR,
69
  ])
70
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71
_X509_KEY_FILE = "key"
72
_X509_CERT_FILE = "cert"
73
_IES_STATUS_FILE = "status"
74
_IES_PID_FILE = "pid"
75
_IES_CA_FILE = "ca"
76

    
77

    
78
class RPCFail(Exception):
79
  """Class denoting RPC failure.
80

81
  Its argument is the error message.
82

83
  """
84

    
85

    
86
def _Fail(msg, *args, **kwargs):
87
  """Log an error and the raise an RPCFail exception.
88

89
  This exception is then handled specially in the ganeti daemon and
90
  turned into a 'failed' return type. As such, this function is a
91
  useful shortcut for logging the error and returning it to the master
92
  daemon.
93

94
  @type msg: string
95
  @param msg: the text of the exception
96
  @raise RPCFail
97

98
  """
99
  if args:
100
    msg = msg % args
101
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
102
    if "exc" in kwargs and kwargs["exc"]:
103
      logging.exception(msg)
104
    else:
105
      logging.error(msg)
106
  raise RPCFail(msg)
107

    
108

    
109
def _GetConfig():
110
  """Simple wrapper to return a SimpleStore.
111

112
  @rtype: L{ssconf.SimpleStore}
113
  @return: a SimpleStore instance
114

115
  """
116
  return ssconf.SimpleStore()
117

    
118

    
119
def _GetSshRunner(cluster_name):
120
  """Simple wrapper to return an SshRunner.
121

122
  @type cluster_name: str
123
  @param cluster_name: the cluster name, which is needed
124
      by the SshRunner constructor
125
  @rtype: L{ssh.SshRunner}
126
  @return: an SshRunner instance
127

128
  """
129
  return ssh.SshRunner(cluster_name)
130

    
131

    
132
def _Decompress(data):
133
  """Unpacks data compressed by the RPC client.
134

135
  @type data: list or tuple
136
  @param data: Data sent by RPC client
137
  @rtype: str
138
  @return: Decompressed data
139

140
  """
141
  assert isinstance(data, (list, tuple))
142
  assert len(data) == 2
143
  (encoding, content) = data
144
  if encoding == constants.RPC_ENCODING_NONE:
145
    return content
146
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147
    return zlib.decompress(base64.b64decode(content))
148
  else:
149
    raise AssertionError("Unknown data encoding")
150

    
151

    
152
def _CleanDirectory(path, exclude=None):
153
  """Removes all regular files in a directory.
154

155
  @type path: str
156
  @param path: the directory to clean
157
  @type exclude: list
158
  @param exclude: list of files to be excluded, defaults
159
      to the empty list
160

161
  """
162
  if path not in _ALLOWED_CLEAN_DIRS:
163
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
164
          path)
165

    
166
  if not os.path.isdir(path):
167
    return
168
  if exclude is None:
169
    exclude = []
170
  else:
171
    # Normalize excluded paths
172
    exclude = [os.path.normpath(i) for i in exclude]
173

    
174
  for rel_name in utils.ListVisibleFiles(path):
175
    full_name = utils.PathJoin(path, rel_name)
176
    if full_name in exclude:
177
      continue
178
    if os.path.isfile(full_name) and not os.path.islink(full_name):
179
      utils.RemoveFile(full_name)
180

    
181

    
182
def _BuildUploadFileList():
183
  """Build the list of allowed upload files.
184

185
  This is abstracted so that it's built only once at module import time.
186

187
  """
188
  allowed_files = set([
189
    constants.CLUSTER_CONF_FILE,
190
    constants.ETC_HOSTS,
191
    constants.SSH_KNOWN_HOSTS_FILE,
192
    constants.VNC_PASSWORD_FILE,
193
    constants.RAPI_CERT_FILE,
194
    constants.RAPI_USERS_FILE,
195
    constants.CONFD_HMAC_KEY,
196
    ])
197

    
198
  for hv_name in constants.HYPER_TYPES:
199
    hv_class = hypervisor.GetHypervisorClass(hv_name)
200
    allowed_files.update(hv_class.GetAncillaryFiles())
201

    
202
  return frozenset(allowed_files)
203

    
204

    
205
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
206

    
207

    
208
def JobQueuePurge():
209
  """Removes job queue files and archived jobs.
210

211
  @rtype: tuple
212
  @return: True, None
213

214
  """
215
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
216
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
217

    
218

    
219
def GetMasterInfo():
220
  """Returns master information.
221

222
  This is an utility function to compute master information, either
223
  for consumption here or from the node daemon.
224

225
  @rtype: tuple
226
  @return: master_netdev, master_ip, master_name
227
  @raise RPCFail: in case of errors
228

229
  """
230
  try:
231
    cfg = _GetConfig()
232
    master_netdev = cfg.GetMasterNetdev()
233
    master_ip = cfg.GetMasterIP()
234
    master_node = cfg.GetMasterNode()
235
  except errors.ConfigurationError, err:
236
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
237
  return (master_netdev, master_ip, master_node)
238

    
239

    
240
def StartMaster(start_daemons, no_voting):
241
  """Activate local node as master node.
242

243
  The function will always try activate the IP address of the master
244
  (unless someone else has it). It will also start the master daemons,
245
  based on the start_daemons parameter.
246

247
  @type start_daemons: boolean
248
  @param start_daemons: whether to also start the master
249
      daemons (ganeti-masterd and ganeti-rapi)
250
  @type no_voting: boolean
251
  @param no_voting: whether to start ganeti-masterd without a node vote
252
      (if start_daemons is True), but still non-interactively
253
  @rtype: None
254

255
  """
256
  # GetMasterInfo will raise an exception if not able to return data
257
  master_netdev, master_ip, _ = GetMasterInfo()
258

    
259
  err_msgs = []
260
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
261
    if utils.OwnIpAddress(master_ip):
262
      # we already have the ip:
263
      logging.debug("Master IP already configured, doing nothing")
264
    else:
265
      msg = "Someone else has the master ip, not activating"
266
      logging.error(msg)
267
      err_msgs.append(msg)
268
  else:
269
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
270
                           "dev", master_netdev, "label",
271
                           "%s:0" % master_netdev])
272
    if result.failed:
273
      msg = "Can't activate master IP: %s" % result.output
274
      logging.error(msg)
275
      err_msgs.append(msg)
276

    
277
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
278
                           "-s", master_ip, master_ip])
279
    # we'll ignore the exit code of arping
280

    
281
  # and now start the master and rapi daemons
282
  if start_daemons:
283
    if no_voting:
284
      masterd_args = "--no-voting --yes-do-it"
285
    else:
286
      masterd_args = ""
287

    
288
    env = {
289
      "EXTRA_MASTERD_ARGS": masterd_args,
290
      }
291

    
292
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
293
    if result.failed:
294
      msg = "Can't start Ganeti master: %s" % result.output
295
      logging.error(msg)
296
      err_msgs.append(msg)
297

    
298
  if err_msgs:
299
    _Fail("; ".join(err_msgs))
300

    
301

    
302
def StopMaster(stop_daemons):
303
  """Deactivate this node as master.
304

305
  The function will always try to deactivate the IP address of the
306
  master. It will also stop the master daemons depending on the
307
  stop_daemons parameter.
308

309
  @type stop_daemons: boolean
310
  @param stop_daemons: whether to also stop the master daemons
311
      (ganeti-masterd and ganeti-rapi)
312
  @rtype: None
313

314
  """
315
  # TODO: log and report back to the caller the error failures; we
316
  # need to decide in which case we fail the RPC for this
317

    
318
  # GetMasterInfo will raise an exception if not able to return data
319
  master_netdev, master_ip, _ = GetMasterInfo()
320

    
321
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
322
                         "dev", master_netdev])
323
  if result.failed:
324
    logging.error("Can't remove the master IP, error: %s", result.output)
325
    # but otherwise ignore the failure
326

    
327
  if stop_daemons:
328
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
329
    if result.failed:
330
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
331
                    " and error %s",
332
                    result.cmd, result.exit_code, result.output)
333

    
334

    
335
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
336
  """Joins this node to the cluster.
337

338
  This does the following:
339
      - updates the hostkeys of the machine (rsa and dsa)
340
      - adds the ssh private key to the user
341
      - adds the ssh public key to the users' authorized_keys file
342

343
  @type dsa: str
344
  @param dsa: the DSA private key to write
345
  @type dsapub: str
346
  @param dsapub: the DSA public key to write
347
  @type rsa: str
348
  @param rsa: the RSA private key to write
349
  @type rsapub: str
350
  @param rsapub: the RSA public key to write
351
  @type sshkey: str
352
  @param sshkey: the SSH private key to write
353
  @type sshpub: str
354
  @param sshpub: the SSH public key to write
355
  @rtype: boolean
356
  @return: the success of the operation
357

358
  """
359
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
360
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
361
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
362
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
363
  for name, content, mode in sshd_keys:
364
    utils.WriteFile(name, data=content, mode=mode)
365

    
366
  try:
367
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
368
                                                    mkdir=True)
369
  except errors.OpExecError, err:
370
    _Fail("Error while processing user ssh files: %s", err, exc=True)
371

    
372
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
373
    utils.WriteFile(name, data=content, mode=0600)
374

    
375
  utils.AddAuthorizedKey(auth_keys, sshpub)
376

    
377
  result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
378
  if result.failed:
379
    _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
380
          result.cmd, result.exit_code, result.output)
381

    
382

    
383
def LeaveCluster(modify_ssh_setup):
384
  """Cleans up and remove the current node.
385

386
  This function cleans up and prepares the current node to be removed
387
  from the cluster.
388

389
  If processing is successful, then it raises an
390
  L{errors.QuitGanetiException} which is used as a special case to
391
  shutdown the node daemon.
392

393
  @param modify_ssh_setup: boolean
394

395
  """
396
  _CleanDirectory(constants.DATA_DIR)
397
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398
  JobQueuePurge()
399

    
400
  if modify_ssh_setup:
401
    try:
402
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403

    
404
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405

    
406
      utils.RemoveFile(priv_key)
407
      utils.RemoveFile(pub_key)
408
    except errors.OpExecError:
409
      logging.exception("Error while processing ssh files")
410

    
411
  try:
412
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
413
    utils.RemoveFile(constants.RAPI_CERT_FILE)
414
    utils.RemoveFile(constants.NODED_CERT_FILE)
415
  except: # pylint: disable-msg=W0702
416
    logging.exception("Error while removing cluster secrets")
417

    
418
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419
  if result.failed:
420
    logging.error("Command %s failed with exitcode %s and error %s",
421
                  result.cmd, result.exit_code, result.output)
422

    
423
  # Raise a custom exception (handled in ganeti-noded)
424
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
425

    
426

    
427
def GetNodeInfo(vgname, hypervisor_type):
428
  """Gives back a hash with different information about the node.
429

430
  @type vgname: C{string}
431
  @param vgname: the name of the volume group to ask for disk space information
432
  @type hypervisor_type: C{str}
433
  @param hypervisor_type: the name of the hypervisor to ask for
434
      memory information
435
  @rtype: C{dict}
436
  @return: dictionary with the following keys:
437
      - vg_size is the size of the configured volume group in MiB
438
      - vg_free is the free size of the volume group in MiB
439
      - memory_dom0 is the memory allocated for domain0 in MiB
440
      - memory_free is the currently available (free) ram in MiB
441
      - memory_total is the total number of ram in MiB
442

443
  """
444
  outputarray = {}
445
  vginfo = _GetVGInfo(vgname)
446
  outputarray['vg_size'] = vginfo['vg_size']
447
  outputarray['vg_free'] = vginfo['vg_free']
448

    
449
  hyper = hypervisor.GetHypervisor(hypervisor_type)
450
  hyp_info = hyper.GetNodeInfo()
451
  if hyp_info is not None:
452
    outputarray.update(hyp_info)
453

    
454
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
455

    
456
  return outputarray
457

    
458

    
459
def VerifyNode(what, cluster_name):
460
  """Verify the status of the local node.
461

462
  Based on the input L{what} parameter, various checks are done on the
463
  local node.
464

465
  If the I{filelist} key is present, this list of
466
  files is checksummed and the file/checksum pairs are returned.
467

468
  If the I{nodelist} key is present, we check that we have
469
  connectivity via ssh with the target nodes (and check the hostname
470
  report).
471

472
  If the I{node-net-test} key is present, we check that we have
473
  connectivity to the given nodes via both primary IP and, if
474
  applicable, secondary IPs.
475

476
  @type what: C{dict}
477
  @param what: a dictionary of things to check:
478
      - filelist: list of files for which to compute checksums
479
      - nodelist: list of nodes we should check ssh communication with
480
      - node-net-test: list of nodes we should check node daemon port
481
        connectivity with
482
      - hypervisor: list with hypervisors to run the verify for
483
  @rtype: dict
484
  @return: a dictionary with the same keys as the input dict, and
485
      values representing the result of the checks
486

487
  """
488
  result = {}
489
  my_name = utils.HostInfo().name
490
  port = utils.GetDaemonPort(constants.NODED)
491

    
492
  if constants.NV_HYPERVISOR in what:
493
    result[constants.NV_HYPERVISOR] = tmp = {}
494
    for hv_name in what[constants.NV_HYPERVISOR]:
495
      try:
496
        val = hypervisor.GetHypervisor(hv_name).Verify()
497
      except errors.HypervisorError, err:
498
        val = "Error while checking hypervisor: %s" % str(err)
499
      tmp[hv_name] = val
500

    
501
  if constants.NV_FILELIST in what:
502
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
503
      what[constants.NV_FILELIST])
504

    
505
  if constants.NV_NODELIST in what:
506
    result[constants.NV_NODELIST] = tmp = {}
507
    random.shuffle(what[constants.NV_NODELIST])
508
    for node in what[constants.NV_NODELIST]:
509
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
510
      if not success:
511
        tmp[node] = message
512

    
513
  if constants.NV_NODENETTEST in what:
514
    result[constants.NV_NODENETTEST] = tmp = {}
515
    my_pip = my_sip = None
516
    for name, pip, sip in what[constants.NV_NODENETTEST]:
517
      if name == my_name:
518
        my_pip = pip
519
        my_sip = sip
520
        break
521
    if not my_pip:
522
      tmp[my_name] = ("Can't find my own primary/secondary IP"
523
                      " in the node list")
524
    else:
525
      for name, pip, sip in what[constants.NV_NODENETTEST]:
526
        fail = []
527
        if not utils.TcpPing(pip, port, source=my_pip):
528
          fail.append("primary")
529
        if sip != pip:
530
          if not utils.TcpPing(sip, port, source=my_sip):
531
            fail.append("secondary")
532
        if fail:
533
          tmp[name] = ("failure using the %s interface(s)" %
534
                       " and ".join(fail))
535

    
536
  if constants.NV_MASTERIP in what:
537
    # FIXME: add checks on incoming data structures (here and in the
538
    # rest of the function)
539
    master_name, master_ip = what[constants.NV_MASTERIP]
540
    if master_name == my_name:
541
      source = constants.LOCALHOST_IP_ADDRESS
542
    else:
543
      source = None
544
    result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
545
                                                  source=source)
546

    
547
  if constants.NV_LVLIST in what:
548
    try:
549
      val = GetVolumeList(what[constants.NV_LVLIST])
550
    except RPCFail, err:
551
      val = str(err)
552
    result[constants.NV_LVLIST] = val
553

    
554
  if constants.NV_INSTANCELIST in what:
555
    # GetInstanceList can fail
556
    try:
557
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
558
    except RPCFail, err:
559
      val = str(err)
560
    result[constants.NV_INSTANCELIST] = val
561

    
562
  if constants.NV_VGLIST in what:
563
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
564

    
565
  if constants.NV_PVLIST in what:
566
    result[constants.NV_PVLIST] = \
567
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
568
                                   filter_allocatable=False)
569

    
570
  if constants.NV_VERSION in what:
571
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
572
                                    constants.RELEASE_VERSION)
573

    
574
  if constants.NV_HVINFO in what:
575
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
576
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
577

    
578
  if constants.NV_DRBDLIST in what:
579
    try:
580
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
581
    except errors.BlockDeviceError, err:
582
      logging.warning("Can't get used minors list", exc_info=True)
583
      used_minors = str(err)
584
    result[constants.NV_DRBDLIST] = used_minors
585

    
586
  if constants.NV_NODESETUP in what:
587
    result[constants.NV_NODESETUP] = tmpr = []
588
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
589
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
590
                  " under /sys, missing required directories /sys/block"
591
                  " and /sys/class/net")
592
    if (not os.path.isdir("/proc/sys") or
593
        not os.path.isfile("/proc/sysrq-trigger")):
594
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
595
                  " under /proc, missing required directory /proc/sys and"
596
                  " the file /proc/sysrq-trigger")
597

    
598
  if constants.NV_TIME in what:
599
    result[constants.NV_TIME] = utils.SplitTime(time.time())
600

    
601
  return result
602

    
603

    
604
def GetVolumeList(vg_name):
605
  """Compute list of logical volumes and their size.
606

607
  @type vg_name: str
608
  @param vg_name: the volume group whose LVs we should list
609
  @rtype: dict
610
  @return:
611
      dictionary of all partions (key) with value being a tuple of
612
      their size (in MiB), inactive and online status::
613

614
        {'test1': ('20.06', True, True)}
615

616
      in case of errors, a string is returned with the error
617
      details.
618

619
  """
620
  lvs = {}
621
  sep = '|'
622
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
623
                         "--separator=%s" % sep,
624
                         "-olv_name,lv_size,lv_attr", vg_name])
625
  if result.failed:
626
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
627

    
628
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
629
  for line in result.stdout.splitlines():
630
    line = line.strip()
631
    match = valid_line_re.match(line)
632
    if not match:
633
      logging.error("Invalid line returned from lvs output: '%s'", line)
634
      continue
635
    name, size, attr = match.groups()
636
    inactive = attr[4] == '-'
637
    online = attr[5] == 'o'
638
    virtual = attr[0] == 'v'
639
    if virtual:
640
      # we don't want to report such volumes as existing, since they
641
      # don't really hold data
642
      continue
643
    lvs[name] = (size, inactive, online)
644

    
645
  return lvs
646

    
647

    
648
def ListVolumeGroups():
649
  """List the volume groups and their size.
650

651
  @rtype: dict
652
  @return: dictionary with keys volume name and values the
653
      size of the volume
654

655
  """
656
  return utils.ListVolumeGroups()
657

    
658

    
659
def NodeVolumes():
660
  """List all volumes on this node.
661

662
  @rtype: list
663
  @return:
664
    A list of dictionaries, each having four keys:
665
      - name: the logical volume name,
666
      - size: the size of the logical volume
667
      - dev: the physical device on which the LV lives
668
      - vg: the volume group to which it belongs
669

670
    In case of errors, we return an empty list and log the
671
    error.
672

673
    Note that since a logical volume can live on multiple physical
674
    volumes, the resulting list might include a logical volume
675
    multiple times.
676

677
  """
678
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
679
                         "--separator=|",
680
                         "--options=lv_name,lv_size,devices,vg_name"])
681
  if result.failed:
682
    _Fail("Failed to list logical volumes, lvs output: %s",
683
          result.output)
684

    
685
  def parse_dev(dev):
686
    return dev.split('(')[0]
687

    
688
  def handle_dev(dev):
689
    return [parse_dev(x) for x in dev.split(",")]
690

    
691
  def map_line(line):
692
    line = [v.strip() for v in line]
693
    return [{'name': line[0], 'size': line[1],
694
             'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
695

    
696
  all_devs = []
697
  for line in result.stdout.splitlines():
698
    if line.count('|') >= 3:
699
      all_devs.extend(map_line(line.split('|')))
700
    else:
701
      logging.warning("Strange line in the output from lvs: '%s'", line)
702
  return all_devs
703

    
704

    
705
def BridgesExist(bridges_list):
706
  """Check if a list of bridges exist on the current node.
707

708
  @rtype: boolean
709
  @return: C{True} if all of them exist, C{False} otherwise
710

711
  """
712
  missing = []
713
  for bridge in bridges_list:
714
    if not utils.BridgeExists(bridge):
715
      missing.append(bridge)
716

    
717
  if missing:
718
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
719

    
720

    
721
def GetInstanceList(hypervisor_list):
722
  """Provides a list of instances.
723

724
  @type hypervisor_list: list
725
  @param hypervisor_list: the list of hypervisors to query information
726

727
  @rtype: list
728
  @return: a list of all running instances on the current node
729
    - instance1.example.com
730
    - instance2.example.com
731

732
  """
733
  results = []
734
  for hname in hypervisor_list:
735
    try:
736
      names = hypervisor.GetHypervisor(hname).ListInstances()
737
      results.extend(names)
738
    except errors.HypervisorError, err:
739
      _Fail("Error enumerating instances (hypervisor %s): %s",
740
            hname, err, exc=True)
741

    
742
  return results
743

    
744

    
745
def GetInstanceInfo(instance, hname):
746
  """Gives back the information about an instance as a dictionary.
747

748
  @type instance: string
749
  @param instance: the instance name
750
  @type hname: string
751
  @param hname: the hypervisor type of the instance
752

753
  @rtype: dict
754
  @return: dictionary with the following keys:
755
      - memory: memory size of instance (int)
756
      - state: xen state of instance (string)
757
      - time: cpu time of instance (float)
758

759
  """
760
  output = {}
761

    
762
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
763
  if iinfo is not None:
764
    output['memory'] = iinfo[2]
765
    output['state'] = iinfo[4]
766
    output['time'] = iinfo[5]
767

    
768
  return output
769

    
770

    
771
def GetInstanceMigratable(instance):
772
  """Gives whether an instance can be migrated.
773

774
  @type instance: L{objects.Instance}
775
  @param instance: object representing the instance to be checked.
776

777
  @rtype: tuple
778
  @return: tuple of (result, description) where:
779
      - result: whether the instance can be migrated or not
780
      - description: a description of the issue, if relevant
781

782
  """
783
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
784
  iname = instance.name
785
  if iname not in hyper.ListInstances():
786
    _Fail("Instance %s is not running", iname)
787

    
788
  for idx in range(len(instance.disks)):
789
    link_name = _GetBlockDevSymlinkPath(iname, idx)
790
    if not os.path.islink(link_name):
791
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
792

    
793

    
794
def GetAllInstancesInfo(hypervisor_list):
795
  """Gather data about all instances.
796

797
  This is the equivalent of L{GetInstanceInfo}, except that it
798
  computes data for all instances at once, thus being faster if one
799
  needs data about more than one instance.
800

801
  @type hypervisor_list: list
802
  @param hypervisor_list: list of hypervisors to query for instance data
803

804
  @rtype: dict
805
  @return: dictionary of instance: data, with data having the following keys:
806
      - memory: memory size of instance (int)
807
      - state: xen state of instance (string)
808
      - time: cpu time of instance (float)
809
      - vcpus: the number of vcpus
810

811
  """
812
  output = {}
813

    
814
  for hname in hypervisor_list:
815
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
816
    if iinfo:
817
      for name, _, memory, vcpus, state, times in iinfo:
818
        value = {
819
          'memory': memory,
820
          'vcpus': vcpus,
821
          'state': state,
822
          'time': times,
823
          }
824
        if name in output:
825
          # we only check static parameters, like memory and vcpus,
826
          # and not state and time which can change between the
827
          # invocations of the different hypervisors
828
          for key in 'memory', 'vcpus':
829
            if value[key] != output[name][key]:
830
              _Fail("Instance %s is running twice"
831
                    " with different parameters", name)
832
        output[name] = value
833

    
834
  return output
835

    
836

    
837
def _InstanceLogName(kind, os_name, instance):
838
  """Compute the OS log filename for a given instance and operation.
839

840
  The instance name and os name are passed in as strings since not all
841
  operations have these as part of an instance object.
842

843
  @type kind: string
844
  @param kind: the operation type (e.g. add, import, etc.)
845
  @type os_name: string
846
  @param os_name: the os name
847
  @type instance: string
848
  @param instance: the name of the instance being imported/added/etc.
849

850
  """
851
  # TODO: Use tempfile.mkstemp to create unique filename
852
  base = ("%s-%s-%s-%s.log" %
853
          (kind, os_name, instance, utils.TimestampForFilename()))
854
  return utils.PathJoin(constants.LOG_OS_DIR, base)
855

    
856

    
857
def InstanceOsAdd(instance, reinstall, debug):
858
  """Add an OS to an instance.
859

860
  @type instance: L{objects.Instance}
861
  @param instance: Instance whose OS is to be installed
862
  @type reinstall: boolean
863
  @param reinstall: whether this is an instance reinstall
864
  @type debug: integer
865
  @param debug: debug level, passed to the OS scripts
866
  @rtype: None
867

868
  """
869
  inst_os = OSFromDisk(instance.os)
870

    
871
  create_env = OSEnvironment(instance, inst_os, debug)
872
  if reinstall:
873
    create_env['INSTANCE_REINSTALL'] = "1"
874

    
875
  logfile = _InstanceLogName("add", instance.os, instance.name)
876

    
877
  result = utils.RunCmd([inst_os.create_script], env=create_env,
878
                        cwd=inst_os.path, output=logfile,)
879
  if result.failed:
880
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
881
                  " output: %s", result.cmd, result.fail_reason, logfile,
882
                  result.output)
883
    lines = [utils.SafeEncode(val)
884
             for val in utils.TailFile(logfile, lines=20)]
885
    _Fail("OS create script failed (%s), last lines in the"
886
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
887

    
888

    
889
def RunRenameInstance(instance, old_name, debug):
890
  """Run the OS rename script for an instance.
891

892
  @type instance: L{objects.Instance}
893
  @param instance: Instance whose OS is to be installed
894
  @type old_name: string
895
  @param old_name: previous instance name
896
  @type debug: integer
897
  @param debug: debug level, passed to the OS scripts
898
  @rtype: boolean
899
  @return: the success of the operation
900

901
  """
902
  inst_os = OSFromDisk(instance.os)
903

    
904
  rename_env = OSEnvironment(instance, inst_os, debug)
905
  rename_env['OLD_INSTANCE_NAME'] = old_name
906

    
907
  logfile = _InstanceLogName("rename", instance.os,
908
                             "%s-%s" % (old_name, instance.name))
909

    
910
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
911
                        cwd=inst_os.path, output=logfile)
912

    
913
  if result.failed:
914
    logging.error("os create command '%s' returned error: %s output: %s",
915
                  result.cmd, result.fail_reason, result.output)
916
    lines = [utils.SafeEncode(val)
917
             for val in utils.TailFile(logfile, lines=20)]
918
    _Fail("OS rename script failed (%s), last lines in the"
919
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
920

    
921

    
922
def _GetVGInfo(vg_name):
923
  """Get information about the volume group.
924

925
  @type vg_name: str
926
  @param vg_name: the volume group which we query
927
  @rtype: dict
928
  @return:
929
    A dictionary with the following keys:
930
      - C{vg_size} is the total size of the volume group in MiB
931
      - C{vg_free} is the free size of the volume group in MiB
932
      - C{pv_count} are the number of physical disks in that VG
933

934
    If an error occurs during gathering of data, we return the same dict
935
    with keys all set to None.
936

937
  """
938
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
939

    
940
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
941
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
942

    
943
  if retval.failed:
944
    logging.error("volume group %s not present", vg_name)
945
    return retdic
946
  valarr = retval.stdout.strip().rstrip(':').split(':')
947
  if len(valarr) == 3:
948
    try:
949
      retdic = {
950
        "vg_size": int(round(float(valarr[0]), 0)),
951
        "vg_free": int(round(float(valarr[1]), 0)),
952
        "pv_count": int(valarr[2]),
953
        }
954
    except (TypeError, ValueError), err:
955
      logging.exception("Fail to parse vgs output: %s", err)
956
  else:
957
    logging.error("vgs output has the wrong number of fields (expected"
958
                  " three): %s", str(valarr))
959
  return retdic
960

    
961

    
962
def _GetBlockDevSymlinkPath(instance_name, idx):
963
  return utils.PathJoin(constants.DISK_LINKS_DIR,
964
                        "%s:%d" % (instance_name, idx))
965

    
966

    
967
def _SymlinkBlockDev(instance_name, device_path, idx):
968
  """Set up symlinks to a instance's block device.
969

970
  This is an auxiliary function run when an instance is start (on the primary
971
  node) or when an instance is migrated (on the target node).
972

973

974
  @param instance_name: the name of the target instance
975
  @param device_path: path of the physical block device, on the node
976
  @param idx: the disk index
977
  @return: absolute path to the disk's symlink
978

979
  """
980
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
981
  try:
982
    os.symlink(device_path, link_name)
983
  except OSError, err:
984
    if err.errno == errno.EEXIST:
985
      if (not os.path.islink(link_name) or
986
          os.readlink(link_name) != device_path):
987
        os.remove(link_name)
988
        os.symlink(device_path, link_name)
989
    else:
990
      raise
991

    
992
  return link_name
993

    
994

    
995
def _RemoveBlockDevLinks(instance_name, disks):
996
  """Remove the block device symlinks belonging to the given instance.
997

998
  """
999
  for idx, _ in enumerate(disks):
1000
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1001
    if os.path.islink(link_name):
1002
      try:
1003
        os.remove(link_name)
1004
      except OSError:
1005
        logging.exception("Can't remove symlink '%s'", link_name)
1006

    
1007

    
1008
def _GatherAndLinkBlockDevs(instance):
1009
  """Set up an instance's block device(s).
1010

1011
  This is run on the primary node at instance startup. The block
1012
  devices must be already assembled.
1013

1014
  @type instance: L{objects.Instance}
1015
  @param instance: the instance whose disks we shoul assemble
1016
  @rtype: list
1017
  @return: list of (disk_object, device_path)
1018

1019
  """
1020
  block_devices = []
1021
  for idx, disk in enumerate(instance.disks):
1022
    device = _RecursiveFindBD(disk)
1023
    if device is None:
1024
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1025
                                    str(disk))
1026
    device.Open()
1027
    try:
1028
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1029
    except OSError, e:
1030
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1031
                                    e.strerror)
1032

    
1033
    block_devices.append((disk, link_name))
1034

    
1035
  return block_devices
1036

    
1037

    
1038
def StartInstance(instance):
1039
  """Start an instance.
1040

1041
  @type instance: L{objects.Instance}
1042
  @param instance: the instance object
1043
  @rtype: None
1044

1045
  """
1046
  running_instances = GetInstanceList([instance.hypervisor])
1047

    
1048
  if instance.name in running_instances:
1049
    logging.info("Instance %s already running, not starting", instance.name)
1050
    return
1051

    
1052
  try:
1053
    block_devices = _GatherAndLinkBlockDevs(instance)
1054
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1055
    hyper.StartInstance(instance, block_devices)
1056
  except errors.BlockDeviceError, err:
1057
    _Fail("Block device error: %s", err, exc=True)
1058
  except errors.HypervisorError, err:
1059
    _RemoveBlockDevLinks(instance.name, instance.disks)
1060
    _Fail("Hypervisor error: %s", err, exc=True)
1061

    
1062

    
1063
def InstanceShutdown(instance, timeout):
1064
  """Shut an instance down.
1065

1066
  @note: this functions uses polling with a hardcoded timeout.
1067

1068
  @type instance: L{objects.Instance}
1069
  @param instance: the instance object
1070
  @type timeout: integer
1071
  @param timeout: maximum timeout for soft shutdown
1072
  @rtype: None
1073

1074
  """
1075
  hv_name = instance.hypervisor
1076
  hyper = hypervisor.GetHypervisor(hv_name)
1077
  iname = instance.name
1078

    
1079
  if instance.name not in hyper.ListInstances():
1080
    logging.info("Instance %s not running, doing nothing", iname)
1081
    return
1082

    
1083
  class _TryShutdown:
1084
    def __init__(self):
1085
      self.tried_once = False
1086

    
1087
    def __call__(self):
1088
      if iname not in hyper.ListInstances():
1089
        return
1090

    
1091
      try:
1092
        hyper.StopInstance(instance, retry=self.tried_once)
1093
      except errors.HypervisorError, err:
1094
        if iname not in hyper.ListInstances():
1095
          # if the instance is no longer existing, consider this a
1096
          # success and go to cleanup
1097
          return
1098

    
1099
        _Fail("Failed to stop instance %s: %s", iname, err)
1100

    
1101
      self.tried_once = True
1102

    
1103
      raise utils.RetryAgain()
1104

    
1105
  try:
1106
    utils.Retry(_TryShutdown(), 5, timeout)
1107
  except utils.RetryTimeout:
1108
    # the shutdown did not succeed
1109
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1110

    
1111
    try:
1112
      hyper.StopInstance(instance, force=True)
1113
    except errors.HypervisorError, err:
1114
      if iname in hyper.ListInstances():
1115
        # only raise an error if the instance still exists, otherwise
1116
        # the error could simply be "instance ... unknown"!
1117
        _Fail("Failed to force stop instance %s: %s", iname, err)
1118

    
1119
    time.sleep(1)
1120

    
1121
    if iname in hyper.ListInstances():
1122
      _Fail("Could not shutdown instance %s even by destroy", iname)
1123

    
1124
  try:
1125
    hyper.CleanupInstance(instance.name)
1126
  except errors.HypervisorError, err:
1127
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1128

    
1129
  _RemoveBlockDevLinks(iname, instance.disks)
1130

    
1131

    
1132
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1133
  """Reboot an instance.
1134

1135
  @type instance: L{objects.Instance}
1136
  @param instance: the instance object to reboot
1137
  @type reboot_type: str
1138
  @param reboot_type: the type of reboot, one the following
1139
    constants:
1140
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1141
        instance OS, do not recreate the VM
1142
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1143
        restart the VM (at the hypervisor level)
1144
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1145
        not accepted here, since that mode is handled differently, in
1146
        cmdlib, and translates into full stop and start of the
1147
        instance (instead of a call_instance_reboot RPC)
1148
  @type shutdown_timeout: integer
1149
  @param shutdown_timeout: maximum timeout for soft shutdown
1150
  @rtype: None
1151

1152
  """
1153
  running_instances = GetInstanceList([instance.hypervisor])
1154

    
1155
  if instance.name not in running_instances:
1156
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1157

    
1158
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1160
    try:
1161
      hyper.RebootInstance(instance)
1162
    except errors.HypervisorError, err:
1163
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1164
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1165
    try:
1166
      InstanceShutdown(instance, shutdown_timeout)
1167
      return StartInstance(instance)
1168
    except errors.HypervisorError, err:
1169
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1170
  else:
1171
    _Fail("Invalid reboot_type received: %s", reboot_type)
1172

    
1173

    
1174
def MigrationInfo(instance):
1175
  """Gather information about an instance to be migrated.
1176

1177
  @type instance: L{objects.Instance}
1178
  @param instance: the instance definition
1179

1180
  """
1181
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1182
  try:
1183
    info = hyper.MigrationInfo(instance)
1184
  except errors.HypervisorError, err:
1185
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1186
  return info
1187

    
1188

    
1189
def AcceptInstance(instance, info, target):
1190
  """Prepare the node to accept an instance.
1191

1192
  @type instance: L{objects.Instance}
1193
  @param instance: the instance definition
1194
  @type info: string/data (opaque)
1195
  @param info: migration information, from the source node
1196
  @type target: string
1197
  @param target: target host (usually ip), on this node
1198

1199
  """
1200
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1201
  try:
1202
    hyper.AcceptInstance(instance, info, target)
1203
  except errors.HypervisorError, err:
1204
    _Fail("Failed to accept instance: %s", err, exc=True)
1205

    
1206

    
1207
def FinalizeMigration(instance, info, success):
1208
  """Finalize any preparation to accept an instance.
1209

1210
  @type instance: L{objects.Instance}
1211
  @param instance: the instance definition
1212
  @type info: string/data (opaque)
1213
  @param info: migration information, from the source node
1214
  @type success: boolean
1215
  @param success: whether the migration was a success or a failure
1216

1217
  """
1218
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1219
  try:
1220
    hyper.FinalizeMigration(instance, info, success)
1221
  except errors.HypervisorError, err:
1222
    _Fail("Failed to finalize migration: %s", err, exc=True)
1223

    
1224

    
1225
def MigrateInstance(instance, target, live):
1226
  """Migrates an instance to another node.
1227

1228
  @type instance: L{objects.Instance}
1229
  @param instance: the instance definition
1230
  @type target: string
1231
  @param target: the target node name
1232
  @type live: boolean
1233
  @param live: whether the migration should be done live or not (the
1234
      interpretation of this parameter is left to the hypervisor)
1235
  @rtype: tuple
1236
  @return: a tuple of (success, msg) where:
1237
      - succes is a boolean denoting the success/failure of the operation
1238
      - msg is a string with details in case of failure
1239

1240
  """
1241
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1242

    
1243
  try:
1244
    hyper.MigrateInstance(instance, target, live)
1245
  except errors.HypervisorError, err:
1246
    _Fail("Failed to migrate instance: %s", err, exc=True)
1247

    
1248

    
1249
def BlockdevCreate(disk, size, owner, on_primary, info):
1250
  """Creates a block device for an instance.
1251

1252
  @type disk: L{objects.Disk}
1253
  @param disk: the object describing the disk we should create
1254
  @type size: int
1255
  @param size: the size of the physical underlying device, in MiB
1256
  @type owner: str
1257
  @param owner: the name of the instance for which disk is created,
1258
      used for device cache data
1259
  @type on_primary: boolean
1260
  @param on_primary:  indicates if it is the primary node or not
1261
  @type info: string
1262
  @param info: string that will be sent to the physical device
1263
      creation, used for example to set (LVM) tags on LVs
1264

1265
  @return: the new unique_id of the device (this can sometime be
1266
      computed only after creation), or None. On secondary nodes,
1267
      it's not required to return anything.
1268

1269
  """
1270
  # TODO: remove the obsolete 'size' argument
1271
  # pylint: disable-msg=W0613
1272
  clist = []
1273
  if disk.children:
1274
    for child in disk.children:
1275
      try:
1276
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1277
      except errors.BlockDeviceError, err:
1278
        _Fail("Can't assemble device %s: %s", child, err)
1279
      if on_primary or disk.AssembleOnSecondary():
1280
        # we need the children open in case the device itself has to
1281
        # be assembled
1282
        try:
1283
          # pylint: disable-msg=E1103
1284
          crdev.Open()
1285
        except errors.BlockDeviceError, err:
1286
          _Fail("Can't make child '%s' read-write: %s", child, err)
1287
      clist.append(crdev)
1288

    
1289
  try:
1290
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1291
  except errors.BlockDeviceError, err:
1292
    _Fail("Can't create block device: %s", err)
1293

    
1294
  if on_primary or disk.AssembleOnSecondary():
1295
    try:
1296
      device.Assemble()
1297
    except errors.BlockDeviceError, err:
1298
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1299
    device.SetSyncSpeed(constants.SYNC_SPEED)
1300
    if on_primary or disk.OpenOnSecondary():
1301
      try:
1302
        device.Open(force=True)
1303
      except errors.BlockDeviceError, err:
1304
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1305
    DevCacheManager.UpdateCache(device.dev_path, owner,
1306
                                on_primary, disk.iv_name)
1307

    
1308
  device.SetInfo(info)
1309

    
1310
  return device.unique_id
1311

    
1312

    
1313
def BlockdevRemove(disk):
1314
  """Remove a block device.
1315

1316
  @note: This is intended to be called recursively.
1317

1318
  @type disk: L{objects.Disk}
1319
  @param disk: the disk object we should remove
1320
  @rtype: boolean
1321
  @return: the success of the operation
1322

1323
  """
1324
  msgs = []
1325
  try:
1326
    rdev = _RecursiveFindBD(disk)
1327
  except errors.BlockDeviceError, err:
1328
    # probably can't attach
1329
    logging.info("Can't attach to device %s in remove", disk)
1330
    rdev = None
1331
  if rdev is not None:
1332
    r_path = rdev.dev_path
1333
    try:
1334
      rdev.Remove()
1335
    except errors.BlockDeviceError, err:
1336
      msgs.append(str(err))
1337
    if not msgs:
1338
      DevCacheManager.RemoveCache(r_path)
1339

    
1340
  if disk.children:
1341
    for child in disk.children:
1342
      try:
1343
        BlockdevRemove(child)
1344
      except RPCFail, err:
1345
        msgs.append(str(err))
1346

    
1347
  if msgs:
1348
    _Fail("; ".join(msgs))
1349

    
1350

    
1351
def _RecursiveAssembleBD(disk, owner, as_primary):
1352
  """Activate a block device for an instance.
1353

1354
  This is run on the primary and secondary nodes for an instance.
1355

1356
  @note: this function is called recursively.
1357

1358
  @type disk: L{objects.Disk}
1359
  @param disk: the disk we try to assemble
1360
  @type owner: str
1361
  @param owner: the name of the instance which owns the disk
1362
  @type as_primary: boolean
1363
  @param as_primary: if we should make the block device
1364
      read/write
1365

1366
  @return: the assembled device or None (in case no device
1367
      was assembled)
1368
  @raise errors.BlockDeviceError: in case there is an error
1369
      during the activation of the children or the device
1370
      itself
1371

1372
  """
1373
  children = []
1374
  if disk.children:
1375
    mcn = disk.ChildrenNeeded()
1376
    if mcn == -1:
1377
      mcn = 0 # max number of Nones allowed
1378
    else:
1379
      mcn = len(disk.children) - mcn # max number of Nones
1380
    for chld_disk in disk.children:
1381
      try:
1382
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1383
      except errors.BlockDeviceError, err:
1384
        if children.count(None) >= mcn:
1385
          raise
1386
        cdev = None
1387
        logging.error("Error in child activation (but continuing): %s",
1388
                      str(err))
1389
      children.append(cdev)
1390

    
1391
  if as_primary or disk.AssembleOnSecondary():
1392
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1393
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1394
    result = r_dev
1395
    if as_primary or disk.OpenOnSecondary():
1396
      r_dev.Open()
1397
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1398
                                as_primary, disk.iv_name)
1399

    
1400
  else:
1401
    result = True
1402
  return result
1403

    
1404

    
1405
def BlockdevAssemble(disk, owner, as_primary):
1406
  """Activate a block device for an instance.
1407

1408
  This is a wrapper over _RecursiveAssembleBD.
1409

1410
  @rtype: str or boolean
1411
  @return: a C{/dev/...} path for primary nodes, and
1412
      C{True} for secondary nodes
1413

1414
  """
1415
  try:
1416
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1417
    if isinstance(result, bdev.BlockDev):
1418
      # pylint: disable-msg=E1103
1419
      result = result.dev_path
1420
  except errors.BlockDeviceError, err:
1421
    _Fail("Error while assembling disk: %s", err, exc=True)
1422

    
1423
  return result
1424

    
1425

    
1426
def BlockdevShutdown(disk):
1427
  """Shut down a block device.
1428

1429
  First, if the device is assembled (Attach() is successful), then
1430
  the device is shutdown. Then the children of the device are
1431
  shutdown.
1432

1433
  This function is called recursively. Note that we don't cache the
1434
  children or such, as oppossed to assemble, shutdown of different
1435
  devices doesn't require that the upper device was active.
1436

1437
  @type disk: L{objects.Disk}
1438
  @param disk: the description of the disk we should
1439
      shutdown
1440
  @rtype: None
1441

1442
  """
1443
  msgs = []
1444
  r_dev = _RecursiveFindBD(disk)
1445
  if r_dev is not None:
1446
    r_path = r_dev.dev_path
1447
    try:
1448
      r_dev.Shutdown()
1449
      DevCacheManager.RemoveCache(r_path)
1450
    except errors.BlockDeviceError, err:
1451
      msgs.append(str(err))
1452

    
1453
  if disk.children:
1454
    for child in disk.children:
1455
      try:
1456
        BlockdevShutdown(child)
1457
      except RPCFail, err:
1458
        msgs.append(str(err))
1459

    
1460
  if msgs:
1461
    _Fail("; ".join(msgs))
1462

    
1463

    
1464
def BlockdevAddchildren(parent_cdev, new_cdevs):
1465
  """Extend a mirrored block device.
1466

1467
  @type parent_cdev: L{objects.Disk}
1468
  @param parent_cdev: the disk to which we should add children
1469
  @type new_cdevs: list of L{objects.Disk}
1470
  @param new_cdevs: the list of children which we should add
1471
  @rtype: None
1472

1473
  """
1474
  parent_bdev = _RecursiveFindBD(parent_cdev)
1475
  if parent_bdev is None:
1476
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1477
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1478
  if new_bdevs.count(None) > 0:
1479
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1480
  parent_bdev.AddChildren(new_bdevs)
1481

    
1482

    
1483
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1484
  """Shrink a mirrored block device.
1485

1486
  @type parent_cdev: L{objects.Disk}
1487
  @param parent_cdev: the disk from which we should remove children
1488
  @type new_cdevs: list of L{objects.Disk}
1489
  @param new_cdevs: the list of children which we should remove
1490
  @rtype: None
1491

1492
  """
1493
  parent_bdev = _RecursiveFindBD(parent_cdev)
1494
  if parent_bdev is None:
1495
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1496
  devs = []
1497
  for disk in new_cdevs:
1498
    rpath = disk.StaticDevPath()
1499
    if rpath is None:
1500
      bd = _RecursiveFindBD(disk)
1501
      if bd is None:
1502
        _Fail("Can't find device %s while removing children", disk)
1503
      else:
1504
        devs.append(bd.dev_path)
1505
    else:
1506
      if not utils.IsNormAbsPath(rpath):
1507
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1508
      devs.append(rpath)
1509
  parent_bdev.RemoveChildren(devs)
1510

    
1511

    
1512
def BlockdevGetmirrorstatus(disks):
1513
  """Get the mirroring status of a list of devices.
1514

1515
  @type disks: list of L{objects.Disk}
1516
  @param disks: the list of disks which we should query
1517
  @rtype: disk
1518
  @return:
1519
      a list of (mirror_done, estimated_time) tuples, which
1520
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1521
  @raise errors.BlockDeviceError: if any of the disks cannot be
1522
      found
1523

1524
  """
1525
  stats = []
1526
  for dsk in disks:
1527
    rbd = _RecursiveFindBD(dsk)
1528
    if rbd is None:
1529
      _Fail("Can't find device %s", dsk)
1530

    
1531
    stats.append(rbd.CombinedSyncStatus())
1532

    
1533
  return stats
1534

    
1535

    
1536
def _RecursiveFindBD(disk):
1537
  """Check if a device is activated.
1538

1539
  If so, return information about the real device.
1540

1541
  @type disk: L{objects.Disk}
1542
  @param disk: the disk object we need to find
1543

1544
  @return: None if the device can't be found,
1545
      otherwise the device instance
1546

1547
  """
1548
  children = []
1549
  if disk.children:
1550
    for chdisk in disk.children:
1551
      children.append(_RecursiveFindBD(chdisk))
1552

    
1553
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1554

    
1555

    
1556
def _OpenRealBD(disk):
1557
  """Opens the underlying block device of a disk.
1558

1559
  @type disk: L{objects.Disk}
1560
  @param disk: the disk object we want to open
1561

1562
  """
1563
  real_disk = _RecursiveFindBD(disk)
1564
  if real_disk is None:
1565
    _Fail("Block device '%s' is not set up", disk)
1566

    
1567
  real_disk.Open()
1568

    
1569
  return real_disk
1570

    
1571

    
1572
def BlockdevFind(disk):
1573
  """Check if a device is activated.
1574

1575
  If it is, return information about the real device.
1576

1577
  @type disk: L{objects.Disk}
1578
  @param disk: the disk to find
1579
  @rtype: None or objects.BlockDevStatus
1580
  @return: None if the disk cannot be found, otherwise a the current
1581
           information
1582

1583
  """
1584
  try:
1585
    rbd = _RecursiveFindBD(disk)
1586
  except errors.BlockDeviceError, err:
1587
    _Fail("Failed to find device: %s", err, exc=True)
1588

    
1589
  if rbd is None:
1590
    return None
1591

    
1592
  return rbd.GetSyncStatus()
1593

    
1594

    
1595
def BlockdevGetsize(disks):
1596
  """Computes the size of the given disks.
1597

1598
  If a disk is not found, returns None instead.
1599

1600
  @type disks: list of L{objects.Disk}
1601
  @param disks: the list of disk to compute the size for
1602
  @rtype: list
1603
  @return: list with elements None if the disk cannot be found,
1604
      otherwise the size
1605

1606
  """
1607
  result = []
1608
  for cf in disks:
1609
    try:
1610
      rbd = _RecursiveFindBD(cf)
1611
    except errors.BlockDeviceError:
1612
      result.append(None)
1613
      continue
1614
    if rbd is None:
1615
      result.append(None)
1616
    else:
1617
      result.append(rbd.GetActualSize())
1618
  return result
1619

    
1620

    
1621
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1622
  """Export a block device to a remote node.
1623

1624
  @type disk: L{objects.Disk}
1625
  @param disk: the description of the disk to export
1626
  @type dest_node: str
1627
  @param dest_node: the destination node to export to
1628
  @type dest_path: str
1629
  @param dest_path: the destination path on the target node
1630
  @type cluster_name: str
1631
  @param cluster_name: the cluster name, needed for SSH hostalias
1632
  @rtype: None
1633

1634
  """
1635
  real_disk = _OpenRealBD(disk)
1636

    
1637
  # the block size on the read dd is 1MiB to match our units
1638
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1639
                               "dd if=%s bs=1048576 count=%s",
1640
                               real_disk.dev_path, str(disk.size))
1641

    
1642
  # we set here a smaller block size as, due to ssh buffering, more
1643
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1644
  # device is not already there or we pass a wrong path; we use
1645
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1646
  # to not buffer too much memory; this means that at best, we flush
1647
  # every 64k, which will not be very fast
1648
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1649
                                " oflag=dsync", dest_path)
1650

    
1651
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1652
                                                   constants.GANETI_RUNAS,
1653
                                                   destcmd)
1654

    
1655
  # all commands have been checked, so we're safe to combine them
1656
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1657

    
1658
  result = utils.RunCmd(["bash", "-c", command])
1659

    
1660
  if result.failed:
1661
    _Fail("Disk copy command '%s' returned error: %s"
1662
          " output: %s", command, result.fail_reason, result.output)
1663

    
1664

    
1665
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1666
  """Write a file to the filesystem.
1667

1668
  This allows the master to overwrite(!) a file. It will only perform
1669
  the operation if the file belongs to a list of configuration files.
1670

1671
  @type file_name: str
1672
  @param file_name: the target file name
1673
  @type data: str
1674
  @param data: the new contents of the file
1675
  @type mode: int
1676
  @param mode: the mode to give the file (can be None)
1677
  @type uid: int
1678
  @param uid: the owner of the file (can be -1 for default)
1679
  @type gid: int
1680
  @param gid: the group of the file (can be -1 for default)
1681
  @type atime: float
1682
  @param atime: the atime to set on the file (can be None)
1683
  @type mtime: float
1684
  @param mtime: the mtime to set on the file (can be None)
1685
  @rtype: None
1686

1687
  """
1688
  if not os.path.isabs(file_name):
1689
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1690

    
1691
  if file_name not in _ALLOWED_UPLOAD_FILES:
1692
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1693
          file_name)
1694

    
1695
  raw_data = _Decompress(data)
1696

    
1697
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1698
                  atime=atime, mtime=mtime)
1699

    
1700

    
1701
def WriteSsconfFiles(values):
1702
  """Update all ssconf files.
1703

1704
  Wrapper around the SimpleStore.WriteFiles.
1705

1706
  """
1707
  ssconf.SimpleStore().WriteFiles(values)
1708

    
1709

    
1710
def _ErrnoOrStr(err):
1711
  """Format an EnvironmentError exception.
1712

1713
  If the L{err} argument has an errno attribute, it will be looked up
1714
  and converted into a textual C{E...} description. Otherwise the
1715
  string representation of the error will be returned.
1716

1717
  @type err: L{EnvironmentError}
1718
  @param err: the exception to format
1719

1720
  """
1721
  if hasattr(err, 'errno'):
1722
    detail = errno.errorcode[err.errno]
1723
  else:
1724
    detail = str(err)
1725
  return detail
1726

    
1727

    
1728
def _OSOndiskAPIVersion(os_dir):
1729
  """Compute and return the API version of a given OS.
1730

1731
  This function will try to read the API version of the OS residing in
1732
  the 'os_dir' directory.
1733

1734
  @type os_dir: str
1735
  @param os_dir: the directory in which we should look for the OS
1736
  @rtype: tuple
1737
  @return: tuple (status, data) with status denoting the validity and
1738
      data holding either the vaid versions or an error message
1739

1740
  """
1741
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1742

    
1743
  try:
1744
    st = os.stat(api_file)
1745
  except EnvironmentError, err:
1746
    return False, ("Required file '%s' not found under path %s: %s" %
1747
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1748

    
1749
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1750
    return False, ("File '%s' in %s is not a regular file" %
1751
                   (constants.OS_API_FILE, os_dir))
1752

    
1753
  try:
1754
    api_versions = utils.ReadFile(api_file).splitlines()
1755
  except EnvironmentError, err:
1756
    return False, ("Error while reading the API version file at %s: %s" %
1757
                   (api_file, _ErrnoOrStr(err)))
1758

    
1759
  try:
1760
    api_versions = [int(version.strip()) for version in api_versions]
1761
  except (TypeError, ValueError), err:
1762
    return False, ("API version(s) can't be converted to integer: %s" %
1763
                   str(err))
1764

    
1765
  return True, api_versions
1766

    
1767

    
1768
def DiagnoseOS(top_dirs=None):
1769
  """Compute the validity for all OSes.
1770

1771
  @type top_dirs: list
1772
  @param top_dirs: the list of directories in which to
1773
      search (if not given defaults to
1774
      L{constants.OS_SEARCH_PATH})
1775
  @rtype: list of L{objects.OS}
1776
  @return: a list of tuples (name, path, status, diagnose, variants)
1777
      for all (potential) OSes under all search paths, where:
1778
          - name is the (potential) OS name
1779
          - path is the full path to the OS
1780
          - status True/False is the validity of the OS
1781
          - diagnose is the error message for an invalid OS, otherwise empty
1782
          - variants is a list of supported OS variants, if any
1783

1784
  """
1785
  if top_dirs is None:
1786
    top_dirs = constants.OS_SEARCH_PATH
1787

    
1788
  result = []
1789
  for dir_name in top_dirs:
1790
    if os.path.isdir(dir_name):
1791
      try:
1792
        f_names = utils.ListVisibleFiles(dir_name)
1793
      except EnvironmentError, err:
1794
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1795
        break
1796
      for name in f_names:
1797
        os_path = utils.PathJoin(dir_name, name)
1798
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1799
        if status:
1800
          diagnose = ""
1801
          variants = os_inst.supported_variants
1802
        else:
1803
          diagnose = os_inst
1804
          variants = []
1805
        result.append((name, os_path, status, diagnose, variants))
1806

    
1807
  return result
1808

    
1809

    
1810
def _TryOSFromDisk(name, base_dir=None):
1811
  """Create an OS instance from disk.
1812

1813
  This function will return an OS instance if the given name is a
1814
  valid OS name.
1815

1816
  @type base_dir: string
1817
  @keyword base_dir: Base directory containing OS installations.
1818
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1819
  @rtype: tuple
1820
  @return: success and either the OS instance if we find a valid one,
1821
      or error message
1822

1823
  """
1824
  if base_dir is None:
1825
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1826
  else:
1827
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1828

    
1829
  if os_dir is None:
1830
    return False, "Directory for OS %s not found in search path" % name
1831

    
1832
  status, api_versions = _OSOndiskAPIVersion(os_dir)
1833
  if not status:
1834
    # push the error up
1835
    return status, api_versions
1836

    
1837
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1838
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1839
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1840

    
1841
  # OS Files dictionary, we will populate it with the absolute path names
1842
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1843

    
1844
  if max(api_versions) >= constants.OS_API_V15:
1845
    os_files[constants.OS_VARIANTS_FILE] = ''
1846

    
1847
  for filename in os_files:
1848
    os_files[filename] = utils.PathJoin(os_dir, filename)
1849

    
1850
    try:
1851
      st = os.stat(os_files[filename])
1852
    except EnvironmentError, err:
1853
      return False, ("File '%s' under path '%s' is missing (%s)" %
1854
                     (filename, os_dir, _ErrnoOrStr(err)))
1855

    
1856
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1857
      return False, ("File '%s' under path '%s' is not a regular file" %
1858
                     (filename, os_dir))
1859

    
1860
    if filename in constants.OS_SCRIPTS:
1861
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1862
        return False, ("File '%s' under path '%s' is not executable" %
1863
                       (filename, os_dir))
1864

    
1865
  variants = None
1866
  if constants.OS_VARIANTS_FILE in os_files:
1867
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1868
    try:
1869
      variants = utils.ReadFile(variants_file).splitlines()
1870
    except EnvironmentError, err:
1871
      return False, ("Error while reading the OS variants file at %s: %s" %
1872
                     (variants_file, _ErrnoOrStr(err)))
1873
    if not variants:
1874
      return False, ("No supported os variant found")
1875

    
1876
  os_obj = objects.OS(name=name, path=os_dir,
1877
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1878
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1879
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1880
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1881
                      supported_variants=variants,
1882
                      api_versions=api_versions)
1883
  return True, os_obj
1884

    
1885

    
1886
def OSFromDisk(name, base_dir=None):
1887
  """Create an OS instance from disk.
1888

1889
  This function will return an OS instance if the given name is a
1890
  valid OS name. Otherwise, it will raise an appropriate
1891
  L{RPCFail} exception, detailing why this is not a valid OS.
1892

1893
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1894
  an exception but returns true/false status data.
1895

1896
  @type base_dir: string
1897
  @keyword base_dir: Base directory containing OS installations.
1898
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1899
  @rtype: L{objects.OS}
1900
  @return: the OS instance if we find a valid one
1901
  @raise RPCFail: if we don't find a valid OS
1902

1903
  """
1904
  name_only = name.split("+", 1)[0]
1905
  status, payload = _TryOSFromDisk(name_only, base_dir)
1906

    
1907
  if not status:
1908
    _Fail(payload)
1909

    
1910
  return payload
1911

    
1912

    
1913
def OSEnvironment(instance, inst_os, debug=0):
1914
  """Calculate the environment for an os script.
1915

1916
  @type instance: L{objects.Instance}
1917
  @param instance: target instance for the os script run
1918
  @type inst_os: L{objects.OS}
1919
  @param inst_os: operating system for which the environment is being built
1920
  @type debug: integer
1921
  @param debug: debug level (0 or 1, for OS Api 10)
1922
  @rtype: dict
1923
  @return: dict of environment variables
1924
  @raise errors.BlockDeviceError: if the block device
1925
      cannot be found
1926

1927
  """
1928
  result = {}
1929
  api_version = \
1930
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1931
  result['OS_API_VERSION'] = '%d' % api_version
1932
  result['INSTANCE_NAME'] = instance.name
1933
  result['INSTANCE_OS'] = instance.os
1934
  result['HYPERVISOR'] = instance.hypervisor
1935
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1936
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1937
  result['DEBUG_LEVEL'] = '%d' % debug
1938
  if api_version >= constants.OS_API_V15:
1939
    try:
1940
      variant = instance.os.split('+', 1)[1]
1941
    except IndexError:
1942
      variant = inst_os.supported_variants[0]
1943
    result['OS_VARIANT'] = variant
1944
  for idx, disk in enumerate(instance.disks):
1945
    real_disk = _OpenRealBD(disk)
1946
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1947
    result['DISK_%d_ACCESS' % idx] = disk.mode
1948
    if constants.HV_DISK_TYPE in instance.hvparams:
1949
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1950
        instance.hvparams[constants.HV_DISK_TYPE]
1951
    if disk.dev_type in constants.LDS_BLOCK:
1952
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1953
    elif disk.dev_type == constants.LD_FILE:
1954
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1955
        'file:%s' % disk.physical_id[0]
1956
  for idx, nic in enumerate(instance.nics):
1957
    result['NIC_%d_MAC' % idx] = nic.mac
1958
    if nic.ip:
1959
      result['NIC_%d_IP' % idx] = nic.ip
1960
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1961
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1962
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1963
    if nic.nicparams[constants.NIC_LINK]:
1964
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1965
    if constants.HV_NIC_TYPE in instance.hvparams:
1966
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1967
        instance.hvparams[constants.HV_NIC_TYPE]
1968

    
1969
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1970
    for key, value in source.items():
1971
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1972

    
1973
  return result
1974

    
1975

    
1976
def BlockdevGrow(disk, amount):
1977
  """Grow a stack of block devices.
1978

1979
  This function is called recursively, with the childrens being the
1980
  first ones to resize.
1981

1982
  @type disk: L{objects.Disk}
1983
  @param disk: the disk to be grown
1984
  @rtype: (status, result)
1985
  @return: a tuple with the status of the operation
1986
      (True/False), and the errors message if status
1987
      is False
1988

1989
  """
1990
  r_dev = _RecursiveFindBD(disk)
1991
  if r_dev is None:
1992
    _Fail("Cannot find block device %s", disk)
1993

    
1994
  try:
1995
    r_dev.Grow(amount)
1996
  except errors.BlockDeviceError, err:
1997
    _Fail("Failed to grow block device: %s", err, exc=True)
1998

    
1999

    
2000
def BlockdevSnapshot(disk):
2001
  """Create a snapshot copy of a block device.
2002

2003
  This function is called recursively, and the snapshot is actually created
2004
  just for the leaf lvm backend device.
2005

2006
  @type disk: L{objects.Disk}
2007
  @param disk: the disk to be snapshotted
2008
  @rtype: string
2009
  @return: snapshot disk path
2010

2011
  """
2012
  if disk.dev_type == constants.LD_DRBD8:
2013
    if not disk.children:
2014
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2015
            disk.unique_id)
2016
    return BlockdevSnapshot(disk.children[0])
2017
  elif disk.dev_type == constants.LD_LV:
2018
    r_dev = _RecursiveFindBD(disk)
2019
    if r_dev is not None:
2020
      # FIXME: choose a saner value for the snapshot size
2021
      # let's stay on the safe side and ask for the full size, for now
2022
      return r_dev.Snapshot(disk.size)
2023
    else:
2024
      _Fail("Cannot find block device %s", disk)
2025
  else:
2026
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2027
          disk.unique_id, disk.dev_type)
2028

    
2029

    
2030
def FinalizeExport(instance, snap_disks):
2031
  """Write out the export configuration information.
2032

2033
  @type instance: L{objects.Instance}
2034
  @param instance: the instance which we export, used for
2035
      saving configuration
2036
  @type snap_disks: list of L{objects.Disk}
2037
  @param snap_disks: list of snapshot block devices, which
2038
      will be used to get the actual name of the dump file
2039

2040
  @rtype: None
2041

2042
  """
2043
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2044
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2045

    
2046
  config = objects.SerializableConfigParser()
2047

    
2048
  config.add_section(constants.INISECT_EXP)
2049
  config.set(constants.INISECT_EXP, 'version', '0')
2050
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2051
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2052
  config.set(constants.INISECT_EXP, 'os', instance.os)
2053
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2054

    
2055
  config.add_section(constants.INISECT_INS)
2056
  config.set(constants.INISECT_INS, 'name', instance.name)
2057
  config.set(constants.INISECT_INS, 'memory', '%d' %
2058
             instance.beparams[constants.BE_MEMORY])
2059
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2060
             instance.beparams[constants.BE_VCPUS])
2061
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2062
  config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2063

    
2064
  nic_total = 0
2065
  for nic_count, nic in enumerate(instance.nics):
2066
    nic_total += 1
2067
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2068
               nic_count, '%s' % nic.mac)
2069
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2070
    for param in constants.NICS_PARAMETER_TYPES:
2071
      config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2072
                 '%s' % nic.nicparams.get(param, None))
2073
  # TODO: redundant: on load can read nics until it doesn't exist
2074
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2075

    
2076
  disk_total = 0
2077
  for disk_count, disk in enumerate(snap_disks):
2078
    if disk:
2079
      disk_total += 1
2080
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2081
                 ('%s' % disk.iv_name))
2082
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2083
                 ('%s' % disk.physical_id[1]))
2084
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2085
                 ('%d' % disk.size))
2086

    
2087
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2088

    
2089
  # New-style hypervisor/backend parameters
2090

    
2091
  config.add_section(constants.INISECT_HYP)
2092
  for name, value in instance.hvparams.items():
2093
    if name not in constants.HVC_GLOBALS:
2094
      config.set(constants.INISECT_HYP, name, str(value))
2095

    
2096
  config.add_section(constants.INISECT_BEP)
2097
  for name, value in instance.beparams.items():
2098
    config.set(constants.INISECT_BEP, name, str(value))
2099

    
2100
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2101
                  data=config.Dumps())
2102
  shutil.rmtree(finaldestdir, ignore_errors=True)
2103
  shutil.move(destdir, finaldestdir)
2104

    
2105

    
2106
def ExportInfo(dest):
2107
  """Get export configuration information.
2108

2109
  @type dest: str
2110
  @param dest: directory containing the export
2111

2112
  @rtype: L{objects.SerializableConfigParser}
2113
  @return: a serializable config file containing the
2114
      export info
2115

2116
  """
2117
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2118

    
2119
  config = objects.SerializableConfigParser()
2120
  config.read(cff)
2121

    
2122
  if (not config.has_section(constants.INISECT_EXP) or
2123
      not config.has_section(constants.INISECT_INS)):
2124
    _Fail("Export info file doesn't have the required fields")
2125

    
2126
  return config.Dumps()
2127

    
2128

    
2129
def ListExports():
2130
  """Return a list of exports currently available on this machine.
2131

2132
  @rtype: list
2133
  @return: list of the exports
2134

2135
  """
2136
  if os.path.isdir(constants.EXPORT_DIR):
2137
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2138
  else:
2139
    _Fail("No exports directory")
2140

    
2141

    
2142
def RemoveExport(export):
2143
  """Remove an existing export from the node.
2144

2145
  @type export: str
2146
  @param export: the name of the export to remove
2147
  @rtype: None
2148

2149
  """
2150
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2151

    
2152
  try:
2153
    shutil.rmtree(target)
2154
  except EnvironmentError, err:
2155
    _Fail("Error while removing the export: %s", err, exc=True)
2156

    
2157

    
2158
def BlockdevRename(devlist):
2159
  """Rename a list of block devices.
2160

2161
  @type devlist: list of tuples
2162
  @param devlist: list of tuples of the form  (disk,
2163
      new_logical_id, new_physical_id); disk is an
2164
      L{objects.Disk} object describing the current disk,
2165
      and new logical_id/physical_id is the name we
2166
      rename it to
2167
  @rtype: boolean
2168
  @return: True if all renames succeeded, False otherwise
2169

2170
  """
2171
  msgs = []
2172
  result = True
2173
  for disk, unique_id in devlist:
2174
    dev = _RecursiveFindBD(disk)
2175
    if dev is None:
2176
      msgs.append("Can't find device %s in rename" % str(disk))
2177
      result = False
2178
      continue
2179
    try:
2180
      old_rpath = dev.dev_path
2181
      dev.Rename(unique_id)
2182
      new_rpath = dev.dev_path
2183
      if old_rpath != new_rpath:
2184
        DevCacheManager.RemoveCache(old_rpath)
2185
        # FIXME: we should add the new cache information here, like:
2186
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2187
        # but we don't have the owner here - maybe parse from existing
2188
        # cache? for now, we only lose lvm data when we rename, which
2189
        # is less critical than DRBD or MD
2190
    except errors.BlockDeviceError, err:
2191
      msgs.append("Can't rename device '%s' to '%s': %s" %
2192
                  (dev, unique_id, err))
2193
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2194
      result = False
2195
  if not result:
2196
    _Fail("; ".join(msgs))
2197

    
2198

    
2199
def _TransformFileStorageDir(file_storage_dir):
2200
  """Checks whether given file_storage_dir is valid.
2201

2202
  Checks wheter the given file_storage_dir is within the cluster-wide
2203
  default file_storage_dir stored in SimpleStore. Only paths under that
2204
  directory are allowed.
2205

2206
  @type file_storage_dir: str
2207
  @param file_storage_dir: the path to check
2208

2209
  @return: the normalized path if valid, None otherwise
2210

2211
  """
2212
  if not constants.ENABLE_FILE_STORAGE:
2213
    _Fail("File storage disabled at configure time")
2214
  cfg = _GetConfig()
2215
  file_storage_dir = os.path.normpath(file_storage_dir)
2216
  base_file_storage_dir = cfg.GetFileStorageDir()
2217
  if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2218
      base_file_storage_dir):
2219
    _Fail("File storage directory '%s' is not under base file"
2220
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2221
  return file_storage_dir
2222

    
2223

    
2224
def CreateFileStorageDir(file_storage_dir):
2225
  """Create file storage directory.
2226

2227
  @type file_storage_dir: str
2228
  @param file_storage_dir: directory to create
2229

2230
  @rtype: tuple
2231
  @return: tuple with first element a boolean indicating wheter dir
2232
      creation was successful or not
2233

2234
  """
2235
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2236
  if os.path.exists(file_storage_dir):
2237
    if not os.path.isdir(file_storage_dir):
2238
      _Fail("Specified storage dir '%s' is not a directory",
2239
            file_storage_dir)
2240
  else:
2241
    try:
2242
      os.makedirs(file_storage_dir, 0750)
2243
    except OSError, err:
2244
      _Fail("Cannot create file storage directory '%s': %s",
2245
            file_storage_dir, err, exc=True)
2246

    
2247

    
2248
def RemoveFileStorageDir(file_storage_dir):
2249
  """Remove file storage directory.
2250

2251
  Remove it only if it's empty. If not log an error and return.
2252

2253
  @type file_storage_dir: str
2254
  @param file_storage_dir: the directory we should cleanup
2255
  @rtype: tuple (success,)
2256
  @return: tuple of one element, C{success}, denoting
2257
      whether the operation was successful
2258

2259
  """
2260
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2261
  if os.path.exists(file_storage_dir):
2262
    if not os.path.isdir(file_storage_dir):
2263
      _Fail("Specified Storage directory '%s' is not a directory",
2264
            file_storage_dir)
2265
    # deletes dir only if empty, otherwise we want to fail the rpc call
2266
    try:
2267
      os.rmdir(file_storage_dir)
2268
    except OSError, err:
2269
      _Fail("Cannot remove file storage directory '%s': %s",
2270
            file_storage_dir, err)
2271

    
2272

    
2273
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2274
  """Rename the file storage directory.
2275

2276
  @type old_file_storage_dir: str
2277
  @param old_file_storage_dir: the current path
2278
  @type new_file_storage_dir: str
2279
  @param new_file_storage_dir: the name we should rename to
2280
  @rtype: tuple (success,)
2281
  @return: tuple of one element, C{success}, denoting
2282
      whether the operation was successful
2283

2284
  """
2285
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2286
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2287
  if not os.path.exists(new_file_storage_dir):
2288
    if os.path.isdir(old_file_storage_dir):
2289
      try:
2290
        os.rename(old_file_storage_dir, new_file_storage_dir)
2291
      except OSError, err:
2292
        _Fail("Cannot rename '%s' to '%s': %s",
2293
              old_file_storage_dir, new_file_storage_dir, err)
2294
    else:
2295
      _Fail("Specified storage dir '%s' is not a directory",
2296
            old_file_storage_dir)
2297
  else:
2298
    if os.path.exists(old_file_storage_dir):
2299
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2300
            old_file_storage_dir, new_file_storage_dir)
2301

    
2302

    
2303
def _EnsureJobQueueFile(file_name):
2304
  """Checks whether the given filename is in the queue directory.
2305

2306
  @type file_name: str
2307
  @param file_name: the file name we should check
2308
  @rtype: None
2309
  @raises RPCFail: if the file is not valid
2310

2311
  """
2312
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2313
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2314

    
2315
  if not result:
2316
    _Fail("Passed job queue file '%s' does not belong to"
2317
          " the queue directory '%s'", file_name, queue_dir)
2318

    
2319

    
2320
def JobQueueUpdate(file_name, content):
2321
  """Updates a file in the queue directory.
2322

2323
  This is just a wrapper over L{utils.WriteFile}, with proper
2324
  checking.
2325

2326
  @type file_name: str
2327
  @param file_name: the job file name
2328
  @type content: str
2329
  @param content: the new job contents
2330
  @rtype: boolean
2331
  @return: the success of the operation
2332

2333
  """
2334
  _EnsureJobQueueFile(file_name)
2335

    
2336
  # Write and replace the file atomically
2337
  utils.WriteFile(file_name, data=_Decompress(content))
2338

    
2339

    
2340
def JobQueueRename(old, new):
2341
  """Renames a job queue file.
2342

2343
  This is just a wrapper over os.rename with proper checking.
2344

2345
  @type old: str
2346
  @param old: the old (actual) file name
2347
  @type new: str
2348
  @param new: the desired file name
2349
  @rtype: tuple
2350
  @return: the success of the operation and payload
2351

2352
  """
2353
  _EnsureJobQueueFile(old)
2354
  _EnsureJobQueueFile(new)
2355

    
2356
  utils.RenameFile(old, new, mkdir=True)
2357

    
2358

    
2359
def JobQueueSetDrainFlag(drain_flag):
2360
  """Set the drain flag for the queue.
2361

2362
  This will set or unset the queue drain flag.
2363

2364
  @type drain_flag: boolean
2365
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2366
  @rtype: truple
2367
  @return: always True, None
2368
  @warning: the function always returns True
2369

2370
  """
2371
  if drain_flag:
2372
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2373
  else:
2374
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2375

    
2376

    
2377
def BlockdevClose(instance_name, disks):
2378
  """Closes the given block devices.
2379

2380
  This means they will be switched to secondary mode (in case of
2381
  DRBD).
2382

2383
  @param instance_name: if the argument is not empty, the symlinks
2384
      of this instance will be removed
2385
  @type disks: list of L{objects.Disk}
2386
  @param disks: the list of disks to be closed
2387
  @rtype: tuple (success, message)
2388
  @return: a tuple of success and message, where success
2389
      indicates the succes of the operation, and message
2390
      which will contain the error details in case we
2391
      failed
2392

2393
  """
2394
  bdevs = []
2395
  for cf in disks:
2396
    rd = _RecursiveFindBD(cf)
2397
    if rd is None:
2398
      _Fail("Can't find device %s", cf)
2399
    bdevs.append(rd)
2400

    
2401
  msg = []
2402
  for rd in bdevs:
2403
    try:
2404
      rd.Close()
2405
    except errors.BlockDeviceError, err:
2406
      msg.append(str(err))
2407
  if msg:
2408
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2409
  else:
2410
    if instance_name:
2411
      _RemoveBlockDevLinks(instance_name, disks)
2412

    
2413

    
2414
def ValidateHVParams(hvname, hvparams):
2415
  """Validates the given hypervisor parameters.
2416

2417
  @type hvname: string
2418
  @param hvname: the hypervisor name
2419
  @type hvparams: dict
2420
  @param hvparams: the hypervisor parameters to be validated
2421
  @rtype: None
2422

2423
  """
2424
  try:
2425
    hv_type = hypervisor.GetHypervisor(hvname)
2426
    hv_type.ValidateParameters(hvparams)
2427
  except errors.HypervisorError, err:
2428
    _Fail(str(err), log=False)
2429

    
2430

    
2431
def DemoteFromMC():
2432
  """Demotes the current node from master candidate role.
2433

2434
  """
2435
  # try to ensure we're not the master by mistake
2436
  master, myself = ssconf.GetMasterAndMyself()
2437
  if master == myself:
2438
    _Fail("ssconf status shows I'm the master node, will not demote")
2439

    
2440
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2441
  if not result.failed:
2442
    _Fail("The master daemon is running, will not demote")
2443

    
2444
  try:
2445
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2446
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2447
  except EnvironmentError, err:
2448
    if err.errno != errno.ENOENT:
2449
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2450

    
2451
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2452

    
2453

    
2454
def _GetX509Filenames(cryptodir, name):
2455
  """Returns the full paths for the private key and certificate.
2456

2457
  """
2458
  return (utils.PathJoin(cryptodir, name),
2459
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2460
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2461

    
2462

    
2463
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2464
  """Creates a new X509 certificate for SSL/TLS.
2465

2466
  @type validity: int
2467
  @param validity: Validity in seconds
2468
  @rtype: tuple; (string, string)
2469
  @return: Certificate name and public part
2470

2471
  """
2472
  (key_pem, cert_pem) = \
2473
    utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2474
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2475

    
2476
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2477
                              prefix="x509-%s-" % utils.TimestampForFilename())
2478
  try:
2479
    name = os.path.basename(cert_dir)
2480
    assert len(name) > 5
2481

    
2482
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2483

    
2484
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2485
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2486

    
2487
    # Never return private key as it shouldn't leave the node
2488
    return (name, cert_pem)
2489
  except Exception:
2490
    shutil.rmtree(cert_dir, ignore_errors=True)
2491
    raise
2492

    
2493

    
2494
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2495
  """Removes a X509 certificate.
2496

2497
  @type name: string
2498
  @param name: Certificate name
2499

2500
  """
2501
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2502

    
2503
  utils.RemoveFile(key_file)
2504
  utils.RemoveFile(cert_file)
2505

    
2506
  try:
2507
    os.rmdir(cert_dir)
2508
  except EnvironmentError, err:
2509
    _Fail("Cannot remove certificate directory '%s': %s",
2510
          cert_dir, err)
2511

    
2512

    
2513
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2514
  """Returns the command for the requested input/output.
2515

2516
  @type instance: L{objects.Instance}
2517
  @param instance: The instance object
2518
  @param mode: Import/export mode
2519
  @param ieio: Input/output type
2520
  @param ieargs: Input/output arguments
2521

2522
  """
2523
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2524

    
2525
  env = None
2526
  prefix = None
2527
  suffix = None
2528

    
2529
  if ieio == constants.IEIO_FILE:
2530
    (filename, ) = ieargs
2531

    
2532
    if not utils.IsNormAbsPath(filename):
2533
      _Fail("Path '%s' is not normalized or absolute", filename)
2534

    
2535
    directory = os.path.normpath(os.path.dirname(filename))
2536

    
2537
    if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2538
        constants.EXPORT_DIR):
2539
      _Fail("File '%s' is not under exports directory '%s'",
2540
            filename, constants.EXPORT_DIR)
2541

    
2542
    # Create directory
2543
    utils.Makedirs(directory, mode=0750)
2544

    
2545
    quoted_filename = utils.ShellQuote(filename)
2546

    
2547
    if mode == constants.IEM_IMPORT:
2548
      suffix = "> %s" % quoted_filename
2549
    elif mode == constants.IEM_EXPORT:
2550
      suffix = "< %s" % quoted_filename
2551

    
2552
  elif ieio == constants.IEIO_RAW_DISK:
2553
    (disk, ) = ieargs
2554

    
2555
    real_disk = _OpenRealBD(disk)
2556

    
2557
    if mode == constants.IEM_IMPORT:
2558
      # we set here a smaller block size as, due to transport buffering, more
2559
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2560
      # is not already there or we pass a wrong path; we use notrunc to no
2561
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2562
      # much memory; this means that at best, we flush every 64k, which will
2563
      # not be very fast
2564
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2565
                                    " bs=%s oflag=dsync"),
2566
                                    real_disk.dev_path,
2567
                                    str(64 * 1024))
2568

    
2569
    elif mode == constants.IEM_EXPORT:
2570
      # the block size on the read dd is 1MiB to match our units
2571
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2572
                                   real_disk.dev_path,
2573
                                   str(1024 * 1024), # 1 MB
2574
                                   str(disk.size))
2575

    
2576
  elif ieio == constants.IEIO_SCRIPT:
2577
    (disk, disk_index, ) = ieargs
2578

    
2579
    assert isinstance(disk_index, (int, long))
2580

    
2581
    real_disk = _OpenRealBD(disk)
2582

    
2583
    inst_os = OSFromDisk(instance.os)
2584
    env = OSEnvironment(instance, inst_os)
2585

    
2586
    if mode == constants.IEM_IMPORT:
2587
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2588
      env["IMPORT_INDEX"] = str(disk_index)
2589
      script = inst_os.import_script
2590

    
2591
    elif mode == constants.IEM_EXPORT:
2592
      env["EXPORT_DEVICE"] = real_disk.dev_path
2593
      env["EXPORT_INDEX"] = str(disk_index)
2594
      script = inst_os.export_script
2595

    
2596
    # TODO: Pass special environment only to script
2597
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2598

    
2599
    if mode == constants.IEM_IMPORT:
2600
      suffix = "| %s" % script_cmd
2601

    
2602
    elif mode == constants.IEM_EXPORT:
2603
      prefix = "%s |" % script_cmd
2604

    
2605
  else:
2606
    _Fail("Invalid %s I/O mode %r", mode, ieio)
2607

    
2608
  return (env, prefix, suffix)
2609

    
2610

    
2611
def _CreateImportExportStatusDir(prefix):
2612
  """Creates status directory for import/export.
2613

2614
  """
2615
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2616
                          prefix=("%s-%s-" %
2617
                                  (prefix, utils.TimestampForFilename())))
2618

    
2619

    
2620
def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2621
  """Starts an import or export daemon.
2622

2623
  @param mode: Import/output mode
2624
  @type opts: L{objects.ImportExportOptions}
2625
  @param opts: Daemon options
2626
  @type host: string
2627
  @param host: Remote host for export (None for import)
2628
  @type port: int
2629
  @param port: Remote port for export (None for import)
2630
  @type instance: L{objects.Instance}
2631
  @param instance: Instance object
2632
  @param ieio: Input/output type
2633
  @param ieioargs: Input/output arguments
2634

2635
  """
2636
  if mode == constants.IEM_IMPORT:
2637
    prefix = "import"
2638

    
2639
    if not (host is None and port is None):
2640
      _Fail("Can not specify host or port on import")
2641

    
2642
  elif mode == constants.IEM_EXPORT:
2643
    prefix = "export"
2644

    
2645
    if host is None or port is None:
2646
      _Fail("Host and port must be specified for an export")
2647

    
2648
  else:
2649
    _Fail("Invalid mode %r", mode)
2650

    
2651
  if (opts.key_name is None) ^ (opts.ca_pem is None):
2652
    _Fail("Cluster certificate can only be used for both key and CA")
2653

    
2654
  (cmd_env, cmd_prefix, cmd_suffix) = \
2655
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2656

    
2657
  if opts.key_name is None:
2658
    # Use server.pem
2659
    key_path = constants.NODED_CERT_FILE
2660
    cert_path = constants.NODED_CERT_FILE
2661
    assert opts.ca_pem is None
2662
  else:
2663
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2664
                                                 opts.key_name)
2665
    assert opts.ca_pem is not None
2666

    
2667
  for i in [key_path, cert_path]:
2668
    if not os.path.exists(i):
2669
      _Fail("File '%s' does not exist" % i)
2670

    
2671
  status_dir = _CreateImportExportStatusDir(prefix)
2672
  try:
2673
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2674
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2675
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2676

    
2677
    if opts.ca_pem is None:
2678
      # Use server.pem
2679
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
2680
    else:
2681
      ca = opts.ca_pem
2682

    
2683
    # Write CA file
2684
    utils.WriteFile(ca_file, data=ca, mode=0400)
2685

    
2686
    cmd = [
2687
      constants.IMPORT_EXPORT_DAEMON,
2688
      status_file, mode,
2689
      "--key=%s" % key_path,
2690
      "--cert=%s" % cert_path,
2691
      "--ca=%s" % ca_file,
2692
      ]
2693

    
2694
    if host:
2695
      cmd.append("--host=%s" % host)
2696

    
2697
    if port:
2698
      cmd.append("--port=%s" % port)
2699

    
2700
    if opts.compress:
2701
      cmd.append("--compress=%s" % opts.compress)
2702

    
2703
    if cmd_prefix:
2704
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
2705

    
2706
    if cmd_suffix:
2707
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
2708

    
2709
    logfile = _InstanceLogName(prefix, instance.os, instance.name)
2710

    
2711
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2712
    # support for receiving a file descriptor for output
2713
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2714
                      output=logfile)
2715

    
2716
    # The import/export name is simply the status directory name
2717
    return os.path.basename(status_dir)
2718

    
2719
  except Exception:
2720
    shutil.rmtree(status_dir, ignore_errors=True)
2721
    raise
2722

    
2723

    
2724
def GetImportExportStatus(names):
2725
  """Returns import/export daemon status.
2726

2727
  @type names: sequence
2728
  @param names: List of names
2729
  @rtype: List of dicts
2730
  @return: Returns a list of the state of each named import/export or None if a
2731
           status couldn't be read
2732

2733
  """
2734
  result = []
2735

    
2736
  for name in names:
2737
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2738
                                 _IES_STATUS_FILE)
2739

    
2740
    try:
2741
      data = utils.ReadFile(status_file)
2742
    except EnvironmentError, err:
2743
      if err.errno != errno.ENOENT:
2744
        raise
2745
      data = None
2746

    
2747
    if not data:
2748
      result.append(None)
2749
      continue
2750

    
2751
    result.append(serializer.LoadJson(data))
2752

    
2753
  return result
2754

    
2755

    
2756
def AbortImportExport(name):
2757
  """Sends SIGTERM to a running import/export daemon.
2758

2759
  """
2760
  logging.info("Abort import/export %s", name)
2761

    
2762
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2763
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2764

    
2765
  if pid:
2766
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2767
                 name, pid)
2768
    os.kill(pid, signal.SIGTERM)
2769

    
2770

    
2771
def CleanupImportExport(name):
2772
  """Cleanup after an import or export.
2773

2774
  If the import/export daemon is still running it's killed. Afterwards the
2775
  whole status directory is removed.
2776

2777
  """
2778
  logging.info("Finalizing import/export %s", name)
2779

    
2780
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2781

    
2782
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2783

    
2784
  if pid:
2785
    logging.info("Import/export %s is still running with PID %s",
2786
                 name, pid)
2787
    utils.KillProcess(pid, waitpid=False)
2788

    
2789
  shutil.rmtree(status_dir, ignore_errors=True)
2790

    
2791

    
2792
def _FindDisks(nodes_ip, disks):
2793
  """Sets the physical ID on disks and returns the block devices.
2794

2795
  """
2796
  # set the correct physical ID
2797
  my_name = utils.HostInfo().name
2798
  for cf in disks:
2799
    cf.SetPhysicalID(my_name, nodes_ip)
2800

    
2801
  bdevs = []
2802

    
2803
  for cf in disks:
2804
    rd = _RecursiveFindBD(cf)
2805
    if rd is None:
2806
      _Fail("Can't find device %s", cf)
2807
    bdevs.append(rd)
2808
  return bdevs
2809

    
2810

    
2811
def DrbdDisconnectNet(nodes_ip, disks):
2812
  """Disconnects the network on a list of drbd devices.
2813

2814
  """
2815
  bdevs = _FindDisks(nodes_ip, disks)
2816

    
2817
  # disconnect disks
2818
  for rd in bdevs:
2819
    try:
2820
      rd.DisconnectNet()
2821
    except errors.BlockDeviceError, err:
2822
      _Fail("Can't change network configuration to standalone mode: %s",
2823
            err, exc=True)
2824

    
2825

    
2826
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2827
  """Attaches the network on a list of drbd devices.
2828

2829
  """
2830
  bdevs = _FindDisks(nodes_ip, disks)
2831

    
2832
  if multimaster:
2833
    for idx, rd in enumerate(bdevs):
2834
      try:
2835
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2836
      except EnvironmentError, err:
2837
        _Fail("Can't create symlink: %s", err)
2838
  # reconnect disks, switch to new master configuration and if
2839
  # needed primary mode
2840
  for rd in bdevs:
2841
    try:
2842
      rd.AttachNet(multimaster)
2843
    except errors.BlockDeviceError, err:
2844
      _Fail("Can't change network configuration: %s", err)
2845

    
2846
  # wait until the disks are connected; we need to retry the re-attach
2847
  # if the device becomes standalone, as this might happen if the one
2848
  # node disconnects and reconnects in a different mode before the
2849
  # other node reconnects; in this case, one or both of the nodes will
2850
  # decide it has wrong configuration and switch to standalone
2851

    
2852
  def _Attach():
2853
    all_connected = True
2854

    
2855
    for rd in bdevs:
2856
      stats = rd.GetProcStatus()
2857

    
2858
      all_connected = (all_connected and
2859
                       (stats.is_connected or stats.is_in_resync))
2860

    
2861
      if stats.is_standalone:
2862
        # peer had different config info and this node became
2863
        # standalone, even though this should not happen with the
2864
        # new staged way of changing disk configs
2865
        try:
2866
          rd.AttachNet(multimaster)
2867
        except errors.BlockDeviceError, err:
2868
          _Fail("Can't change network configuration: %s", err)
2869

    
2870
    if not all_connected:
2871
      raise utils.RetryAgain()
2872

    
2873
  try:
2874
    # Start with a delay of 100 miliseconds and go up to 5 seconds
2875
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2876
  except utils.RetryTimeout:
2877
    _Fail("Timeout in disk reconnecting")
2878

    
2879
  if multimaster:
2880
    # change to primary mode
2881
    for rd in bdevs:
2882
      try:
2883
        rd.Open()
2884
      except errors.BlockDeviceError, err:
2885
        _Fail("Can't change to primary mode: %s", err)
2886

    
2887

    
2888
def DrbdWaitSync(nodes_ip, disks):
2889
  """Wait until DRBDs have synchronized.
2890

2891
  """
2892
  def _helper(rd):
2893
    stats = rd.GetProcStatus()
2894
    if not (stats.is_connected or stats.is_in_resync):
2895
      raise utils.RetryAgain()
2896
    return stats
2897

    
2898
  bdevs = _FindDisks(nodes_ip, disks)
2899

    
2900
  min_resync = 100
2901
  alldone = True
2902
  for rd in bdevs:
2903
    try:
2904
      # poll each second for 15 seconds
2905
      stats = utils.Retry(_helper, 1, 15, args=[rd])
2906
    except utils.RetryTimeout:
2907
      stats = rd.GetProcStatus()
2908
      # last check
2909
      if not (stats.is_connected or stats.is_in_resync):
2910
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2911
    alldone = alldone and (not stats.is_in_resync)
2912
    if stats.sync_percent is not None:
2913
      min_resync = min(min_resync, stats.sync_percent)
2914

    
2915
  return (alldone, min_resync)
2916

    
2917

    
2918
def PowercycleNode(hypervisor_type):
2919
  """Hard-powercycle the node.
2920

2921
  Because we need to return first, and schedule the powercycle in the
2922
  background, we won't be able to report failures nicely.
2923

2924
  """
2925
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2926
  try:
2927
    pid = os.fork()
2928
  except OSError:
2929
    # if we can't fork, we'll pretend that we're in the child process
2930
    pid = 0
2931
  if pid > 0:
2932
    return "Reboot scheduled in 5 seconds"
2933
  # ensure the child is running on ram
2934
  try:
2935
    utils.Mlockall()
2936
  except Exception: # pylint: disable-msg=W0703
2937
    pass
2938
  time.sleep(5)
2939
  hyper.PowercycleNode()
2940

    
2941

    
2942
class HooksRunner(object):
2943
  """Hook runner.
2944

2945
  This class is instantiated on the node side (ganeti-noded) and not
2946
  on the master side.
2947

2948
  """
2949
  def __init__(self, hooks_base_dir=None):
2950
    """Constructor for hooks runner.
2951

2952
    @type hooks_base_dir: str or None
2953
    @param hooks_base_dir: if not None, this overrides the
2954
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2955

2956
    """
2957
    if hooks_base_dir is None:
2958
      hooks_base_dir = constants.HOOKS_BASE_DIR
2959
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
2960
    # constant
2961
    self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2962

    
2963
  def RunHooks(self, hpath, phase, env):
2964
    """Run the scripts in the hooks directory.
2965

2966
    @type hpath: str
2967
    @param hpath: the path to the hooks directory which
2968
        holds the scripts
2969
    @type phase: str
2970
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2971
        L{constants.HOOKS_PHASE_POST}
2972
    @type env: dict
2973
    @param env: dictionary with the environment for the hook
2974
    @rtype: list
2975
    @return: list of 3-element tuples:
2976
      - script path
2977
      - script result, either L{constants.HKR_SUCCESS} or
2978
        L{constants.HKR_FAIL}
2979
      - output of the script
2980

2981
    @raise errors.ProgrammerError: for invalid input
2982
        parameters
2983

2984
    """
2985
    if phase == constants.HOOKS_PHASE_PRE:
2986
      suffix = "pre"
2987
    elif phase == constants.HOOKS_PHASE_POST:
2988
      suffix = "post"
2989
    else:
2990
      _Fail("Unknown hooks phase '%s'", phase)
2991

    
2992

    
2993
    subdir = "%s-%s.d" % (hpath, suffix)
2994
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2995

    
2996
    results = []
2997

    
2998
    if not os.path.isdir(dir_name):
2999
      # for non-existing/non-dirs, we simply exit instead of logging a
3000
      # warning at every operation
3001
      return results
3002

    
3003
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3004

    
3005
    for (relname, relstatus, runresult)  in runparts_results:
3006
      if relstatus == constants.RUNPARTS_SKIP:
3007
        rrval = constants.HKR_SKIP
3008
        output = ""
3009
      elif relstatus == constants.RUNPARTS_ERR:
3010
        rrval = constants.HKR_FAIL
3011
        output = "Hook script execution error: %s" % runresult
3012
      elif relstatus == constants.RUNPARTS_RUN:
3013
        if runresult.failed:
3014
          rrval = constants.HKR_FAIL
3015
        else:
3016
          rrval = constants.HKR_SUCCESS
3017
        output = utils.SafeEncode(runresult.output.strip())
3018
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3019

    
3020
    return results
3021

    
3022

    
3023
class IAllocatorRunner(object):
3024
  """IAllocator runner.
3025

3026
  This class is instantiated on the node side (ganeti-noded) and not on
3027
  the master side.
3028

3029
  """
3030
  @staticmethod
3031
  def Run(name, idata):
3032
    """Run an iallocator script.
3033

3034
    @type name: str
3035
    @param name: the iallocator script name
3036
    @type idata: str
3037
    @param idata: the allocator input data
3038

3039
    @rtype: tuple
3040
    @return: two element tuple of:
3041
       - status
3042
       - either error message or stdout of allocator (for success)
3043

3044
    """
3045
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3046
                                  os.path.isfile)
3047
    if alloc_script is None:
3048
      _Fail("iallocator module '%s' not found in the search path", name)
3049

    
3050
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3051
    try:
3052
      os.write(fd, idata)
3053
      os.close(fd)
3054
      result = utils.RunCmd([alloc_script, fin_name])
3055
      if result.failed:
3056
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3057
              name, result.fail_reason, result.output)
3058
    finally:
3059
      os.unlink(fin_name)
3060

    
3061
    return result.stdout
3062

    
3063

    
3064
class DevCacheManager(object):
3065
  """Simple class for managing a cache of block device information.
3066

3067
  """
3068
  _DEV_PREFIX = "/dev/"
3069
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3070

    
3071
  @classmethod
3072
  def _ConvertPath(cls, dev_path):
3073
    """Converts a /dev/name path to the cache file name.
3074

3075
    This replaces slashes with underscores and strips the /dev
3076
    prefix. It then returns the full path to the cache file.
3077

3078
    @type dev_path: str
3079
    @param dev_path: the C{/dev/} path name
3080
    @rtype: str
3081
    @return: the converted path name
3082

3083
    """
3084
    if dev_path.startswith(cls._DEV_PREFIX):
3085
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3086
    dev_path = dev_path.replace("/", "_")
3087
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3088
    return fpath
3089

    
3090
  @classmethod
3091
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3092
    """Updates the cache information for a given device.
3093

3094
    @type dev_path: str
3095
    @param dev_path: the pathname of the device
3096
    @type owner: str
3097
    @param owner: the owner (instance name) of the device
3098
    @type on_primary: bool
3099
    @param on_primary: whether this is the primary
3100
        node nor not
3101
    @type iv_name: str
3102
    @param iv_name: the instance-visible name of the
3103
        device, as in objects.Disk.iv_name
3104

3105
    @rtype: None
3106

3107
    """
3108
    if dev_path is None:
3109
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3110
      return
3111
    fpath = cls._ConvertPath(dev_path)
3112
    if on_primary:
3113
      state = "primary"
3114
    else:
3115
      state = "secondary"
3116
    if iv_name is None:
3117
      iv_name = "not_visible"
3118
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3119
    try:
3120
      utils.WriteFile(fpath, data=fdata)
3121
    except EnvironmentError, err:
3122
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3123

    
3124
  @classmethod
3125
  def RemoveCache(cls, dev_path):
3126
    """Remove data for a dev_path.
3127

3128
    This is just a wrapper over L{utils.RemoveFile} with a converted
3129
    path name and logging.
3130

3131
    @type dev_path: str
3132
    @param dev_path: the pathname of the device
3133

3134
    @rtype: None
3135

3136
    """
3137
    if dev_path is None:
3138
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3139
      return
3140
    fpath = cls._ConvertPath(dev_path)
3141
    try:
3142
      utils.RemoveFile(fpath)
3143
    except EnvironmentError, err:
3144
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)