Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ fb460cf7

History | View | Annotate | Download (107.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63

    
64

    
65
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66
_ALLOWED_CLEAN_DIRS = frozenset([
67
  constants.DATA_DIR,
68
  constants.JOB_QUEUE_ARCHIVE_DIR,
69
  constants.QUEUE_DIR,
70
  constants.CRYPTO_KEYS_DIR,
71
  ])
72
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73
_X509_KEY_FILE = "key"
74
_X509_CERT_FILE = "cert"
75
_IES_STATUS_FILE = "status"
76
_IES_PID_FILE = "pid"
77
_IES_CA_FILE = "ca"
78

    
79
#: Valid LVS output line regex
80
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81

    
82

    
83
class RPCFail(Exception):
84
  """Class denoting RPC failure.
85

86
  Its argument is the error message.
87

88
  """
89

    
90

    
91
def _Fail(msg, *args, **kwargs):
92
  """Log an error and the raise an RPCFail exception.
93

94
  This exception is then handled specially in the ganeti daemon and
95
  turned into a 'failed' return type. As such, this function is a
96
  useful shortcut for logging the error and returning it to the master
97
  daemon.
98

99
  @type msg: string
100
  @param msg: the text of the exception
101
  @raise RPCFail
102

103
  """
104
  if args:
105
    msg = msg % args
106
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
107
    if "exc" in kwargs and kwargs["exc"]:
108
      logging.exception(msg)
109
    else:
110
      logging.error(msg)
111
  raise RPCFail(msg)
112

    
113

    
114
def _GetConfig():
115
  """Simple wrapper to return a SimpleStore.
116

117
  @rtype: L{ssconf.SimpleStore}
118
  @return: a SimpleStore instance
119

120
  """
121
  return ssconf.SimpleStore()
122

    
123

    
124
def _GetSshRunner(cluster_name):
125
  """Simple wrapper to return an SshRunner.
126

127
  @type cluster_name: str
128
  @param cluster_name: the cluster name, which is needed
129
      by the SshRunner constructor
130
  @rtype: L{ssh.SshRunner}
131
  @return: an SshRunner instance
132

133
  """
134
  return ssh.SshRunner(cluster_name)
135

    
136

    
137
def _Decompress(data):
138
  """Unpacks data compressed by the RPC client.
139

140
  @type data: list or tuple
141
  @param data: Data sent by RPC client
142
  @rtype: str
143
  @return: Decompressed data
144

145
  """
146
  assert isinstance(data, (list, tuple))
147
  assert len(data) == 2
148
  (encoding, content) = data
149
  if encoding == constants.RPC_ENCODING_NONE:
150
    return content
151
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152
    return zlib.decompress(base64.b64decode(content))
153
  else:
154
    raise AssertionError("Unknown data encoding")
155

    
156

    
157
def _CleanDirectory(path, exclude=None):
158
  """Removes all regular files in a directory.
159

160
  @type path: str
161
  @param path: the directory to clean
162
  @type exclude: list
163
  @param exclude: list of files to be excluded, defaults
164
      to the empty list
165

166
  """
167
  if path not in _ALLOWED_CLEAN_DIRS:
168
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169
          path)
170

    
171
  if not os.path.isdir(path):
172
    return
173
  if exclude is None:
174
    exclude = []
175
  else:
176
    # Normalize excluded paths
177
    exclude = [os.path.normpath(i) for i in exclude]
178

    
179
  for rel_name in utils.ListVisibleFiles(path):
180
    full_name = utils.PathJoin(path, rel_name)
181
    if full_name in exclude:
182
      continue
183
    if os.path.isfile(full_name) and not os.path.islink(full_name):
184
      utils.RemoveFile(full_name)
185

    
186

    
187
def _BuildUploadFileList():
188
  """Build the list of allowed upload files.
189

190
  This is abstracted so that it's built only once at module import time.
191

192
  """
193
  allowed_files = set([
194
    constants.CLUSTER_CONF_FILE,
195
    constants.ETC_HOSTS,
196
    constants.SSH_KNOWN_HOSTS_FILE,
197
    constants.VNC_PASSWORD_FILE,
198
    constants.RAPI_CERT_FILE,
199
    constants.SPICE_CERT_FILE,
200
    constants.SPICE_CACERT_FILE,
201
    constants.RAPI_USERS_FILE,
202
    constants.CONFD_HMAC_KEY,
203
    constants.CLUSTER_DOMAIN_SECRET_FILE,
204
    ])
205

    
206
  for hv_name in constants.HYPER_TYPES:
207
    hv_class = hypervisor.GetHypervisorClass(hv_name)
208
    allowed_files.update(hv_class.GetAncillaryFiles())
209

    
210
  return frozenset(allowed_files)
211

    
212

    
213
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
214

    
215

    
216
def JobQueuePurge():
217
  """Removes job queue files and archived jobs.
218

219
  @rtype: tuple
220
  @return: True, None
221

222
  """
223
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
225

    
226

    
227
def GetMasterInfo():
228
  """Returns master information.
229

230
  This is an utility function to compute master information, either
231
  for consumption here or from the node daemon.
232

233
  @rtype: tuple
234
  @return: master_netdev, master_ip, master_name, primary_ip_family
235
  @raise RPCFail: in case of errors
236

237
  """
238
  try:
239
    cfg = _GetConfig()
240
    master_netdev = cfg.GetMasterNetdev()
241
    master_ip = cfg.GetMasterIP()
242
    master_node = cfg.GetMasterNode()
243
    primary_ip_family = cfg.GetPrimaryIPFamily()
244
  except errors.ConfigurationError, err:
245
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
246
  return (master_netdev, master_ip, master_node, primary_ip_family)
247

    
248

    
249
def ActivateMasterIp():
250
  """Activate the IP address of the master daemon.
251

252
  """
253
  # GetMasterInfo will raise an exception if not able to return data
254
  master_netdev, master_ip, _, family = GetMasterInfo()
255

    
256
  err_msg = None
257
  if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
258
    if netutils.IPAddress.Own(master_ip):
259
      # we already have the ip:
260
      logging.debug("Master IP already configured, doing nothing")
261
    else:
262
      err_msg = "Someone else has the master ip, not activating"
263
      logging.error(err_msg)
264
  else:
265
    ipcls = netutils.IP4Address
266
    if family == netutils.IP6Address.family:
267
      ipcls = netutils.IP6Address
268

    
269
    result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
270
                           "%s/%d" % (master_ip, ipcls.iplen),
271
                           "dev", master_netdev, "label",
272
                           "%s:0" % master_netdev])
273
    if result.failed:
274
      err_msg = "Can't activate master IP: %s" % result.output
275
      logging.error(err_msg)
276

    
277
    # we ignore the exit code of the following cmds
278
    if ipcls == netutils.IP4Address:
279
      utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
280
                    master_ip, master_ip])
281
    elif ipcls == netutils.IP6Address:
282
      try:
283
        utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
284
      except errors.OpExecError:
285
        # TODO: Better error reporting
286
        logging.warning("Can't execute ndisc6, please install if missing")
287

    
288
  if err_msg:
289
    _Fail(err_msg)
290

    
291

    
292
def StartMasterDaemons(no_voting):
293
  """Activate local node as master node.
294

295
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
296

297
  @type no_voting: boolean
298
  @param no_voting: whether to start ganeti-masterd without a node vote
299
      but still non-interactively
300
  @rtype: None
301

302
  """
303

    
304
  if no_voting:
305
    masterd_args = "--no-voting --yes-do-it"
306
  else:
307
    masterd_args = ""
308

    
309
  env = {
310
    "EXTRA_MASTERD_ARGS": masterd_args,
311
    }
312

    
313
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
314
  if result.failed:
315
    msg = "Can't start Ganeti master: %s" % result.output
316
    logging.error(msg)
317
    _Fail(msg)
318

    
319

    
320
def DeactivateMasterIp():
321
  """Deactivate the master IP on this node.
322

323
  """
324
  # TODO: log and report back to the caller the error failures; we
325
  # need to decide in which case we fail the RPC for this
326

    
327
  # GetMasterInfo will raise an exception if not able to return data
328
  master_netdev, master_ip, _, family = GetMasterInfo()
329

    
330
  ipcls = netutils.IP4Address
331
  if family == netutils.IP6Address.family:
332
    ipcls = netutils.IP6Address
333

    
334
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
335
                         "%s/%d" % (master_ip, ipcls.iplen),
336
                         "dev", master_netdev])
337
  if result.failed:
338
    logging.error("Can't remove the master IP, error: %s", result.output)
339
    # but otherwise ignore the failure
340

    
341

    
342
def StopMasterDaemons():
343
  """Stop the master daemons on this node.
344

345
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
346

347
  @rtype: None
348

349
  """
350
  # TODO: log and report back to the caller the error failures; we
351
  # need to decide in which case we fail the RPC for this
352

    
353
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
354
  if result.failed:
355
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
356
                  " and error %s",
357
                  result.cmd, result.exit_code, result.output)
358

    
359

    
360
def EtcHostsModify(mode, host, ip):
361
  """Modify a host entry in /etc/hosts.
362

363
  @param mode: The mode to operate. Either add or remove entry
364
  @param host: The host to operate on
365
  @param ip: The ip associated with the entry
366

367
  """
368
  if mode == constants.ETC_HOSTS_ADD:
369
    if not ip:
370
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
371
              " present")
372
    utils.AddHostToEtcHosts(host, ip)
373
  elif mode == constants.ETC_HOSTS_REMOVE:
374
    if ip:
375
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
376
              " parameter is present")
377
    utils.RemoveHostFromEtcHosts(host)
378
  else:
379
    RPCFail("Mode not supported")
380

    
381

    
382
def LeaveCluster(modify_ssh_setup):
383
  """Cleans up and remove the current node.
384

385
  This function cleans up and prepares the current node to be removed
386
  from the cluster.
387

388
  If processing is successful, then it raises an
389
  L{errors.QuitGanetiException} which is used as a special case to
390
  shutdown the node daemon.
391

392
  @param modify_ssh_setup: boolean
393

394
  """
395
  _CleanDirectory(constants.DATA_DIR)
396
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
397
  JobQueuePurge()
398

    
399
  if modify_ssh_setup:
400
    try:
401
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
402

    
403
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
404

    
405
      utils.RemoveFile(priv_key)
406
      utils.RemoveFile(pub_key)
407
    except errors.OpExecError:
408
      logging.exception("Error while processing ssh files")
409

    
410
  try:
411
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
412
    utils.RemoveFile(constants.RAPI_CERT_FILE)
413
    utils.RemoveFile(constants.SPICE_CERT_FILE)
414
    utils.RemoveFile(constants.SPICE_CACERT_FILE)
415
    utils.RemoveFile(constants.NODED_CERT_FILE)
416
  except: # pylint: disable=W0702
417
    logging.exception("Error while removing cluster secrets")
418

    
419
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
420
  if result.failed:
421
    logging.error("Command %s failed with exitcode %s and error %s",
422
                  result.cmd, result.exit_code, result.output)
423

    
424
  # Raise a custom exception (handled in ganeti-noded)
425
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
426

    
427

    
428
def GetNodeInfo(vgname, hypervisor_type):
429
  """Gives back a hash with different information about the node.
430

431
  @type vgname: C{string}
432
  @param vgname: the name of the volume group to ask for disk space information
433
  @type hypervisor_type: C{str}
434
  @param hypervisor_type: the name of the hypervisor to ask for
435
      memory information
436
  @rtype: C{dict}
437
  @return: dictionary with the following keys:
438
      - vg_size is the size of the configured volume group in MiB
439
      - vg_free is the free size of the volume group in MiB
440
      - memory_dom0 is the memory allocated for domain0 in MiB
441
      - memory_free is the currently available (free) ram in MiB
442
      - memory_total is the total number of ram in MiB
443

444
  """
445
  outputarray = {}
446

    
447
  if vgname is not None:
448
    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
449
    vg_free = vg_size = None
450
    if vginfo:
451
      vg_free = int(round(vginfo[0][0], 0))
452
      vg_size = int(round(vginfo[0][1], 0))
453
    outputarray["vg_size"] = vg_size
454
    outputarray["vg_free"] = vg_free
455

    
456
  if hypervisor_type is not None:
457
    hyper = hypervisor.GetHypervisor(hypervisor_type)
458
    hyp_info = hyper.GetNodeInfo()
459
    if hyp_info is not None:
460
      outputarray.update(hyp_info)
461

    
462
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
463

    
464
  return outputarray
465

    
466

    
467
def VerifyNode(what, cluster_name):
468
  """Verify the status of the local node.
469

470
  Based on the input L{what} parameter, various checks are done on the
471
  local node.
472

473
  If the I{filelist} key is present, this list of
474
  files is checksummed and the file/checksum pairs are returned.
475

476
  If the I{nodelist} key is present, we check that we have
477
  connectivity via ssh with the target nodes (and check the hostname
478
  report).
479

480
  If the I{node-net-test} key is present, we check that we have
481
  connectivity to the given nodes via both primary IP and, if
482
  applicable, secondary IPs.
483

484
  @type what: C{dict}
485
  @param what: a dictionary of things to check:
486
      - filelist: list of files for which to compute checksums
487
      - nodelist: list of nodes we should check ssh communication with
488
      - node-net-test: list of nodes we should check node daemon port
489
        connectivity with
490
      - hypervisor: list with hypervisors to run the verify for
491
  @rtype: dict
492
  @return: a dictionary with the same keys as the input dict, and
493
      values representing the result of the checks
494

495
  """
496
  result = {}
497
  my_name = netutils.Hostname.GetSysName()
498
  port = netutils.GetDaemonPort(constants.NODED)
499
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
500

    
501
  if constants.NV_HYPERVISOR in what and vm_capable:
502
    result[constants.NV_HYPERVISOR] = tmp = {}
503
    for hv_name in what[constants.NV_HYPERVISOR]:
504
      try:
505
        val = hypervisor.GetHypervisor(hv_name).Verify()
506
      except errors.HypervisorError, err:
507
        val = "Error while checking hypervisor: %s" % str(err)
508
      tmp[hv_name] = val
509

    
510
  if constants.NV_HVPARAMS in what and vm_capable:
511
    result[constants.NV_HVPARAMS] = tmp = []
512
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
513
      try:
514
        logging.info("Validating hv %s, %s", hv_name, hvparms)
515
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
516
      except errors.HypervisorError, err:
517
        tmp.append((source, hv_name, str(err)))
518

    
519
  if constants.NV_FILELIST in what:
520
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
521
      what[constants.NV_FILELIST])
522

    
523
  if constants.NV_NODELIST in what:
524
    result[constants.NV_NODELIST] = tmp = {}
525
    random.shuffle(what[constants.NV_NODELIST])
526
    for node in what[constants.NV_NODELIST]:
527
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
528
      if not success:
529
        tmp[node] = message
530

    
531
  if constants.NV_NODENETTEST in what:
532
    result[constants.NV_NODENETTEST] = tmp = {}
533
    my_pip = my_sip = None
534
    for name, pip, sip in what[constants.NV_NODENETTEST]:
535
      if name == my_name:
536
        my_pip = pip
537
        my_sip = sip
538
        break
539
    if not my_pip:
540
      tmp[my_name] = ("Can't find my own primary/secondary IP"
541
                      " in the node list")
542
    else:
543
      for name, pip, sip in what[constants.NV_NODENETTEST]:
544
        fail = []
545
        if not netutils.TcpPing(pip, port, source=my_pip):
546
          fail.append("primary")
547
        if sip != pip:
548
          if not netutils.TcpPing(sip, port, source=my_sip):
549
            fail.append("secondary")
550
        if fail:
551
          tmp[name] = ("failure using the %s interface(s)" %
552
                       " and ".join(fail))
553

    
554
  if constants.NV_MASTERIP in what:
555
    # FIXME: add checks on incoming data structures (here and in the
556
    # rest of the function)
557
    master_name, master_ip = what[constants.NV_MASTERIP]
558
    if master_name == my_name:
559
      source = constants.IP4_ADDRESS_LOCALHOST
560
    else:
561
      source = None
562
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
563
                                                  source=source)
564

    
565
  if constants.NV_OOB_PATHS in what:
566
    result[constants.NV_OOB_PATHS] = tmp = []
567
    for path in what[constants.NV_OOB_PATHS]:
568
      try:
569
        st = os.stat(path)
570
      except OSError, err:
571
        tmp.append("error stating out of band helper: %s" % err)
572
      else:
573
        if stat.S_ISREG(st.st_mode):
574
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
575
            tmp.append(None)
576
          else:
577
            tmp.append("out of band helper %s is not executable" % path)
578
        else:
579
          tmp.append("out of band helper %s is not a file" % path)
580

    
581
  if constants.NV_LVLIST in what and vm_capable:
582
    try:
583
      val = GetVolumeList(utils.ListVolumeGroups().keys())
584
    except RPCFail, err:
585
      val = str(err)
586
    result[constants.NV_LVLIST] = val
587

    
588
  if constants.NV_INSTANCELIST in what and vm_capable:
589
    # GetInstanceList can fail
590
    try:
591
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
592
    except RPCFail, err:
593
      val = str(err)
594
    result[constants.NV_INSTANCELIST] = val
595

    
596
  if constants.NV_VGLIST in what and vm_capable:
597
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
598

    
599
  if constants.NV_PVLIST in what and vm_capable:
600
    result[constants.NV_PVLIST] = \
601
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
602
                                   filter_allocatable=False)
603

    
604
  if constants.NV_VERSION in what:
605
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
606
                                    constants.RELEASE_VERSION)
607

    
608
  if constants.NV_HVINFO in what and vm_capable:
609
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
610
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
611

    
612
  if constants.NV_DRBDLIST in what and vm_capable:
613
    try:
614
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
615
    except errors.BlockDeviceError, err:
616
      logging.warning("Can't get used minors list", exc_info=True)
617
      used_minors = str(err)
618
    result[constants.NV_DRBDLIST] = used_minors
619

    
620
  if constants.NV_DRBDHELPER in what and vm_capable:
621
    status = True
622
    try:
623
      payload = bdev.BaseDRBD.GetUsermodeHelper()
624
    except errors.BlockDeviceError, err:
625
      logging.error("Can't get DRBD usermode helper: %s", str(err))
626
      status = False
627
      payload = str(err)
628
    result[constants.NV_DRBDHELPER] = (status, payload)
629

    
630
  if constants.NV_NODESETUP in what:
631
    result[constants.NV_NODESETUP] = tmpr = []
632
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
633
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
634
                  " under /sys, missing required directories /sys/block"
635
                  " and /sys/class/net")
636
    if (not os.path.isdir("/proc/sys") or
637
        not os.path.isfile("/proc/sysrq-trigger")):
638
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
639
                  " under /proc, missing required directory /proc/sys and"
640
                  " the file /proc/sysrq-trigger")
641

    
642
  if constants.NV_TIME in what:
643
    result[constants.NV_TIME] = utils.SplitTime(time.time())
644

    
645
  if constants.NV_OSLIST in what and vm_capable:
646
    result[constants.NV_OSLIST] = DiagnoseOS()
647

    
648
  if constants.NV_BRIDGES in what and vm_capable:
649
    result[constants.NV_BRIDGES] = [bridge
650
                                    for bridge in what[constants.NV_BRIDGES]
651
                                    if not utils.BridgeExists(bridge)]
652
  return result
653

    
654

    
655
def GetBlockDevSizes(devices):
656
  """Return the size of the given block devices
657

658
  @type devices: list
659
  @param devices: list of block device nodes to query
660
  @rtype: dict
661
  @return:
662
    dictionary of all block devices under /dev (key). The value is their
663
    size in MiB.
664

665
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
666

667
  """
668
  DEV_PREFIX = "/dev/"
669
  blockdevs = {}
670

    
671
  for devpath in devices:
672
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
673
      continue
674

    
675
    try:
676
      st = os.stat(devpath)
677
    except EnvironmentError, err:
678
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
679
      continue
680

    
681
    if stat.S_ISBLK(st.st_mode):
682
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
683
      if result.failed:
684
        # We don't want to fail, just do not list this device as available
685
        logging.warning("Cannot get size for block device %s", devpath)
686
        continue
687

    
688
      size = int(result.stdout) / (1024 * 1024)
689
      blockdevs[devpath] = size
690
  return blockdevs
691

    
692

    
693
def GetVolumeList(vg_names):
694
  """Compute list of logical volumes and their size.
695

696
  @type vg_names: list
697
  @param vg_names: the volume groups whose LVs we should list, or
698
      empty for all volume groups
699
  @rtype: dict
700
  @return:
701
      dictionary of all partions (key) with value being a tuple of
702
      their size (in MiB), inactive and online status::
703

704
        {'xenvg/test1': ('20.06', True, True)}
705

706
      in case of errors, a string is returned with the error
707
      details.
708

709
  """
710
  lvs = {}
711
  sep = "|"
712
  if not vg_names:
713
    vg_names = []
714
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
715
                         "--separator=%s" % sep,
716
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
717
  if result.failed:
718
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
719

    
720
  for line in result.stdout.splitlines():
721
    line = line.strip()
722
    match = _LVSLINE_REGEX.match(line)
723
    if not match:
724
      logging.error("Invalid line returned from lvs output: '%s'", line)
725
      continue
726
    vg_name, name, size, attr = match.groups()
727
    inactive = attr[4] == "-"
728
    online = attr[5] == "o"
729
    virtual = attr[0] == "v"
730
    if virtual:
731
      # we don't want to report such volumes as existing, since they
732
      # don't really hold data
733
      continue
734
    lvs[vg_name + "/" + name] = (size, inactive, online)
735

    
736
  return lvs
737

    
738

    
739
def ListVolumeGroups():
740
  """List the volume groups and their size.
741

742
  @rtype: dict
743
  @return: dictionary with keys volume name and values the
744
      size of the volume
745

746
  """
747
  return utils.ListVolumeGroups()
748

    
749

    
750
def NodeVolumes():
751
  """List all volumes on this node.
752

753
  @rtype: list
754
  @return:
755
    A list of dictionaries, each having four keys:
756
      - name: the logical volume name,
757
      - size: the size of the logical volume
758
      - dev: the physical device on which the LV lives
759
      - vg: the volume group to which it belongs
760

761
    In case of errors, we return an empty list and log the
762
    error.
763

764
    Note that since a logical volume can live on multiple physical
765
    volumes, the resulting list might include a logical volume
766
    multiple times.
767

768
  """
769
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
770
                         "--separator=|",
771
                         "--options=lv_name,lv_size,devices,vg_name"])
772
  if result.failed:
773
    _Fail("Failed to list logical volumes, lvs output: %s",
774
          result.output)
775

    
776
  def parse_dev(dev):
777
    return dev.split("(")[0]
778

    
779
  def handle_dev(dev):
780
    return [parse_dev(x) for x in dev.split(",")]
781

    
782
  def map_line(line):
783
    line = [v.strip() for v in line]
784
    return [{"name": line[0], "size": line[1],
785
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
786

    
787
  all_devs = []
788
  for line in result.stdout.splitlines():
789
    if line.count("|") >= 3:
790
      all_devs.extend(map_line(line.split("|")))
791
    else:
792
      logging.warning("Strange line in the output from lvs: '%s'", line)
793
  return all_devs
794

    
795

    
796
def BridgesExist(bridges_list):
797
  """Check if a list of bridges exist on the current node.
798

799
  @rtype: boolean
800
  @return: C{True} if all of them exist, C{False} otherwise
801

802
  """
803
  missing = []
804
  for bridge in bridges_list:
805
    if not utils.BridgeExists(bridge):
806
      missing.append(bridge)
807

    
808
  if missing:
809
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
810

    
811

    
812
def GetInstanceList(hypervisor_list):
813
  """Provides a list of instances.
814

815
  @type hypervisor_list: list
816
  @param hypervisor_list: the list of hypervisors to query information
817

818
  @rtype: list
819
  @return: a list of all running instances on the current node
820
    - instance1.example.com
821
    - instance2.example.com
822

823
  """
824
  results = []
825
  for hname in hypervisor_list:
826
    try:
827
      names = hypervisor.GetHypervisor(hname).ListInstances()
828
      results.extend(names)
829
    except errors.HypervisorError, err:
830
      _Fail("Error enumerating instances (hypervisor %s): %s",
831
            hname, err, exc=True)
832

    
833
  return results
834

    
835

    
836
def GetInstanceInfo(instance, hname):
837
  """Gives back the information about an instance as a dictionary.
838

839
  @type instance: string
840
  @param instance: the instance name
841
  @type hname: string
842
  @param hname: the hypervisor type of the instance
843

844
  @rtype: dict
845
  @return: dictionary with the following keys:
846
      - memory: memory size of instance (int)
847
      - state: xen state of instance (string)
848
      - time: cpu time of instance (float)
849

850
  """
851
  output = {}
852

    
853
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
854
  if iinfo is not None:
855
    output["memory"] = iinfo[2]
856
    output["state"] = iinfo[4]
857
    output["time"] = iinfo[5]
858

    
859
  return output
860

    
861

    
862
def GetInstanceMigratable(instance):
863
  """Gives whether an instance can be migrated.
864

865
  @type instance: L{objects.Instance}
866
  @param instance: object representing the instance to be checked.
867

868
  @rtype: tuple
869
  @return: tuple of (result, description) where:
870
      - result: whether the instance can be migrated or not
871
      - description: a description of the issue, if relevant
872

873
  """
874
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
875
  iname = instance.name
876
  if iname not in hyper.ListInstances():
877
    _Fail("Instance %s is not running", iname)
878

    
879
  for idx in range(len(instance.disks)):
880
    link_name = _GetBlockDevSymlinkPath(iname, idx)
881
    if not os.path.islink(link_name):
882
      logging.warning("Instance %s is missing symlink %s for disk %d",
883
                      iname, link_name, idx)
884

    
885

    
886
def GetAllInstancesInfo(hypervisor_list):
887
  """Gather data about all instances.
888

889
  This is the equivalent of L{GetInstanceInfo}, except that it
890
  computes data for all instances at once, thus being faster if one
891
  needs data about more than one instance.
892

893
  @type hypervisor_list: list
894
  @param hypervisor_list: list of hypervisors to query for instance data
895

896
  @rtype: dict
897
  @return: dictionary of instance: data, with data having the following keys:
898
      - memory: memory size of instance (int)
899
      - state: xen state of instance (string)
900
      - time: cpu time of instance (float)
901
      - vcpus: the number of vcpus
902

903
  """
904
  output = {}
905

    
906
  for hname in hypervisor_list:
907
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
908
    if iinfo:
909
      for name, _, memory, vcpus, state, times in iinfo:
910
        value = {
911
          "memory": memory,
912
          "vcpus": vcpus,
913
          "state": state,
914
          "time": times,
915
          }
916
        if name in output:
917
          # we only check static parameters, like memory and vcpus,
918
          # and not state and time which can change between the
919
          # invocations of the different hypervisors
920
          for key in "memory", "vcpus":
921
            if value[key] != output[name][key]:
922
              _Fail("Instance %s is running twice"
923
                    " with different parameters", name)
924
        output[name] = value
925

    
926
  return output
927

    
928

    
929
def _InstanceLogName(kind, os_name, instance, component):
930
  """Compute the OS log filename for a given instance and operation.
931

932
  The instance name and os name are passed in as strings since not all
933
  operations have these as part of an instance object.
934

935
  @type kind: string
936
  @param kind: the operation type (e.g. add, import, etc.)
937
  @type os_name: string
938
  @param os_name: the os name
939
  @type instance: string
940
  @param instance: the name of the instance being imported/added/etc.
941
  @type component: string or None
942
  @param component: the name of the component of the instance being
943
      transferred
944

945
  """
946
  # TODO: Use tempfile.mkstemp to create unique filename
947
  if component:
948
    assert "/" not in component
949
    c_msg = "-%s" % component
950
  else:
951
    c_msg = ""
952
  base = ("%s-%s-%s%s-%s.log" %
953
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
954
  return utils.PathJoin(constants.LOG_OS_DIR, base)
955

    
956

    
957
def InstanceOsAdd(instance, reinstall, debug):
958
  """Add an OS to an instance.
959

960
  @type instance: L{objects.Instance}
961
  @param instance: Instance whose OS is to be installed
962
  @type reinstall: boolean
963
  @param reinstall: whether this is an instance reinstall
964
  @type debug: integer
965
  @param debug: debug level, passed to the OS scripts
966
  @rtype: None
967

968
  """
969
  inst_os = OSFromDisk(instance.os)
970

    
971
  create_env = OSEnvironment(instance, inst_os, debug)
972
  if reinstall:
973
    create_env["INSTANCE_REINSTALL"] = "1"
974

    
975
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
976

    
977
  result = utils.RunCmd([inst_os.create_script], env=create_env,
978
                        cwd=inst_os.path, output=logfile, reset_env=True)
979
  if result.failed:
980
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
981
                  " output: %s", result.cmd, result.fail_reason, logfile,
982
                  result.output)
983
    lines = [utils.SafeEncode(val)
984
             for val in utils.TailFile(logfile, lines=20)]
985
    _Fail("OS create script failed (%s), last lines in the"
986
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
987

    
988

    
989
def RunRenameInstance(instance, old_name, debug):
990
  """Run the OS rename script for an instance.
991

992
  @type instance: L{objects.Instance}
993
  @param instance: Instance whose OS is to be installed
994
  @type old_name: string
995
  @param old_name: previous instance name
996
  @type debug: integer
997
  @param debug: debug level, passed to the OS scripts
998
  @rtype: boolean
999
  @return: the success of the operation
1000

1001
  """
1002
  inst_os = OSFromDisk(instance.os)
1003

    
1004
  rename_env = OSEnvironment(instance, inst_os, debug)
1005
  rename_env["OLD_INSTANCE_NAME"] = old_name
1006

    
1007
  logfile = _InstanceLogName("rename", instance.os,
1008
                             "%s-%s" % (old_name, instance.name), None)
1009

    
1010
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1011
                        cwd=inst_os.path, output=logfile, reset_env=True)
1012

    
1013
  if result.failed:
1014
    logging.error("os create command '%s' returned error: %s output: %s",
1015
                  result.cmd, result.fail_reason, result.output)
1016
    lines = [utils.SafeEncode(val)
1017
             for val in utils.TailFile(logfile, lines=20)]
1018
    _Fail("OS rename script failed (%s), last lines in the"
1019
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1020

    
1021

    
1022
def _GetBlockDevSymlinkPath(instance_name, idx):
1023
  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1024
                        (instance_name, constants.DISK_SEPARATOR, idx))
1025

    
1026

    
1027
def _SymlinkBlockDev(instance_name, device_path, idx):
1028
  """Set up symlinks to a instance's block device.
1029

1030
  This is an auxiliary function run when an instance is start (on the primary
1031
  node) or when an instance is migrated (on the target node).
1032

1033

1034
  @param instance_name: the name of the target instance
1035
  @param device_path: path of the physical block device, on the node
1036
  @param idx: the disk index
1037
  @return: absolute path to the disk's symlink
1038

1039
  """
1040
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1041
  try:
1042
    os.symlink(device_path, link_name)
1043
  except OSError, err:
1044
    if err.errno == errno.EEXIST:
1045
      if (not os.path.islink(link_name) or
1046
          os.readlink(link_name) != device_path):
1047
        os.remove(link_name)
1048
        os.symlink(device_path, link_name)
1049
    else:
1050
      raise
1051

    
1052
  return link_name
1053

    
1054

    
1055
def _RemoveBlockDevLinks(instance_name, disks):
1056
  """Remove the block device symlinks belonging to the given instance.
1057

1058
  """
1059
  for idx, _ in enumerate(disks):
1060
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1061
    if os.path.islink(link_name):
1062
      try:
1063
        os.remove(link_name)
1064
      except OSError:
1065
        logging.exception("Can't remove symlink '%s'", link_name)
1066

    
1067

    
1068
def _GatherAndLinkBlockDevs(instance):
1069
  """Set up an instance's block device(s).
1070

1071
  This is run on the primary node at instance startup. The block
1072
  devices must be already assembled.
1073

1074
  @type instance: L{objects.Instance}
1075
  @param instance: the instance whose disks we shoul assemble
1076
  @rtype: list
1077
  @return: list of (disk_object, device_path)
1078

1079
  """
1080
  block_devices = []
1081
  for idx, disk in enumerate(instance.disks):
1082
    device = _RecursiveFindBD(disk)
1083
    if device is None:
1084
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1085
                                    str(disk))
1086
    device.Open()
1087
    try:
1088
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1089
    except OSError, e:
1090
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1091
                                    e.strerror)
1092

    
1093
    block_devices.append((disk, link_name))
1094

    
1095
  return block_devices
1096

    
1097

    
1098
def StartInstance(instance, startup_paused):
1099
  """Start an instance.
1100

1101
  @type instance: L{objects.Instance}
1102
  @param instance: the instance object
1103
  @type startup_paused: bool
1104
  @param instance: pause instance at startup?
1105
  @rtype: None
1106

1107
  """
1108
  running_instances = GetInstanceList([instance.hypervisor])
1109

    
1110
  if instance.name in running_instances:
1111
    logging.info("Instance %s already running, not starting", instance.name)
1112
    return
1113

    
1114
  try:
1115
    block_devices = _GatherAndLinkBlockDevs(instance)
1116
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1117
    hyper.StartInstance(instance, block_devices, startup_paused)
1118
  except errors.BlockDeviceError, err:
1119
    _Fail("Block device error: %s", err, exc=True)
1120
  except errors.HypervisorError, err:
1121
    _RemoveBlockDevLinks(instance.name, instance.disks)
1122
    _Fail("Hypervisor error: %s", err, exc=True)
1123

    
1124

    
1125
def InstanceShutdown(instance, timeout):
1126
  """Shut an instance down.
1127

1128
  @note: this functions uses polling with a hardcoded timeout.
1129

1130
  @type instance: L{objects.Instance}
1131
  @param instance: the instance object
1132
  @type timeout: integer
1133
  @param timeout: maximum timeout for soft shutdown
1134
  @rtype: None
1135

1136
  """
1137
  hv_name = instance.hypervisor
1138
  hyper = hypervisor.GetHypervisor(hv_name)
1139
  iname = instance.name
1140

    
1141
  if instance.name not in hyper.ListInstances():
1142
    logging.info("Instance %s not running, doing nothing", iname)
1143
    return
1144

    
1145
  class _TryShutdown:
1146
    def __init__(self):
1147
      self.tried_once = False
1148

    
1149
    def __call__(self):
1150
      if iname not in hyper.ListInstances():
1151
        return
1152

    
1153
      try:
1154
        hyper.StopInstance(instance, retry=self.tried_once)
1155
      except errors.HypervisorError, err:
1156
        if iname not in hyper.ListInstances():
1157
          # if the instance is no longer existing, consider this a
1158
          # success and go to cleanup
1159
          return
1160

    
1161
        _Fail("Failed to stop instance %s: %s", iname, err)
1162

    
1163
      self.tried_once = True
1164

    
1165
      raise utils.RetryAgain()
1166

    
1167
  try:
1168
    utils.Retry(_TryShutdown(), 5, timeout)
1169
  except utils.RetryTimeout:
1170
    # the shutdown did not succeed
1171
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1172

    
1173
    try:
1174
      hyper.StopInstance(instance, force=True)
1175
    except errors.HypervisorError, err:
1176
      if iname in hyper.ListInstances():
1177
        # only raise an error if the instance still exists, otherwise
1178
        # the error could simply be "instance ... unknown"!
1179
        _Fail("Failed to force stop instance %s: %s", iname, err)
1180

    
1181
    time.sleep(1)
1182

    
1183
    if iname in hyper.ListInstances():
1184
      _Fail("Could not shutdown instance %s even by destroy", iname)
1185

    
1186
  try:
1187
    hyper.CleanupInstance(instance.name)
1188
  except errors.HypervisorError, err:
1189
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1190

    
1191
  _RemoveBlockDevLinks(iname, instance.disks)
1192

    
1193

    
1194
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1195
  """Reboot an instance.
1196

1197
  @type instance: L{objects.Instance}
1198
  @param instance: the instance object to reboot
1199
  @type reboot_type: str
1200
  @param reboot_type: the type of reboot, one the following
1201
    constants:
1202
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1203
        instance OS, do not recreate the VM
1204
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1205
        restart the VM (at the hypervisor level)
1206
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1207
        not accepted here, since that mode is handled differently, in
1208
        cmdlib, and translates into full stop and start of the
1209
        instance (instead of a call_instance_reboot RPC)
1210
  @type shutdown_timeout: integer
1211
  @param shutdown_timeout: maximum timeout for soft shutdown
1212
  @rtype: None
1213

1214
  """
1215
  running_instances = GetInstanceList([instance.hypervisor])
1216

    
1217
  if instance.name not in running_instances:
1218
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1219

    
1220
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1221
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1222
    try:
1223
      hyper.RebootInstance(instance)
1224
    except errors.HypervisorError, err:
1225
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1226
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1227
    try:
1228
      InstanceShutdown(instance, shutdown_timeout)
1229
      return StartInstance(instance, False)
1230
    except errors.HypervisorError, err:
1231
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1232
  else:
1233
    _Fail("Invalid reboot_type received: %s", reboot_type)
1234

    
1235

    
1236
def MigrationInfo(instance):
1237
  """Gather information about an instance to be migrated.
1238

1239
  @type instance: L{objects.Instance}
1240
  @param instance: the instance definition
1241

1242
  """
1243
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1244
  try:
1245
    info = hyper.MigrationInfo(instance)
1246
  except errors.HypervisorError, err:
1247
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1248
  return info
1249

    
1250

    
1251
def AcceptInstance(instance, info, target):
1252
  """Prepare the node to accept an instance.
1253

1254
  @type instance: L{objects.Instance}
1255
  @param instance: the instance definition
1256
  @type info: string/data (opaque)
1257
  @param info: migration information, from the source node
1258
  @type target: string
1259
  @param target: target host (usually ip), on this node
1260

1261
  """
1262
  # TODO: why is this required only for DTS_EXT_MIRROR?
1263
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1264
    # Create the symlinks, as the disks are not active
1265
    # in any way
1266
    try:
1267
      _GatherAndLinkBlockDevs(instance)
1268
    except errors.BlockDeviceError, err:
1269
      _Fail("Block device error: %s", err, exc=True)
1270

    
1271
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1272
  try:
1273
    hyper.AcceptInstance(instance, info, target)
1274
  except errors.HypervisorError, err:
1275
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1276
      _RemoveBlockDevLinks(instance.name, instance.disks)
1277
    _Fail("Failed to accept instance: %s", err, exc=True)
1278

    
1279

    
1280
def FinalizeMigrationDst(instance, info, success):
1281
  """Finalize any preparation to accept an instance.
1282

1283
  @type instance: L{objects.Instance}
1284
  @param instance: the instance definition
1285
  @type info: string/data (opaque)
1286
  @param info: migration information, from the source node
1287
  @type success: boolean
1288
  @param success: whether the migration was a success or a failure
1289

1290
  """
1291
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1292
  try:
1293
    hyper.FinalizeMigrationDst(instance, info, success)
1294
  except errors.HypervisorError, err:
1295
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1296

    
1297

    
1298
def MigrateInstance(instance, target, live):
1299
  """Migrates an instance to another node.
1300

1301
  @type instance: L{objects.Instance}
1302
  @param instance: the instance definition
1303
  @type target: string
1304
  @param target: the target node name
1305
  @type live: boolean
1306
  @param live: whether the migration should be done live or not (the
1307
      interpretation of this parameter is left to the hypervisor)
1308
  @raise RPCFail: if migration fails for some reason
1309

1310
  """
1311
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1312

    
1313
  try:
1314
    hyper.MigrateInstance(instance, target, live)
1315
  except errors.HypervisorError, err:
1316
    _Fail("Failed to migrate instance: %s", err, exc=True)
1317

    
1318

    
1319
def FinalizeMigrationSource(instance, success, live):
1320
  """Finalize the instance migration on the source node.
1321

1322
  @type instance: L{objects.Instance}
1323
  @param instance: the instance definition of the migrated instance
1324
  @type success: bool
1325
  @param success: whether the migration succeeded or not
1326
  @type live: bool
1327
  @param live: whether the user requested a live migration or not
1328
  @raise RPCFail: If the execution fails for some reason
1329

1330
  """
1331
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1332

    
1333
  try:
1334
    hyper.FinalizeMigrationSource(instance, success, live)
1335
  except Exception, err:  # pylint: disable=W0703
1336
    _Fail("Failed to finalize the migration on the source node: %s", err,
1337
          exc=True)
1338

    
1339

    
1340
def GetMigrationStatus(instance):
1341
  """Get the migration status
1342

1343
  @type instance: L{objects.Instance}
1344
  @param instance: the instance that is being migrated
1345
  @rtype: L{objects.MigrationStatus}
1346
  @return: the status of the current migration (one of
1347
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1348
           progress info that can be retrieved from the hypervisor
1349
  @raise RPCFail: If the migration status cannot be retrieved
1350

1351
  """
1352
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1353
  try:
1354
    return hyper.GetMigrationStatus(instance)
1355
  except Exception, err:  # pylint: disable=W0703
1356
    _Fail("Failed to get migration status: %s", err, exc=True)
1357

    
1358

    
1359
def BlockdevCreate(disk, size, owner, on_primary, info):
1360
  """Creates a block device for an instance.
1361

1362
  @type disk: L{objects.Disk}
1363
  @param disk: the object describing the disk we should create
1364
  @type size: int
1365
  @param size: the size of the physical underlying device, in MiB
1366
  @type owner: str
1367
  @param owner: the name of the instance for which disk is created,
1368
      used for device cache data
1369
  @type on_primary: boolean
1370
  @param on_primary:  indicates if it is the primary node or not
1371
  @type info: string
1372
  @param info: string that will be sent to the physical device
1373
      creation, used for example to set (LVM) tags on LVs
1374

1375
  @return: the new unique_id of the device (this can sometime be
1376
      computed only after creation), or None. On secondary nodes,
1377
      it's not required to return anything.
1378

1379
  """
1380
  # TODO: remove the obsolete "size" argument
1381
  # pylint: disable=W0613
1382
  clist = []
1383
  if disk.children:
1384
    for child in disk.children:
1385
      try:
1386
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1387
      except errors.BlockDeviceError, err:
1388
        _Fail("Can't assemble device %s: %s", child, err)
1389
      if on_primary or disk.AssembleOnSecondary():
1390
        # we need the children open in case the device itself has to
1391
        # be assembled
1392
        try:
1393
          # pylint: disable=E1103
1394
          crdev.Open()
1395
        except errors.BlockDeviceError, err:
1396
          _Fail("Can't make child '%s' read-write: %s", child, err)
1397
      clist.append(crdev)
1398

    
1399
  try:
1400
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1401
  except errors.BlockDeviceError, err:
1402
    _Fail("Can't create block device: %s", err)
1403

    
1404
  if on_primary or disk.AssembleOnSecondary():
1405
    try:
1406
      device.Assemble()
1407
    except errors.BlockDeviceError, err:
1408
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1409
    device.SetSyncSpeed(constants.SYNC_SPEED)
1410
    if on_primary or disk.OpenOnSecondary():
1411
      try:
1412
        device.Open(force=True)
1413
      except errors.BlockDeviceError, err:
1414
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1415
    DevCacheManager.UpdateCache(device.dev_path, owner,
1416
                                on_primary, disk.iv_name)
1417

    
1418
  device.SetInfo(info)
1419

    
1420
  return device.unique_id
1421

    
1422

    
1423
def _WipeDevice(path, offset, size):
1424
  """This function actually wipes the device.
1425

1426
  @param path: The path to the device to wipe
1427
  @param offset: The offset in MiB in the file
1428
  @param size: The size in MiB to write
1429

1430
  """
1431
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1432
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1433
         "count=%d" % size]
1434
  result = utils.RunCmd(cmd)
1435

    
1436
  if result.failed:
1437
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1438
          result.fail_reason, result.output)
1439

    
1440

    
1441
def BlockdevWipe(disk, offset, size):
1442
  """Wipes a block device.
1443

1444
  @type disk: L{objects.Disk}
1445
  @param disk: the disk object we want to wipe
1446
  @type offset: int
1447
  @param offset: The offset in MiB in the file
1448
  @type size: int
1449
  @param size: The size in MiB to write
1450

1451
  """
1452
  try:
1453
    rdev = _RecursiveFindBD(disk)
1454
  except errors.BlockDeviceError:
1455
    rdev = None
1456

    
1457
  if not rdev:
1458
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1459

    
1460
  # Do cross verify some of the parameters
1461
  if offset > rdev.size:
1462
    _Fail("Offset is bigger than device size")
1463
  if (offset + size) > rdev.size:
1464
    _Fail("The provided offset and size to wipe is bigger than device size")
1465

    
1466
  _WipeDevice(rdev.dev_path, offset, size)
1467

    
1468

    
1469
def BlockdevPauseResumeSync(disks, pause):
1470
  """Pause or resume the sync of the block device.
1471

1472
  @type disks: list of L{objects.Disk}
1473
  @param disks: the disks object we want to pause/resume
1474
  @type pause: bool
1475
  @param pause: Wheater to pause or resume
1476

1477
  """
1478
  success = []
1479
  for disk in disks:
1480
    try:
1481
      rdev = _RecursiveFindBD(disk)
1482
    except errors.BlockDeviceError:
1483
      rdev = None
1484

    
1485
    if not rdev:
1486
      success.append((False, ("Cannot change sync for device %s:"
1487
                              " device not found" % disk.iv_name)))
1488
      continue
1489

    
1490
    result = rdev.PauseResumeSync(pause)
1491

    
1492
    if result:
1493
      success.append((result, None))
1494
    else:
1495
      if pause:
1496
        msg = "Pause"
1497
      else:
1498
        msg = "Resume"
1499
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1500

    
1501
  return success
1502

    
1503

    
1504
def BlockdevRemove(disk):
1505
  """Remove a block device.
1506

1507
  @note: This is intended to be called recursively.
1508

1509
  @type disk: L{objects.Disk}
1510
  @param disk: the disk object we should remove
1511
  @rtype: boolean
1512
  @return: the success of the operation
1513

1514
  """
1515
  msgs = []
1516
  try:
1517
    rdev = _RecursiveFindBD(disk)
1518
  except errors.BlockDeviceError, err:
1519
    # probably can't attach
1520
    logging.info("Can't attach to device %s in remove", disk)
1521
    rdev = None
1522
  if rdev is not None:
1523
    r_path = rdev.dev_path
1524
    try:
1525
      rdev.Remove()
1526
    except errors.BlockDeviceError, err:
1527
      msgs.append(str(err))
1528
    if not msgs:
1529
      DevCacheManager.RemoveCache(r_path)
1530

    
1531
  if disk.children:
1532
    for child in disk.children:
1533
      try:
1534
        BlockdevRemove(child)
1535
      except RPCFail, err:
1536
        msgs.append(str(err))
1537

    
1538
  if msgs:
1539
    _Fail("; ".join(msgs))
1540

    
1541

    
1542
def _RecursiveAssembleBD(disk, owner, as_primary):
1543
  """Activate a block device for an instance.
1544

1545
  This is run on the primary and secondary nodes for an instance.
1546

1547
  @note: this function is called recursively.
1548

1549
  @type disk: L{objects.Disk}
1550
  @param disk: the disk we try to assemble
1551
  @type owner: str
1552
  @param owner: the name of the instance which owns the disk
1553
  @type as_primary: boolean
1554
  @param as_primary: if we should make the block device
1555
      read/write
1556

1557
  @return: the assembled device or None (in case no device
1558
      was assembled)
1559
  @raise errors.BlockDeviceError: in case there is an error
1560
      during the activation of the children or the device
1561
      itself
1562

1563
  """
1564
  children = []
1565
  if disk.children:
1566
    mcn = disk.ChildrenNeeded()
1567
    if mcn == -1:
1568
      mcn = 0 # max number of Nones allowed
1569
    else:
1570
      mcn = len(disk.children) - mcn # max number of Nones
1571
    for chld_disk in disk.children:
1572
      try:
1573
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1574
      except errors.BlockDeviceError, err:
1575
        if children.count(None) >= mcn:
1576
          raise
1577
        cdev = None
1578
        logging.error("Error in child activation (but continuing): %s",
1579
                      str(err))
1580
      children.append(cdev)
1581

    
1582
  if as_primary or disk.AssembleOnSecondary():
1583
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1584
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1585
    result = r_dev
1586
    if as_primary or disk.OpenOnSecondary():
1587
      r_dev.Open()
1588
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1589
                                as_primary, disk.iv_name)
1590

    
1591
  else:
1592
    result = True
1593
  return result
1594

    
1595

    
1596
def BlockdevAssemble(disk, owner, as_primary, idx):
1597
  """Activate a block device for an instance.
1598

1599
  This is a wrapper over _RecursiveAssembleBD.
1600

1601
  @rtype: str or boolean
1602
  @return: a C{/dev/...} path for primary nodes, and
1603
      C{True} for secondary nodes
1604

1605
  """
1606
  try:
1607
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1608
    if isinstance(result, bdev.BlockDev):
1609
      # pylint: disable=E1103
1610
      result = result.dev_path
1611
      if as_primary:
1612
        _SymlinkBlockDev(owner, result, idx)
1613
  except errors.BlockDeviceError, err:
1614
    _Fail("Error while assembling disk: %s", err, exc=True)
1615
  except OSError, err:
1616
    _Fail("Error while symlinking disk: %s", err, exc=True)
1617

    
1618
  return result
1619

    
1620

    
1621
def BlockdevShutdown(disk):
1622
  """Shut down a block device.
1623

1624
  First, if the device is assembled (Attach() is successful), then
1625
  the device is shutdown. Then the children of the device are
1626
  shutdown.
1627

1628
  This function is called recursively. Note that we don't cache the
1629
  children or such, as oppossed to assemble, shutdown of different
1630
  devices doesn't require that the upper device was active.
1631

1632
  @type disk: L{objects.Disk}
1633
  @param disk: the description of the disk we should
1634
      shutdown
1635
  @rtype: None
1636

1637
  """
1638
  msgs = []
1639
  r_dev = _RecursiveFindBD(disk)
1640
  if r_dev is not None:
1641
    r_path = r_dev.dev_path
1642
    try:
1643
      r_dev.Shutdown()
1644
      DevCacheManager.RemoveCache(r_path)
1645
    except errors.BlockDeviceError, err:
1646
      msgs.append(str(err))
1647

    
1648
  if disk.children:
1649
    for child in disk.children:
1650
      try:
1651
        BlockdevShutdown(child)
1652
      except RPCFail, err:
1653
        msgs.append(str(err))
1654

    
1655
  if msgs:
1656
    _Fail("; ".join(msgs))
1657

    
1658

    
1659
def BlockdevAddchildren(parent_cdev, new_cdevs):
1660
  """Extend a mirrored block device.
1661

1662
  @type parent_cdev: L{objects.Disk}
1663
  @param parent_cdev: the disk to which we should add children
1664
  @type new_cdevs: list of L{objects.Disk}
1665
  @param new_cdevs: the list of children which we should add
1666
  @rtype: None
1667

1668
  """
1669
  parent_bdev = _RecursiveFindBD(parent_cdev)
1670
  if parent_bdev is None:
1671
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1672
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1673
  if new_bdevs.count(None) > 0:
1674
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1675
  parent_bdev.AddChildren(new_bdevs)
1676

    
1677

    
1678
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1679
  """Shrink a mirrored block device.
1680

1681
  @type parent_cdev: L{objects.Disk}
1682
  @param parent_cdev: the disk from which we should remove children
1683
  @type new_cdevs: list of L{objects.Disk}
1684
  @param new_cdevs: the list of children which we should remove
1685
  @rtype: None
1686

1687
  """
1688
  parent_bdev = _RecursiveFindBD(parent_cdev)
1689
  if parent_bdev is None:
1690
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1691
  devs = []
1692
  for disk in new_cdevs:
1693
    rpath = disk.StaticDevPath()
1694
    if rpath is None:
1695
      bd = _RecursiveFindBD(disk)
1696
      if bd is None:
1697
        _Fail("Can't find device %s while removing children", disk)
1698
      else:
1699
        devs.append(bd.dev_path)
1700
    else:
1701
      if not utils.IsNormAbsPath(rpath):
1702
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1703
      devs.append(rpath)
1704
  parent_bdev.RemoveChildren(devs)
1705

    
1706

    
1707
def BlockdevGetmirrorstatus(disks):
1708
  """Get the mirroring status of a list of devices.
1709

1710
  @type disks: list of L{objects.Disk}
1711
  @param disks: the list of disks which we should query
1712
  @rtype: disk
1713
  @return: List of L{objects.BlockDevStatus}, one for each disk
1714
  @raise errors.BlockDeviceError: if any of the disks cannot be
1715
      found
1716

1717
  """
1718
  stats = []
1719
  for dsk in disks:
1720
    rbd = _RecursiveFindBD(dsk)
1721
    if rbd is None:
1722
      _Fail("Can't find device %s", dsk)
1723

    
1724
    stats.append(rbd.CombinedSyncStatus())
1725

    
1726
  return stats
1727

    
1728

    
1729
def BlockdevGetmirrorstatusMulti(disks):
1730
  """Get the mirroring status of a list of devices.
1731

1732
  @type disks: list of L{objects.Disk}
1733
  @param disks: the list of disks which we should query
1734
  @rtype: disk
1735
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1736
    success/failure, status is L{objects.BlockDevStatus} on success, string
1737
    otherwise
1738

1739
  """
1740
  result = []
1741
  for disk in disks:
1742
    try:
1743
      rbd = _RecursiveFindBD(disk)
1744
      if rbd is None:
1745
        result.append((False, "Can't find device %s" % disk))
1746
        continue
1747

    
1748
      status = rbd.CombinedSyncStatus()
1749
    except errors.BlockDeviceError, err:
1750
      logging.exception("Error while getting disk status")
1751
      result.append((False, str(err)))
1752
    else:
1753
      result.append((True, status))
1754

    
1755
  assert len(disks) == len(result)
1756

    
1757
  return result
1758

    
1759

    
1760
def _RecursiveFindBD(disk):
1761
  """Check if a device is activated.
1762

1763
  If so, return information about the real device.
1764

1765
  @type disk: L{objects.Disk}
1766
  @param disk: the disk object we need to find
1767

1768
  @return: None if the device can't be found,
1769
      otherwise the device instance
1770

1771
  """
1772
  children = []
1773
  if disk.children:
1774
    for chdisk in disk.children:
1775
      children.append(_RecursiveFindBD(chdisk))
1776

    
1777
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1778

    
1779

    
1780
def _OpenRealBD(disk):
1781
  """Opens the underlying block device of a disk.
1782

1783
  @type disk: L{objects.Disk}
1784
  @param disk: the disk object we want to open
1785

1786
  """
1787
  real_disk = _RecursiveFindBD(disk)
1788
  if real_disk is None:
1789
    _Fail("Block device '%s' is not set up", disk)
1790

    
1791
  real_disk.Open()
1792

    
1793
  return real_disk
1794

    
1795

    
1796
def BlockdevFind(disk):
1797
  """Check if a device is activated.
1798

1799
  If it is, return information about the real device.
1800

1801
  @type disk: L{objects.Disk}
1802
  @param disk: the disk to find
1803
  @rtype: None or objects.BlockDevStatus
1804
  @return: None if the disk cannot be found, otherwise a the current
1805
           information
1806

1807
  """
1808
  try:
1809
    rbd = _RecursiveFindBD(disk)
1810
  except errors.BlockDeviceError, err:
1811
    _Fail("Failed to find device: %s", err, exc=True)
1812

    
1813
  if rbd is None:
1814
    return None
1815

    
1816
  return rbd.GetSyncStatus()
1817

    
1818

    
1819
def BlockdevGetsize(disks):
1820
  """Computes the size of the given disks.
1821

1822
  If a disk is not found, returns None instead.
1823

1824
  @type disks: list of L{objects.Disk}
1825
  @param disks: the list of disk to compute the size for
1826
  @rtype: list
1827
  @return: list with elements None if the disk cannot be found,
1828
      otherwise the size
1829

1830
  """
1831
  result = []
1832
  for cf in disks:
1833
    try:
1834
      rbd = _RecursiveFindBD(cf)
1835
    except errors.BlockDeviceError:
1836
      result.append(None)
1837
      continue
1838
    if rbd is None:
1839
      result.append(None)
1840
    else:
1841
      result.append(rbd.GetActualSize())
1842
  return result
1843

    
1844

    
1845
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1846
  """Export a block device to a remote node.
1847

1848
  @type disk: L{objects.Disk}
1849
  @param disk: the description of the disk to export
1850
  @type dest_node: str
1851
  @param dest_node: the destination node to export to
1852
  @type dest_path: str
1853
  @param dest_path: the destination path on the target node
1854
  @type cluster_name: str
1855
  @param cluster_name: the cluster name, needed for SSH hostalias
1856
  @rtype: None
1857

1858
  """
1859
  real_disk = _OpenRealBD(disk)
1860

    
1861
  # the block size on the read dd is 1MiB to match our units
1862
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1863
                               "dd if=%s bs=1048576 count=%s",
1864
                               real_disk.dev_path, str(disk.size))
1865

    
1866
  # we set here a smaller block size as, due to ssh buffering, more
1867
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1868
  # device is not already there or we pass a wrong path; we use
1869
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1870
  # to not buffer too much memory; this means that at best, we flush
1871
  # every 64k, which will not be very fast
1872
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1873
                                " oflag=dsync", dest_path)
1874

    
1875
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1876
                                                   constants.GANETI_RUNAS,
1877
                                                   destcmd)
1878

    
1879
  # all commands have been checked, so we're safe to combine them
1880
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1881

    
1882
  result = utils.RunCmd(["bash", "-c", command])
1883

    
1884
  if result.failed:
1885
    _Fail("Disk copy command '%s' returned error: %s"
1886
          " output: %s", command, result.fail_reason, result.output)
1887

    
1888

    
1889
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1890
  """Write a file to the filesystem.
1891

1892
  This allows the master to overwrite(!) a file. It will only perform
1893
  the operation if the file belongs to a list of configuration files.
1894

1895
  @type file_name: str
1896
  @param file_name: the target file name
1897
  @type data: str
1898
  @param data: the new contents of the file
1899
  @type mode: int
1900
  @param mode: the mode to give the file (can be None)
1901
  @type uid: string
1902
  @param uid: the owner of the file
1903
  @type gid: string
1904
  @param gid: the group of the file
1905
  @type atime: float
1906
  @param atime: the atime to set on the file (can be None)
1907
  @type mtime: float
1908
  @param mtime: the mtime to set on the file (can be None)
1909
  @rtype: None
1910

1911
  """
1912
  if not os.path.isabs(file_name):
1913
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1914

    
1915
  if file_name not in _ALLOWED_UPLOAD_FILES:
1916
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1917
          file_name)
1918

    
1919
  raw_data = _Decompress(data)
1920

    
1921
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1922
    _Fail("Invalid username/groupname type")
1923

    
1924
  getents = runtime.GetEnts()
1925
  uid = getents.LookupUser(uid)
1926
  gid = getents.LookupGroup(gid)
1927

    
1928
  utils.SafeWriteFile(file_name, None,
1929
                      data=raw_data, mode=mode, uid=uid, gid=gid,
1930
                      atime=atime, mtime=mtime)
1931

    
1932

    
1933
def RunOob(oob_program, command, node, timeout):
1934
  """Executes oob_program with given command on given node.
1935

1936
  @param oob_program: The path to the executable oob_program
1937
  @param command: The command to invoke on oob_program
1938
  @param node: The node given as an argument to the program
1939
  @param timeout: Timeout after which we kill the oob program
1940

1941
  @return: stdout
1942
  @raise RPCFail: If execution fails for some reason
1943

1944
  """
1945
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1946

    
1947
  if result.failed:
1948
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1949
          result.fail_reason, result.output)
1950

    
1951
  return result.stdout
1952

    
1953

    
1954
def WriteSsconfFiles(values):
1955
  """Update all ssconf files.
1956

1957
  Wrapper around the SimpleStore.WriteFiles.
1958

1959
  """
1960
  ssconf.SimpleStore().WriteFiles(values)
1961

    
1962

    
1963
def _ErrnoOrStr(err):
1964
  """Format an EnvironmentError exception.
1965

1966
  If the L{err} argument has an errno attribute, it will be looked up
1967
  and converted into a textual C{E...} description. Otherwise the
1968
  string representation of the error will be returned.
1969

1970
  @type err: L{EnvironmentError}
1971
  @param err: the exception to format
1972

1973
  """
1974
  if hasattr(err, "errno"):
1975
    detail = errno.errorcode[err.errno]
1976
  else:
1977
    detail = str(err)
1978
  return detail
1979

    
1980

    
1981
def _OSOndiskAPIVersion(os_dir):
1982
  """Compute and return the API version of a given OS.
1983

1984
  This function will try to read the API version of the OS residing in
1985
  the 'os_dir' directory.
1986

1987
  @type os_dir: str
1988
  @param os_dir: the directory in which we should look for the OS
1989
  @rtype: tuple
1990
  @return: tuple (status, data) with status denoting the validity and
1991
      data holding either the vaid versions or an error message
1992

1993
  """
1994
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1995

    
1996
  try:
1997
    st = os.stat(api_file)
1998
  except EnvironmentError, err:
1999
    return False, ("Required file '%s' not found under path %s: %s" %
2000
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2001

    
2002
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2003
    return False, ("File '%s' in %s is not a regular file" %
2004
                   (constants.OS_API_FILE, os_dir))
2005

    
2006
  try:
2007
    api_versions = utils.ReadFile(api_file).splitlines()
2008
  except EnvironmentError, err:
2009
    return False, ("Error while reading the API version file at %s: %s" %
2010
                   (api_file, _ErrnoOrStr(err)))
2011

    
2012
  try:
2013
    api_versions = [int(version.strip()) for version in api_versions]
2014
  except (TypeError, ValueError), err:
2015
    return False, ("API version(s) can't be converted to integer: %s" %
2016
                   str(err))
2017

    
2018
  return True, api_versions
2019

    
2020

    
2021
def DiagnoseOS(top_dirs=None):
2022
  """Compute the validity for all OSes.
2023

2024
  @type top_dirs: list
2025
  @param top_dirs: the list of directories in which to
2026
      search (if not given defaults to
2027
      L{constants.OS_SEARCH_PATH})
2028
  @rtype: list of L{objects.OS}
2029
  @return: a list of tuples (name, path, status, diagnose, variants,
2030
      parameters, api_version) for all (potential) OSes under all
2031
      search paths, where:
2032
          - name is the (potential) OS name
2033
          - path is the full path to the OS
2034
          - status True/False is the validity of the OS
2035
          - diagnose is the error message for an invalid OS, otherwise empty
2036
          - variants is a list of supported OS variants, if any
2037
          - parameters is a list of (name, help) parameters, if any
2038
          - api_version is a list of support OS API versions
2039

2040
  """
2041
  if top_dirs is None:
2042
    top_dirs = constants.OS_SEARCH_PATH
2043

    
2044
  result = []
2045
  for dir_name in top_dirs:
2046
    if os.path.isdir(dir_name):
2047
      try:
2048
        f_names = utils.ListVisibleFiles(dir_name)
2049
      except EnvironmentError, err:
2050
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2051
        break
2052
      for name in f_names:
2053
        os_path = utils.PathJoin(dir_name, name)
2054
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2055
        if status:
2056
          diagnose = ""
2057
          variants = os_inst.supported_variants
2058
          parameters = os_inst.supported_parameters
2059
          api_versions = os_inst.api_versions
2060
        else:
2061
          diagnose = os_inst
2062
          variants = parameters = api_versions = []
2063
        result.append((name, os_path, status, diagnose, variants,
2064
                       parameters, api_versions))
2065

    
2066
  return result
2067

    
2068

    
2069
def _TryOSFromDisk(name, base_dir=None):
2070
  """Create an OS instance from disk.
2071

2072
  This function will return an OS instance if the given name is a
2073
  valid OS name.
2074

2075
  @type base_dir: string
2076
  @keyword base_dir: Base directory containing OS installations.
2077
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2078
  @rtype: tuple
2079
  @return: success and either the OS instance if we find a valid one,
2080
      or error message
2081

2082
  """
2083
  if base_dir is None:
2084
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2085
  else:
2086
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2087

    
2088
  if os_dir is None:
2089
    return False, "Directory for OS %s not found in search path" % name
2090

    
2091
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2092
  if not status:
2093
    # push the error up
2094
    return status, api_versions
2095

    
2096
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2097
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2098
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2099

    
2100
  # OS Files dictionary, we will populate it with the absolute path
2101
  # names; if the value is True, then it is a required file, otherwise
2102
  # an optional one
2103
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2104

    
2105
  if max(api_versions) >= constants.OS_API_V15:
2106
    os_files[constants.OS_VARIANTS_FILE] = False
2107

    
2108
  if max(api_versions) >= constants.OS_API_V20:
2109
    os_files[constants.OS_PARAMETERS_FILE] = True
2110
  else:
2111
    del os_files[constants.OS_SCRIPT_VERIFY]
2112

    
2113
  for (filename, required) in os_files.items():
2114
    os_files[filename] = utils.PathJoin(os_dir, filename)
2115

    
2116
    try:
2117
      st = os.stat(os_files[filename])
2118
    except EnvironmentError, err:
2119
      if err.errno == errno.ENOENT and not required:
2120
        del os_files[filename]
2121
        continue
2122
      return False, ("File '%s' under path '%s' is missing (%s)" %
2123
                     (filename, os_dir, _ErrnoOrStr(err)))
2124

    
2125
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2126
      return False, ("File '%s' under path '%s' is not a regular file" %
2127
                     (filename, os_dir))
2128

    
2129
    if filename in constants.OS_SCRIPTS:
2130
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2131
        return False, ("File '%s' under path '%s' is not executable" %
2132
                       (filename, os_dir))
2133

    
2134
  variants = []
2135
  if constants.OS_VARIANTS_FILE in os_files:
2136
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2137
    try:
2138
      variants = utils.ReadFile(variants_file).splitlines()
2139
    except EnvironmentError, err:
2140
      # we accept missing files, but not other errors
2141
      if err.errno != errno.ENOENT:
2142
        return False, ("Error while reading the OS variants file at %s: %s" %
2143
                       (variants_file, _ErrnoOrStr(err)))
2144

    
2145
  parameters = []
2146
  if constants.OS_PARAMETERS_FILE in os_files:
2147
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2148
    try:
2149
      parameters = utils.ReadFile(parameters_file).splitlines()
2150
    except EnvironmentError, err:
2151
      return False, ("Error while reading the OS parameters file at %s: %s" %
2152
                     (parameters_file, _ErrnoOrStr(err)))
2153
    parameters = [v.split(None, 1) for v in parameters]
2154

    
2155
  os_obj = objects.OS(name=name, path=os_dir,
2156
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2157
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2158
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2159
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2160
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2161
                                                 None),
2162
                      supported_variants=variants,
2163
                      supported_parameters=parameters,
2164
                      api_versions=api_versions)
2165
  return True, os_obj
2166

    
2167

    
2168
def OSFromDisk(name, base_dir=None):
2169
  """Create an OS instance from disk.
2170

2171
  This function will return an OS instance if the given name is a
2172
  valid OS name. Otherwise, it will raise an appropriate
2173
  L{RPCFail} exception, detailing why this is not a valid OS.
2174

2175
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2176
  an exception but returns true/false status data.
2177

2178
  @type base_dir: string
2179
  @keyword base_dir: Base directory containing OS installations.
2180
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2181
  @rtype: L{objects.OS}
2182
  @return: the OS instance if we find a valid one
2183
  @raise RPCFail: if we don't find a valid OS
2184

2185
  """
2186
  name_only = objects.OS.GetName(name)
2187
  status, payload = _TryOSFromDisk(name_only, base_dir)
2188

    
2189
  if not status:
2190
    _Fail(payload)
2191

    
2192
  return payload
2193

    
2194

    
2195
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2196
  """Calculate the basic environment for an os script.
2197

2198
  @type os_name: str
2199
  @param os_name: full operating system name (including variant)
2200
  @type inst_os: L{objects.OS}
2201
  @param inst_os: operating system for which the environment is being built
2202
  @type os_params: dict
2203
  @param os_params: the OS parameters
2204
  @type debug: integer
2205
  @param debug: debug level (0 or 1, for OS Api 10)
2206
  @rtype: dict
2207
  @return: dict of environment variables
2208
  @raise errors.BlockDeviceError: if the block device
2209
      cannot be found
2210

2211
  """
2212
  result = {}
2213
  api_version = \
2214
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2215
  result["OS_API_VERSION"] = "%d" % api_version
2216
  result["OS_NAME"] = inst_os.name
2217
  result["DEBUG_LEVEL"] = "%d" % debug
2218

    
2219
  # OS variants
2220
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2221
    variant = objects.OS.GetVariant(os_name)
2222
    if not variant:
2223
      variant = inst_os.supported_variants[0]
2224
  else:
2225
    variant = ""
2226
  result["OS_VARIANT"] = variant
2227

    
2228
  # OS params
2229
  for pname, pvalue in os_params.items():
2230
    result["OSP_%s" % pname.upper()] = pvalue
2231

    
2232
  return result
2233

    
2234

    
2235
def OSEnvironment(instance, inst_os, debug=0):
2236
  """Calculate the environment for an os script.
2237

2238
  @type instance: L{objects.Instance}
2239
  @param instance: target instance for the os script run
2240
  @type inst_os: L{objects.OS}
2241
  @param inst_os: operating system for which the environment is being built
2242
  @type debug: integer
2243
  @param debug: debug level (0 or 1, for OS Api 10)
2244
  @rtype: dict
2245
  @return: dict of environment variables
2246
  @raise errors.BlockDeviceError: if the block device
2247
      cannot be found
2248

2249
  """
2250
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2251

    
2252
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2253
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2254

    
2255
  result["HYPERVISOR"] = instance.hypervisor
2256
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2257
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2258
  result["INSTANCE_SECONDARY_NODES"] = \
2259
      ("%s" % " ".join(instance.secondary_nodes))
2260

    
2261
  # Disks
2262
  for idx, disk in enumerate(instance.disks):
2263
    real_disk = _OpenRealBD(disk)
2264
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2265
    result["DISK_%d_ACCESS" % idx] = disk.mode
2266
    if constants.HV_DISK_TYPE in instance.hvparams:
2267
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2268
        instance.hvparams[constants.HV_DISK_TYPE]
2269
    if disk.dev_type in constants.LDS_BLOCK:
2270
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2271
    elif disk.dev_type == constants.LD_FILE:
2272
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2273
        "file:%s" % disk.physical_id[0]
2274

    
2275
  # NICs
2276
  for idx, nic in enumerate(instance.nics):
2277
    result["NIC_%d_MAC" % idx] = nic.mac
2278
    if nic.ip:
2279
      result["NIC_%d_IP" % idx] = nic.ip
2280
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2281
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2282
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2283
    if nic.nicparams[constants.NIC_LINK]:
2284
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2285
    if constants.HV_NIC_TYPE in instance.hvparams:
2286
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2287
        instance.hvparams[constants.HV_NIC_TYPE]
2288

    
2289
  # HV/BE params
2290
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2291
    for key, value in source.items():
2292
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2293

    
2294
  return result
2295

    
2296

    
2297
def BlockdevGrow(disk, amount, dryrun):
2298
  """Grow a stack of block devices.
2299

2300
  This function is called recursively, with the childrens being the
2301
  first ones to resize.
2302

2303
  @type disk: L{objects.Disk}
2304
  @param disk: the disk to be grown
2305
  @type amount: integer
2306
  @param amount: the amount (in mebibytes) to grow with
2307
  @type dryrun: boolean
2308
  @param dryrun: whether to execute the operation in simulation mode
2309
      only, without actually increasing the size
2310
  @rtype: (status, result)
2311
  @return: a tuple with the status of the operation (True/False), and
2312
      the errors message if status is False
2313

2314
  """
2315
  r_dev = _RecursiveFindBD(disk)
2316
  if r_dev is None:
2317
    _Fail("Cannot find block device %s", disk)
2318

    
2319
  try:
2320
    r_dev.Grow(amount, dryrun)
2321
  except errors.BlockDeviceError, err:
2322
    _Fail("Failed to grow block device: %s", err, exc=True)
2323

    
2324

    
2325
def BlockdevSnapshot(disk):
2326
  """Create a snapshot copy of a block device.
2327

2328
  This function is called recursively, and the snapshot is actually created
2329
  just for the leaf lvm backend device.
2330

2331
  @type disk: L{objects.Disk}
2332
  @param disk: the disk to be snapshotted
2333
  @rtype: string
2334
  @return: snapshot disk ID as (vg, lv)
2335

2336
  """
2337
  if disk.dev_type == constants.LD_DRBD8:
2338
    if not disk.children:
2339
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2340
            disk.unique_id)
2341
    return BlockdevSnapshot(disk.children[0])
2342
  elif disk.dev_type == constants.LD_LV:
2343
    r_dev = _RecursiveFindBD(disk)
2344
    if r_dev is not None:
2345
      # FIXME: choose a saner value for the snapshot size
2346
      # let's stay on the safe side and ask for the full size, for now
2347
      return r_dev.Snapshot(disk.size)
2348
    else:
2349
      _Fail("Cannot find block device %s", disk)
2350
  else:
2351
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2352
          disk.unique_id, disk.dev_type)
2353

    
2354

    
2355
def FinalizeExport(instance, snap_disks):
2356
  """Write out the export configuration information.
2357

2358
  @type instance: L{objects.Instance}
2359
  @param instance: the instance which we export, used for
2360
      saving configuration
2361
  @type snap_disks: list of L{objects.Disk}
2362
  @param snap_disks: list of snapshot block devices, which
2363
      will be used to get the actual name of the dump file
2364

2365
  @rtype: None
2366

2367
  """
2368
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2369
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2370

    
2371
  config = objects.SerializableConfigParser()
2372

    
2373
  config.add_section(constants.INISECT_EXP)
2374
  config.set(constants.INISECT_EXP, "version", "0")
2375
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2376
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2377
  config.set(constants.INISECT_EXP, "os", instance.os)
2378
  config.set(constants.INISECT_EXP, "compression", "none")
2379

    
2380
  config.add_section(constants.INISECT_INS)
2381
  config.set(constants.INISECT_INS, "name", instance.name)
2382
  config.set(constants.INISECT_INS, "memory", "%d" %
2383
             instance.beparams[constants.BE_MEMORY])
2384
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2385
             instance.beparams[constants.BE_VCPUS])
2386
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2387
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2388
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2389

    
2390
  nic_total = 0
2391
  for nic_count, nic in enumerate(instance.nics):
2392
    nic_total += 1
2393
    config.set(constants.INISECT_INS, "nic%d_mac" %
2394
               nic_count, "%s" % nic.mac)
2395
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2396
    for param in constants.NICS_PARAMETER_TYPES:
2397
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2398
                 "%s" % nic.nicparams.get(param, None))
2399
  # TODO: redundant: on load can read nics until it doesn't exist
2400
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2401

    
2402
  disk_total = 0
2403
  for disk_count, disk in enumerate(snap_disks):
2404
    if disk:
2405
      disk_total += 1
2406
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2407
                 ("%s" % disk.iv_name))
2408
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2409
                 ("%s" % disk.physical_id[1]))
2410
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2411
                 ("%d" % disk.size))
2412

    
2413
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2414

    
2415
  # New-style hypervisor/backend parameters
2416

    
2417
  config.add_section(constants.INISECT_HYP)
2418
  for name, value in instance.hvparams.items():
2419
    if name not in constants.HVC_GLOBALS:
2420
      config.set(constants.INISECT_HYP, name, str(value))
2421

    
2422
  config.add_section(constants.INISECT_BEP)
2423
  for name, value in instance.beparams.items():
2424
    config.set(constants.INISECT_BEP, name, str(value))
2425

    
2426
  config.add_section(constants.INISECT_OSP)
2427
  for name, value in instance.osparams.items():
2428
    config.set(constants.INISECT_OSP, name, str(value))
2429

    
2430
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2431
                  data=config.Dumps())
2432
  shutil.rmtree(finaldestdir, ignore_errors=True)
2433
  shutil.move(destdir, finaldestdir)
2434

    
2435

    
2436
def ExportInfo(dest):
2437
  """Get export configuration information.
2438

2439
  @type dest: str
2440
  @param dest: directory containing the export
2441

2442
  @rtype: L{objects.SerializableConfigParser}
2443
  @return: a serializable config file containing the
2444
      export info
2445

2446
  """
2447
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2448

    
2449
  config = objects.SerializableConfigParser()
2450
  config.read(cff)
2451

    
2452
  if (not config.has_section(constants.INISECT_EXP) or
2453
      not config.has_section(constants.INISECT_INS)):
2454
    _Fail("Export info file doesn't have the required fields")
2455

    
2456
  return config.Dumps()
2457

    
2458

    
2459
def ListExports():
2460
  """Return a list of exports currently available on this machine.
2461

2462
  @rtype: list
2463
  @return: list of the exports
2464

2465
  """
2466
  if os.path.isdir(constants.EXPORT_DIR):
2467
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2468
  else:
2469
    _Fail("No exports directory")
2470

    
2471

    
2472
def RemoveExport(export):
2473
  """Remove an existing export from the node.
2474

2475
  @type export: str
2476
  @param export: the name of the export to remove
2477
  @rtype: None
2478

2479
  """
2480
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2481

    
2482
  try:
2483
    shutil.rmtree(target)
2484
  except EnvironmentError, err:
2485
    _Fail("Error while removing the export: %s", err, exc=True)
2486

    
2487

    
2488
def BlockdevRename(devlist):
2489
  """Rename a list of block devices.
2490

2491
  @type devlist: list of tuples
2492
  @param devlist: list of tuples of the form  (disk,
2493
      new_logical_id, new_physical_id); disk is an
2494
      L{objects.Disk} object describing the current disk,
2495
      and new logical_id/physical_id is the name we
2496
      rename it to
2497
  @rtype: boolean
2498
  @return: True if all renames succeeded, False otherwise
2499

2500
  """
2501
  msgs = []
2502
  result = True
2503
  for disk, unique_id in devlist:
2504
    dev = _RecursiveFindBD(disk)
2505
    if dev is None:
2506
      msgs.append("Can't find device %s in rename" % str(disk))
2507
      result = False
2508
      continue
2509
    try:
2510
      old_rpath = dev.dev_path
2511
      dev.Rename(unique_id)
2512
      new_rpath = dev.dev_path
2513
      if old_rpath != new_rpath:
2514
        DevCacheManager.RemoveCache(old_rpath)
2515
        # FIXME: we should add the new cache information here, like:
2516
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2517
        # but we don't have the owner here - maybe parse from existing
2518
        # cache? for now, we only lose lvm data when we rename, which
2519
        # is less critical than DRBD or MD
2520
    except errors.BlockDeviceError, err:
2521
      msgs.append("Can't rename device '%s' to '%s': %s" %
2522
                  (dev, unique_id, err))
2523
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2524
      result = False
2525
  if not result:
2526
    _Fail("; ".join(msgs))
2527

    
2528

    
2529
def _TransformFileStorageDir(fs_dir):
2530
  """Checks whether given file_storage_dir is valid.
2531

2532
  Checks wheter the given fs_dir is within the cluster-wide default
2533
  file_storage_dir or the shared_file_storage_dir, which are stored in
2534
  SimpleStore. Only paths under those directories are allowed.
2535

2536
  @type fs_dir: str
2537
  @param fs_dir: the path to check
2538

2539
  @return: the normalized path if valid, None otherwise
2540

2541
  """
2542
  if not constants.ENABLE_FILE_STORAGE:
2543
    _Fail("File storage disabled at configure time")
2544
  cfg = _GetConfig()
2545
  fs_dir = os.path.normpath(fs_dir)
2546
  base_fstore = cfg.GetFileStorageDir()
2547
  base_shared = cfg.GetSharedFileStorageDir()
2548
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2549
          utils.IsBelowDir(base_shared, fs_dir)):
2550
    _Fail("File storage directory '%s' is not under base file"
2551
          " storage directory '%s' or shared storage directory '%s'",
2552
          fs_dir, base_fstore, base_shared)
2553
  return fs_dir
2554

    
2555

    
2556
def CreateFileStorageDir(file_storage_dir):
2557
  """Create file storage directory.
2558

2559
  @type file_storage_dir: str
2560
  @param file_storage_dir: directory to create
2561

2562
  @rtype: tuple
2563
  @return: tuple with first element a boolean indicating wheter dir
2564
      creation was successful or not
2565

2566
  """
2567
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2568
  if os.path.exists(file_storage_dir):
2569
    if not os.path.isdir(file_storage_dir):
2570
      _Fail("Specified storage dir '%s' is not a directory",
2571
            file_storage_dir)
2572
  else:
2573
    try:
2574
      os.makedirs(file_storage_dir, 0750)
2575
    except OSError, err:
2576
      _Fail("Cannot create file storage directory '%s': %s",
2577
            file_storage_dir, err, exc=True)
2578

    
2579

    
2580
def RemoveFileStorageDir(file_storage_dir):
2581
  """Remove file storage directory.
2582

2583
  Remove it only if it's empty. If not log an error and return.
2584

2585
  @type file_storage_dir: str
2586
  @param file_storage_dir: the directory we should cleanup
2587
  @rtype: tuple (success,)
2588
  @return: tuple of one element, C{success}, denoting
2589
      whether the operation was successful
2590

2591
  """
2592
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2593
  if os.path.exists(file_storage_dir):
2594
    if not os.path.isdir(file_storage_dir):
2595
      _Fail("Specified Storage directory '%s' is not a directory",
2596
            file_storage_dir)
2597
    # deletes dir only if empty, otherwise we want to fail the rpc call
2598
    try:
2599
      os.rmdir(file_storage_dir)
2600
    except OSError, err:
2601
      _Fail("Cannot remove file storage directory '%s': %s",
2602
            file_storage_dir, err)
2603

    
2604

    
2605
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2606
  """Rename the file storage directory.
2607

2608
  @type old_file_storage_dir: str
2609
  @param old_file_storage_dir: the current path
2610
  @type new_file_storage_dir: str
2611
  @param new_file_storage_dir: the name we should rename to
2612
  @rtype: tuple (success,)
2613
  @return: tuple of one element, C{success}, denoting
2614
      whether the operation was successful
2615

2616
  """
2617
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2618
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2619
  if not os.path.exists(new_file_storage_dir):
2620
    if os.path.isdir(old_file_storage_dir):
2621
      try:
2622
        os.rename(old_file_storage_dir, new_file_storage_dir)
2623
      except OSError, err:
2624
        _Fail("Cannot rename '%s' to '%s': %s",
2625
              old_file_storage_dir, new_file_storage_dir, err)
2626
    else:
2627
      _Fail("Specified storage dir '%s' is not a directory",
2628
            old_file_storage_dir)
2629
  else:
2630
    if os.path.exists(old_file_storage_dir):
2631
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2632
            old_file_storage_dir, new_file_storage_dir)
2633

    
2634

    
2635
def _EnsureJobQueueFile(file_name):
2636
  """Checks whether the given filename is in the queue directory.
2637

2638
  @type file_name: str
2639
  @param file_name: the file name we should check
2640
  @rtype: None
2641
  @raises RPCFail: if the file is not valid
2642

2643
  """
2644
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2645
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2646

    
2647
  if not result:
2648
    _Fail("Passed job queue file '%s' does not belong to"
2649
          " the queue directory '%s'", file_name, queue_dir)
2650

    
2651

    
2652
def JobQueueUpdate(file_name, content):
2653
  """Updates a file in the queue directory.
2654

2655
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2656
  checking.
2657

2658
  @type file_name: str
2659
  @param file_name: the job file name
2660
  @type content: str
2661
  @param content: the new job contents
2662
  @rtype: boolean
2663
  @return: the success of the operation
2664

2665
  """
2666
  _EnsureJobQueueFile(file_name)
2667
  getents = runtime.GetEnts()
2668

    
2669
  # Write and replace the file atomically
2670
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2671
                  gid=getents.masterd_gid)
2672

    
2673

    
2674
def JobQueueRename(old, new):
2675
  """Renames a job queue file.
2676

2677
  This is just a wrapper over os.rename with proper checking.
2678

2679
  @type old: str
2680
  @param old: the old (actual) file name
2681
  @type new: str
2682
  @param new: the desired file name
2683
  @rtype: tuple
2684
  @return: the success of the operation and payload
2685

2686
  """
2687
  _EnsureJobQueueFile(old)
2688
  _EnsureJobQueueFile(new)
2689

    
2690
  utils.RenameFile(old, new, mkdir=True)
2691

    
2692

    
2693
def BlockdevClose(instance_name, disks):
2694
  """Closes the given block devices.
2695

2696
  This means they will be switched to secondary mode (in case of
2697
  DRBD).
2698

2699
  @param instance_name: if the argument is not empty, the symlinks
2700
      of this instance will be removed
2701
  @type disks: list of L{objects.Disk}
2702
  @param disks: the list of disks to be closed
2703
  @rtype: tuple (success, message)
2704
  @return: a tuple of success and message, where success
2705
      indicates the succes of the operation, and message
2706
      which will contain the error details in case we
2707
      failed
2708

2709
  """
2710
  bdevs = []
2711
  for cf in disks:
2712
    rd = _RecursiveFindBD(cf)
2713
    if rd is None:
2714
      _Fail("Can't find device %s", cf)
2715
    bdevs.append(rd)
2716

    
2717
  msg = []
2718
  for rd in bdevs:
2719
    try:
2720
      rd.Close()
2721
    except errors.BlockDeviceError, err:
2722
      msg.append(str(err))
2723
  if msg:
2724
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2725
  else:
2726
    if instance_name:
2727
      _RemoveBlockDevLinks(instance_name, disks)
2728

    
2729

    
2730
def ValidateHVParams(hvname, hvparams):
2731
  """Validates the given hypervisor parameters.
2732

2733
  @type hvname: string
2734
  @param hvname: the hypervisor name
2735
  @type hvparams: dict
2736
  @param hvparams: the hypervisor parameters to be validated
2737
  @rtype: None
2738

2739
  """
2740
  try:
2741
    hv_type = hypervisor.GetHypervisor(hvname)
2742
    hv_type.ValidateParameters(hvparams)
2743
  except errors.HypervisorError, err:
2744
    _Fail(str(err), log=False)
2745

    
2746

    
2747
def _CheckOSPList(os_obj, parameters):
2748
  """Check whether a list of parameters is supported by the OS.
2749

2750
  @type os_obj: L{objects.OS}
2751
  @param os_obj: OS object to check
2752
  @type parameters: list
2753
  @param parameters: the list of parameters to check
2754

2755
  """
2756
  supported = [v[0] for v in os_obj.supported_parameters]
2757
  delta = frozenset(parameters).difference(supported)
2758
  if delta:
2759
    _Fail("The following parameters are not supported"
2760
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2761

    
2762

    
2763
def ValidateOS(required, osname, checks, osparams):
2764
  """Validate the given OS' parameters.
2765

2766
  @type required: boolean
2767
  @param required: whether absence of the OS should translate into
2768
      failure or not
2769
  @type osname: string
2770
  @param osname: the OS to be validated
2771
  @type checks: list
2772
  @param checks: list of the checks to run (currently only 'parameters')
2773
  @type osparams: dict
2774
  @param osparams: dictionary with OS parameters
2775
  @rtype: boolean
2776
  @return: True if the validation passed, or False if the OS was not
2777
      found and L{required} was false
2778

2779
  """
2780
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2781
    _Fail("Unknown checks required for OS %s: %s", osname,
2782
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2783

    
2784
  name_only = objects.OS.GetName(osname)
2785
  status, tbv = _TryOSFromDisk(name_only, None)
2786

    
2787
  if not status:
2788
    if required:
2789
      _Fail(tbv)
2790
    else:
2791
      return False
2792

    
2793
  if max(tbv.api_versions) < constants.OS_API_V20:
2794
    return True
2795

    
2796
  if constants.OS_VALIDATE_PARAMETERS in checks:
2797
    _CheckOSPList(tbv, osparams.keys())
2798

    
2799
  validate_env = OSCoreEnv(osname, tbv, osparams)
2800
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2801
                        cwd=tbv.path, reset_env=True)
2802
  if result.failed:
2803
    logging.error("os validate command '%s' returned error: %s output: %s",
2804
                  result.cmd, result.fail_reason, result.output)
2805
    _Fail("OS validation script failed (%s), output: %s",
2806
          result.fail_reason, result.output, log=False)
2807

    
2808
  return True
2809

    
2810

    
2811
def DemoteFromMC():
2812
  """Demotes the current node from master candidate role.
2813

2814
  """
2815
  # try to ensure we're not the master by mistake
2816
  master, myself = ssconf.GetMasterAndMyself()
2817
  if master == myself:
2818
    _Fail("ssconf status shows I'm the master node, will not demote")
2819

    
2820
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2821
  if not result.failed:
2822
    _Fail("The master daemon is running, will not demote")
2823

    
2824
  try:
2825
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2826
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2827
  except EnvironmentError, err:
2828
    if err.errno != errno.ENOENT:
2829
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2830

    
2831
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2832

    
2833

    
2834
def _GetX509Filenames(cryptodir, name):
2835
  """Returns the full paths for the private key and certificate.
2836

2837
  """
2838
  return (utils.PathJoin(cryptodir, name),
2839
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2840
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2841

    
2842

    
2843
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2844
  """Creates a new X509 certificate for SSL/TLS.
2845

2846
  @type validity: int
2847
  @param validity: Validity in seconds
2848
  @rtype: tuple; (string, string)
2849
  @return: Certificate name and public part
2850

2851
  """
2852
  (key_pem, cert_pem) = \
2853
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2854
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2855

    
2856
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2857
                              prefix="x509-%s-" % utils.TimestampForFilename())
2858
  try:
2859
    name = os.path.basename(cert_dir)
2860
    assert len(name) > 5
2861

    
2862
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2863

    
2864
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2865
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2866

    
2867
    # Never return private key as it shouldn't leave the node
2868
    return (name, cert_pem)
2869
  except Exception:
2870
    shutil.rmtree(cert_dir, ignore_errors=True)
2871
    raise
2872

    
2873

    
2874
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2875
  """Removes a X509 certificate.
2876

2877
  @type name: string
2878
  @param name: Certificate name
2879

2880
  """
2881
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2882

    
2883
  utils.RemoveFile(key_file)
2884
  utils.RemoveFile(cert_file)
2885

    
2886
  try:
2887
    os.rmdir(cert_dir)
2888
  except EnvironmentError, err:
2889
    _Fail("Cannot remove certificate directory '%s': %s",
2890
          cert_dir, err)
2891

    
2892

    
2893
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2894
  """Returns the command for the requested input/output.
2895

2896
  @type instance: L{objects.Instance}
2897
  @param instance: The instance object
2898
  @param mode: Import/export mode
2899
  @param ieio: Input/output type
2900
  @param ieargs: Input/output arguments
2901

2902
  """
2903
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2904

    
2905
  env = None
2906
  prefix = None
2907
  suffix = None
2908
  exp_size = None
2909

    
2910
  if ieio == constants.IEIO_FILE:
2911
    (filename, ) = ieargs
2912

    
2913
    if not utils.IsNormAbsPath(filename):
2914
      _Fail("Path '%s' is not normalized or absolute", filename)
2915

    
2916
    real_filename = os.path.realpath(filename)
2917
    directory = os.path.dirname(real_filename)
2918

    
2919
    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2920
      _Fail("File '%s' is not under exports directory '%s': %s",
2921
            filename, constants.EXPORT_DIR, real_filename)
2922

    
2923
    # Create directory
2924
    utils.Makedirs(directory, mode=0750)
2925

    
2926
    quoted_filename = utils.ShellQuote(filename)
2927

    
2928
    if mode == constants.IEM_IMPORT:
2929
      suffix = "> %s" % quoted_filename
2930
    elif mode == constants.IEM_EXPORT:
2931
      suffix = "< %s" % quoted_filename
2932

    
2933
      # Retrieve file size
2934
      try:
2935
        st = os.stat(filename)
2936
      except EnvironmentError, err:
2937
        logging.error("Can't stat(2) %s: %s", filename, err)
2938
      else:
2939
        exp_size = utils.BytesToMebibyte(st.st_size)
2940

    
2941
  elif ieio == constants.IEIO_RAW_DISK:
2942
    (disk, ) = ieargs
2943

    
2944
    real_disk = _OpenRealBD(disk)
2945

    
2946
    if mode == constants.IEM_IMPORT:
2947
      # we set here a smaller block size as, due to transport buffering, more
2948
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2949
      # is not already there or we pass a wrong path; we use notrunc to no
2950
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2951
      # much memory; this means that at best, we flush every 64k, which will
2952
      # not be very fast
2953
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2954
                                    " bs=%s oflag=dsync"),
2955
                                    real_disk.dev_path,
2956
                                    str(64 * 1024))
2957

    
2958
    elif mode == constants.IEM_EXPORT:
2959
      # the block size on the read dd is 1MiB to match our units
2960
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2961
                                   real_disk.dev_path,
2962
                                   str(1024 * 1024), # 1 MB
2963
                                   str(disk.size))
2964
      exp_size = disk.size
2965

    
2966
  elif ieio == constants.IEIO_SCRIPT:
2967
    (disk, disk_index, ) = ieargs
2968

    
2969
    assert isinstance(disk_index, (int, long))
2970

    
2971
    real_disk = _OpenRealBD(disk)
2972

    
2973
    inst_os = OSFromDisk(instance.os)
2974
    env = OSEnvironment(instance, inst_os)
2975

    
2976
    if mode == constants.IEM_IMPORT:
2977
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2978
      env["IMPORT_INDEX"] = str(disk_index)
2979
      script = inst_os.import_script
2980

    
2981
    elif mode == constants.IEM_EXPORT:
2982
      env["EXPORT_DEVICE"] = real_disk.dev_path
2983
      env["EXPORT_INDEX"] = str(disk_index)
2984
      script = inst_os.export_script
2985

    
2986
    # TODO: Pass special environment only to script
2987
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2988

    
2989
    if mode == constants.IEM_IMPORT:
2990
      suffix = "| %s" % script_cmd
2991

    
2992
    elif mode == constants.IEM_EXPORT:
2993
      prefix = "%s |" % script_cmd
2994

    
2995
    # Let script predict size
2996
    exp_size = constants.IE_CUSTOM_SIZE
2997

    
2998
  else:
2999
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3000

    
3001
  return (env, prefix, suffix, exp_size)
3002

    
3003

    
3004
def _CreateImportExportStatusDir(prefix):
3005
  """Creates status directory for import/export.
3006

3007
  """
3008
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3009
                          prefix=("%s-%s-" %
3010
                                  (prefix, utils.TimestampForFilename())))
3011

    
3012

    
3013
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3014
                            ieio, ieioargs):
3015
  """Starts an import or export daemon.
3016

3017
  @param mode: Import/output mode
3018
  @type opts: L{objects.ImportExportOptions}
3019
  @param opts: Daemon options
3020
  @type host: string
3021
  @param host: Remote host for export (None for import)
3022
  @type port: int
3023
  @param port: Remote port for export (None for import)
3024
  @type instance: L{objects.Instance}
3025
  @param instance: Instance object
3026
  @type component: string
3027
  @param component: which part of the instance is transferred now,
3028
      e.g. 'disk/0'
3029
  @param ieio: Input/output type
3030
  @param ieioargs: Input/output arguments
3031

3032
  """
3033
  if mode == constants.IEM_IMPORT:
3034
    prefix = "import"
3035

    
3036
    if not (host is None and port is None):
3037
      _Fail("Can not specify host or port on import")
3038

    
3039
  elif mode == constants.IEM_EXPORT:
3040
    prefix = "export"
3041

    
3042
    if host is None or port is None:
3043
      _Fail("Host and port must be specified for an export")
3044

    
3045
  else:
3046
    _Fail("Invalid mode %r", mode)
3047

    
3048
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3049
    _Fail("Cluster certificate can only be used for both key and CA")
3050

    
3051
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3052
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3053

    
3054
  if opts.key_name is None:
3055
    # Use server.pem
3056
    key_path = constants.NODED_CERT_FILE
3057
    cert_path = constants.NODED_CERT_FILE
3058
    assert opts.ca_pem is None
3059
  else:
3060
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3061
                                                 opts.key_name)
3062
    assert opts.ca_pem is not None
3063

    
3064
  for i in [key_path, cert_path]:
3065
    if not os.path.exists(i):
3066
      _Fail("File '%s' does not exist" % i)
3067

    
3068
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3069
  try:
3070
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3071
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3072
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3073

    
3074
    if opts.ca_pem is None:
3075
      # Use server.pem
3076
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
3077
    else:
3078
      ca = opts.ca_pem
3079

    
3080
    # Write CA file
3081
    utils.WriteFile(ca_file, data=ca, mode=0400)
3082

    
3083
    cmd = [
3084
      constants.IMPORT_EXPORT_DAEMON,
3085
      status_file, mode,
3086
      "--key=%s" % key_path,
3087
      "--cert=%s" % cert_path,
3088
      "--ca=%s" % ca_file,
3089
      ]
3090

    
3091
    if host:
3092
      cmd.append("--host=%s" % host)
3093

    
3094
    if port:
3095
      cmd.append("--port=%s" % port)
3096

    
3097
    if opts.ipv6:
3098
      cmd.append("--ipv6")
3099
    else:
3100
      cmd.append("--ipv4")
3101

    
3102
    if opts.compress:
3103
      cmd.append("--compress=%s" % opts.compress)
3104

    
3105
    if opts.magic:
3106
      cmd.append("--magic=%s" % opts.magic)
3107

    
3108
    if exp_size is not None:
3109
      cmd.append("--expected-size=%s" % exp_size)
3110

    
3111
    if cmd_prefix:
3112
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3113

    
3114
    if cmd_suffix:
3115
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3116

    
3117
    if mode == constants.IEM_EXPORT:
3118
      # Retry connection a few times when connecting to remote peer
3119
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3120
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3121
    elif opts.connect_timeout is not None:
3122
      assert mode == constants.IEM_IMPORT
3123
      # Overall timeout for establishing connection while listening
3124
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3125

    
3126
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3127

    
3128
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3129
    # support for receiving a file descriptor for output
3130
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3131
                      output=logfile)
3132

    
3133
    # The import/export name is simply the status directory name
3134
    return os.path.basename(status_dir)
3135

    
3136
  except Exception:
3137
    shutil.rmtree(status_dir, ignore_errors=True)
3138
    raise
3139

    
3140

    
3141
def GetImportExportStatus(names):
3142
  """Returns import/export daemon status.
3143

3144
  @type names: sequence
3145
  @param names: List of names
3146
  @rtype: List of dicts
3147
  @return: Returns a list of the state of each named import/export or None if a
3148
           status couldn't be read
3149

3150
  """
3151
  result = []
3152

    
3153
  for name in names:
3154
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3155
                                 _IES_STATUS_FILE)
3156

    
3157
    try:
3158
      data = utils.ReadFile(status_file)
3159
    except EnvironmentError, err:
3160
      if err.errno != errno.ENOENT:
3161
        raise
3162
      data = None
3163

    
3164
    if not data:
3165
      result.append(None)
3166
      continue
3167

    
3168
    result.append(serializer.LoadJson(data))
3169

    
3170
  return result
3171

    
3172

    
3173
def AbortImportExport(name):
3174
  """Sends SIGTERM to a running import/export daemon.
3175

3176
  """
3177
  logging.info("Abort import/export %s", name)
3178

    
3179
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3180
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3181

    
3182
  if pid:
3183
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3184
                 name, pid)
3185
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3186

    
3187

    
3188
def CleanupImportExport(name):
3189
  """Cleanup after an import or export.
3190

3191
  If the import/export daemon is still running it's killed. Afterwards the
3192
  whole status directory is removed.
3193

3194
  """
3195
  logging.info("Finalizing import/export %s", name)
3196

    
3197
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3198

    
3199
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3200

    
3201
  if pid:
3202
    logging.info("Import/export %s is still running with PID %s",
3203
                 name, pid)
3204
    utils.KillProcess(pid, waitpid=False)
3205

    
3206
  shutil.rmtree(status_dir, ignore_errors=True)
3207

    
3208

    
3209
def _FindDisks(nodes_ip, disks):
3210
  """Sets the physical ID on disks and returns the block devices.
3211

3212
  """
3213
  # set the correct physical ID
3214
  my_name = netutils.Hostname.GetSysName()
3215
  for cf in disks:
3216
    cf.SetPhysicalID(my_name, nodes_ip)
3217

    
3218
  bdevs = []
3219

    
3220
  for cf in disks:
3221
    rd = _RecursiveFindBD(cf)
3222
    if rd is None:
3223
      _Fail("Can't find device %s", cf)
3224
    bdevs.append(rd)
3225
  return bdevs
3226

    
3227

    
3228
def DrbdDisconnectNet(nodes_ip, disks):
3229
  """Disconnects the network on a list of drbd devices.
3230

3231
  """
3232
  bdevs = _FindDisks(nodes_ip, disks)
3233

    
3234
  # disconnect disks
3235
  for rd in bdevs:
3236
    try:
3237
      rd.DisconnectNet()
3238
    except errors.BlockDeviceError, err:
3239
      _Fail("Can't change network configuration to standalone mode: %s",
3240
            err, exc=True)
3241

    
3242

    
3243
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3244
  """Attaches the network on a list of drbd devices.
3245

3246
  """
3247
  bdevs = _FindDisks(nodes_ip, disks)
3248

    
3249
  if multimaster:
3250
    for idx, rd in enumerate(bdevs):
3251
      try:
3252
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3253
      except EnvironmentError, err:
3254
        _Fail("Can't create symlink: %s", err)
3255
  # reconnect disks, switch to new master configuration and if
3256
  # needed primary mode
3257
  for rd in bdevs:
3258
    try:
3259
      rd.AttachNet(multimaster)
3260
    except errors.BlockDeviceError, err:
3261
      _Fail("Can't change network configuration: %s", err)
3262

    
3263
  # wait until the disks are connected; we need to retry the re-attach
3264
  # if the device becomes standalone, as this might happen if the one
3265
  # node disconnects and reconnects in a different mode before the
3266
  # other node reconnects; in this case, one or both of the nodes will
3267
  # decide it has wrong configuration and switch to standalone
3268

    
3269
  def _Attach():
3270
    all_connected = True
3271

    
3272
    for rd in bdevs:
3273
      stats = rd.GetProcStatus()
3274

    
3275
      all_connected = (all_connected and
3276
                       (stats.is_connected or stats.is_in_resync))
3277

    
3278
      if stats.is_standalone:
3279
        # peer had different config info and this node became
3280
        # standalone, even though this should not happen with the
3281
        # new staged way of changing disk configs
3282
        try:
3283
          rd.AttachNet(multimaster)
3284
        except errors.BlockDeviceError, err:
3285
          _Fail("Can't change network configuration: %s", err)
3286

    
3287
    if not all_connected:
3288
      raise utils.RetryAgain()
3289

    
3290
  try:
3291
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3292
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3293
  except utils.RetryTimeout:
3294
    _Fail("Timeout in disk reconnecting")
3295

    
3296
  if multimaster:
3297
    # change to primary mode
3298
    for rd in bdevs:
3299
      try:
3300
        rd.Open()
3301
      except errors.BlockDeviceError, err:
3302
        _Fail("Can't change to primary mode: %s", err)
3303

    
3304

    
3305
def DrbdWaitSync(nodes_ip, disks):
3306
  """Wait until DRBDs have synchronized.
3307

3308
  """
3309
  def _helper(rd):
3310
    stats = rd.GetProcStatus()
3311
    if not (stats.is_connected or stats.is_in_resync):
3312
      raise utils.RetryAgain()
3313
    return stats
3314

    
3315
  bdevs = _FindDisks(nodes_ip, disks)
3316

    
3317
  min_resync = 100
3318
  alldone = True
3319
  for rd in bdevs:
3320
    try:
3321
      # poll each second for 15 seconds
3322
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3323
    except utils.RetryTimeout:
3324
      stats = rd.GetProcStatus()
3325
      # last check
3326
      if not (stats.is_connected or stats.is_in_resync):
3327
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3328
    alldone = alldone and (not stats.is_in_resync)
3329
    if stats.sync_percent is not None:
3330
      min_resync = min(min_resync, stats.sync_percent)
3331

    
3332
  return (alldone, min_resync)
3333

    
3334

    
3335
def GetDrbdUsermodeHelper():
3336
  """Returns DRBD usermode helper currently configured.
3337

3338
  """
3339
  try:
3340
    return bdev.BaseDRBD.GetUsermodeHelper()
3341
  except errors.BlockDeviceError, err:
3342
    _Fail(str(err))
3343

    
3344

    
3345
def PowercycleNode(hypervisor_type):
3346
  """Hard-powercycle the node.
3347

3348
  Because we need to return first, and schedule the powercycle in the
3349
  background, we won't be able to report failures nicely.
3350

3351
  """
3352
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3353
  try:
3354
    pid = os.fork()
3355
  except OSError:
3356
    # if we can't fork, we'll pretend that we're in the child process
3357
    pid = 0
3358
  if pid > 0:
3359
    return "Reboot scheduled in 5 seconds"
3360
  # ensure the child is running on ram
3361
  try:
3362
    utils.Mlockall()
3363
  except Exception: # pylint: disable=W0703
3364
    pass
3365
  time.sleep(5)
3366
  hyper.PowercycleNode()
3367

    
3368

    
3369
class HooksRunner(object):
3370
  """Hook runner.
3371

3372
  This class is instantiated on the node side (ganeti-noded) and not
3373
  on the master side.
3374

3375
  """
3376
  def __init__(self, hooks_base_dir=None):
3377
    """Constructor for hooks runner.
3378

3379
    @type hooks_base_dir: str or None
3380
    @param hooks_base_dir: if not None, this overrides the
3381
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3382

3383
    """
3384
    if hooks_base_dir is None:
3385
      hooks_base_dir = constants.HOOKS_BASE_DIR
3386
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3387
    # constant
3388
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3389

    
3390
  def RunHooks(self, hpath, phase, env):
3391
    """Run the scripts in the hooks directory.
3392

3393
    @type hpath: str
3394
    @param hpath: the path to the hooks directory which
3395
        holds the scripts
3396
    @type phase: str
3397
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3398
        L{constants.HOOKS_PHASE_POST}
3399
    @type env: dict
3400
    @param env: dictionary with the environment for the hook
3401
    @rtype: list
3402
    @return: list of 3-element tuples:
3403
      - script path
3404
      - script result, either L{constants.HKR_SUCCESS} or
3405
        L{constants.HKR_FAIL}
3406
      - output of the script
3407

3408
    @raise errors.ProgrammerError: for invalid input
3409
        parameters
3410

3411
    """
3412
    if phase == constants.HOOKS_PHASE_PRE:
3413
      suffix = "pre"
3414
    elif phase == constants.HOOKS_PHASE_POST:
3415
      suffix = "post"
3416
    else:
3417
      _Fail("Unknown hooks phase '%s'", phase)
3418

    
3419
    subdir = "%s-%s.d" % (hpath, suffix)
3420
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3421

    
3422
    results = []
3423

    
3424
    if not os.path.isdir(dir_name):
3425
      # for non-existing/non-dirs, we simply exit instead of logging a
3426
      # warning at every operation
3427
      return results
3428

    
3429
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3430

    
3431
    for (relname, relstatus, runresult)  in runparts_results:
3432
      if relstatus == constants.RUNPARTS_SKIP:
3433
        rrval = constants.HKR_SKIP
3434
        output = ""
3435
      elif relstatus == constants.RUNPARTS_ERR:
3436
        rrval = constants.HKR_FAIL
3437
        output = "Hook script execution error: %s" % runresult
3438
      elif relstatus == constants.RUNPARTS_RUN:
3439
        if runresult.failed:
3440
          rrval = constants.HKR_FAIL
3441
        else:
3442
          rrval = constants.HKR_SUCCESS
3443
        output = utils.SafeEncode(runresult.output.strip())
3444
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3445

    
3446
    return results
3447

    
3448

    
3449
class IAllocatorRunner(object):
3450
  """IAllocator runner.
3451

3452
  This class is instantiated on the node side (ganeti-noded) and not on
3453
  the master side.
3454

3455
  """
3456
  @staticmethod
3457
  def Run(name, idata):
3458
    """Run an iallocator script.
3459

3460
    @type name: str
3461
    @param name: the iallocator script name
3462
    @type idata: str
3463
    @param idata: the allocator input data
3464

3465
    @rtype: tuple
3466
    @return: two element tuple of:
3467
       - status
3468
       - either error message or stdout of allocator (for success)
3469

3470
    """
3471
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3472
                                  os.path.isfile)
3473
    if alloc_script is None:
3474
      _Fail("iallocator module '%s' not found in the search path", name)
3475

    
3476
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3477
    try:
3478
      os.write(fd, idata)
3479
      os.close(fd)
3480
      result = utils.RunCmd([alloc_script, fin_name])
3481
      if result.failed:
3482
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3483
              name, result.fail_reason, result.output)
3484
    finally:
3485
      os.unlink(fin_name)
3486

    
3487
    return result.stdout
3488

    
3489

    
3490
class DevCacheManager(object):
3491
  """Simple class for managing a cache of block device information.
3492

3493
  """
3494
  _DEV_PREFIX = "/dev/"
3495
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3496

    
3497
  @classmethod
3498
  def _ConvertPath(cls, dev_path):
3499
    """Converts a /dev/name path to the cache file name.
3500

3501
    This replaces slashes with underscores and strips the /dev
3502
    prefix. It then returns the full path to the cache file.
3503

3504
    @type dev_path: str
3505
    @param dev_path: the C{/dev/} path name
3506
    @rtype: str
3507
    @return: the converted path name
3508

3509
    """
3510
    if dev_path.startswith(cls._DEV_PREFIX):
3511
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3512
    dev_path = dev_path.replace("/", "_")
3513
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3514
    return fpath
3515

    
3516
  @classmethod
3517
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3518
    """Updates the cache information for a given device.
3519

3520
    @type dev_path: str
3521
    @param dev_path: the pathname of the device
3522
    @type owner: str
3523
    @param owner: the owner (instance name) of the device
3524
    @type on_primary: bool
3525
    @param on_primary: whether this is the primary
3526
        node nor not
3527
    @type iv_name: str
3528
    @param iv_name: the instance-visible name of the
3529
        device, as in objects.Disk.iv_name
3530

3531
    @rtype: None
3532

3533
    """
3534
    if dev_path is None:
3535
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3536
      return
3537
    fpath = cls._ConvertPath(dev_path)
3538
    if on_primary:
3539
      state = "primary"
3540
    else:
3541
      state = "secondary"
3542
    if iv_name is None:
3543
      iv_name = "not_visible"
3544
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3545
    try:
3546
      utils.WriteFile(fpath, data=fdata)
3547
    except EnvironmentError, err:
3548
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3549

    
3550
  @classmethod
3551
  def RemoveCache(cls, dev_path):
3552
    """Remove data for a dev_path.
3553

3554
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3555
    path name and logging.
3556

3557
    @type dev_path: str
3558
    @param dev_path: the pathname of the device
3559

3560
    @rtype: None
3561

3562
    """
3563
    if dev_path is None:
3564
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3565
      return
3566
    fpath = cls._ConvertPath(dev_path)
3567
    try:
3568
      utils.RemoveFile(fpath)
3569
    except EnvironmentError, err:
3570
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)