Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 34fbc862

History | View | Annotate | Download (106.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63

    
64

    
65
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66
_ALLOWED_CLEAN_DIRS = frozenset([
67
  constants.DATA_DIR,
68
  constants.JOB_QUEUE_ARCHIVE_DIR,
69
  constants.QUEUE_DIR,
70
  constants.CRYPTO_KEYS_DIR,
71
  ])
72
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73
_X509_KEY_FILE = "key"
74
_X509_CERT_FILE = "cert"
75
_IES_STATUS_FILE = "status"
76
_IES_PID_FILE = "pid"
77
_IES_CA_FILE = "ca"
78

    
79
#: Valid LVS output line regex
80
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81

    
82

    
83
class RPCFail(Exception):
84
  """Class denoting RPC failure.
85

86
  Its argument is the error message.
87

88
  """
89

    
90

    
91
def _Fail(msg, *args, **kwargs):
92
  """Log an error and the raise an RPCFail exception.
93

94
  This exception is then handled specially in the ganeti daemon and
95
  turned into a 'failed' return type. As such, this function is a
96
  useful shortcut for logging the error and returning it to the master
97
  daemon.
98

99
  @type msg: string
100
  @param msg: the text of the exception
101
  @raise RPCFail
102

103
  """
104
  if args:
105
    msg = msg % args
106
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
107
    if "exc" in kwargs and kwargs["exc"]:
108
      logging.exception(msg)
109
    else:
110
      logging.error(msg)
111
  raise RPCFail(msg)
112

    
113

    
114
def _GetConfig():
115
  """Simple wrapper to return a SimpleStore.
116

117
  @rtype: L{ssconf.SimpleStore}
118
  @return: a SimpleStore instance
119

120
  """
121
  return ssconf.SimpleStore()
122

    
123

    
124
def _GetSshRunner(cluster_name):
125
  """Simple wrapper to return an SshRunner.
126

127
  @type cluster_name: str
128
  @param cluster_name: the cluster name, which is needed
129
      by the SshRunner constructor
130
  @rtype: L{ssh.SshRunner}
131
  @return: an SshRunner instance
132

133
  """
134
  return ssh.SshRunner(cluster_name)
135

    
136

    
137
def _Decompress(data):
138
  """Unpacks data compressed by the RPC client.
139

140
  @type data: list or tuple
141
  @param data: Data sent by RPC client
142
  @rtype: str
143
  @return: Decompressed data
144

145
  """
146
  assert isinstance(data, (list, tuple))
147
  assert len(data) == 2
148
  (encoding, content) = data
149
  if encoding == constants.RPC_ENCODING_NONE:
150
    return content
151
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152
    return zlib.decompress(base64.b64decode(content))
153
  else:
154
    raise AssertionError("Unknown data encoding")
155

    
156

    
157
def _CleanDirectory(path, exclude=None):
158
  """Removes all regular files in a directory.
159

160
  @type path: str
161
  @param path: the directory to clean
162
  @type exclude: list
163
  @param exclude: list of files to be excluded, defaults
164
      to the empty list
165

166
  """
167
  if path not in _ALLOWED_CLEAN_DIRS:
168
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169
          path)
170

    
171
  if not os.path.isdir(path):
172
    return
173
  if exclude is None:
174
    exclude = []
175
  else:
176
    # Normalize excluded paths
177
    exclude = [os.path.normpath(i) for i in exclude]
178

    
179
  for rel_name in utils.ListVisibleFiles(path):
180
    full_name = utils.PathJoin(path, rel_name)
181
    if full_name in exclude:
182
      continue
183
    if os.path.isfile(full_name) and not os.path.islink(full_name):
184
      utils.RemoveFile(full_name)
185

    
186

    
187
def _BuildUploadFileList():
188
  """Build the list of allowed upload files.
189

190
  This is abstracted so that it's built only once at module import time.
191

192
  """
193
  allowed_files = set([
194
    constants.CLUSTER_CONF_FILE,
195
    constants.ETC_HOSTS,
196
    constants.SSH_KNOWN_HOSTS_FILE,
197
    constants.VNC_PASSWORD_FILE,
198
    constants.RAPI_CERT_FILE,
199
    constants.SPICE_CERT_FILE,
200
    constants.SPICE_CACERT_FILE,
201
    constants.RAPI_USERS_FILE,
202
    constants.CONFD_HMAC_KEY,
203
    constants.CLUSTER_DOMAIN_SECRET_FILE,
204
    ])
205

    
206
  for hv_name in constants.HYPER_TYPES:
207
    hv_class = hypervisor.GetHypervisorClass(hv_name)
208
    allowed_files.update(hv_class.GetAncillaryFiles())
209

    
210
  return frozenset(allowed_files)
211

    
212

    
213
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
214

    
215

    
216
def JobQueuePurge():
217
  """Removes job queue files and archived jobs.
218

219
  @rtype: tuple
220
  @return: True, None
221

222
  """
223
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
225

    
226

    
227
def GetMasterInfo():
228
  """Returns master information.
229

230
  This is an utility function to compute master information, either
231
  for consumption here or from the node daemon.
232

233
  @rtype: tuple
234
  @return: master_netdev, master_ip, master_name, primary_ip_family
235
  @raise RPCFail: in case of errors
236

237
  """
238
  try:
239
    cfg = _GetConfig()
240
    master_netdev = cfg.GetMasterNetdev()
241
    master_ip = cfg.GetMasterIP()
242
    master_node = cfg.GetMasterNode()
243
    primary_ip_family = cfg.GetPrimaryIPFamily()
244
  except errors.ConfigurationError, err:
245
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
246
  return (master_netdev, master_ip, master_node, primary_ip_family)
247

    
248

    
249
def StartMaster(start_daemons, no_voting):
250
  """Activate local node as master node.
251

252
  The function will either try activate the IP address of the master
253
  (unless someone else has it) or also start the master daemons, based
254
  on the start_daemons parameter.
255

256
  @type start_daemons: boolean
257
  @param start_daemons: whether to start the master daemons
258
      (ganeti-masterd and ganeti-rapi), or (if false) activate the
259
      master ip
260
  @type no_voting: boolean
261
  @param no_voting: whether to start ganeti-masterd without a node vote
262
      (if start_daemons is True), but still non-interactively
263
  @rtype: None
264

265
  """
266
  # GetMasterInfo will raise an exception if not able to return data
267
  master_netdev, master_ip, _, family = GetMasterInfo()
268

    
269
  err_msgs = []
270
  # either start the master and rapi daemons
271
  if start_daemons:
272
    if no_voting:
273
      masterd_args = "--no-voting --yes-do-it"
274
    else:
275
      masterd_args = ""
276

    
277
    env = {
278
      "EXTRA_MASTERD_ARGS": masterd_args,
279
      }
280

    
281
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
282
    if result.failed:
283
      msg = "Can't start Ganeti master: %s" % result.output
284
      logging.error(msg)
285
      err_msgs.append(msg)
286
  # or activate the IP
287
  else:
288
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
289
      if netutils.IPAddress.Own(master_ip):
290
        # we already have the ip:
291
        logging.debug("Master IP already configured, doing nothing")
292
      else:
293
        msg = "Someone else has the master ip, not activating"
294
        logging.error(msg)
295
        err_msgs.append(msg)
296
    else:
297
      ipcls = netutils.IP4Address
298
      if family == netutils.IP6Address.family:
299
        ipcls = netutils.IP6Address
300

    
301
      result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
302
                             "%s/%d" % (master_ip, ipcls.iplen),
303
                             "dev", master_netdev, "label",
304
                             "%s:0" % master_netdev])
305
      if result.failed:
306
        msg = "Can't activate master IP: %s" % result.output
307
        logging.error(msg)
308
        err_msgs.append(msg)
309

    
310
      # we ignore the exit code of the following cmds
311
      if ipcls == netutils.IP4Address:
312
        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
313
                      master_ip, master_ip])
314
      elif ipcls == netutils.IP6Address:
315
        try:
316
          utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
317
        except errors.OpExecError:
318
          # TODO: Better error reporting
319
          logging.warning("Can't execute ndisc6, please install if missing")
320

    
321
  if err_msgs:
322
    _Fail("; ".join(err_msgs))
323

    
324

    
325
def StopMaster(stop_daemons):
326
  """Deactivate this node as master.
327

328
  The function will always try to deactivate the IP address of the
329
  master. It will also stop the master daemons depending on the
330
  stop_daemons parameter.
331

332
  @type stop_daemons: boolean
333
  @param stop_daemons: whether to also stop the master daemons
334
      (ganeti-masterd and ganeti-rapi)
335
  @rtype: None
336

337
  """
338
  # TODO: log and report back to the caller the error failures; we
339
  # need to decide in which case we fail the RPC for this
340

    
341
  # GetMasterInfo will raise an exception if not able to return data
342
  master_netdev, master_ip, _, family = GetMasterInfo()
343

    
344
  ipcls = netutils.IP4Address
345
  if family == netutils.IP6Address.family:
346
    ipcls = netutils.IP6Address
347

    
348
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
349
                         "%s/%d" % (master_ip, ipcls.iplen),
350
                         "dev", master_netdev])
351
  if result.failed:
352
    logging.error("Can't remove the master IP, error: %s", result.output)
353
    # but otherwise ignore the failure
354

    
355
  if stop_daemons:
356
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
357
    if result.failed:
358
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
359
                    " and error %s",
360
                    result.cmd, result.exit_code, result.output)
361

    
362

    
363
def EtcHostsModify(mode, host, ip):
364
  """Modify a host entry in /etc/hosts.
365

366
  @param mode: The mode to operate. Either add or remove entry
367
  @param host: The host to operate on
368
  @param ip: The ip associated with the entry
369

370
  """
371
  if mode == constants.ETC_HOSTS_ADD:
372
    if not ip:
373
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
374
              " present")
375
    utils.AddHostToEtcHosts(host, ip)
376
  elif mode == constants.ETC_HOSTS_REMOVE:
377
    if ip:
378
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
379
              " parameter is present")
380
    utils.RemoveHostFromEtcHosts(host)
381
  else:
382
    RPCFail("Mode not supported")
383

    
384

    
385
def LeaveCluster(modify_ssh_setup):
386
  """Cleans up and remove the current node.
387

388
  This function cleans up and prepares the current node to be removed
389
  from the cluster.
390

391
  If processing is successful, then it raises an
392
  L{errors.QuitGanetiException} which is used as a special case to
393
  shutdown the node daemon.
394

395
  @param modify_ssh_setup: boolean
396

397
  """
398
  _CleanDirectory(constants.DATA_DIR)
399
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
400
  JobQueuePurge()
401

    
402
  if modify_ssh_setup:
403
    try:
404
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
405

    
406
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
407

    
408
      utils.RemoveFile(priv_key)
409
      utils.RemoveFile(pub_key)
410
    except errors.OpExecError:
411
      logging.exception("Error while processing ssh files")
412

    
413
  try:
414
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
415
    utils.RemoveFile(constants.RAPI_CERT_FILE)
416
    utils.RemoveFile(constants.SPICE_CERT_FILE)
417
    utils.RemoveFile(constants.SPICE_CACERT_FILE)
418
    utils.RemoveFile(constants.NODED_CERT_FILE)
419
  except: # pylint: disable=W0702
420
    logging.exception("Error while removing cluster secrets")
421

    
422
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
423
  if result.failed:
424
    logging.error("Command %s failed with exitcode %s and error %s",
425
                  result.cmd, result.exit_code, result.output)
426

    
427
  # Raise a custom exception (handled in ganeti-noded)
428
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
429

    
430

    
431
def GetNodeInfo(vgname, hypervisor_type):
432
  """Gives back a hash with different information about the node.
433

434
  @type vgname: C{string}
435
  @param vgname: the name of the volume group to ask for disk space information
436
  @type hypervisor_type: C{str}
437
  @param hypervisor_type: the name of the hypervisor to ask for
438
      memory information
439
  @rtype: C{dict}
440
  @return: dictionary with the following keys:
441
      - vg_size is the size of the configured volume group in MiB
442
      - vg_free is the free size of the volume group in MiB
443
      - memory_dom0 is the memory allocated for domain0 in MiB
444
      - memory_free is the currently available (free) ram in MiB
445
      - memory_total is the total number of ram in MiB
446
      - hv_version: the hypervisor version, if available
447

448
  """
449
  outputarray = {}
450

    
451
  if vgname is not None:
452
    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
453
    vg_free = vg_size = None
454
    if vginfo:
455
      vg_free = int(round(vginfo[0][0], 0))
456
      vg_size = int(round(vginfo[0][1], 0))
457
    outputarray["vg_size"] = vg_size
458
    outputarray["vg_free"] = vg_free
459

    
460
  if hypervisor_type is not None:
461
    hyper = hypervisor.GetHypervisor(hypervisor_type)
462
    hyp_info = hyper.GetNodeInfo()
463
    if hyp_info is not None:
464
      outputarray.update(hyp_info)
465

    
466
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
467

    
468
  return outputarray
469

    
470

    
471
def VerifyNode(what, cluster_name):
472
  """Verify the status of the local node.
473

474
  Based on the input L{what} parameter, various checks are done on the
475
  local node.
476

477
  If the I{filelist} key is present, this list of
478
  files is checksummed and the file/checksum pairs are returned.
479

480
  If the I{nodelist} key is present, we check that we have
481
  connectivity via ssh with the target nodes (and check the hostname
482
  report).
483

484
  If the I{node-net-test} key is present, we check that we have
485
  connectivity to the given nodes via both primary IP and, if
486
  applicable, secondary IPs.
487

488
  @type what: C{dict}
489
  @param what: a dictionary of things to check:
490
      - filelist: list of files for which to compute checksums
491
      - nodelist: list of nodes we should check ssh communication with
492
      - node-net-test: list of nodes we should check node daemon port
493
        connectivity with
494
      - hypervisor: list with hypervisors to run the verify for
495
  @rtype: dict
496
  @return: a dictionary with the same keys as the input dict, and
497
      values representing the result of the checks
498

499
  """
500
  result = {}
501
  my_name = netutils.Hostname.GetSysName()
502
  port = netutils.GetDaemonPort(constants.NODED)
503
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
504

    
505
  if constants.NV_HYPERVISOR in what and vm_capable:
506
    result[constants.NV_HYPERVISOR] = tmp = {}
507
    for hv_name in what[constants.NV_HYPERVISOR]:
508
      try:
509
        val = hypervisor.GetHypervisor(hv_name).Verify()
510
      except errors.HypervisorError, err:
511
        val = "Error while checking hypervisor: %s" % str(err)
512
      tmp[hv_name] = val
513

    
514
  if constants.NV_HVPARAMS in what and vm_capable:
515
    result[constants.NV_HVPARAMS] = tmp = []
516
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
517
      try:
518
        logging.info("Validating hv %s, %s", hv_name, hvparms)
519
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
520
      except errors.HypervisorError, err:
521
        tmp.append((source, hv_name, str(err)))
522

    
523
  if constants.NV_FILELIST in what:
524
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
525
      what[constants.NV_FILELIST])
526

    
527
  if constants.NV_NODELIST in what:
528
    result[constants.NV_NODELIST] = tmp = {}
529
    random.shuffle(what[constants.NV_NODELIST])
530
    for node in what[constants.NV_NODELIST]:
531
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
532
      if not success:
533
        tmp[node] = message
534

    
535
  if constants.NV_NODENETTEST in what:
536
    result[constants.NV_NODENETTEST] = tmp = {}
537
    my_pip = my_sip = None
538
    for name, pip, sip in what[constants.NV_NODENETTEST]:
539
      if name == my_name:
540
        my_pip = pip
541
        my_sip = sip
542
        break
543
    if not my_pip:
544
      tmp[my_name] = ("Can't find my own primary/secondary IP"
545
                      " in the node list")
546
    else:
547
      for name, pip, sip in what[constants.NV_NODENETTEST]:
548
        fail = []
549
        if not netutils.TcpPing(pip, port, source=my_pip):
550
          fail.append("primary")
551
        if sip != pip:
552
          if not netutils.TcpPing(sip, port, source=my_sip):
553
            fail.append("secondary")
554
        if fail:
555
          tmp[name] = ("failure using the %s interface(s)" %
556
                       " and ".join(fail))
557

    
558
  if constants.NV_MASTERIP in what:
559
    # FIXME: add checks on incoming data structures (here and in the
560
    # rest of the function)
561
    master_name, master_ip = what[constants.NV_MASTERIP]
562
    if master_name == my_name:
563
      source = constants.IP4_ADDRESS_LOCALHOST
564
    else:
565
      source = None
566
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
567
                                                  source=source)
568

    
569
  if constants.NV_OOB_PATHS in what:
570
    result[constants.NV_OOB_PATHS] = tmp = []
571
    for path in what[constants.NV_OOB_PATHS]:
572
      try:
573
        st = os.stat(path)
574
      except OSError, err:
575
        tmp.append("error stating out of band helper: %s" % err)
576
      else:
577
        if stat.S_ISREG(st.st_mode):
578
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
579
            tmp.append(None)
580
          else:
581
            tmp.append("out of band helper %s is not executable" % path)
582
        else:
583
          tmp.append("out of band helper %s is not a file" % path)
584

    
585
  if constants.NV_LVLIST in what and vm_capable:
586
    try:
587
      val = GetVolumeList(utils.ListVolumeGroups().keys())
588
    except RPCFail, err:
589
      val = str(err)
590
    result[constants.NV_LVLIST] = val
591

    
592
  if constants.NV_INSTANCELIST in what and vm_capable:
593
    # GetInstanceList can fail
594
    try:
595
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
596
    except RPCFail, err:
597
      val = str(err)
598
    result[constants.NV_INSTANCELIST] = val
599

    
600
  if constants.NV_VGLIST in what and vm_capable:
601
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
602

    
603
  if constants.NV_PVLIST in what and vm_capable:
604
    result[constants.NV_PVLIST] = \
605
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
606
                                   filter_allocatable=False)
607

    
608
  if constants.NV_VERSION in what:
609
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
610
                                    constants.RELEASE_VERSION)
611

    
612
  if constants.NV_HVINFO in what and vm_capable:
613
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
614
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
615

    
616
  if constants.NV_DRBDLIST in what and vm_capable:
617
    try:
618
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
619
    except errors.BlockDeviceError, err:
620
      logging.warning("Can't get used minors list", exc_info=True)
621
      used_minors = str(err)
622
    result[constants.NV_DRBDLIST] = used_minors
623

    
624
  if constants.NV_DRBDHELPER in what and vm_capable:
625
    status = True
626
    try:
627
      payload = bdev.BaseDRBD.GetUsermodeHelper()
628
    except errors.BlockDeviceError, err:
629
      logging.error("Can't get DRBD usermode helper: %s", str(err))
630
      status = False
631
      payload = str(err)
632
    result[constants.NV_DRBDHELPER] = (status, payload)
633

    
634
  if constants.NV_NODESETUP in what:
635
    result[constants.NV_NODESETUP] = tmpr = []
636
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
637
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
638
                  " under /sys, missing required directories /sys/block"
639
                  " and /sys/class/net")
640
    if (not os.path.isdir("/proc/sys") or
641
        not os.path.isfile("/proc/sysrq-trigger")):
642
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
643
                  " under /proc, missing required directory /proc/sys and"
644
                  " the file /proc/sysrq-trigger")
645

    
646
  if constants.NV_TIME in what:
647
    result[constants.NV_TIME] = utils.SplitTime(time.time())
648

    
649
  if constants.NV_OSLIST in what and vm_capable:
650
    result[constants.NV_OSLIST] = DiagnoseOS()
651

    
652
  if constants.NV_BRIDGES in what and vm_capable:
653
    result[constants.NV_BRIDGES] = [bridge
654
                                    for bridge in what[constants.NV_BRIDGES]
655
                                    if not utils.BridgeExists(bridge)]
656
  return result
657

    
658

    
659
def GetBlockDevSizes(devices):
660
  """Return the size of the given block devices
661

662
  @type devices: list
663
  @param devices: list of block device nodes to query
664
  @rtype: dict
665
  @return:
666
    dictionary of all block devices under /dev (key). The value is their
667
    size in MiB.
668

669
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
670

671
  """
672
  DEV_PREFIX = "/dev/"
673
  blockdevs = {}
674

    
675
  for devpath in devices:
676
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
677
      continue
678

    
679
    try:
680
      st = os.stat(devpath)
681
    except EnvironmentError, err:
682
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
683
      continue
684

    
685
    if stat.S_ISBLK(st.st_mode):
686
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
687
      if result.failed:
688
        # We don't want to fail, just do not list this device as available
689
        logging.warning("Cannot get size for block device %s", devpath)
690
        continue
691

    
692
      size = int(result.stdout) / (1024 * 1024)
693
      blockdevs[devpath] = size
694
  return blockdevs
695

    
696

    
697
def GetVolumeList(vg_names):
698
  """Compute list of logical volumes and their size.
699

700
  @type vg_names: list
701
  @param vg_names: the volume groups whose LVs we should list, or
702
      empty for all volume groups
703
  @rtype: dict
704
  @return:
705
      dictionary of all partions (key) with value being a tuple of
706
      their size (in MiB), inactive and online status::
707

708
        {'xenvg/test1': ('20.06', True, True)}
709

710
      in case of errors, a string is returned with the error
711
      details.
712

713
  """
714
  lvs = {}
715
  sep = "|"
716
  if not vg_names:
717
    vg_names = []
718
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
719
                         "--separator=%s" % sep,
720
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
721
  if result.failed:
722
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
723

    
724
  for line in result.stdout.splitlines():
725
    line = line.strip()
726
    match = _LVSLINE_REGEX.match(line)
727
    if not match:
728
      logging.error("Invalid line returned from lvs output: '%s'", line)
729
      continue
730
    vg_name, name, size, attr = match.groups()
731
    inactive = attr[4] == "-"
732
    online = attr[5] == "o"
733
    virtual = attr[0] == "v"
734
    if virtual:
735
      # we don't want to report such volumes as existing, since they
736
      # don't really hold data
737
      continue
738
    lvs[vg_name + "/" + name] = (size, inactive, online)
739

    
740
  return lvs
741

    
742

    
743
def ListVolumeGroups():
744
  """List the volume groups and their size.
745

746
  @rtype: dict
747
  @return: dictionary with keys volume name and values the
748
      size of the volume
749

750
  """
751
  return utils.ListVolumeGroups()
752

    
753

    
754
def NodeVolumes():
755
  """List all volumes on this node.
756

757
  @rtype: list
758
  @return:
759
    A list of dictionaries, each having four keys:
760
      - name: the logical volume name,
761
      - size: the size of the logical volume
762
      - dev: the physical device on which the LV lives
763
      - vg: the volume group to which it belongs
764

765
    In case of errors, we return an empty list and log the
766
    error.
767

768
    Note that since a logical volume can live on multiple physical
769
    volumes, the resulting list might include a logical volume
770
    multiple times.
771

772
  """
773
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
774
                         "--separator=|",
775
                         "--options=lv_name,lv_size,devices,vg_name"])
776
  if result.failed:
777
    _Fail("Failed to list logical volumes, lvs output: %s",
778
          result.output)
779

    
780
  def parse_dev(dev):
781
    return dev.split("(")[0]
782

    
783
  def handle_dev(dev):
784
    return [parse_dev(x) for x in dev.split(",")]
785

    
786
  def map_line(line):
787
    line = [v.strip() for v in line]
788
    return [{"name": line[0], "size": line[1],
789
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
790

    
791
  all_devs = []
792
  for line in result.stdout.splitlines():
793
    if line.count("|") >= 3:
794
      all_devs.extend(map_line(line.split("|")))
795
    else:
796
      logging.warning("Strange line in the output from lvs: '%s'", line)
797
  return all_devs
798

    
799

    
800
def BridgesExist(bridges_list):
801
  """Check if a list of bridges exist on the current node.
802

803
  @rtype: boolean
804
  @return: C{True} if all of them exist, C{False} otherwise
805

806
  """
807
  missing = []
808
  for bridge in bridges_list:
809
    if not utils.BridgeExists(bridge):
810
      missing.append(bridge)
811

    
812
  if missing:
813
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
814

    
815

    
816
def GetInstanceList(hypervisor_list):
817
  """Provides a list of instances.
818

819
  @type hypervisor_list: list
820
  @param hypervisor_list: the list of hypervisors to query information
821

822
  @rtype: list
823
  @return: a list of all running instances on the current node
824
    - instance1.example.com
825
    - instance2.example.com
826

827
  """
828
  results = []
829
  for hname in hypervisor_list:
830
    try:
831
      names = hypervisor.GetHypervisor(hname).ListInstances()
832
      results.extend(names)
833
    except errors.HypervisorError, err:
834
      _Fail("Error enumerating instances (hypervisor %s): %s",
835
            hname, err, exc=True)
836

    
837
  return results
838

    
839

    
840
def GetInstanceInfo(instance, hname):
841
  """Gives back the information about an instance as a dictionary.
842

843
  @type instance: string
844
  @param instance: the instance name
845
  @type hname: string
846
  @param hname: the hypervisor type of the instance
847

848
  @rtype: dict
849
  @return: dictionary with the following keys:
850
      - memory: memory size of instance (int)
851
      - state: xen state of instance (string)
852
      - time: cpu time of instance (float)
853

854
  """
855
  output = {}
856

    
857
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
858
  if iinfo is not None:
859
    output["memory"] = iinfo[2]
860
    output["state"] = iinfo[4]
861
    output["time"] = iinfo[5]
862

    
863
  return output
864

    
865

    
866
def GetInstanceMigratable(instance):
867
  """Gives whether an instance can be migrated.
868

869
  @type instance: L{objects.Instance}
870
  @param instance: object representing the instance to be checked.
871

872
  @rtype: tuple
873
  @return: tuple of (result, description) where:
874
      - result: whether the instance can be migrated or not
875
      - description: a description of the issue, if relevant
876

877
  """
878
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
879
  iname = instance.name
880
  if iname not in hyper.ListInstances():
881
    _Fail("Instance %s is not running", iname)
882

    
883
  for idx in range(len(instance.disks)):
884
    link_name = _GetBlockDevSymlinkPath(iname, idx)
885
    if not os.path.islink(link_name):
886
      logging.warning("Instance %s is missing symlink %s for disk %d",
887
                      iname, link_name, idx)
888

    
889

    
890
def GetAllInstancesInfo(hypervisor_list):
891
  """Gather data about all instances.
892

893
  This is the equivalent of L{GetInstanceInfo}, except that it
894
  computes data for all instances at once, thus being faster if one
895
  needs data about more than one instance.
896

897
  @type hypervisor_list: list
898
  @param hypervisor_list: list of hypervisors to query for instance data
899

900
  @rtype: dict
901
  @return: dictionary of instance: data, with data having the following keys:
902
      - memory: memory size of instance (int)
903
      - state: xen state of instance (string)
904
      - time: cpu time of instance (float)
905
      - vcpus: the number of vcpus
906

907
  """
908
  output = {}
909

    
910
  for hname in hypervisor_list:
911
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
912
    if iinfo:
913
      for name, _, memory, vcpus, state, times in iinfo:
914
        value = {
915
          "memory": memory,
916
          "vcpus": vcpus,
917
          "state": state,
918
          "time": times,
919
          }
920
        if name in output:
921
          # we only check static parameters, like memory and vcpus,
922
          # and not state and time which can change between the
923
          # invocations of the different hypervisors
924
          for key in "memory", "vcpus":
925
            if value[key] != output[name][key]:
926
              _Fail("Instance %s is running twice"
927
                    " with different parameters", name)
928
        output[name] = value
929

    
930
  return output
931

    
932

    
933
def _InstanceLogName(kind, os_name, instance, component):
934
  """Compute the OS log filename for a given instance and operation.
935

936
  The instance name and os name are passed in as strings since not all
937
  operations have these as part of an instance object.
938

939
  @type kind: string
940
  @param kind: the operation type (e.g. add, import, etc.)
941
  @type os_name: string
942
  @param os_name: the os name
943
  @type instance: string
944
  @param instance: the name of the instance being imported/added/etc.
945
  @type component: string or None
946
  @param component: the name of the component of the instance being
947
      transferred
948

949
  """
950
  # TODO: Use tempfile.mkstemp to create unique filename
951
  if component:
952
    assert "/" not in component
953
    c_msg = "-%s" % component
954
  else:
955
    c_msg = ""
956
  base = ("%s-%s-%s%s-%s.log" %
957
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
958
  return utils.PathJoin(constants.LOG_OS_DIR, base)
959

    
960

    
961
def InstanceOsAdd(instance, reinstall, debug):
962
  """Add an OS to an instance.
963

964
  @type instance: L{objects.Instance}
965
  @param instance: Instance whose OS is to be installed
966
  @type reinstall: boolean
967
  @param reinstall: whether this is an instance reinstall
968
  @type debug: integer
969
  @param debug: debug level, passed to the OS scripts
970
  @rtype: None
971

972
  """
973
  inst_os = OSFromDisk(instance.os)
974

    
975
  create_env = OSEnvironment(instance, inst_os, debug)
976
  if reinstall:
977
    create_env["INSTANCE_REINSTALL"] = "1"
978

    
979
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
980

    
981
  result = utils.RunCmd([inst_os.create_script], env=create_env,
982
                        cwd=inst_os.path, output=logfile, reset_env=True)
983
  if result.failed:
984
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
985
                  " output: %s", result.cmd, result.fail_reason, logfile,
986
                  result.output)
987
    lines = [utils.SafeEncode(val)
988
             for val in utils.TailFile(logfile, lines=20)]
989
    _Fail("OS create script failed (%s), last lines in the"
990
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
991

    
992

    
993
def RunRenameInstance(instance, old_name, debug):
994
  """Run the OS rename script for an instance.
995

996
  @type instance: L{objects.Instance}
997
  @param instance: Instance whose OS is to be installed
998
  @type old_name: string
999
  @param old_name: previous instance name
1000
  @type debug: integer
1001
  @param debug: debug level, passed to the OS scripts
1002
  @rtype: boolean
1003
  @return: the success of the operation
1004

1005
  """
1006
  inst_os = OSFromDisk(instance.os)
1007

    
1008
  rename_env = OSEnvironment(instance, inst_os, debug)
1009
  rename_env["OLD_INSTANCE_NAME"] = old_name
1010

    
1011
  logfile = _InstanceLogName("rename", instance.os,
1012
                             "%s-%s" % (old_name, instance.name), None)
1013

    
1014
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1015
                        cwd=inst_os.path, output=logfile, reset_env=True)
1016

    
1017
  if result.failed:
1018
    logging.error("os create command '%s' returned error: %s output: %s",
1019
                  result.cmd, result.fail_reason, result.output)
1020
    lines = [utils.SafeEncode(val)
1021
             for val in utils.TailFile(logfile, lines=20)]
1022
    _Fail("OS rename script failed (%s), last lines in the"
1023
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1024

    
1025

    
1026
def _GetBlockDevSymlinkPath(instance_name, idx):
1027
  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1028
                        (instance_name, constants.DISK_SEPARATOR, idx))
1029

    
1030

    
1031
def _SymlinkBlockDev(instance_name, device_path, idx):
1032
  """Set up symlinks to a instance's block device.
1033

1034
  This is an auxiliary function run when an instance is start (on the primary
1035
  node) or when an instance is migrated (on the target node).
1036

1037

1038
  @param instance_name: the name of the target instance
1039
  @param device_path: path of the physical block device, on the node
1040
  @param idx: the disk index
1041
  @return: absolute path to the disk's symlink
1042

1043
  """
1044
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1045
  try:
1046
    os.symlink(device_path, link_name)
1047
  except OSError, err:
1048
    if err.errno == errno.EEXIST:
1049
      if (not os.path.islink(link_name) or
1050
          os.readlink(link_name) != device_path):
1051
        os.remove(link_name)
1052
        os.symlink(device_path, link_name)
1053
    else:
1054
      raise
1055

    
1056
  return link_name
1057

    
1058

    
1059
def _RemoveBlockDevLinks(instance_name, disks):
1060
  """Remove the block device symlinks belonging to the given instance.
1061

1062
  """
1063
  for idx, _ in enumerate(disks):
1064
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1065
    if os.path.islink(link_name):
1066
      try:
1067
        os.remove(link_name)
1068
      except OSError:
1069
        logging.exception("Can't remove symlink '%s'", link_name)
1070

    
1071

    
1072
def _GatherAndLinkBlockDevs(instance):
1073
  """Set up an instance's block device(s).
1074

1075
  This is run on the primary node at instance startup. The block
1076
  devices must be already assembled.
1077

1078
  @type instance: L{objects.Instance}
1079
  @param instance: the instance whose disks we shoul assemble
1080
  @rtype: list
1081
  @return: list of (disk_object, device_path)
1082

1083
  """
1084
  block_devices = []
1085
  for idx, disk in enumerate(instance.disks):
1086
    device = _RecursiveFindBD(disk)
1087
    if device is None:
1088
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1089
                                    str(disk))
1090
    device.Open()
1091
    try:
1092
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1093
    except OSError, e:
1094
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1095
                                    e.strerror)
1096

    
1097
    block_devices.append((disk, link_name))
1098

    
1099
  return block_devices
1100

    
1101

    
1102
def StartInstance(instance, startup_paused):
1103
  """Start an instance.
1104

1105
  @type instance: L{objects.Instance}
1106
  @param instance: the instance object
1107
  @type startup_paused: bool
1108
  @param instance: pause instance at startup?
1109
  @rtype: None
1110

1111
  """
1112
  running_instances = GetInstanceList([instance.hypervisor])
1113

    
1114
  if instance.name in running_instances:
1115
    logging.info("Instance %s already running, not starting", instance.name)
1116
    return
1117

    
1118
  try:
1119
    block_devices = _GatherAndLinkBlockDevs(instance)
1120
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1121
    hyper.StartInstance(instance, block_devices, startup_paused)
1122
  except errors.BlockDeviceError, err:
1123
    _Fail("Block device error: %s", err, exc=True)
1124
  except errors.HypervisorError, err:
1125
    _RemoveBlockDevLinks(instance.name, instance.disks)
1126
    _Fail("Hypervisor error: %s", err, exc=True)
1127

    
1128

    
1129
def InstanceShutdown(instance, timeout):
1130
  """Shut an instance down.
1131

1132
  @note: this functions uses polling with a hardcoded timeout.
1133

1134
  @type instance: L{objects.Instance}
1135
  @param instance: the instance object
1136
  @type timeout: integer
1137
  @param timeout: maximum timeout for soft shutdown
1138
  @rtype: None
1139

1140
  """
1141
  hv_name = instance.hypervisor
1142
  hyper = hypervisor.GetHypervisor(hv_name)
1143
  iname = instance.name
1144

    
1145
  if instance.name not in hyper.ListInstances():
1146
    logging.info("Instance %s not running, doing nothing", iname)
1147
    return
1148

    
1149
  class _TryShutdown:
1150
    def __init__(self):
1151
      self.tried_once = False
1152

    
1153
    def __call__(self):
1154
      if iname not in hyper.ListInstances():
1155
        return
1156

    
1157
      try:
1158
        hyper.StopInstance(instance, retry=self.tried_once)
1159
      except errors.HypervisorError, err:
1160
        if iname not in hyper.ListInstances():
1161
          # if the instance is no longer existing, consider this a
1162
          # success and go to cleanup
1163
          return
1164

    
1165
        _Fail("Failed to stop instance %s: %s", iname, err)
1166

    
1167
      self.tried_once = True
1168

    
1169
      raise utils.RetryAgain()
1170

    
1171
  try:
1172
    utils.Retry(_TryShutdown(), 5, timeout)
1173
  except utils.RetryTimeout:
1174
    # the shutdown did not succeed
1175
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1176

    
1177
    try:
1178
      hyper.StopInstance(instance, force=True)
1179
    except errors.HypervisorError, err:
1180
      if iname in hyper.ListInstances():
1181
        # only raise an error if the instance still exists, otherwise
1182
        # the error could simply be "instance ... unknown"!
1183
        _Fail("Failed to force stop instance %s: %s", iname, err)
1184

    
1185
    time.sleep(1)
1186

    
1187
    if iname in hyper.ListInstances():
1188
      _Fail("Could not shutdown instance %s even by destroy", iname)
1189

    
1190
  try:
1191
    hyper.CleanupInstance(instance.name)
1192
  except errors.HypervisorError, err:
1193
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1194

    
1195
  _RemoveBlockDevLinks(iname, instance.disks)
1196

    
1197

    
1198
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1199
  """Reboot an instance.
1200

1201
  @type instance: L{objects.Instance}
1202
  @param instance: the instance object to reboot
1203
  @type reboot_type: str
1204
  @param reboot_type: the type of reboot, one the following
1205
    constants:
1206
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1207
        instance OS, do not recreate the VM
1208
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1209
        restart the VM (at the hypervisor level)
1210
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1211
        not accepted here, since that mode is handled differently, in
1212
        cmdlib, and translates into full stop and start of the
1213
        instance (instead of a call_instance_reboot RPC)
1214
  @type shutdown_timeout: integer
1215
  @param shutdown_timeout: maximum timeout for soft shutdown
1216
  @rtype: None
1217

1218
  """
1219
  running_instances = GetInstanceList([instance.hypervisor])
1220

    
1221
  if instance.name not in running_instances:
1222
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1223

    
1224
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1225
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1226
    try:
1227
      hyper.RebootInstance(instance)
1228
    except errors.HypervisorError, err:
1229
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1230
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1231
    try:
1232
      InstanceShutdown(instance, shutdown_timeout)
1233
      return StartInstance(instance, False)
1234
    except errors.HypervisorError, err:
1235
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1236
  else:
1237
    _Fail("Invalid reboot_type received: %s", reboot_type)
1238

    
1239

    
1240
def MigrationInfo(instance):
1241
  """Gather information about an instance to be migrated.
1242

1243
  @type instance: L{objects.Instance}
1244
  @param instance: the instance definition
1245

1246
  """
1247
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1248
  try:
1249
    info = hyper.MigrationInfo(instance)
1250
  except errors.HypervisorError, err:
1251
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1252
  return info
1253

    
1254

    
1255
def AcceptInstance(instance, info, target):
1256
  """Prepare the node to accept an instance.
1257

1258
  @type instance: L{objects.Instance}
1259
  @param instance: the instance definition
1260
  @type info: string/data (opaque)
1261
  @param info: migration information, from the source node
1262
  @type target: string
1263
  @param target: target host (usually ip), on this node
1264

1265
  """
1266
  # TODO: why is this required only for DTS_EXT_MIRROR?
1267
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1268
    # Create the symlinks, as the disks are not active
1269
    # in any way
1270
    try:
1271
      _GatherAndLinkBlockDevs(instance)
1272
    except errors.BlockDeviceError, err:
1273
      _Fail("Block device error: %s", err, exc=True)
1274

    
1275
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1276
  try:
1277
    hyper.AcceptInstance(instance, info, target)
1278
  except errors.HypervisorError, err:
1279
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1280
      _RemoveBlockDevLinks(instance.name, instance.disks)
1281
    _Fail("Failed to accept instance: %s", err, exc=True)
1282

    
1283

    
1284
def FinalizeMigration(instance, info, success):
1285
  """Finalize any preparation to accept an instance.
1286

1287
  @type instance: L{objects.Instance}
1288
  @param instance: the instance definition
1289
  @type info: string/data (opaque)
1290
  @param info: migration information, from the source node
1291
  @type success: boolean
1292
  @param success: whether the migration was a success or a failure
1293

1294
  """
1295
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1296
  try:
1297
    hyper.FinalizeMigration(instance, info, success)
1298
  except errors.HypervisorError, err:
1299
    _Fail("Failed to finalize migration: %s", err, exc=True)
1300

    
1301

    
1302
def MigrateInstance(instance, target, live):
1303
  """Migrates an instance to another node.
1304

1305
  @type instance: L{objects.Instance}
1306
  @param instance: the instance definition
1307
  @type target: string
1308
  @param target: the target node name
1309
  @type live: boolean
1310
  @param live: whether the migration should be done live or not (the
1311
      interpretation of this parameter is left to the hypervisor)
1312
  @rtype: tuple
1313
  @return: a tuple of (success, msg) where:
1314
      - succes is a boolean denoting the success/failure of the operation
1315
      - msg is a string with details in case of failure
1316

1317
  """
1318
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1319

    
1320
  try:
1321
    hyper.MigrateInstance(instance, target, live)
1322
  except errors.HypervisorError, err:
1323
    _Fail("Failed to migrate instance: %s", err, exc=True)
1324

    
1325

    
1326
def BlockdevCreate(disk, size, owner, on_primary, info):
1327
  """Creates a block device for an instance.
1328

1329
  @type disk: L{objects.Disk}
1330
  @param disk: the object describing the disk we should create
1331
  @type size: int
1332
  @param size: the size of the physical underlying device, in MiB
1333
  @type owner: str
1334
  @param owner: the name of the instance for which disk is created,
1335
      used for device cache data
1336
  @type on_primary: boolean
1337
  @param on_primary:  indicates if it is the primary node or not
1338
  @type info: string
1339
  @param info: string that will be sent to the physical device
1340
      creation, used for example to set (LVM) tags on LVs
1341

1342
  @return: the new unique_id of the device (this can sometime be
1343
      computed only after creation), or None. On secondary nodes,
1344
      it's not required to return anything.
1345

1346
  """
1347
  # TODO: remove the obsolete "size" argument
1348
  # pylint: disable=W0613
1349
  clist = []
1350
  if disk.children:
1351
    for child in disk.children:
1352
      try:
1353
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1354
      except errors.BlockDeviceError, err:
1355
        _Fail("Can't assemble device %s: %s", child, err)
1356
      if on_primary or disk.AssembleOnSecondary():
1357
        # we need the children open in case the device itself has to
1358
        # be assembled
1359
        try:
1360
          # pylint: disable=E1103
1361
          crdev.Open()
1362
        except errors.BlockDeviceError, err:
1363
          _Fail("Can't make child '%s' read-write: %s", child, err)
1364
      clist.append(crdev)
1365

    
1366
  try:
1367
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1368
  except errors.BlockDeviceError, err:
1369
    _Fail("Can't create block device: %s", err)
1370

    
1371
  if on_primary or disk.AssembleOnSecondary():
1372
    try:
1373
      device.Assemble()
1374
    except errors.BlockDeviceError, err:
1375
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1376
    device.SetSyncSpeed(constants.SYNC_SPEED)
1377
    if on_primary or disk.OpenOnSecondary():
1378
      try:
1379
        device.Open(force=True)
1380
      except errors.BlockDeviceError, err:
1381
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1382
    DevCacheManager.UpdateCache(device.dev_path, owner,
1383
                                on_primary, disk.iv_name)
1384

    
1385
  device.SetInfo(info)
1386

    
1387
  return device.unique_id
1388

    
1389

    
1390
def _WipeDevice(path, offset, size):
1391
  """This function actually wipes the device.
1392

1393
  @param path: The path to the device to wipe
1394
  @param offset: The offset in MiB in the file
1395
  @param size: The size in MiB to write
1396

1397
  """
1398
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1399
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1400
         "count=%d" % size]
1401
  result = utils.RunCmd(cmd)
1402

    
1403
  if result.failed:
1404
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1405
          result.fail_reason, result.output)
1406

    
1407

    
1408
def BlockdevWipe(disk, offset, size):
1409
  """Wipes a block device.
1410

1411
  @type disk: L{objects.Disk}
1412
  @param disk: the disk object we want to wipe
1413
  @type offset: int
1414
  @param offset: The offset in MiB in the file
1415
  @type size: int
1416
  @param size: The size in MiB to write
1417

1418
  """
1419
  try:
1420
    rdev = _RecursiveFindBD(disk)
1421
  except errors.BlockDeviceError:
1422
    rdev = None
1423

    
1424
  if not rdev:
1425
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1426

    
1427
  # Do cross verify some of the parameters
1428
  if offset > rdev.size:
1429
    _Fail("Offset is bigger than device size")
1430
  if (offset + size) > rdev.size:
1431
    _Fail("The provided offset and size to wipe is bigger than device size")
1432

    
1433
  _WipeDevice(rdev.dev_path, offset, size)
1434

    
1435

    
1436
def BlockdevPauseResumeSync(disks, pause):
1437
  """Pause or resume the sync of the block device.
1438

1439
  @type disks: list of L{objects.Disk}
1440
  @param disks: the disks object we want to pause/resume
1441
  @type pause: bool
1442
  @param pause: Wheater to pause or resume
1443

1444
  """
1445
  success = []
1446
  for disk in disks:
1447
    try:
1448
      rdev = _RecursiveFindBD(disk)
1449
    except errors.BlockDeviceError:
1450
      rdev = None
1451

    
1452
    if not rdev:
1453
      success.append((False, ("Cannot change sync for device %s:"
1454
                              " device not found" % disk.iv_name)))
1455
      continue
1456

    
1457
    result = rdev.PauseResumeSync(pause)
1458

    
1459
    if result:
1460
      success.append((result, None))
1461
    else:
1462
      if pause:
1463
        msg = "Pause"
1464
      else:
1465
        msg = "Resume"
1466
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1467

    
1468
  return success
1469

    
1470

    
1471
def BlockdevRemove(disk):
1472
  """Remove a block device.
1473

1474
  @note: This is intended to be called recursively.
1475

1476
  @type disk: L{objects.Disk}
1477
  @param disk: the disk object we should remove
1478
  @rtype: boolean
1479
  @return: the success of the operation
1480

1481
  """
1482
  msgs = []
1483
  try:
1484
    rdev = _RecursiveFindBD(disk)
1485
  except errors.BlockDeviceError, err:
1486
    # probably can't attach
1487
    logging.info("Can't attach to device %s in remove", disk)
1488
    rdev = None
1489
  if rdev is not None:
1490
    r_path = rdev.dev_path
1491
    try:
1492
      rdev.Remove()
1493
    except errors.BlockDeviceError, err:
1494
      msgs.append(str(err))
1495
    if not msgs:
1496
      DevCacheManager.RemoveCache(r_path)
1497

    
1498
  if disk.children:
1499
    for child in disk.children:
1500
      try:
1501
        BlockdevRemove(child)
1502
      except RPCFail, err:
1503
        msgs.append(str(err))
1504

    
1505
  if msgs:
1506
    _Fail("; ".join(msgs))
1507

    
1508

    
1509
def _RecursiveAssembleBD(disk, owner, as_primary):
1510
  """Activate a block device for an instance.
1511

1512
  This is run on the primary and secondary nodes for an instance.
1513

1514
  @note: this function is called recursively.
1515

1516
  @type disk: L{objects.Disk}
1517
  @param disk: the disk we try to assemble
1518
  @type owner: str
1519
  @param owner: the name of the instance which owns the disk
1520
  @type as_primary: boolean
1521
  @param as_primary: if we should make the block device
1522
      read/write
1523

1524
  @return: the assembled device or None (in case no device
1525
      was assembled)
1526
  @raise errors.BlockDeviceError: in case there is an error
1527
      during the activation of the children or the device
1528
      itself
1529

1530
  """
1531
  children = []
1532
  if disk.children:
1533
    mcn = disk.ChildrenNeeded()
1534
    if mcn == -1:
1535
      mcn = 0 # max number of Nones allowed
1536
    else:
1537
      mcn = len(disk.children) - mcn # max number of Nones
1538
    for chld_disk in disk.children:
1539
      try:
1540
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1541
      except errors.BlockDeviceError, err:
1542
        if children.count(None) >= mcn:
1543
          raise
1544
        cdev = None
1545
        logging.error("Error in child activation (but continuing): %s",
1546
                      str(err))
1547
      children.append(cdev)
1548

    
1549
  if as_primary or disk.AssembleOnSecondary():
1550
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1551
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1552
    result = r_dev
1553
    if as_primary or disk.OpenOnSecondary():
1554
      r_dev.Open()
1555
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1556
                                as_primary, disk.iv_name)
1557

    
1558
  else:
1559
    result = True
1560
  return result
1561

    
1562

    
1563
def BlockdevAssemble(disk, owner, as_primary, idx):
1564
  """Activate a block device for an instance.
1565

1566
  This is a wrapper over _RecursiveAssembleBD.
1567

1568
  @rtype: str or boolean
1569
  @return: a C{/dev/...} path for primary nodes, and
1570
      C{True} for secondary nodes
1571

1572
  """
1573
  try:
1574
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1575
    if isinstance(result, bdev.BlockDev):
1576
      # pylint: disable=E1103
1577
      result = result.dev_path
1578
      if as_primary:
1579
        _SymlinkBlockDev(owner, result, idx)
1580
  except errors.BlockDeviceError, err:
1581
    _Fail("Error while assembling disk: %s", err, exc=True)
1582
  except OSError, err:
1583
    _Fail("Error while symlinking disk: %s", err, exc=True)
1584

    
1585
  return result
1586

    
1587

    
1588
def BlockdevShutdown(disk):
1589
  """Shut down a block device.
1590

1591
  First, if the device is assembled (Attach() is successful), then
1592
  the device is shutdown. Then the children of the device are
1593
  shutdown.
1594

1595
  This function is called recursively. Note that we don't cache the
1596
  children or such, as oppossed to assemble, shutdown of different
1597
  devices doesn't require that the upper device was active.
1598

1599
  @type disk: L{objects.Disk}
1600
  @param disk: the description of the disk we should
1601
      shutdown
1602
  @rtype: None
1603

1604
  """
1605
  msgs = []
1606
  r_dev = _RecursiveFindBD(disk)
1607
  if r_dev is not None:
1608
    r_path = r_dev.dev_path
1609
    try:
1610
      r_dev.Shutdown()
1611
      DevCacheManager.RemoveCache(r_path)
1612
    except errors.BlockDeviceError, err:
1613
      msgs.append(str(err))
1614

    
1615
  if disk.children:
1616
    for child in disk.children:
1617
      try:
1618
        BlockdevShutdown(child)
1619
      except RPCFail, err:
1620
        msgs.append(str(err))
1621

    
1622
  if msgs:
1623
    _Fail("; ".join(msgs))
1624

    
1625

    
1626
def BlockdevAddchildren(parent_cdev, new_cdevs):
1627
  """Extend a mirrored block device.
1628

1629
  @type parent_cdev: L{objects.Disk}
1630
  @param parent_cdev: the disk to which we should add children
1631
  @type new_cdevs: list of L{objects.Disk}
1632
  @param new_cdevs: the list of children which we should add
1633
  @rtype: None
1634

1635
  """
1636
  parent_bdev = _RecursiveFindBD(parent_cdev)
1637
  if parent_bdev is None:
1638
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1639
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1640
  if new_bdevs.count(None) > 0:
1641
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1642
  parent_bdev.AddChildren(new_bdevs)
1643

    
1644

    
1645
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1646
  """Shrink a mirrored block device.
1647

1648
  @type parent_cdev: L{objects.Disk}
1649
  @param parent_cdev: the disk from which we should remove children
1650
  @type new_cdevs: list of L{objects.Disk}
1651
  @param new_cdevs: the list of children which we should remove
1652
  @rtype: None
1653

1654
  """
1655
  parent_bdev = _RecursiveFindBD(parent_cdev)
1656
  if parent_bdev is None:
1657
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1658
  devs = []
1659
  for disk in new_cdevs:
1660
    rpath = disk.StaticDevPath()
1661
    if rpath is None:
1662
      bd = _RecursiveFindBD(disk)
1663
      if bd is None:
1664
        _Fail("Can't find device %s while removing children", disk)
1665
      else:
1666
        devs.append(bd.dev_path)
1667
    else:
1668
      if not utils.IsNormAbsPath(rpath):
1669
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1670
      devs.append(rpath)
1671
  parent_bdev.RemoveChildren(devs)
1672

    
1673

    
1674
def BlockdevGetmirrorstatus(disks):
1675
  """Get the mirroring status of a list of devices.
1676

1677
  @type disks: list of L{objects.Disk}
1678
  @param disks: the list of disks which we should query
1679
  @rtype: disk
1680
  @return: List of L{objects.BlockDevStatus}, one for each disk
1681
  @raise errors.BlockDeviceError: if any of the disks cannot be
1682
      found
1683

1684
  """
1685
  stats = []
1686
  for dsk in disks:
1687
    rbd = _RecursiveFindBD(dsk)
1688
    if rbd is None:
1689
      _Fail("Can't find device %s", dsk)
1690

    
1691
    stats.append(rbd.CombinedSyncStatus())
1692

    
1693
  return stats
1694

    
1695

    
1696
def BlockdevGetmirrorstatusMulti(disks):
1697
  """Get the mirroring status of a list of devices.
1698

1699
  @type disks: list of L{objects.Disk}
1700
  @param disks: the list of disks which we should query
1701
  @rtype: disk
1702
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1703
    success/failure, status is L{objects.BlockDevStatus} on success, string
1704
    otherwise
1705

1706
  """
1707
  result = []
1708
  for disk in disks:
1709
    try:
1710
      rbd = _RecursiveFindBD(disk)
1711
      if rbd is None:
1712
        result.append((False, "Can't find device %s" % disk))
1713
        continue
1714

    
1715
      status = rbd.CombinedSyncStatus()
1716
    except errors.BlockDeviceError, err:
1717
      logging.exception("Error while getting disk status")
1718
      result.append((False, str(err)))
1719
    else:
1720
      result.append((True, status))
1721

    
1722
  assert len(disks) == len(result)
1723

    
1724
  return result
1725

    
1726

    
1727
def _RecursiveFindBD(disk):
1728
  """Check if a device is activated.
1729

1730
  If so, return information about the real device.
1731

1732
  @type disk: L{objects.Disk}
1733
  @param disk: the disk object we need to find
1734

1735
  @return: None if the device can't be found,
1736
      otherwise the device instance
1737

1738
  """
1739
  children = []
1740
  if disk.children:
1741
    for chdisk in disk.children:
1742
      children.append(_RecursiveFindBD(chdisk))
1743

    
1744
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1745

    
1746

    
1747
def _OpenRealBD(disk):
1748
  """Opens the underlying block device of a disk.
1749

1750
  @type disk: L{objects.Disk}
1751
  @param disk: the disk object we want to open
1752

1753
  """
1754
  real_disk = _RecursiveFindBD(disk)
1755
  if real_disk is None:
1756
    _Fail("Block device '%s' is not set up", disk)
1757

    
1758
  real_disk.Open()
1759

    
1760
  return real_disk
1761

    
1762

    
1763
def BlockdevFind(disk):
1764
  """Check if a device is activated.
1765

1766
  If it is, return information about the real device.
1767

1768
  @type disk: L{objects.Disk}
1769
  @param disk: the disk to find
1770
  @rtype: None or objects.BlockDevStatus
1771
  @return: None if the disk cannot be found, otherwise a the current
1772
           information
1773

1774
  """
1775
  try:
1776
    rbd = _RecursiveFindBD(disk)
1777
  except errors.BlockDeviceError, err:
1778
    _Fail("Failed to find device: %s", err, exc=True)
1779

    
1780
  if rbd is None:
1781
    return None
1782

    
1783
  return rbd.GetSyncStatus()
1784

    
1785

    
1786
def BlockdevGetsize(disks):
1787
  """Computes the size of the given disks.
1788

1789
  If a disk is not found, returns None instead.
1790

1791
  @type disks: list of L{objects.Disk}
1792
  @param disks: the list of disk to compute the size for
1793
  @rtype: list
1794
  @return: list with elements None if the disk cannot be found,
1795
      otherwise the size
1796

1797
  """
1798
  result = []
1799
  for cf in disks:
1800
    try:
1801
      rbd = _RecursiveFindBD(cf)
1802
    except errors.BlockDeviceError:
1803
      result.append(None)
1804
      continue
1805
    if rbd is None:
1806
      result.append(None)
1807
    else:
1808
      result.append(rbd.GetActualSize())
1809
  return result
1810

    
1811

    
1812
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1813
  """Export a block device to a remote node.
1814

1815
  @type disk: L{objects.Disk}
1816
  @param disk: the description of the disk to export
1817
  @type dest_node: str
1818
  @param dest_node: the destination node to export to
1819
  @type dest_path: str
1820
  @param dest_path: the destination path on the target node
1821
  @type cluster_name: str
1822
  @param cluster_name: the cluster name, needed for SSH hostalias
1823
  @rtype: None
1824

1825
  """
1826
  real_disk = _OpenRealBD(disk)
1827

    
1828
  # the block size on the read dd is 1MiB to match our units
1829
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1830
                               "dd if=%s bs=1048576 count=%s",
1831
                               real_disk.dev_path, str(disk.size))
1832

    
1833
  # we set here a smaller block size as, due to ssh buffering, more
1834
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1835
  # device is not already there or we pass a wrong path; we use
1836
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1837
  # to not buffer too much memory; this means that at best, we flush
1838
  # every 64k, which will not be very fast
1839
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1840
                                " oflag=dsync", dest_path)
1841

    
1842
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1843
                                                   constants.GANETI_RUNAS,
1844
                                                   destcmd)
1845

    
1846
  # all commands have been checked, so we're safe to combine them
1847
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1848

    
1849
  result = utils.RunCmd(["bash", "-c", command])
1850

    
1851
  if result.failed:
1852
    _Fail("Disk copy command '%s' returned error: %s"
1853
          " output: %s", command, result.fail_reason, result.output)
1854

    
1855

    
1856
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1857
  """Write a file to the filesystem.
1858

1859
  This allows the master to overwrite(!) a file. It will only perform
1860
  the operation if the file belongs to a list of configuration files.
1861

1862
  @type file_name: str
1863
  @param file_name: the target file name
1864
  @type data: str
1865
  @param data: the new contents of the file
1866
  @type mode: int
1867
  @param mode: the mode to give the file (can be None)
1868
  @type uid: string
1869
  @param uid: the owner of the file
1870
  @type gid: string
1871
  @param gid: the group of the file
1872
  @type atime: float
1873
  @param atime: the atime to set on the file (can be None)
1874
  @type mtime: float
1875
  @param mtime: the mtime to set on the file (can be None)
1876
  @rtype: None
1877

1878
  """
1879
  if not os.path.isabs(file_name):
1880
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1881

    
1882
  if file_name not in _ALLOWED_UPLOAD_FILES:
1883
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1884
          file_name)
1885

    
1886
  raw_data = _Decompress(data)
1887

    
1888
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1889
    _Fail("Invalid username/groupname type")
1890

    
1891
  getents = runtime.GetEnts()
1892
  uid = getents.LookupUser(uid)
1893
  gid = getents.LookupGroup(gid)
1894

    
1895
  utils.SafeWriteFile(file_name, None,
1896
                      data=raw_data, mode=mode, uid=uid, gid=gid,
1897
                      atime=atime, mtime=mtime)
1898

    
1899

    
1900
def RunOob(oob_program, command, node, timeout):
1901
  """Executes oob_program with given command on given node.
1902

1903
  @param oob_program: The path to the executable oob_program
1904
  @param command: The command to invoke on oob_program
1905
  @param node: The node given as an argument to the program
1906
  @param timeout: Timeout after which we kill the oob program
1907

1908
  @return: stdout
1909
  @raise RPCFail: If execution fails for some reason
1910

1911
  """
1912
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1913

    
1914
  if result.failed:
1915
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1916
          result.fail_reason, result.output)
1917

    
1918
  return result.stdout
1919

    
1920

    
1921
def WriteSsconfFiles(values):
1922
  """Update all ssconf files.
1923

1924
  Wrapper around the SimpleStore.WriteFiles.
1925

1926
  """
1927
  ssconf.SimpleStore().WriteFiles(values)
1928

    
1929

    
1930
def _ErrnoOrStr(err):
1931
  """Format an EnvironmentError exception.
1932

1933
  If the L{err} argument has an errno attribute, it will be looked up
1934
  and converted into a textual C{E...} description. Otherwise the
1935
  string representation of the error will be returned.
1936

1937
  @type err: L{EnvironmentError}
1938
  @param err: the exception to format
1939

1940
  """
1941
  if hasattr(err, "errno"):
1942
    detail = errno.errorcode[err.errno]
1943
  else:
1944
    detail = str(err)
1945
  return detail
1946

    
1947

    
1948
def _OSOndiskAPIVersion(os_dir):
1949
  """Compute and return the API version of a given OS.
1950

1951
  This function will try to read the API version of the OS residing in
1952
  the 'os_dir' directory.
1953

1954
  @type os_dir: str
1955
  @param os_dir: the directory in which we should look for the OS
1956
  @rtype: tuple
1957
  @return: tuple (status, data) with status denoting the validity and
1958
      data holding either the vaid versions or an error message
1959

1960
  """
1961
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1962

    
1963
  try:
1964
    st = os.stat(api_file)
1965
  except EnvironmentError, err:
1966
    return False, ("Required file '%s' not found under path %s: %s" %
1967
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1968

    
1969
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1970
    return False, ("File '%s' in %s is not a regular file" %
1971
                   (constants.OS_API_FILE, os_dir))
1972

    
1973
  try:
1974
    api_versions = utils.ReadFile(api_file).splitlines()
1975
  except EnvironmentError, err:
1976
    return False, ("Error while reading the API version file at %s: %s" %
1977
                   (api_file, _ErrnoOrStr(err)))
1978

    
1979
  try:
1980
    api_versions = [int(version.strip()) for version in api_versions]
1981
  except (TypeError, ValueError), err:
1982
    return False, ("API version(s) can't be converted to integer: %s" %
1983
                   str(err))
1984

    
1985
  return True, api_versions
1986

    
1987

    
1988
def DiagnoseOS(top_dirs=None):
1989
  """Compute the validity for all OSes.
1990

1991
  @type top_dirs: list
1992
  @param top_dirs: the list of directories in which to
1993
      search (if not given defaults to
1994
      L{constants.OS_SEARCH_PATH})
1995
  @rtype: list of L{objects.OS}
1996
  @return: a list of tuples (name, path, status, diagnose, variants,
1997
      parameters, api_version) for all (potential) OSes under all
1998
      search paths, where:
1999
          - name is the (potential) OS name
2000
          - path is the full path to the OS
2001
          - status True/False is the validity of the OS
2002
          - diagnose is the error message for an invalid OS, otherwise empty
2003
          - variants is a list of supported OS variants, if any
2004
          - parameters is a list of (name, help) parameters, if any
2005
          - api_version is a list of support OS API versions
2006

2007
  """
2008
  if top_dirs is None:
2009
    top_dirs = constants.OS_SEARCH_PATH
2010

    
2011
  result = []
2012
  for dir_name in top_dirs:
2013
    if os.path.isdir(dir_name):
2014
      try:
2015
        f_names = utils.ListVisibleFiles(dir_name)
2016
      except EnvironmentError, err:
2017
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2018
        break
2019
      for name in f_names:
2020
        os_path = utils.PathJoin(dir_name, name)
2021
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2022
        if status:
2023
          diagnose = ""
2024
          variants = os_inst.supported_variants
2025
          parameters = os_inst.supported_parameters
2026
          api_versions = os_inst.api_versions
2027
        else:
2028
          diagnose = os_inst
2029
          variants = parameters = api_versions = []
2030
        result.append((name, os_path, status, diagnose, variants,
2031
                       parameters, api_versions))
2032

    
2033
  return result
2034

    
2035

    
2036
def _TryOSFromDisk(name, base_dir=None):
2037
  """Create an OS instance from disk.
2038

2039
  This function will return an OS instance if the given name is a
2040
  valid OS name.
2041

2042
  @type base_dir: string
2043
  @keyword base_dir: Base directory containing OS installations.
2044
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2045
  @rtype: tuple
2046
  @return: success and either the OS instance if we find a valid one,
2047
      or error message
2048

2049
  """
2050
  if base_dir is None:
2051
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2052
  else:
2053
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2054

    
2055
  if os_dir is None:
2056
    return False, "Directory for OS %s not found in search path" % name
2057

    
2058
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2059
  if not status:
2060
    # push the error up
2061
    return status, api_versions
2062

    
2063
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2064
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2065
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2066

    
2067
  # OS Files dictionary, we will populate it with the absolute path
2068
  # names; if the value is True, then it is a required file, otherwise
2069
  # an optional one
2070
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2071

    
2072
  if max(api_versions) >= constants.OS_API_V15:
2073
    os_files[constants.OS_VARIANTS_FILE] = False
2074

    
2075
  if max(api_versions) >= constants.OS_API_V20:
2076
    os_files[constants.OS_PARAMETERS_FILE] = True
2077
  else:
2078
    del os_files[constants.OS_SCRIPT_VERIFY]
2079

    
2080
  for (filename, required) in os_files.items():
2081
    os_files[filename] = utils.PathJoin(os_dir, filename)
2082

    
2083
    try:
2084
      st = os.stat(os_files[filename])
2085
    except EnvironmentError, err:
2086
      if err.errno == errno.ENOENT and not required:
2087
        del os_files[filename]
2088
        continue
2089
      return False, ("File '%s' under path '%s' is missing (%s)" %
2090
                     (filename, os_dir, _ErrnoOrStr(err)))
2091

    
2092
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2093
      return False, ("File '%s' under path '%s' is not a regular file" %
2094
                     (filename, os_dir))
2095

    
2096
    if filename in constants.OS_SCRIPTS:
2097
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2098
        return False, ("File '%s' under path '%s' is not executable" %
2099
                       (filename, os_dir))
2100

    
2101
  variants = []
2102
  if constants.OS_VARIANTS_FILE in os_files:
2103
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2104
    try:
2105
      variants = utils.ReadFile(variants_file).splitlines()
2106
    except EnvironmentError, err:
2107
      # we accept missing files, but not other errors
2108
      if err.errno != errno.ENOENT:
2109
        return False, ("Error while reading the OS variants file at %s: %s" %
2110
                       (variants_file, _ErrnoOrStr(err)))
2111

    
2112
  parameters = []
2113
  if constants.OS_PARAMETERS_FILE in os_files:
2114
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2115
    try:
2116
      parameters = utils.ReadFile(parameters_file).splitlines()
2117
    except EnvironmentError, err:
2118
      return False, ("Error while reading the OS parameters file at %s: %s" %
2119
                     (parameters_file, _ErrnoOrStr(err)))
2120
    parameters = [v.split(None, 1) for v in parameters]
2121

    
2122
  os_obj = objects.OS(name=name, path=os_dir,
2123
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2124
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2125
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2126
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2127
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2128
                                                 None),
2129
                      supported_variants=variants,
2130
                      supported_parameters=parameters,
2131
                      api_versions=api_versions)
2132
  return True, os_obj
2133

    
2134

    
2135
def OSFromDisk(name, base_dir=None):
2136
  """Create an OS instance from disk.
2137

2138
  This function will return an OS instance if the given name is a
2139
  valid OS name. Otherwise, it will raise an appropriate
2140
  L{RPCFail} exception, detailing why this is not a valid OS.
2141

2142
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2143
  an exception but returns true/false status data.
2144

2145
  @type base_dir: string
2146
  @keyword base_dir: Base directory containing OS installations.
2147
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2148
  @rtype: L{objects.OS}
2149
  @return: the OS instance if we find a valid one
2150
  @raise RPCFail: if we don't find a valid OS
2151

2152
  """
2153
  name_only = objects.OS.GetName(name)
2154
  status, payload = _TryOSFromDisk(name_only, base_dir)
2155

    
2156
  if not status:
2157
    _Fail(payload)
2158

    
2159
  return payload
2160

    
2161

    
2162
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2163
  """Calculate the basic environment for an os script.
2164

2165
  @type os_name: str
2166
  @param os_name: full operating system name (including variant)
2167
  @type inst_os: L{objects.OS}
2168
  @param inst_os: operating system for which the environment is being built
2169
  @type os_params: dict
2170
  @param os_params: the OS parameters
2171
  @type debug: integer
2172
  @param debug: debug level (0 or 1, for OS Api 10)
2173
  @rtype: dict
2174
  @return: dict of environment variables
2175
  @raise errors.BlockDeviceError: if the block device
2176
      cannot be found
2177

2178
  """
2179
  result = {}
2180
  api_version = \
2181
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2182
  result["OS_API_VERSION"] = "%d" % api_version
2183
  result["OS_NAME"] = inst_os.name
2184
  result["DEBUG_LEVEL"] = "%d" % debug
2185

    
2186
  # OS variants
2187
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2188
    variant = objects.OS.GetVariant(os_name)
2189
    if not variant:
2190
      variant = inst_os.supported_variants[0]
2191
  else:
2192
    variant = ""
2193
  result["OS_VARIANT"] = variant
2194

    
2195
  # OS params
2196
  for pname, pvalue in os_params.items():
2197
    result["OSP_%s" % pname.upper()] = pvalue
2198

    
2199
  return result
2200

    
2201

    
2202
def OSEnvironment(instance, inst_os, debug=0):
2203
  """Calculate the environment for an os script.
2204

2205
  @type instance: L{objects.Instance}
2206
  @param instance: target instance for the os script run
2207
  @type inst_os: L{objects.OS}
2208
  @param inst_os: operating system for which the environment is being built
2209
  @type debug: integer
2210
  @param debug: debug level (0 or 1, for OS Api 10)
2211
  @rtype: dict
2212
  @return: dict of environment variables
2213
  @raise errors.BlockDeviceError: if the block device
2214
      cannot be found
2215

2216
  """
2217
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2218

    
2219
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2220
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2221

    
2222
  result["HYPERVISOR"] = instance.hypervisor
2223
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2224
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2225
  result["INSTANCE_SECONDARY_NODES"] = \
2226
      ("%s" % " ".join(instance.secondary_nodes))
2227

    
2228
  # Disks
2229
  for idx, disk in enumerate(instance.disks):
2230
    real_disk = _OpenRealBD(disk)
2231
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2232
    result["DISK_%d_ACCESS" % idx] = disk.mode
2233
    if constants.HV_DISK_TYPE in instance.hvparams:
2234
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2235
        instance.hvparams[constants.HV_DISK_TYPE]
2236
    if disk.dev_type in constants.LDS_BLOCK:
2237
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2238
    elif disk.dev_type == constants.LD_FILE:
2239
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2240
        "file:%s" % disk.physical_id[0]
2241

    
2242
  # NICs
2243
  for idx, nic in enumerate(instance.nics):
2244
    result["NIC_%d_MAC" % idx] = nic.mac
2245
    if nic.ip:
2246
      result["NIC_%d_IP" % idx] = nic.ip
2247
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2248
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2249
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2250
    if nic.nicparams[constants.NIC_LINK]:
2251
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2252
    if constants.HV_NIC_TYPE in instance.hvparams:
2253
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2254
        instance.hvparams[constants.HV_NIC_TYPE]
2255

    
2256
  # HV/BE params
2257
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2258
    for key, value in source.items():
2259
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2260

    
2261
  return result
2262

    
2263

    
2264
def BlockdevGrow(disk, amount, dryrun):
2265
  """Grow a stack of block devices.
2266

2267
  This function is called recursively, with the childrens being the
2268
  first ones to resize.
2269

2270
  @type disk: L{objects.Disk}
2271
  @param disk: the disk to be grown
2272
  @type amount: integer
2273
  @param amount: the amount (in mebibytes) to grow with
2274
  @type dryrun: boolean
2275
  @param dryrun: whether to execute the operation in simulation mode
2276
      only, without actually increasing the size
2277
  @rtype: (status, result)
2278
  @return: a tuple with the status of the operation (True/False), and
2279
      the errors message if status is False
2280

2281
  """
2282
  r_dev = _RecursiveFindBD(disk)
2283
  if r_dev is None:
2284
    _Fail("Cannot find block device %s", disk)
2285

    
2286
  try:
2287
    r_dev.Grow(amount, dryrun)
2288
  except errors.BlockDeviceError, err:
2289
    _Fail("Failed to grow block device: %s", err, exc=True)
2290

    
2291

    
2292
def BlockdevSnapshot(disk):
2293
  """Create a snapshot copy of a block device.
2294

2295
  This function is called recursively, and the snapshot is actually created
2296
  just for the leaf lvm backend device.
2297

2298
  @type disk: L{objects.Disk}
2299
  @param disk: the disk to be snapshotted
2300
  @rtype: string
2301
  @return: snapshot disk ID as (vg, lv)
2302

2303
  """
2304
  if disk.dev_type == constants.LD_DRBD8:
2305
    if not disk.children:
2306
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2307
            disk.unique_id)
2308
    return BlockdevSnapshot(disk.children[0])
2309
  elif disk.dev_type == constants.LD_LV:
2310
    r_dev = _RecursiveFindBD(disk)
2311
    if r_dev is not None:
2312
      # FIXME: choose a saner value for the snapshot size
2313
      # let's stay on the safe side and ask for the full size, for now
2314
      return r_dev.Snapshot(disk.size)
2315
    else:
2316
      _Fail("Cannot find block device %s", disk)
2317
  else:
2318
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2319
          disk.unique_id, disk.dev_type)
2320

    
2321

    
2322
def FinalizeExport(instance, snap_disks):
2323
  """Write out the export configuration information.
2324

2325
  @type instance: L{objects.Instance}
2326
  @param instance: the instance which we export, used for
2327
      saving configuration
2328
  @type snap_disks: list of L{objects.Disk}
2329
  @param snap_disks: list of snapshot block devices, which
2330
      will be used to get the actual name of the dump file
2331

2332
  @rtype: None
2333

2334
  """
2335
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2336
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2337

    
2338
  config = objects.SerializableConfigParser()
2339

    
2340
  config.add_section(constants.INISECT_EXP)
2341
  config.set(constants.INISECT_EXP, "version", "0")
2342
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2343
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2344
  config.set(constants.INISECT_EXP, "os", instance.os)
2345
  config.set(constants.INISECT_EXP, "compression", "none")
2346

    
2347
  config.add_section(constants.INISECT_INS)
2348
  config.set(constants.INISECT_INS, "name", instance.name)
2349
  config.set(constants.INISECT_INS, "memory", "%d" %
2350
             instance.beparams[constants.BE_MEMORY])
2351
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2352
             instance.beparams[constants.BE_VCPUS])
2353
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2354
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2355
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2356

    
2357
  nic_total = 0
2358
  for nic_count, nic in enumerate(instance.nics):
2359
    nic_total += 1
2360
    config.set(constants.INISECT_INS, "nic%d_mac" %
2361
               nic_count, "%s" % nic.mac)
2362
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2363
    for param in constants.NICS_PARAMETER_TYPES:
2364
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2365
                 "%s" % nic.nicparams.get(param, None))
2366
  # TODO: redundant: on load can read nics until it doesn't exist
2367
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2368

    
2369
  disk_total = 0
2370
  for disk_count, disk in enumerate(snap_disks):
2371
    if disk:
2372
      disk_total += 1
2373
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2374
                 ("%s" % disk.iv_name))
2375
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2376
                 ("%s" % disk.physical_id[1]))
2377
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2378
                 ("%d" % disk.size))
2379

    
2380
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2381

    
2382
  # New-style hypervisor/backend parameters
2383

    
2384
  config.add_section(constants.INISECT_HYP)
2385
  for name, value in instance.hvparams.items():
2386
    if name not in constants.HVC_GLOBALS:
2387
      config.set(constants.INISECT_HYP, name, str(value))
2388

    
2389
  config.add_section(constants.INISECT_BEP)
2390
  for name, value in instance.beparams.items():
2391
    config.set(constants.INISECT_BEP, name, str(value))
2392

    
2393
  config.add_section(constants.INISECT_OSP)
2394
  for name, value in instance.osparams.items():
2395
    config.set(constants.INISECT_OSP, name, str(value))
2396

    
2397
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2398
                  data=config.Dumps())
2399
  shutil.rmtree(finaldestdir, ignore_errors=True)
2400
  shutil.move(destdir, finaldestdir)
2401

    
2402

    
2403
def ExportInfo(dest):
2404
  """Get export configuration information.
2405

2406
  @type dest: str
2407
  @param dest: directory containing the export
2408

2409
  @rtype: L{objects.SerializableConfigParser}
2410
  @return: a serializable config file containing the
2411
      export info
2412

2413
  """
2414
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2415

    
2416
  config = objects.SerializableConfigParser()
2417
  config.read(cff)
2418

    
2419
  if (not config.has_section(constants.INISECT_EXP) or
2420
      not config.has_section(constants.INISECT_INS)):
2421
    _Fail("Export info file doesn't have the required fields")
2422

    
2423
  return config.Dumps()
2424

    
2425

    
2426
def ListExports():
2427
  """Return a list of exports currently available on this machine.
2428

2429
  @rtype: list
2430
  @return: list of the exports
2431

2432
  """
2433
  if os.path.isdir(constants.EXPORT_DIR):
2434
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2435
  else:
2436
    _Fail("No exports directory")
2437

    
2438

    
2439
def RemoveExport(export):
2440
  """Remove an existing export from the node.
2441

2442
  @type export: str
2443
  @param export: the name of the export to remove
2444
  @rtype: None
2445

2446
  """
2447
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2448

    
2449
  try:
2450
    shutil.rmtree(target)
2451
  except EnvironmentError, err:
2452
    _Fail("Error while removing the export: %s", err, exc=True)
2453

    
2454

    
2455
def BlockdevRename(devlist):
2456
  """Rename a list of block devices.
2457

2458
  @type devlist: list of tuples
2459
  @param devlist: list of tuples of the form  (disk,
2460
      new_logical_id, new_physical_id); disk is an
2461
      L{objects.Disk} object describing the current disk,
2462
      and new logical_id/physical_id is the name we
2463
      rename it to
2464
  @rtype: boolean
2465
  @return: True if all renames succeeded, False otherwise
2466

2467
  """
2468
  msgs = []
2469
  result = True
2470
  for disk, unique_id in devlist:
2471
    dev = _RecursiveFindBD(disk)
2472
    if dev is None:
2473
      msgs.append("Can't find device %s in rename" % str(disk))
2474
      result = False
2475
      continue
2476
    try:
2477
      old_rpath = dev.dev_path
2478
      dev.Rename(unique_id)
2479
      new_rpath = dev.dev_path
2480
      if old_rpath != new_rpath:
2481
        DevCacheManager.RemoveCache(old_rpath)
2482
        # FIXME: we should add the new cache information here, like:
2483
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2484
        # but we don't have the owner here - maybe parse from existing
2485
        # cache? for now, we only lose lvm data when we rename, which
2486
        # is less critical than DRBD or MD
2487
    except errors.BlockDeviceError, err:
2488
      msgs.append("Can't rename device '%s' to '%s': %s" %
2489
                  (dev, unique_id, err))
2490
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2491
      result = False
2492
  if not result:
2493
    _Fail("; ".join(msgs))
2494

    
2495

    
2496
def _TransformFileStorageDir(fs_dir):
2497
  """Checks whether given file_storage_dir is valid.
2498

2499
  Checks wheter the given fs_dir is within the cluster-wide default
2500
  file_storage_dir or the shared_file_storage_dir, which are stored in
2501
  SimpleStore. Only paths under those directories are allowed.
2502

2503
  @type fs_dir: str
2504
  @param fs_dir: the path to check
2505

2506
  @return: the normalized path if valid, None otherwise
2507

2508
  """
2509
  if not constants.ENABLE_FILE_STORAGE:
2510
    _Fail("File storage disabled at configure time")
2511
  cfg = _GetConfig()
2512
  fs_dir = os.path.normpath(fs_dir)
2513
  base_fstore = cfg.GetFileStorageDir()
2514
  base_shared = cfg.GetSharedFileStorageDir()
2515
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2516
          utils.IsBelowDir(base_shared, fs_dir)):
2517
    _Fail("File storage directory '%s' is not under base file"
2518
          " storage directory '%s' or shared storage directory '%s'",
2519
          fs_dir, base_fstore, base_shared)
2520
  return fs_dir
2521

    
2522

    
2523
def CreateFileStorageDir(file_storage_dir):
2524
  """Create file storage directory.
2525

2526
  @type file_storage_dir: str
2527
  @param file_storage_dir: directory to create
2528

2529
  @rtype: tuple
2530
  @return: tuple with first element a boolean indicating wheter dir
2531
      creation was successful or not
2532

2533
  """
2534
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2535
  if os.path.exists(file_storage_dir):
2536
    if not os.path.isdir(file_storage_dir):
2537
      _Fail("Specified storage dir '%s' is not a directory",
2538
            file_storage_dir)
2539
  else:
2540
    try:
2541
      os.makedirs(file_storage_dir, 0750)
2542
    except OSError, err:
2543
      _Fail("Cannot create file storage directory '%s': %s",
2544
            file_storage_dir, err, exc=True)
2545

    
2546

    
2547
def RemoveFileStorageDir(file_storage_dir):
2548
  """Remove file storage directory.
2549

2550
  Remove it only if it's empty. If not log an error and return.
2551

2552
  @type file_storage_dir: str
2553
  @param file_storage_dir: the directory we should cleanup
2554
  @rtype: tuple (success,)
2555
  @return: tuple of one element, C{success}, denoting
2556
      whether the operation was successful
2557

2558
  """
2559
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2560
  if os.path.exists(file_storage_dir):
2561
    if not os.path.isdir(file_storage_dir):
2562
      _Fail("Specified Storage directory '%s' is not a directory",
2563
            file_storage_dir)
2564
    # deletes dir only if empty, otherwise we want to fail the rpc call
2565
    try:
2566
      os.rmdir(file_storage_dir)
2567
    except OSError, err:
2568
      _Fail("Cannot remove file storage directory '%s': %s",
2569
            file_storage_dir, err)
2570

    
2571

    
2572
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2573
  """Rename the file storage directory.
2574

2575
  @type old_file_storage_dir: str
2576
  @param old_file_storage_dir: the current path
2577
  @type new_file_storage_dir: str
2578
  @param new_file_storage_dir: the name we should rename to
2579
  @rtype: tuple (success,)
2580
  @return: tuple of one element, C{success}, denoting
2581
      whether the operation was successful
2582

2583
  """
2584
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2585
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2586
  if not os.path.exists(new_file_storage_dir):
2587
    if os.path.isdir(old_file_storage_dir):
2588
      try:
2589
        os.rename(old_file_storage_dir, new_file_storage_dir)
2590
      except OSError, err:
2591
        _Fail("Cannot rename '%s' to '%s': %s",
2592
              old_file_storage_dir, new_file_storage_dir, err)
2593
    else:
2594
      _Fail("Specified storage dir '%s' is not a directory",
2595
            old_file_storage_dir)
2596
  else:
2597
    if os.path.exists(old_file_storage_dir):
2598
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2599
            old_file_storage_dir, new_file_storage_dir)
2600

    
2601

    
2602
def _EnsureJobQueueFile(file_name):
2603
  """Checks whether the given filename is in the queue directory.
2604

2605
  @type file_name: str
2606
  @param file_name: the file name we should check
2607
  @rtype: None
2608
  @raises RPCFail: if the file is not valid
2609

2610
  """
2611
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2612
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2613

    
2614
  if not result:
2615
    _Fail("Passed job queue file '%s' does not belong to"
2616
          " the queue directory '%s'", file_name, queue_dir)
2617

    
2618

    
2619
def JobQueueUpdate(file_name, content):
2620
  """Updates a file in the queue directory.
2621

2622
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2623
  checking.
2624

2625
  @type file_name: str
2626
  @param file_name: the job file name
2627
  @type content: str
2628
  @param content: the new job contents
2629
  @rtype: boolean
2630
  @return: the success of the operation
2631

2632
  """
2633
  _EnsureJobQueueFile(file_name)
2634
  getents = runtime.GetEnts()
2635

    
2636
  # Write and replace the file atomically
2637
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2638
                  gid=getents.masterd_gid)
2639

    
2640

    
2641
def JobQueueRename(old, new):
2642
  """Renames a job queue file.
2643

2644
  This is just a wrapper over os.rename with proper checking.
2645

2646
  @type old: str
2647
  @param old: the old (actual) file name
2648
  @type new: str
2649
  @param new: the desired file name
2650
  @rtype: tuple
2651
  @return: the success of the operation and payload
2652

2653
  """
2654
  _EnsureJobQueueFile(old)
2655
  _EnsureJobQueueFile(new)
2656

    
2657
  utils.RenameFile(old, new, mkdir=True)
2658

    
2659

    
2660
def BlockdevClose(instance_name, disks):
2661
  """Closes the given block devices.
2662

2663
  This means they will be switched to secondary mode (in case of
2664
  DRBD).
2665

2666
  @param instance_name: if the argument is not empty, the symlinks
2667
      of this instance will be removed
2668
  @type disks: list of L{objects.Disk}
2669
  @param disks: the list of disks to be closed
2670
  @rtype: tuple (success, message)
2671
  @return: a tuple of success and message, where success
2672
      indicates the succes of the operation, and message
2673
      which will contain the error details in case we
2674
      failed
2675

2676
  """
2677
  bdevs = []
2678
  for cf in disks:
2679
    rd = _RecursiveFindBD(cf)
2680
    if rd is None:
2681
      _Fail("Can't find device %s", cf)
2682
    bdevs.append(rd)
2683

    
2684
  msg = []
2685
  for rd in bdevs:
2686
    try:
2687
      rd.Close()
2688
    except errors.BlockDeviceError, err:
2689
      msg.append(str(err))
2690
  if msg:
2691
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2692
  else:
2693
    if instance_name:
2694
      _RemoveBlockDevLinks(instance_name, disks)
2695

    
2696

    
2697
def ValidateHVParams(hvname, hvparams):
2698
  """Validates the given hypervisor parameters.
2699

2700
  @type hvname: string
2701
  @param hvname: the hypervisor name
2702
  @type hvparams: dict
2703
  @param hvparams: the hypervisor parameters to be validated
2704
  @rtype: None
2705

2706
  """
2707
  try:
2708
    hv_type = hypervisor.GetHypervisor(hvname)
2709
    hv_type.ValidateParameters(hvparams)
2710
  except errors.HypervisorError, err:
2711
    _Fail(str(err), log=False)
2712

    
2713

    
2714
def _CheckOSPList(os_obj, parameters):
2715
  """Check whether a list of parameters is supported by the OS.
2716

2717
  @type os_obj: L{objects.OS}
2718
  @param os_obj: OS object to check
2719
  @type parameters: list
2720
  @param parameters: the list of parameters to check
2721

2722
  """
2723
  supported = [v[0] for v in os_obj.supported_parameters]
2724
  delta = frozenset(parameters).difference(supported)
2725
  if delta:
2726
    _Fail("The following parameters are not supported"
2727
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2728

    
2729

    
2730
def ValidateOS(required, osname, checks, osparams):
2731
  """Validate the given OS' parameters.
2732

2733
  @type required: boolean
2734
  @param required: whether absence of the OS should translate into
2735
      failure or not
2736
  @type osname: string
2737
  @param osname: the OS to be validated
2738
  @type checks: list
2739
  @param checks: list of the checks to run (currently only 'parameters')
2740
  @type osparams: dict
2741
  @param osparams: dictionary with OS parameters
2742
  @rtype: boolean
2743
  @return: True if the validation passed, or False if the OS was not
2744
      found and L{required} was false
2745

2746
  """
2747
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2748
    _Fail("Unknown checks required for OS %s: %s", osname,
2749
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2750

    
2751
  name_only = objects.OS.GetName(osname)
2752
  status, tbv = _TryOSFromDisk(name_only, None)
2753

    
2754
  if not status:
2755
    if required:
2756
      _Fail(tbv)
2757
    else:
2758
      return False
2759

    
2760
  if max(tbv.api_versions) < constants.OS_API_V20:
2761
    return True
2762

    
2763
  if constants.OS_VALIDATE_PARAMETERS in checks:
2764
    _CheckOSPList(tbv, osparams.keys())
2765

    
2766
  validate_env = OSCoreEnv(osname, tbv, osparams)
2767
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2768
                        cwd=tbv.path, reset_env=True)
2769
  if result.failed:
2770
    logging.error("os validate command '%s' returned error: %s output: %s",
2771
                  result.cmd, result.fail_reason, result.output)
2772
    _Fail("OS validation script failed (%s), output: %s",
2773
          result.fail_reason, result.output, log=False)
2774

    
2775
  return True
2776

    
2777

    
2778
def DemoteFromMC():
2779
  """Demotes the current node from master candidate role.
2780

2781
  """
2782
  # try to ensure we're not the master by mistake
2783
  master, myself = ssconf.GetMasterAndMyself()
2784
  if master == myself:
2785
    _Fail("ssconf status shows I'm the master node, will not demote")
2786

    
2787
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2788
  if not result.failed:
2789
    _Fail("The master daemon is running, will not demote")
2790

    
2791
  try:
2792
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2793
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2794
  except EnvironmentError, err:
2795
    if err.errno != errno.ENOENT:
2796
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2797

    
2798
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2799

    
2800

    
2801
def _GetX509Filenames(cryptodir, name):
2802
  """Returns the full paths for the private key and certificate.
2803

2804
  """
2805
  return (utils.PathJoin(cryptodir, name),
2806
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2807
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2808

    
2809

    
2810
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2811
  """Creates a new X509 certificate for SSL/TLS.
2812

2813
  @type validity: int
2814
  @param validity: Validity in seconds
2815
  @rtype: tuple; (string, string)
2816
  @return: Certificate name and public part
2817

2818
  """
2819
  (key_pem, cert_pem) = \
2820
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2821
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2822

    
2823
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2824
                              prefix="x509-%s-" % utils.TimestampForFilename())
2825
  try:
2826
    name = os.path.basename(cert_dir)
2827
    assert len(name) > 5
2828

    
2829
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2830

    
2831
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2832
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2833

    
2834
    # Never return private key as it shouldn't leave the node
2835
    return (name, cert_pem)
2836
  except Exception:
2837
    shutil.rmtree(cert_dir, ignore_errors=True)
2838
    raise
2839

    
2840

    
2841
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2842
  """Removes a X509 certificate.
2843

2844
  @type name: string
2845
  @param name: Certificate name
2846

2847
  """
2848
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2849

    
2850
  utils.RemoveFile(key_file)
2851
  utils.RemoveFile(cert_file)
2852

    
2853
  try:
2854
    os.rmdir(cert_dir)
2855
  except EnvironmentError, err:
2856
    _Fail("Cannot remove certificate directory '%s': %s",
2857
          cert_dir, err)
2858

    
2859

    
2860
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2861
  """Returns the command for the requested input/output.
2862

2863
  @type instance: L{objects.Instance}
2864
  @param instance: The instance object
2865
  @param mode: Import/export mode
2866
  @param ieio: Input/output type
2867
  @param ieargs: Input/output arguments
2868

2869
  """
2870
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2871

    
2872
  env = None
2873
  prefix = None
2874
  suffix = None
2875
  exp_size = None
2876

    
2877
  if ieio == constants.IEIO_FILE:
2878
    (filename, ) = ieargs
2879

    
2880
    if not utils.IsNormAbsPath(filename):
2881
      _Fail("Path '%s' is not normalized or absolute", filename)
2882

    
2883
    real_filename = os.path.realpath(filename)
2884
    directory = os.path.dirname(real_filename)
2885

    
2886
    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2887
      _Fail("File '%s' is not under exports directory '%s': %s",
2888
            filename, constants.EXPORT_DIR, real_filename)
2889

    
2890
    # Create directory
2891
    utils.Makedirs(directory, mode=0750)
2892

    
2893
    quoted_filename = utils.ShellQuote(filename)
2894

    
2895
    if mode == constants.IEM_IMPORT:
2896
      suffix = "> %s" % quoted_filename
2897
    elif mode == constants.IEM_EXPORT:
2898
      suffix = "< %s" % quoted_filename
2899

    
2900
      # Retrieve file size
2901
      try:
2902
        st = os.stat(filename)
2903
      except EnvironmentError, err:
2904
        logging.error("Can't stat(2) %s: %s", filename, err)
2905
      else:
2906
        exp_size = utils.BytesToMebibyte(st.st_size)
2907

    
2908
  elif ieio == constants.IEIO_RAW_DISK:
2909
    (disk, ) = ieargs
2910

    
2911
    real_disk = _OpenRealBD(disk)
2912

    
2913
    if mode == constants.IEM_IMPORT:
2914
      # we set here a smaller block size as, due to transport buffering, more
2915
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2916
      # is not already there or we pass a wrong path; we use notrunc to no
2917
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2918
      # much memory; this means that at best, we flush every 64k, which will
2919
      # not be very fast
2920
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2921
                                    " bs=%s oflag=dsync"),
2922
                                    real_disk.dev_path,
2923
                                    str(64 * 1024))
2924

    
2925
    elif mode == constants.IEM_EXPORT:
2926
      # the block size on the read dd is 1MiB to match our units
2927
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2928
                                   real_disk.dev_path,
2929
                                   str(1024 * 1024), # 1 MB
2930
                                   str(disk.size))
2931
      exp_size = disk.size
2932

    
2933
  elif ieio == constants.IEIO_SCRIPT:
2934
    (disk, disk_index, ) = ieargs
2935

    
2936
    assert isinstance(disk_index, (int, long))
2937

    
2938
    real_disk = _OpenRealBD(disk)
2939

    
2940
    inst_os = OSFromDisk(instance.os)
2941
    env = OSEnvironment(instance, inst_os)
2942

    
2943
    if mode == constants.IEM_IMPORT:
2944
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2945
      env["IMPORT_INDEX"] = str(disk_index)
2946
      script = inst_os.import_script
2947

    
2948
    elif mode == constants.IEM_EXPORT:
2949
      env["EXPORT_DEVICE"] = real_disk.dev_path
2950
      env["EXPORT_INDEX"] = str(disk_index)
2951
      script = inst_os.export_script
2952

    
2953
    # TODO: Pass special environment only to script
2954
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2955

    
2956
    if mode == constants.IEM_IMPORT:
2957
      suffix = "| %s" % script_cmd
2958

    
2959
    elif mode == constants.IEM_EXPORT:
2960
      prefix = "%s |" % script_cmd
2961

    
2962
    # Let script predict size
2963
    exp_size = constants.IE_CUSTOM_SIZE
2964

    
2965
  else:
2966
    _Fail("Invalid %s I/O mode %r", mode, ieio)
2967

    
2968
  return (env, prefix, suffix, exp_size)
2969

    
2970

    
2971
def _CreateImportExportStatusDir(prefix):
2972
  """Creates status directory for import/export.
2973

2974
  """
2975
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2976
                          prefix=("%s-%s-" %
2977
                                  (prefix, utils.TimestampForFilename())))
2978

    
2979

    
2980
def StartImportExportDaemon(mode, opts, host, port, instance, component,
2981
                            ieio, ieioargs):
2982
  """Starts an import or export daemon.
2983

2984
  @param mode: Import/output mode
2985
  @type opts: L{objects.ImportExportOptions}
2986
  @param opts: Daemon options
2987
  @type host: string
2988
  @param host: Remote host for export (None for import)
2989
  @type port: int
2990
  @param port: Remote port for export (None for import)
2991
  @type instance: L{objects.Instance}
2992
  @param instance: Instance object
2993
  @type component: string
2994
  @param component: which part of the instance is transferred now,
2995
      e.g. 'disk/0'
2996
  @param ieio: Input/output type
2997
  @param ieioargs: Input/output arguments
2998

2999
  """
3000
  if mode == constants.IEM_IMPORT:
3001
    prefix = "import"
3002

    
3003
    if not (host is None and port is None):
3004
      _Fail("Can not specify host or port on import")
3005

    
3006
  elif mode == constants.IEM_EXPORT:
3007
    prefix = "export"
3008

    
3009
    if host is None or port is None:
3010
      _Fail("Host and port must be specified for an export")
3011

    
3012
  else:
3013
    _Fail("Invalid mode %r", mode)
3014

    
3015
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3016
    _Fail("Cluster certificate can only be used for both key and CA")
3017

    
3018
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3019
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3020

    
3021
  if opts.key_name is None:
3022
    # Use server.pem
3023
    key_path = constants.NODED_CERT_FILE
3024
    cert_path = constants.NODED_CERT_FILE
3025
    assert opts.ca_pem is None
3026
  else:
3027
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3028
                                                 opts.key_name)
3029
    assert opts.ca_pem is not None
3030

    
3031
  for i in [key_path, cert_path]:
3032
    if not os.path.exists(i):
3033
      _Fail("File '%s' does not exist" % i)
3034

    
3035
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3036
  try:
3037
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3038
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3039
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3040

    
3041
    if opts.ca_pem is None:
3042
      # Use server.pem
3043
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
3044
    else:
3045
      ca = opts.ca_pem
3046

    
3047
    # Write CA file
3048
    utils.WriteFile(ca_file, data=ca, mode=0400)
3049

    
3050
    cmd = [
3051
      constants.IMPORT_EXPORT_DAEMON,
3052
      status_file, mode,
3053
      "--key=%s" % key_path,
3054
      "--cert=%s" % cert_path,
3055
      "--ca=%s" % ca_file,
3056
      ]
3057

    
3058
    if host:
3059
      cmd.append("--host=%s" % host)
3060

    
3061
    if port:
3062
      cmd.append("--port=%s" % port)
3063

    
3064
    if opts.ipv6:
3065
      cmd.append("--ipv6")
3066
    else:
3067
      cmd.append("--ipv4")
3068

    
3069
    if opts.compress:
3070
      cmd.append("--compress=%s" % opts.compress)
3071

    
3072
    if opts.magic:
3073
      cmd.append("--magic=%s" % opts.magic)
3074

    
3075
    if exp_size is not None:
3076
      cmd.append("--expected-size=%s" % exp_size)
3077

    
3078
    if cmd_prefix:
3079
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3080

    
3081
    if cmd_suffix:
3082
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3083

    
3084
    if mode == constants.IEM_EXPORT:
3085
      # Retry connection a few times when connecting to remote peer
3086
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3087
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3088
    elif opts.connect_timeout is not None:
3089
      assert mode == constants.IEM_IMPORT
3090
      # Overall timeout for establishing connection while listening
3091
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3092

    
3093
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3094

    
3095
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3096
    # support for receiving a file descriptor for output
3097
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3098
                      output=logfile)
3099

    
3100
    # The import/export name is simply the status directory name
3101
    return os.path.basename(status_dir)
3102

    
3103
  except Exception:
3104
    shutil.rmtree(status_dir, ignore_errors=True)
3105
    raise
3106

    
3107

    
3108
def GetImportExportStatus(names):
3109
  """Returns import/export daemon status.
3110

3111
  @type names: sequence
3112
  @param names: List of names
3113
  @rtype: List of dicts
3114
  @return: Returns a list of the state of each named import/export or None if a
3115
           status couldn't be read
3116

3117
  """
3118
  result = []
3119

    
3120
  for name in names:
3121
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3122
                                 _IES_STATUS_FILE)
3123

    
3124
    try:
3125
      data = utils.ReadFile(status_file)
3126
    except EnvironmentError, err:
3127
      if err.errno != errno.ENOENT:
3128
        raise
3129
      data = None
3130

    
3131
    if not data:
3132
      result.append(None)
3133
      continue
3134

    
3135
    result.append(serializer.LoadJson(data))
3136

    
3137
  return result
3138

    
3139

    
3140
def AbortImportExport(name):
3141
  """Sends SIGTERM to a running import/export daemon.
3142

3143
  """
3144
  logging.info("Abort import/export %s", name)
3145

    
3146
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3147
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3148

    
3149
  if pid:
3150
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3151
                 name, pid)
3152
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3153

    
3154

    
3155
def CleanupImportExport(name):
3156
  """Cleanup after an import or export.
3157

3158
  If the import/export daemon is still running it's killed. Afterwards the
3159
  whole status directory is removed.
3160

3161
  """
3162
  logging.info("Finalizing import/export %s", name)
3163

    
3164
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3165

    
3166
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3167

    
3168
  if pid:
3169
    logging.info("Import/export %s is still running with PID %s",
3170
                 name, pid)
3171
    utils.KillProcess(pid, waitpid=False)
3172

    
3173
  shutil.rmtree(status_dir, ignore_errors=True)
3174

    
3175

    
3176
def _FindDisks(nodes_ip, disks):
3177
  """Sets the physical ID on disks and returns the block devices.
3178

3179
  """
3180
  # set the correct physical ID
3181
  my_name = netutils.Hostname.GetSysName()
3182
  for cf in disks:
3183
    cf.SetPhysicalID(my_name, nodes_ip)
3184

    
3185
  bdevs = []
3186

    
3187
  for cf in disks:
3188
    rd = _RecursiveFindBD(cf)
3189
    if rd is None:
3190
      _Fail("Can't find device %s", cf)
3191
    bdevs.append(rd)
3192
  return bdevs
3193

    
3194

    
3195
def DrbdDisconnectNet(nodes_ip, disks):
3196
  """Disconnects the network on a list of drbd devices.
3197

3198
  """
3199
  bdevs = _FindDisks(nodes_ip, disks)
3200

    
3201
  # disconnect disks
3202
  for rd in bdevs:
3203
    try:
3204
      rd.DisconnectNet()
3205
    except errors.BlockDeviceError, err:
3206
      _Fail("Can't change network configuration to standalone mode: %s",
3207
            err, exc=True)
3208

    
3209

    
3210
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3211
  """Attaches the network on a list of drbd devices.
3212

3213
  """
3214
  bdevs = _FindDisks(nodes_ip, disks)
3215

    
3216
  if multimaster:
3217
    for idx, rd in enumerate(bdevs):
3218
      try:
3219
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3220
      except EnvironmentError, err:
3221
        _Fail("Can't create symlink: %s", err)
3222
  # reconnect disks, switch to new master configuration and if
3223
  # needed primary mode
3224
  for rd in bdevs:
3225
    try:
3226
      rd.AttachNet(multimaster)
3227
    except errors.BlockDeviceError, err:
3228
      _Fail("Can't change network configuration: %s", err)
3229

    
3230
  # wait until the disks are connected; we need to retry the re-attach
3231
  # if the device becomes standalone, as this might happen if the one
3232
  # node disconnects and reconnects in a different mode before the
3233
  # other node reconnects; in this case, one or both of the nodes will
3234
  # decide it has wrong configuration and switch to standalone
3235

    
3236
  def _Attach():
3237
    all_connected = True
3238

    
3239
    for rd in bdevs:
3240
      stats = rd.GetProcStatus()
3241

    
3242
      all_connected = (all_connected and
3243
                       (stats.is_connected or stats.is_in_resync))
3244

    
3245
      if stats.is_standalone:
3246
        # peer had different config info and this node became
3247
        # standalone, even though this should not happen with the
3248
        # new staged way of changing disk configs
3249
        try:
3250
          rd.AttachNet(multimaster)
3251
        except errors.BlockDeviceError, err:
3252
          _Fail("Can't change network configuration: %s", err)
3253

    
3254
    if not all_connected:
3255
      raise utils.RetryAgain()
3256

    
3257
  try:
3258
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3259
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3260
  except utils.RetryTimeout:
3261
    _Fail("Timeout in disk reconnecting")
3262

    
3263
  if multimaster:
3264
    # change to primary mode
3265
    for rd in bdevs:
3266
      try:
3267
        rd.Open()
3268
      except errors.BlockDeviceError, err:
3269
        _Fail("Can't change to primary mode: %s", err)
3270

    
3271

    
3272
def DrbdWaitSync(nodes_ip, disks):
3273
  """Wait until DRBDs have synchronized.
3274

3275
  """
3276
  def _helper(rd):
3277
    stats = rd.GetProcStatus()
3278
    if not (stats.is_connected or stats.is_in_resync):
3279
      raise utils.RetryAgain()
3280
    return stats
3281

    
3282
  bdevs = _FindDisks(nodes_ip, disks)
3283

    
3284
  min_resync = 100
3285
  alldone = True
3286
  for rd in bdevs:
3287
    try:
3288
      # poll each second for 15 seconds
3289
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3290
    except utils.RetryTimeout:
3291
      stats = rd.GetProcStatus()
3292
      # last check
3293
      if not (stats.is_connected or stats.is_in_resync):
3294
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3295
    alldone = alldone and (not stats.is_in_resync)
3296
    if stats.sync_percent is not None:
3297
      min_resync = min(min_resync, stats.sync_percent)
3298

    
3299
  return (alldone, min_resync)
3300

    
3301

    
3302
def GetDrbdUsermodeHelper():
3303
  """Returns DRBD usermode helper currently configured.
3304

3305
  """
3306
  try:
3307
    return bdev.BaseDRBD.GetUsermodeHelper()
3308
  except errors.BlockDeviceError, err:
3309
    _Fail(str(err))
3310

    
3311

    
3312
def PowercycleNode(hypervisor_type):
3313
  """Hard-powercycle the node.
3314

3315
  Because we need to return first, and schedule the powercycle in the
3316
  background, we won't be able to report failures nicely.
3317

3318
  """
3319
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3320
  try:
3321
    pid = os.fork()
3322
  except OSError:
3323
    # if we can't fork, we'll pretend that we're in the child process
3324
    pid = 0
3325
  if pid > 0:
3326
    return "Reboot scheduled in 5 seconds"
3327
  # ensure the child is running on ram
3328
  try:
3329
    utils.Mlockall()
3330
  except Exception: # pylint: disable=W0703
3331
    pass
3332
  time.sleep(5)
3333
  hyper.PowercycleNode()
3334

    
3335

    
3336
class HooksRunner(object):
3337
  """Hook runner.
3338

3339
  This class is instantiated on the node side (ganeti-noded) and not
3340
  on the master side.
3341

3342
  """
3343
  def __init__(self, hooks_base_dir=None):
3344
    """Constructor for hooks runner.
3345

3346
    @type hooks_base_dir: str or None
3347
    @param hooks_base_dir: if not None, this overrides the
3348
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3349

3350
    """
3351
    if hooks_base_dir is None:
3352
      hooks_base_dir = constants.HOOKS_BASE_DIR
3353
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3354
    # constant
3355
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3356

    
3357
  def RunHooks(self, hpath, phase, env):
3358
    """Run the scripts in the hooks directory.
3359

3360
    @type hpath: str
3361
    @param hpath: the path to the hooks directory which
3362
        holds the scripts
3363
    @type phase: str
3364
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3365
        L{constants.HOOKS_PHASE_POST}
3366
    @type env: dict
3367
    @param env: dictionary with the environment for the hook
3368
    @rtype: list
3369
    @return: list of 3-element tuples:
3370
      - script path
3371
      - script result, either L{constants.HKR_SUCCESS} or
3372
        L{constants.HKR_FAIL}
3373
      - output of the script
3374

3375
    @raise errors.ProgrammerError: for invalid input
3376
        parameters
3377

3378
    """
3379
    if phase == constants.HOOKS_PHASE_PRE:
3380
      suffix = "pre"
3381
    elif phase == constants.HOOKS_PHASE_POST:
3382
      suffix = "post"
3383
    else:
3384
      _Fail("Unknown hooks phase '%s'", phase)
3385

    
3386
    subdir = "%s-%s.d" % (hpath, suffix)
3387
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3388

    
3389
    results = []
3390

    
3391
    if not os.path.isdir(dir_name):
3392
      # for non-existing/non-dirs, we simply exit instead of logging a
3393
      # warning at every operation
3394
      return results
3395

    
3396
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3397

    
3398
    for (relname, relstatus, runresult)  in runparts_results:
3399
      if relstatus == constants.RUNPARTS_SKIP:
3400
        rrval = constants.HKR_SKIP
3401
        output = ""
3402
      elif relstatus == constants.RUNPARTS_ERR:
3403
        rrval = constants.HKR_FAIL
3404
        output = "Hook script execution error: %s" % runresult
3405
      elif relstatus == constants.RUNPARTS_RUN:
3406
        if runresult.failed:
3407
          rrval = constants.HKR_FAIL
3408
        else:
3409
          rrval = constants.HKR_SUCCESS
3410
        output = utils.SafeEncode(runresult.output.strip())
3411
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3412

    
3413
    return results
3414

    
3415

    
3416
class IAllocatorRunner(object):
3417
  """IAllocator runner.
3418

3419
  This class is instantiated on the node side (ganeti-noded) and not on
3420
  the master side.
3421

3422
  """
3423
  @staticmethod
3424
  def Run(name, idata):
3425
    """Run an iallocator script.
3426

3427
    @type name: str
3428
    @param name: the iallocator script name
3429
    @type idata: str
3430
    @param idata: the allocator input data
3431

3432
    @rtype: tuple
3433
    @return: two element tuple of:
3434
       - status
3435
       - either error message or stdout of allocator (for success)
3436

3437
    """
3438
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3439
                                  os.path.isfile)
3440
    if alloc_script is None:
3441
      _Fail("iallocator module '%s' not found in the search path", name)
3442

    
3443
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3444
    try:
3445
      os.write(fd, idata)
3446
      os.close(fd)
3447
      result = utils.RunCmd([alloc_script, fin_name])
3448
      if result.failed:
3449
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3450
              name, result.fail_reason, result.output)
3451
    finally:
3452
      os.unlink(fin_name)
3453

    
3454
    return result.stdout
3455

    
3456

    
3457
class DevCacheManager(object):
3458
  """Simple class for managing a cache of block device information.
3459

3460
  """
3461
  _DEV_PREFIX = "/dev/"
3462
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3463

    
3464
  @classmethod
3465
  def _ConvertPath(cls, dev_path):
3466
    """Converts a /dev/name path to the cache file name.
3467

3468
    This replaces slashes with underscores and strips the /dev
3469
    prefix. It then returns the full path to the cache file.
3470

3471
    @type dev_path: str
3472
    @param dev_path: the C{/dev/} path name
3473
    @rtype: str
3474
    @return: the converted path name
3475

3476
    """
3477
    if dev_path.startswith(cls._DEV_PREFIX):
3478
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3479
    dev_path = dev_path.replace("/", "_")
3480
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3481
    return fpath
3482

    
3483
  @classmethod
3484
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3485
    """Updates the cache information for a given device.
3486

3487
    @type dev_path: str
3488
    @param dev_path: the pathname of the device
3489
    @type owner: str
3490
    @param owner: the owner (instance name) of the device
3491
    @type on_primary: bool
3492
    @param on_primary: whether this is the primary
3493
        node nor not
3494
    @type iv_name: str
3495
    @param iv_name: the instance-visible name of the
3496
        device, as in objects.Disk.iv_name
3497

3498
    @rtype: None
3499

3500
    """
3501
    if dev_path is None:
3502
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3503
      return
3504
    fpath = cls._ConvertPath(dev_path)
3505
    if on_primary:
3506
      state = "primary"
3507
    else:
3508
      state = "secondary"
3509
    if iv_name is None:
3510
      iv_name = "not_visible"
3511
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3512
    try:
3513
      utils.WriteFile(fpath, data=fdata)
3514
    except EnvironmentError, err:
3515
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3516

    
3517
  @classmethod
3518
  def RemoveCache(cls, dev_path):
3519
    """Remove data for a dev_path.
3520

3521
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3522
    path name and logging.
3523

3524
    @type dev_path: str
3525
    @param dev_path: the pathname of the device
3526

3527
    @rtype: None
3528

3529
    """
3530
    if dev_path is None:
3531
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3532
      return
3533
    fpath = cls._ConvertPath(dev_path)
3534
    try:
3535
      utils.RemoveFile(fpath)
3536
    except EnvironmentError, err:
3537
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)