Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 9888b9e6

History | View | Annotate | Download (108.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63

    
64

    
65
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66
_ALLOWED_CLEAN_DIRS = frozenset([
67
  constants.DATA_DIR,
68
  constants.JOB_QUEUE_ARCHIVE_DIR,
69
  constants.QUEUE_DIR,
70
  constants.CRYPTO_KEYS_DIR,
71
  ])
72
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73
_X509_KEY_FILE = "key"
74
_X509_CERT_FILE = "cert"
75
_IES_STATUS_FILE = "status"
76
_IES_PID_FILE = "pid"
77
_IES_CA_FILE = "ca"
78

    
79
#: Valid LVS output line regex
80
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81

    
82

    
83
class RPCFail(Exception):
84
  """Class denoting RPC failure.
85

86
  Its argument is the error message.
87

88
  """
89

    
90

    
91
def _Fail(msg, *args, **kwargs):
92
  """Log an error and the raise an RPCFail exception.
93

94
  This exception is then handled specially in the ganeti daemon and
95
  turned into a 'failed' return type. As such, this function is a
96
  useful shortcut for logging the error and returning it to the master
97
  daemon.
98

99
  @type msg: string
100
  @param msg: the text of the exception
101
  @raise RPCFail
102

103
  """
104
  if args:
105
    msg = msg % args
106
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
107
    if "exc" in kwargs and kwargs["exc"]:
108
      logging.exception(msg)
109
    else:
110
      logging.error(msg)
111
  raise RPCFail(msg)
112

    
113

    
114
def _GetConfig():
115
  """Simple wrapper to return a SimpleStore.
116

117
  @rtype: L{ssconf.SimpleStore}
118
  @return: a SimpleStore instance
119

120
  """
121
  return ssconf.SimpleStore()
122

    
123

    
124
def _GetSshRunner(cluster_name):
125
  """Simple wrapper to return an SshRunner.
126

127
  @type cluster_name: str
128
  @param cluster_name: the cluster name, which is needed
129
      by the SshRunner constructor
130
  @rtype: L{ssh.SshRunner}
131
  @return: an SshRunner instance
132

133
  """
134
  return ssh.SshRunner(cluster_name)
135

    
136

    
137
def _Decompress(data):
138
  """Unpacks data compressed by the RPC client.
139

140
  @type data: list or tuple
141
  @param data: Data sent by RPC client
142
  @rtype: str
143
  @return: Decompressed data
144

145
  """
146
  assert isinstance(data, (list, tuple))
147
  assert len(data) == 2
148
  (encoding, content) = data
149
  if encoding == constants.RPC_ENCODING_NONE:
150
    return content
151
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152
    return zlib.decompress(base64.b64decode(content))
153
  else:
154
    raise AssertionError("Unknown data encoding")
155

    
156

    
157
def _CleanDirectory(path, exclude=None):
158
  """Removes all regular files in a directory.
159

160
  @type path: str
161
  @param path: the directory to clean
162
  @type exclude: list
163
  @param exclude: list of files to be excluded, defaults
164
      to the empty list
165

166
  """
167
  if path not in _ALLOWED_CLEAN_DIRS:
168
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169
          path)
170

    
171
  if not os.path.isdir(path):
172
    return
173
  if exclude is None:
174
    exclude = []
175
  else:
176
    # Normalize excluded paths
177
    exclude = [os.path.normpath(i) for i in exclude]
178

    
179
  for rel_name in utils.ListVisibleFiles(path):
180
    full_name = utils.PathJoin(path, rel_name)
181
    if full_name in exclude:
182
      continue
183
    if os.path.isfile(full_name) and not os.path.islink(full_name):
184
      utils.RemoveFile(full_name)
185

    
186

    
187
def _BuildUploadFileList():
188
  """Build the list of allowed upload files.
189

190
  This is abstracted so that it's built only once at module import time.
191

192
  """
193
  allowed_files = set([
194
    constants.CLUSTER_CONF_FILE,
195
    constants.ETC_HOSTS,
196
    constants.SSH_KNOWN_HOSTS_FILE,
197
    constants.VNC_PASSWORD_FILE,
198
    constants.RAPI_CERT_FILE,
199
    constants.SPICE_CERT_FILE,
200
    constants.SPICE_CACERT_FILE,
201
    constants.RAPI_USERS_FILE,
202
    constants.CONFD_HMAC_KEY,
203
    constants.CLUSTER_DOMAIN_SECRET_FILE,
204
    ])
205

    
206
  for hv_name in constants.HYPER_TYPES:
207
    hv_class = hypervisor.GetHypervisorClass(hv_name)
208
    allowed_files.update(hv_class.GetAncillaryFiles())
209

    
210
  return frozenset(allowed_files)
211

    
212

    
213
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
214

    
215

    
216
def JobQueuePurge():
217
  """Removes job queue files and archived jobs.
218

219
  @rtype: tuple
220
  @return: True, None
221

222
  """
223
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
225

    
226

    
227
def GetMasterInfo():
228
  """Returns master information.
229

230
  This is an utility function to compute master information, either
231
  for consumption here or from the node daemon.
232

233
  @rtype: tuple
234
  @return: master_netdev, master_ip, master_name, primary_ip_family,
235
    master_netmask
236
  @raise RPCFail: in case of errors
237

238
  """
239
  try:
240
    cfg = _GetConfig()
241
    master_netdev = cfg.GetMasterNetdev()
242
    master_ip = cfg.GetMasterIP()
243
    master_netmask = cfg.GetMasterNetmask()
244
    master_node = cfg.GetMasterNode()
245
    primary_ip_family = cfg.GetPrimaryIPFamily()
246
  except errors.ConfigurationError, err:
247
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
248
  return (master_netdev, master_ip, master_node, primary_ip_family,
249
      master_netmask)
250

    
251

    
252
def ActivateMasterIp():
253
  """Activate the IP address of the master daemon.
254

255
  """
256
  # GetMasterInfo will raise an exception if not able to return data
257
  master_netdev, master_ip, _, family, master_netmask = GetMasterInfo()
258

    
259
  err_msg = None
260
  if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
261
    if netutils.IPAddress.Own(master_ip):
262
      # we already have the ip:
263
      logging.debug("Master IP already configured, doing nothing")
264
    else:
265
      err_msg = "Someone else has the master ip, not activating"
266
      logging.error(err_msg)
267
  else:
268
    ipcls = netutils.IP4Address
269
    if family == netutils.IP6Address.family:
270
      ipcls = netutils.IP6Address
271

    
272
    result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
273
                           "%s/%s" % (master_ip, master_netmask),
274
                           "dev", master_netdev, "label",
275
                           "%s:0" % master_netdev])
276
    if result.failed:
277
      err_msg = "Can't activate master IP: %s" % result.output
278
      logging.error(err_msg)
279

    
280
    else:
281
      # we ignore the exit code of the following cmds
282
      if ipcls == netutils.IP4Address:
283
        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
284
                      master_ip, master_ip])
285
      elif ipcls == netutils.IP6Address:
286
        try:
287
          utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
288
        except errors.OpExecError:
289
          # TODO: Better error reporting
290
          logging.warning("Can't execute ndisc6, please install if missing")
291

    
292
  if err_msg:
293
    _Fail(err_msg)
294

    
295

    
296
def StartMasterDaemons(no_voting):
297
  """Activate local node as master node.
298

299
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
300

301
  @type no_voting: boolean
302
  @param no_voting: whether to start ganeti-masterd without a node vote
303
      but still non-interactively
304
  @rtype: None
305

306
  """
307

    
308
  if no_voting:
309
    masterd_args = "--no-voting --yes-do-it"
310
  else:
311
    masterd_args = ""
312

    
313
  env = {
314
    "EXTRA_MASTERD_ARGS": masterd_args,
315
    }
316

    
317
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
318
  if result.failed:
319
    msg = "Can't start Ganeti master: %s" % result.output
320
    logging.error(msg)
321
    _Fail(msg)
322

    
323

    
324
def DeactivateMasterIp():
325
  """Deactivate the master IP on this node.
326

327
  """
328
  # TODO: log and report back to the caller the error failures; we
329
  # need to decide in which case we fail the RPC for this
330

    
331
  # GetMasterInfo will raise an exception if not able to return data
332
  master_netdev, master_ip, _, _, master_netmask = GetMasterInfo()
333

    
334
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
335
                         "%s/%s" % (master_ip, master_netmask),
336
                         "dev", master_netdev])
337
  if result.failed:
338
    logging.error("Can't remove the master IP, error: %s", result.output)
339
    # but otherwise ignore the failure
340

    
341

    
342
def StopMasterDaemons():
343
  """Stop the master daemons on this node.
344

345
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
346

347
  @rtype: None
348

349
  """
350
  # TODO: log and report back to the caller the error failures; we
351
  # need to decide in which case we fail the RPC for this
352

    
353
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
354
  if result.failed:
355
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
356
                  " and error %s",
357
                  result.cmd, result.exit_code, result.output)
358

    
359

    
360
def ChangeMasterNetmask(netmask):
361
  """Change the netmask of the master IP.
362

363
  """
364
  master_netdev, master_ip, _, _, old_netmask = GetMasterInfo()
365
  if old_netmask == netmask:
366
    return
367

    
368
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
369
                         "%s/%s" % (master_ip, netmask),
370
                         "dev", master_netdev, "label",
371
                         "%s:0" % master_netdev])
372
  if result.failed:
373
    _Fail("Could not change the master IP netmask")
374

    
375
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
376
                         "%s/%s" % (master_ip, old_netmask),
377
                         "dev", master_netdev, "label",
378
                         "%s:0" % master_netdev])
379
  if result.failed:
380
    _Fail("Could not change the master IP netmask")
381

    
382

    
383
def EtcHostsModify(mode, host, ip):
384
  """Modify a host entry in /etc/hosts.
385

386
  @param mode: The mode to operate. Either add or remove entry
387
  @param host: The host to operate on
388
  @param ip: The ip associated with the entry
389

390
  """
391
  if mode == constants.ETC_HOSTS_ADD:
392
    if not ip:
393
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
394
              " present")
395
    utils.AddHostToEtcHosts(host, ip)
396
  elif mode == constants.ETC_HOSTS_REMOVE:
397
    if ip:
398
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
399
              " parameter is present")
400
    utils.RemoveHostFromEtcHosts(host)
401
  else:
402
    RPCFail("Mode not supported")
403

    
404

    
405
def LeaveCluster(modify_ssh_setup):
406
  """Cleans up and remove the current node.
407

408
  This function cleans up and prepares the current node to be removed
409
  from the cluster.
410

411
  If processing is successful, then it raises an
412
  L{errors.QuitGanetiException} which is used as a special case to
413
  shutdown the node daemon.
414

415
  @param modify_ssh_setup: boolean
416

417
  """
418
  _CleanDirectory(constants.DATA_DIR)
419
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
420
  JobQueuePurge()
421

    
422
  if modify_ssh_setup:
423
    try:
424
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
425

    
426
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
427

    
428
      utils.RemoveFile(priv_key)
429
      utils.RemoveFile(pub_key)
430
    except errors.OpExecError:
431
      logging.exception("Error while processing ssh files")
432

    
433
  try:
434
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
435
    utils.RemoveFile(constants.RAPI_CERT_FILE)
436
    utils.RemoveFile(constants.SPICE_CERT_FILE)
437
    utils.RemoveFile(constants.SPICE_CACERT_FILE)
438
    utils.RemoveFile(constants.NODED_CERT_FILE)
439
  except: # pylint: disable=W0702
440
    logging.exception("Error while removing cluster secrets")
441

    
442
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
443
  if result.failed:
444
    logging.error("Command %s failed with exitcode %s and error %s",
445
                  result.cmd, result.exit_code, result.output)
446

    
447
  # Raise a custom exception (handled in ganeti-noded)
448
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
449

    
450

    
451
def GetNodeInfo(vgname, hypervisor_type):
452
  """Gives back a hash with different information about the node.
453

454
  @type vgname: C{string}
455
  @param vgname: the name of the volume group to ask for disk space information
456
  @type hypervisor_type: C{str}
457
  @param hypervisor_type: the name of the hypervisor to ask for
458
      memory information
459
  @rtype: C{dict}
460
  @return: dictionary with the following keys:
461
      - vg_size is the size of the configured volume group in MiB
462
      - vg_free is the free size of the volume group in MiB
463
      - memory_dom0 is the memory allocated for domain0 in MiB
464
      - memory_free is the currently available (free) ram in MiB
465
      - memory_total is the total number of ram in MiB
466
      - hv_version: the hypervisor version, if available
467

468
  """
469
  outputarray = {}
470

    
471
  if vgname is not None:
472
    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
473
    vg_free = vg_size = None
474
    if vginfo:
475
      vg_free = int(round(vginfo[0][0], 0))
476
      vg_size = int(round(vginfo[0][1], 0))
477
    outputarray["vg_size"] = vg_size
478
    outputarray["vg_free"] = vg_free
479

    
480
  if hypervisor_type is not None:
481
    hyper = hypervisor.GetHypervisor(hypervisor_type)
482
    hyp_info = hyper.GetNodeInfo()
483
    if hyp_info is not None:
484
      outputarray.update(hyp_info)
485

    
486
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
487

    
488
  return outputarray
489

    
490

    
491
def VerifyNode(what, cluster_name):
492
  """Verify the status of the local node.
493

494
  Based on the input L{what} parameter, various checks are done on the
495
  local node.
496

497
  If the I{filelist} key is present, this list of
498
  files is checksummed and the file/checksum pairs are returned.
499

500
  If the I{nodelist} key is present, we check that we have
501
  connectivity via ssh with the target nodes (and check the hostname
502
  report).
503

504
  If the I{node-net-test} key is present, we check that we have
505
  connectivity to the given nodes via both primary IP and, if
506
  applicable, secondary IPs.
507

508
  @type what: C{dict}
509
  @param what: a dictionary of things to check:
510
      - filelist: list of files for which to compute checksums
511
      - nodelist: list of nodes we should check ssh communication with
512
      - node-net-test: list of nodes we should check node daemon port
513
        connectivity with
514
      - hypervisor: list with hypervisors to run the verify for
515
  @rtype: dict
516
  @return: a dictionary with the same keys as the input dict, and
517
      values representing the result of the checks
518

519
  """
520
  result = {}
521
  my_name = netutils.Hostname.GetSysName()
522
  port = netutils.GetDaemonPort(constants.NODED)
523
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
524

    
525
  if constants.NV_HYPERVISOR in what and vm_capable:
526
    result[constants.NV_HYPERVISOR] = tmp = {}
527
    for hv_name in what[constants.NV_HYPERVISOR]:
528
      try:
529
        val = hypervisor.GetHypervisor(hv_name).Verify()
530
      except errors.HypervisorError, err:
531
        val = "Error while checking hypervisor: %s" % str(err)
532
      tmp[hv_name] = val
533

    
534
  if constants.NV_HVPARAMS in what and vm_capable:
535
    result[constants.NV_HVPARAMS] = tmp = []
536
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
537
      try:
538
        logging.info("Validating hv %s, %s", hv_name, hvparms)
539
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
540
      except errors.HypervisorError, err:
541
        tmp.append((source, hv_name, str(err)))
542

    
543
  if constants.NV_FILELIST in what:
544
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
545
      what[constants.NV_FILELIST])
546

    
547
  if constants.NV_NODELIST in what:
548
    (nodes, bynode) = what[constants.NV_NODELIST]
549

    
550
    # Add nodes from other groups (different for each node)
551
    try:
552
      nodes.extend(bynode[my_name])
553
    except KeyError:
554
      pass
555

    
556
    # Use a random order
557
    random.shuffle(nodes)
558

    
559
    # Try to contact all nodes
560
    val = {}
561
    for node in nodes:
562
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
563
      if not success:
564
        val[node] = message
565

    
566
    result[constants.NV_NODELIST] = val
567

    
568
  if constants.NV_NODENETTEST in what:
569
    result[constants.NV_NODENETTEST] = tmp = {}
570
    my_pip = my_sip = None
571
    for name, pip, sip in what[constants.NV_NODENETTEST]:
572
      if name == my_name:
573
        my_pip = pip
574
        my_sip = sip
575
        break
576
    if not my_pip:
577
      tmp[my_name] = ("Can't find my own primary/secondary IP"
578
                      " in the node list")
579
    else:
580
      for name, pip, sip in what[constants.NV_NODENETTEST]:
581
        fail = []
582
        if not netutils.TcpPing(pip, port, source=my_pip):
583
          fail.append("primary")
584
        if sip != pip:
585
          if not netutils.TcpPing(sip, port, source=my_sip):
586
            fail.append("secondary")
587
        if fail:
588
          tmp[name] = ("failure using the %s interface(s)" %
589
                       " and ".join(fail))
590

    
591
  if constants.NV_MASTERIP in what:
592
    # FIXME: add checks on incoming data structures (here and in the
593
    # rest of the function)
594
    master_name, master_ip = what[constants.NV_MASTERIP]
595
    if master_name == my_name:
596
      source = constants.IP4_ADDRESS_LOCALHOST
597
    else:
598
      source = None
599
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
600
                                                  source=source)
601

    
602
  if constants.NV_OOB_PATHS in what:
603
    result[constants.NV_OOB_PATHS] = tmp = []
604
    for path in what[constants.NV_OOB_PATHS]:
605
      try:
606
        st = os.stat(path)
607
      except OSError, err:
608
        tmp.append("error stating out of band helper: %s" % err)
609
      else:
610
        if stat.S_ISREG(st.st_mode):
611
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
612
            tmp.append(None)
613
          else:
614
            tmp.append("out of band helper %s is not executable" % path)
615
        else:
616
          tmp.append("out of band helper %s is not a file" % path)
617

    
618
  if constants.NV_LVLIST in what and vm_capable:
619
    try:
620
      val = GetVolumeList(utils.ListVolumeGroups().keys())
621
    except RPCFail, err:
622
      val = str(err)
623
    result[constants.NV_LVLIST] = val
624

    
625
  if constants.NV_INSTANCELIST in what and vm_capable:
626
    # GetInstanceList can fail
627
    try:
628
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
629
    except RPCFail, err:
630
      val = str(err)
631
    result[constants.NV_INSTANCELIST] = val
632

    
633
  if constants.NV_VGLIST in what and vm_capable:
634
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
635

    
636
  if constants.NV_PVLIST in what and vm_capable:
637
    result[constants.NV_PVLIST] = \
638
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
639
                                   filter_allocatable=False)
640

    
641
  if constants.NV_VERSION in what:
642
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
643
                                    constants.RELEASE_VERSION)
644

    
645
  if constants.NV_HVINFO in what and vm_capable:
646
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
647
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
648

    
649
  if constants.NV_DRBDLIST in what and vm_capable:
650
    try:
651
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
652
    except errors.BlockDeviceError, err:
653
      logging.warning("Can't get used minors list", exc_info=True)
654
      used_minors = str(err)
655
    result[constants.NV_DRBDLIST] = used_minors
656

    
657
  if constants.NV_DRBDHELPER in what and vm_capable:
658
    status = True
659
    try:
660
      payload = bdev.BaseDRBD.GetUsermodeHelper()
661
    except errors.BlockDeviceError, err:
662
      logging.error("Can't get DRBD usermode helper: %s", str(err))
663
      status = False
664
      payload = str(err)
665
    result[constants.NV_DRBDHELPER] = (status, payload)
666

    
667
  if constants.NV_NODESETUP in what:
668
    result[constants.NV_NODESETUP] = tmpr = []
669
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
670
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
671
                  " under /sys, missing required directories /sys/block"
672
                  " and /sys/class/net")
673
    if (not os.path.isdir("/proc/sys") or
674
        not os.path.isfile("/proc/sysrq-trigger")):
675
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
676
                  " under /proc, missing required directory /proc/sys and"
677
                  " the file /proc/sysrq-trigger")
678

    
679
  if constants.NV_TIME in what:
680
    result[constants.NV_TIME] = utils.SplitTime(time.time())
681

    
682
  if constants.NV_OSLIST in what and vm_capable:
683
    result[constants.NV_OSLIST] = DiagnoseOS()
684

    
685
  if constants.NV_BRIDGES in what and vm_capable:
686
    result[constants.NV_BRIDGES] = [bridge
687
                                    for bridge in what[constants.NV_BRIDGES]
688
                                    if not utils.BridgeExists(bridge)]
689
  return result
690

    
691

    
692
def GetBlockDevSizes(devices):
693
  """Return the size of the given block devices
694

695
  @type devices: list
696
  @param devices: list of block device nodes to query
697
  @rtype: dict
698
  @return:
699
    dictionary of all block devices under /dev (key). The value is their
700
    size in MiB.
701

702
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
703

704
  """
705
  DEV_PREFIX = "/dev/"
706
  blockdevs = {}
707

    
708
  for devpath in devices:
709
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
710
      continue
711

    
712
    try:
713
      st = os.stat(devpath)
714
    except EnvironmentError, err:
715
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
716
      continue
717

    
718
    if stat.S_ISBLK(st.st_mode):
719
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
720
      if result.failed:
721
        # We don't want to fail, just do not list this device as available
722
        logging.warning("Cannot get size for block device %s", devpath)
723
        continue
724

    
725
      size = int(result.stdout) / (1024 * 1024)
726
      blockdevs[devpath] = size
727
  return blockdevs
728

    
729

    
730
def GetVolumeList(vg_names):
731
  """Compute list of logical volumes and their size.
732

733
  @type vg_names: list
734
  @param vg_names: the volume groups whose LVs we should list, or
735
      empty for all volume groups
736
  @rtype: dict
737
  @return:
738
      dictionary of all partions (key) with value being a tuple of
739
      their size (in MiB), inactive and online status::
740

741
        {'xenvg/test1': ('20.06', True, True)}
742

743
      in case of errors, a string is returned with the error
744
      details.
745

746
  """
747
  lvs = {}
748
  sep = "|"
749
  if not vg_names:
750
    vg_names = []
751
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
752
                         "--separator=%s" % sep,
753
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
754
  if result.failed:
755
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
756

    
757
  for line in result.stdout.splitlines():
758
    line = line.strip()
759
    match = _LVSLINE_REGEX.match(line)
760
    if not match:
761
      logging.error("Invalid line returned from lvs output: '%s'", line)
762
      continue
763
    vg_name, name, size, attr = match.groups()
764
    inactive = attr[4] == "-"
765
    online = attr[5] == "o"
766
    virtual = attr[0] == "v"
767
    if virtual:
768
      # we don't want to report such volumes as existing, since they
769
      # don't really hold data
770
      continue
771
    lvs[vg_name + "/" + name] = (size, inactive, online)
772

    
773
  return lvs
774

    
775

    
776
def ListVolumeGroups():
777
  """List the volume groups and their size.
778

779
  @rtype: dict
780
  @return: dictionary with keys volume name and values the
781
      size of the volume
782

783
  """
784
  return utils.ListVolumeGroups()
785

    
786

    
787
def NodeVolumes():
788
  """List all volumes on this node.
789

790
  @rtype: list
791
  @return:
792
    A list of dictionaries, each having four keys:
793
      - name: the logical volume name,
794
      - size: the size of the logical volume
795
      - dev: the physical device on which the LV lives
796
      - vg: the volume group to which it belongs
797

798
    In case of errors, we return an empty list and log the
799
    error.
800

801
    Note that since a logical volume can live on multiple physical
802
    volumes, the resulting list might include a logical volume
803
    multiple times.
804

805
  """
806
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
807
                         "--separator=|",
808
                         "--options=lv_name,lv_size,devices,vg_name"])
809
  if result.failed:
810
    _Fail("Failed to list logical volumes, lvs output: %s",
811
          result.output)
812

    
813
  def parse_dev(dev):
814
    return dev.split("(")[0]
815

    
816
  def handle_dev(dev):
817
    return [parse_dev(x) for x in dev.split(",")]
818

    
819
  def map_line(line):
820
    line = [v.strip() for v in line]
821
    return [{"name": line[0], "size": line[1],
822
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
823

    
824
  all_devs = []
825
  for line in result.stdout.splitlines():
826
    if line.count("|") >= 3:
827
      all_devs.extend(map_line(line.split("|")))
828
    else:
829
      logging.warning("Strange line in the output from lvs: '%s'", line)
830
  return all_devs
831

    
832

    
833
def BridgesExist(bridges_list):
834
  """Check if a list of bridges exist on the current node.
835

836
  @rtype: boolean
837
  @return: C{True} if all of them exist, C{False} otherwise
838

839
  """
840
  missing = []
841
  for bridge in bridges_list:
842
    if not utils.BridgeExists(bridge):
843
      missing.append(bridge)
844

    
845
  if missing:
846
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
847

    
848

    
849
def GetInstanceList(hypervisor_list):
850
  """Provides a list of instances.
851

852
  @type hypervisor_list: list
853
  @param hypervisor_list: the list of hypervisors to query information
854

855
  @rtype: list
856
  @return: a list of all running instances on the current node
857
    - instance1.example.com
858
    - instance2.example.com
859

860
  """
861
  results = []
862
  for hname in hypervisor_list:
863
    try:
864
      names = hypervisor.GetHypervisor(hname).ListInstances()
865
      results.extend(names)
866
    except errors.HypervisorError, err:
867
      _Fail("Error enumerating instances (hypervisor %s): %s",
868
            hname, err, exc=True)
869

    
870
  return results
871

    
872

    
873
def GetInstanceInfo(instance, hname):
874
  """Gives back the information about an instance as a dictionary.
875

876
  @type instance: string
877
  @param instance: the instance name
878
  @type hname: string
879
  @param hname: the hypervisor type of the instance
880

881
  @rtype: dict
882
  @return: dictionary with the following keys:
883
      - memory: memory size of instance (int)
884
      - state: xen state of instance (string)
885
      - time: cpu time of instance (float)
886

887
  """
888
  output = {}
889

    
890
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
891
  if iinfo is not None:
892
    output["memory"] = iinfo[2]
893
    output["state"] = iinfo[4]
894
    output["time"] = iinfo[5]
895

    
896
  return output
897

    
898

    
899
def GetInstanceMigratable(instance):
900
  """Gives whether an instance can be migrated.
901

902
  @type instance: L{objects.Instance}
903
  @param instance: object representing the instance to be checked.
904

905
  @rtype: tuple
906
  @return: tuple of (result, description) where:
907
      - result: whether the instance can be migrated or not
908
      - description: a description of the issue, if relevant
909

910
  """
911
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
912
  iname = instance.name
913
  if iname not in hyper.ListInstances():
914
    _Fail("Instance %s is not running", iname)
915

    
916
  for idx in range(len(instance.disks)):
917
    link_name = _GetBlockDevSymlinkPath(iname, idx)
918
    if not os.path.islink(link_name):
919
      logging.warning("Instance %s is missing symlink %s for disk %d",
920
                      iname, link_name, idx)
921

    
922

    
923
def GetAllInstancesInfo(hypervisor_list):
924
  """Gather data about all instances.
925

926
  This is the equivalent of L{GetInstanceInfo}, except that it
927
  computes data for all instances at once, thus being faster if one
928
  needs data about more than one instance.
929

930
  @type hypervisor_list: list
931
  @param hypervisor_list: list of hypervisors to query for instance data
932

933
  @rtype: dict
934
  @return: dictionary of instance: data, with data having the following keys:
935
      - memory: memory size of instance (int)
936
      - state: xen state of instance (string)
937
      - time: cpu time of instance (float)
938
      - vcpus: the number of vcpus
939

940
  """
941
  output = {}
942

    
943
  for hname in hypervisor_list:
944
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
945
    if iinfo:
946
      for name, _, memory, vcpus, state, times in iinfo:
947
        value = {
948
          "memory": memory,
949
          "vcpus": vcpus,
950
          "state": state,
951
          "time": times,
952
          }
953
        if name in output:
954
          # we only check static parameters, like memory and vcpus,
955
          # and not state and time which can change between the
956
          # invocations of the different hypervisors
957
          for key in "memory", "vcpus":
958
            if value[key] != output[name][key]:
959
              _Fail("Instance %s is running twice"
960
                    " with different parameters", name)
961
        output[name] = value
962

    
963
  return output
964

    
965

    
966
def _InstanceLogName(kind, os_name, instance, component):
967
  """Compute the OS log filename for a given instance and operation.
968

969
  The instance name and os name are passed in as strings since not all
970
  operations have these as part of an instance object.
971

972
  @type kind: string
973
  @param kind: the operation type (e.g. add, import, etc.)
974
  @type os_name: string
975
  @param os_name: the os name
976
  @type instance: string
977
  @param instance: the name of the instance being imported/added/etc.
978
  @type component: string or None
979
  @param component: the name of the component of the instance being
980
      transferred
981

982
  """
983
  # TODO: Use tempfile.mkstemp to create unique filename
984
  if component:
985
    assert "/" not in component
986
    c_msg = "-%s" % component
987
  else:
988
    c_msg = ""
989
  base = ("%s-%s-%s%s-%s.log" %
990
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
991
  return utils.PathJoin(constants.LOG_OS_DIR, base)
992

    
993

    
994
def InstanceOsAdd(instance, reinstall, debug):
995
  """Add an OS to an instance.
996

997
  @type instance: L{objects.Instance}
998
  @param instance: Instance whose OS is to be installed
999
  @type reinstall: boolean
1000
  @param reinstall: whether this is an instance reinstall
1001
  @type debug: integer
1002
  @param debug: debug level, passed to the OS scripts
1003
  @rtype: None
1004

1005
  """
1006
  inst_os = OSFromDisk(instance.os)
1007

    
1008
  create_env = OSEnvironment(instance, inst_os, debug)
1009
  if reinstall:
1010
    create_env["INSTANCE_REINSTALL"] = "1"
1011

    
1012
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1013

    
1014
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1015
                        cwd=inst_os.path, output=logfile, reset_env=True)
1016
  if result.failed:
1017
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1018
                  " output: %s", result.cmd, result.fail_reason, logfile,
1019
                  result.output)
1020
    lines = [utils.SafeEncode(val)
1021
             for val in utils.TailFile(logfile, lines=20)]
1022
    _Fail("OS create script failed (%s), last lines in the"
1023
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1024

    
1025

    
1026
def RunRenameInstance(instance, old_name, debug):
1027
  """Run the OS rename script for an instance.
1028

1029
  @type instance: L{objects.Instance}
1030
  @param instance: Instance whose OS is to be installed
1031
  @type old_name: string
1032
  @param old_name: previous instance name
1033
  @type debug: integer
1034
  @param debug: debug level, passed to the OS scripts
1035
  @rtype: boolean
1036
  @return: the success of the operation
1037

1038
  """
1039
  inst_os = OSFromDisk(instance.os)
1040

    
1041
  rename_env = OSEnvironment(instance, inst_os, debug)
1042
  rename_env["OLD_INSTANCE_NAME"] = old_name
1043

    
1044
  logfile = _InstanceLogName("rename", instance.os,
1045
                             "%s-%s" % (old_name, instance.name), None)
1046

    
1047
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1048
                        cwd=inst_os.path, output=logfile, reset_env=True)
1049

    
1050
  if result.failed:
1051
    logging.error("os create command '%s' returned error: %s output: %s",
1052
                  result.cmd, result.fail_reason, result.output)
1053
    lines = [utils.SafeEncode(val)
1054
             for val in utils.TailFile(logfile, lines=20)]
1055
    _Fail("OS rename script failed (%s), last lines in the"
1056
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1057

    
1058

    
1059
def _GetBlockDevSymlinkPath(instance_name, idx):
1060
  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1061
                        (instance_name, constants.DISK_SEPARATOR, idx))
1062

    
1063

    
1064
def _SymlinkBlockDev(instance_name, device_path, idx):
1065
  """Set up symlinks to a instance's block device.
1066

1067
  This is an auxiliary function run when an instance is start (on the primary
1068
  node) or when an instance is migrated (on the target node).
1069

1070

1071
  @param instance_name: the name of the target instance
1072
  @param device_path: path of the physical block device, on the node
1073
  @param idx: the disk index
1074
  @return: absolute path to the disk's symlink
1075

1076
  """
1077
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1078
  try:
1079
    os.symlink(device_path, link_name)
1080
  except OSError, err:
1081
    if err.errno == errno.EEXIST:
1082
      if (not os.path.islink(link_name) or
1083
          os.readlink(link_name) != device_path):
1084
        os.remove(link_name)
1085
        os.symlink(device_path, link_name)
1086
    else:
1087
      raise
1088

    
1089
  return link_name
1090

    
1091

    
1092
def _RemoveBlockDevLinks(instance_name, disks):
1093
  """Remove the block device symlinks belonging to the given instance.
1094

1095
  """
1096
  for idx, _ in enumerate(disks):
1097
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1098
    if os.path.islink(link_name):
1099
      try:
1100
        os.remove(link_name)
1101
      except OSError:
1102
        logging.exception("Can't remove symlink '%s'", link_name)
1103

    
1104

    
1105
def _GatherAndLinkBlockDevs(instance):
1106
  """Set up an instance's block device(s).
1107

1108
  This is run on the primary node at instance startup. The block
1109
  devices must be already assembled.
1110

1111
  @type instance: L{objects.Instance}
1112
  @param instance: the instance whose disks we shoul assemble
1113
  @rtype: list
1114
  @return: list of (disk_object, device_path)
1115

1116
  """
1117
  block_devices = []
1118
  for idx, disk in enumerate(instance.disks):
1119
    device = _RecursiveFindBD(disk)
1120
    if device is None:
1121
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1122
                                    str(disk))
1123
    device.Open()
1124
    try:
1125
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1126
    except OSError, e:
1127
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1128
                                    e.strerror)
1129

    
1130
    block_devices.append((disk, link_name))
1131

    
1132
  return block_devices
1133

    
1134

    
1135
def StartInstance(instance, startup_paused):
1136
  """Start an instance.
1137

1138
  @type instance: L{objects.Instance}
1139
  @param instance: the instance object
1140
  @type startup_paused: bool
1141
  @param instance: pause instance at startup?
1142
  @rtype: None
1143

1144
  """
1145
  running_instances = GetInstanceList([instance.hypervisor])
1146

    
1147
  if instance.name in running_instances:
1148
    logging.info("Instance %s already running, not starting", instance.name)
1149
    return
1150

    
1151
  try:
1152
    block_devices = _GatherAndLinkBlockDevs(instance)
1153
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1154
    hyper.StartInstance(instance, block_devices, startup_paused)
1155
  except errors.BlockDeviceError, err:
1156
    _Fail("Block device error: %s", err, exc=True)
1157
  except errors.HypervisorError, err:
1158
    _RemoveBlockDevLinks(instance.name, instance.disks)
1159
    _Fail("Hypervisor error: %s", err, exc=True)
1160

    
1161

    
1162
def InstanceShutdown(instance, timeout):
1163
  """Shut an instance down.
1164

1165
  @note: this functions uses polling with a hardcoded timeout.
1166

1167
  @type instance: L{objects.Instance}
1168
  @param instance: the instance object
1169
  @type timeout: integer
1170
  @param timeout: maximum timeout for soft shutdown
1171
  @rtype: None
1172

1173
  """
1174
  hv_name = instance.hypervisor
1175
  hyper = hypervisor.GetHypervisor(hv_name)
1176
  iname = instance.name
1177

    
1178
  if instance.name not in hyper.ListInstances():
1179
    logging.info("Instance %s not running, doing nothing", iname)
1180
    return
1181

    
1182
  class _TryShutdown:
1183
    def __init__(self):
1184
      self.tried_once = False
1185

    
1186
    def __call__(self):
1187
      if iname not in hyper.ListInstances():
1188
        return
1189

    
1190
      try:
1191
        hyper.StopInstance(instance, retry=self.tried_once)
1192
      except errors.HypervisorError, err:
1193
        if iname not in hyper.ListInstances():
1194
          # if the instance is no longer existing, consider this a
1195
          # success and go to cleanup
1196
          return
1197

    
1198
        _Fail("Failed to stop instance %s: %s", iname, err)
1199

    
1200
      self.tried_once = True
1201

    
1202
      raise utils.RetryAgain()
1203

    
1204
  try:
1205
    utils.Retry(_TryShutdown(), 5, timeout)
1206
  except utils.RetryTimeout:
1207
    # the shutdown did not succeed
1208
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1209

    
1210
    try:
1211
      hyper.StopInstance(instance, force=True)
1212
    except errors.HypervisorError, err:
1213
      if iname in hyper.ListInstances():
1214
        # only raise an error if the instance still exists, otherwise
1215
        # the error could simply be "instance ... unknown"!
1216
        _Fail("Failed to force stop instance %s: %s", iname, err)
1217

    
1218
    time.sleep(1)
1219

    
1220
    if iname in hyper.ListInstances():
1221
      _Fail("Could not shutdown instance %s even by destroy", iname)
1222

    
1223
  try:
1224
    hyper.CleanupInstance(instance.name)
1225
  except errors.HypervisorError, err:
1226
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1227

    
1228
  _RemoveBlockDevLinks(iname, instance.disks)
1229

    
1230

    
1231
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1232
  """Reboot an instance.
1233

1234
  @type instance: L{objects.Instance}
1235
  @param instance: the instance object to reboot
1236
  @type reboot_type: str
1237
  @param reboot_type: the type of reboot, one the following
1238
    constants:
1239
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1240
        instance OS, do not recreate the VM
1241
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1242
        restart the VM (at the hypervisor level)
1243
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1244
        not accepted here, since that mode is handled differently, in
1245
        cmdlib, and translates into full stop and start of the
1246
        instance (instead of a call_instance_reboot RPC)
1247
  @type shutdown_timeout: integer
1248
  @param shutdown_timeout: maximum timeout for soft shutdown
1249
  @rtype: None
1250

1251
  """
1252
  running_instances = GetInstanceList([instance.hypervisor])
1253

    
1254
  if instance.name not in running_instances:
1255
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1256

    
1257
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1258
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1259
    try:
1260
      hyper.RebootInstance(instance)
1261
    except errors.HypervisorError, err:
1262
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1263
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1264
    try:
1265
      InstanceShutdown(instance, shutdown_timeout)
1266
      return StartInstance(instance, False)
1267
    except errors.HypervisorError, err:
1268
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1269
  else:
1270
    _Fail("Invalid reboot_type received: %s", reboot_type)
1271

    
1272

    
1273
def MigrationInfo(instance):
1274
  """Gather information about an instance to be migrated.
1275

1276
  @type instance: L{objects.Instance}
1277
  @param instance: the instance definition
1278

1279
  """
1280
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1281
  try:
1282
    info = hyper.MigrationInfo(instance)
1283
  except errors.HypervisorError, err:
1284
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1285
  return info
1286

    
1287

    
1288
def AcceptInstance(instance, info, target):
1289
  """Prepare the node to accept an instance.
1290

1291
  @type instance: L{objects.Instance}
1292
  @param instance: the instance definition
1293
  @type info: string/data (opaque)
1294
  @param info: migration information, from the source node
1295
  @type target: string
1296
  @param target: target host (usually ip), on this node
1297

1298
  """
1299
  # TODO: why is this required only for DTS_EXT_MIRROR?
1300
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1301
    # Create the symlinks, as the disks are not active
1302
    # in any way
1303
    try:
1304
      _GatherAndLinkBlockDevs(instance)
1305
    except errors.BlockDeviceError, err:
1306
      _Fail("Block device error: %s", err, exc=True)
1307

    
1308
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1309
  try:
1310
    hyper.AcceptInstance(instance, info, target)
1311
  except errors.HypervisorError, err:
1312
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1313
      _RemoveBlockDevLinks(instance.name, instance.disks)
1314
    _Fail("Failed to accept instance: %s", err, exc=True)
1315

    
1316

    
1317
def FinalizeMigrationDst(instance, info, success):
1318
  """Finalize any preparation to accept an instance.
1319

1320
  @type instance: L{objects.Instance}
1321
  @param instance: the instance definition
1322
  @type info: string/data (opaque)
1323
  @param info: migration information, from the source node
1324
  @type success: boolean
1325
  @param success: whether the migration was a success or a failure
1326

1327
  """
1328
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1329
  try:
1330
    hyper.FinalizeMigrationDst(instance, info, success)
1331
  except errors.HypervisorError, err:
1332
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1333

    
1334

    
1335
def MigrateInstance(instance, target, live):
1336
  """Migrates an instance to another node.
1337

1338
  @type instance: L{objects.Instance}
1339
  @param instance: the instance definition
1340
  @type target: string
1341
  @param target: the target node name
1342
  @type live: boolean
1343
  @param live: whether the migration should be done live or not (the
1344
      interpretation of this parameter is left to the hypervisor)
1345
  @raise RPCFail: if migration fails for some reason
1346

1347
  """
1348
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1349

    
1350
  try:
1351
    hyper.MigrateInstance(instance, target, live)
1352
  except errors.HypervisorError, err:
1353
    _Fail("Failed to migrate instance: %s", err, exc=True)
1354

    
1355

    
1356
def FinalizeMigrationSource(instance, success, live):
1357
  """Finalize the instance migration on the source node.
1358

1359
  @type instance: L{objects.Instance}
1360
  @param instance: the instance definition of the migrated instance
1361
  @type success: bool
1362
  @param success: whether the migration succeeded or not
1363
  @type live: bool
1364
  @param live: whether the user requested a live migration or not
1365
  @raise RPCFail: If the execution fails for some reason
1366

1367
  """
1368
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1369

    
1370
  try:
1371
    hyper.FinalizeMigrationSource(instance, success, live)
1372
  except Exception, err:  # pylint: disable=W0703
1373
    _Fail("Failed to finalize the migration on the source node: %s", err,
1374
          exc=True)
1375

    
1376

    
1377
def GetMigrationStatus(instance):
1378
  """Get the migration status
1379

1380
  @type instance: L{objects.Instance}
1381
  @param instance: the instance that is being migrated
1382
  @rtype: L{objects.MigrationStatus}
1383
  @return: the status of the current migration (one of
1384
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1385
           progress info that can be retrieved from the hypervisor
1386
  @raise RPCFail: If the migration status cannot be retrieved
1387

1388
  """
1389
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1390
  try:
1391
    return hyper.GetMigrationStatus(instance)
1392
  except Exception, err:  # pylint: disable=W0703
1393
    _Fail("Failed to get migration status: %s", err, exc=True)
1394

    
1395

    
1396
def BlockdevCreate(disk, size, owner, on_primary, info):
1397
  """Creates a block device for an instance.
1398

1399
  @type disk: L{objects.Disk}
1400
  @param disk: the object describing the disk we should create
1401
  @type size: int
1402
  @param size: the size of the physical underlying device, in MiB
1403
  @type owner: str
1404
  @param owner: the name of the instance for which disk is created,
1405
      used for device cache data
1406
  @type on_primary: boolean
1407
  @param on_primary:  indicates if it is the primary node or not
1408
  @type info: string
1409
  @param info: string that will be sent to the physical device
1410
      creation, used for example to set (LVM) tags on LVs
1411

1412
  @return: the new unique_id of the device (this can sometime be
1413
      computed only after creation), or None. On secondary nodes,
1414
      it's not required to return anything.
1415

1416
  """
1417
  # TODO: remove the obsolete "size" argument
1418
  # pylint: disable=W0613
1419
  clist = []
1420
  if disk.children:
1421
    for child in disk.children:
1422
      try:
1423
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1424
      except errors.BlockDeviceError, err:
1425
        _Fail("Can't assemble device %s: %s", child, err)
1426
      if on_primary or disk.AssembleOnSecondary():
1427
        # we need the children open in case the device itself has to
1428
        # be assembled
1429
        try:
1430
          # pylint: disable=E1103
1431
          crdev.Open()
1432
        except errors.BlockDeviceError, err:
1433
          _Fail("Can't make child '%s' read-write: %s", child, err)
1434
      clist.append(crdev)
1435

    
1436
  try:
1437
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1438
  except errors.BlockDeviceError, err:
1439
    _Fail("Can't create block device: %s", err)
1440

    
1441
  if on_primary or disk.AssembleOnSecondary():
1442
    try:
1443
      device.Assemble()
1444
    except errors.BlockDeviceError, err:
1445
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1446
    device.SetSyncSpeed(constants.SYNC_SPEED)
1447
    if on_primary or disk.OpenOnSecondary():
1448
      try:
1449
        device.Open(force=True)
1450
      except errors.BlockDeviceError, err:
1451
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1452
    DevCacheManager.UpdateCache(device.dev_path, owner,
1453
                                on_primary, disk.iv_name)
1454

    
1455
  device.SetInfo(info)
1456

    
1457
  return device.unique_id
1458

    
1459

    
1460
def _WipeDevice(path, offset, size):
1461
  """This function actually wipes the device.
1462

1463
  @param path: The path to the device to wipe
1464
  @param offset: The offset in MiB in the file
1465
  @param size: The size in MiB to write
1466

1467
  """
1468
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1469
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1470
         "count=%d" % size]
1471
  result = utils.RunCmd(cmd)
1472

    
1473
  if result.failed:
1474
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1475
          result.fail_reason, result.output)
1476

    
1477

    
1478
def BlockdevWipe(disk, offset, size):
1479
  """Wipes a block device.
1480

1481
  @type disk: L{objects.Disk}
1482
  @param disk: the disk object we want to wipe
1483
  @type offset: int
1484
  @param offset: The offset in MiB in the file
1485
  @type size: int
1486
  @param size: The size in MiB to write
1487

1488
  """
1489
  try:
1490
    rdev = _RecursiveFindBD(disk)
1491
  except errors.BlockDeviceError:
1492
    rdev = None
1493

    
1494
  if not rdev:
1495
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1496

    
1497
  # Do cross verify some of the parameters
1498
  if offset > rdev.size:
1499
    _Fail("Offset is bigger than device size")
1500
  if (offset + size) > rdev.size:
1501
    _Fail("The provided offset and size to wipe is bigger than device size")
1502

    
1503
  _WipeDevice(rdev.dev_path, offset, size)
1504

    
1505

    
1506
def BlockdevPauseResumeSync(disks, pause):
1507
  """Pause or resume the sync of the block device.
1508

1509
  @type disks: list of L{objects.Disk}
1510
  @param disks: the disks object we want to pause/resume
1511
  @type pause: bool
1512
  @param pause: Wheater to pause or resume
1513

1514
  """
1515
  success = []
1516
  for disk in disks:
1517
    try:
1518
      rdev = _RecursiveFindBD(disk)
1519
    except errors.BlockDeviceError:
1520
      rdev = None
1521

    
1522
    if not rdev:
1523
      success.append((False, ("Cannot change sync for device %s:"
1524
                              " device not found" % disk.iv_name)))
1525
      continue
1526

    
1527
    result = rdev.PauseResumeSync(pause)
1528

    
1529
    if result:
1530
      success.append((result, None))
1531
    else:
1532
      if pause:
1533
        msg = "Pause"
1534
      else:
1535
        msg = "Resume"
1536
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1537

    
1538
  return success
1539

    
1540

    
1541
def BlockdevRemove(disk):
1542
  """Remove a block device.
1543

1544
  @note: This is intended to be called recursively.
1545

1546
  @type disk: L{objects.Disk}
1547
  @param disk: the disk object we should remove
1548
  @rtype: boolean
1549
  @return: the success of the operation
1550

1551
  """
1552
  msgs = []
1553
  try:
1554
    rdev = _RecursiveFindBD(disk)
1555
  except errors.BlockDeviceError, err:
1556
    # probably can't attach
1557
    logging.info("Can't attach to device %s in remove", disk)
1558
    rdev = None
1559
  if rdev is not None:
1560
    r_path = rdev.dev_path
1561
    try:
1562
      rdev.Remove()
1563
    except errors.BlockDeviceError, err:
1564
      msgs.append(str(err))
1565
    if not msgs:
1566
      DevCacheManager.RemoveCache(r_path)
1567

    
1568
  if disk.children:
1569
    for child in disk.children:
1570
      try:
1571
        BlockdevRemove(child)
1572
      except RPCFail, err:
1573
        msgs.append(str(err))
1574

    
1575
  if msgs:
1576
    _Fail("; ".join(msgs))
1577

    
1578

    
1579
def _RecursiveAssembleBD(disk, owner, as_primary):
1580
  """Activate a block device for an instance.
1581

1582
  This is run on the primary and secondary nodes for an instance.
1583

1584
  @note: this function is called recursively.
1585

1586
  @type disk: L{objects.Disk}
1587
  @param disk: the disk we try to assemble
1588
  @type owner: str
1589
  @param owner: the name of the instance which owns the disk
1590
  @type as_primary: boolean
1591
  @param as_primary: if we should make the block device
1592
      read/write
1593

1594
  @return: the assembled device or None (in case no device
1595
      was assembled)
1596
  @raise errors.BlockDeviceError: in case there is an error
1597
      during the activation of the children or the device
1598
      itself
1599

1600
  """
1601
  children = []
1602
  if disk.children:
1603
    mcn = disk.ChildrenNeeded()
1604
    if mcn == -1:
1605
      mcn = 0 # max number of Nones allowed
1606
    else:
1607
      mcn = len(disk.children) - mcn # max number of Nones
1608
    for chld_disk in disk.children:
1609
      try:
1610
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1611
      except errors.BlockDeviceError, err:
1612
        if children.count(None) >= mcn:
1613
          raise
1614
        cdev = None
1615
        logging.error("Error in child activation (but continuing): %s",
1616
                      str(err))
1617
      children.append(cdev)
1618

    
1619
  if as_primary or disk.AssembleOnSecondary():
1620
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1621
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1622
    result = r_dev
1623
    if as_primary or disk.OpenOnSecondary():
1624
      r_dev.Open()
1625
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1626
                                as_primary, disk.iv_name)
1627

    
1628
  else:
1629
    result = True
1630
  return result
1631

    
1632

    
1633
def BlockdevAssemble(disk, owner, as_primary, idx):
1634
  """Activate a block device for an instance.
1635

1636
  This is a wrapper over _RecursiveAssembleBD.
1637

1638
  @rtype: str or boolean
1639
  @return: a C{/dev/...} path for primary nodes, and
1640
      C{True} for secondary nodes
1641

1642
  """
1643
  try:
1644
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1645
    if isinstance(result, bdev.BlockDev):
1646
      # pylint: disable=E1103
1647
      result = result.dev_path
1648
      if as_primary:
1649
        _SymlinkBlockDev(owner, result, idx)
1650
  except errors.BlockDeviceError, err:
1651
    _Fail("Error while assembling disk: %s", err, exc=True)
1652
  except OSError, err:
1653
    _Fail("Error while symlinking disk: %s", err, exc=True)
1654

    
1655
  return result
1656

    
1657

    
1658
def BlockdevShutdown(disk):
1659
  """Shut down a block device.
1660

1661
  First, if the device is assembled (Attach() is successful), then
1662
  the device is shutdown. Then the children of the device are
1663
  shutdown.
1664

1665
  This function is called recursively. Note that we don't cache the
1666
  children or such, as oppossed to assemble, shutdown of different
1667
  devices doesn't require that the upper device was active.
1668

1669
  @type disk: L{objects.Disk}
1670
  @param disk: the description of the disk we should
1671
      shutdown
1672
  @rtype: None
1673

1674
  """
1675
  msgs = []
1676
  r_dev = _RecursiveFindBD(disk)
1677
  if r_dev is not None:
1678
    r_path = r_dev.dev_path
1679
    try:
1680
      r_dev.Shutdown()
1681
      DevCacheManager.RemoveCache(r_path)
1682
    except errors.BlockDeviceError, err:
1683
      msgs.append(str(err))
1684

    
1685
  if disk.children:
1686
    for child in disk.children:
1687
      try:
1688
        BlockdevShutdown(child)
1689
      except RPCFail, err:
1690
        msgs.append(str(err))
1691

    
1692
  if msgs:
1693
    _Fail("; ".join(msgs))
1694

    
1695

    
1696
def BlockdevAddchildren(parent_cdev, new_cdevs):
1697
  """Extend a mirrored block device.
1698

1699
  @type parent_cdev: L{objects.Disk}
1700
  @param parent_cdev: the disk to which we should add children
1701
  @type new_cdevs: list of L{objects.Disk}
1702
  @param new_cdevs: the list of children which we should add
1703
  @rtype: None
1704

1705
  """
1706
  parent_bdev = _RecursiveFindBD(parent_cdev)
1707
  if parent_bdev is None:
1708
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1709
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1710
  if new_bdevs.count(None) > 0:
1711
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1712
  parent_bdev.AddChildren(new_bdevs)
1713

    
1714

    
1715
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1716
  """Shrink a mirrored block device.
1717

1718
  @type parent_cdev: L{objects.Disk}
1719
  @param parent_cdev: the disk from which we should remove children
1720
  @type new_cdevs: list of L{objects.Disk}
1721
  @param new_cdevs: the list of children which we should remove
1722
  @rtype: None
1723

1724
  """
1725
  parent_bdev = _RecursiveFindBD(parent_cdev)
1726
  if parent_bdev is None:
1727
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1728
  devs = []
1729
  for disk in new_cdevs:
1730
    rpath = disk.StaticDevPath()
1731
    if rpath is None:
1732
      bd = _RecursiveFindBD(disk)
1733
      if bd is None:
1734
        _Fail("Can't find device %s while removing children", disk)
1735
      else:
1736
        devs.append(bd.dev_path)
1737
    else:
1738
      if not utils.IsNormAbsPath(rpath):
1739
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1740
      devs.append(rpath)
1741
  parent_bdev.RemoveChildren(devs)
1742

    
1743

    
1744
def BlockdevGetmirrorstatus(disks):
1745
  """Get the mirroring status of a list of devices.
1746

1747
  @type disks: list of L{objects.Disk}
1748
  @param disks: the list of disks which we should query
1749
  @rtype: disk
1750
  @return: List of L{objects.BlockDevStatus}, one for each disk
1751
  @raise errors.BlockDeviceError: if any of the disks cannot be
1752
      found
1753

1754
  """
1755
  stats = []
1756
  for dsk in disks:
1757
    rbd = _RecursiveFindBD(dsk)
1758
    if rbd is None:
1759
      _Fail("Can't find device %s", dsk)
1760

    
1761
    stats.append(rbd.CombinedSyncStatus())
1762

    
1763
  return stats
1764

    
1765

    
1766
def BlockdevGetmirrorstatusMulti(disks):
1767
  """Get the mirroring status of a list of devices.
1768

1769
  @type disks: list of L{objects.Disk}
1770
  @param disks: the list of disks which we should query
1771
  @rtype: disk
1772
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1773
    success/failure, status is L{objects.BlockDevStatus} on success, string
1774
    otherwise
1775

1776
  """
1777
  result = []
1778
  for disk in disks:
1779
    try:
1780
      rbd = _RecursiveFindBD(disk)
1781
      if rbd is None:
1782
        result.append((False, "Can't find device %s" % disk))
1783
        continue
1784

    
1785
      status = rbd.CombinedSyncStatus()
1786
    except errors.BlockDeviceError, err:
1787
      logging.exception("Error while getting disk status")
1788
      result.append((False, str(err)))
1789
    else:
1790
      result.append((True, status))
1791

    
1792
  assert len(disks) == len(result)
1793

    
1794
  return result
1795

    
1796

    
1797
def _RecursiveFindBD(disk):
1798
  """Check if a device is activated.
1799

1800
  If so, return information about the real device.
1801

1802
  @type disk: L{objects.Disk}
1803
  @param disk: the disk object we need to find
1804

1805
  @return: None if the device can't be found,
1806
      otherwise the device instance
1807

1808
  """
1809
  children = []
1810
  if disk.children:
1811
    for chdisk in disk.children:
1812
      children.append(_RecursiveFindBD(chdisk))
1813

    
1814
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1815

    
1816

    
1817
def _OpenRealBD(disk):
1818
  """Opens the underlying block device of a disk.
1819

1820
  @type disk: L{objects.Disk}
1821
  @param disk: the disk object we want to open
1822

1823
  """
1824
  real_disk = _RecursiveFindBD(disk)
1825
  if real_disk is None:
1826
    _Fail("Block device '%s' is not set up", disk)
1827

    
1828
  real_disk.Open()
1829

    
1830
  return real_disk
1831

    
1832

    
1833
def BlockdevFind(disk):
1834
  """Check if a device is activated.
1835

1836
  If it is, return information about the real device.
1837

1838
  @type disk: L{objects.Disk}
1839
  @param disk: the disk to find
1840
  @rtype: None or objects.BlockDevStatus
1841
  @return: None if the disk cannot be found, otherwise a the current
1842
           information
1843

1844
  """
1845
  try:
1846
    rbd = _RecursiveFindBD(disk)
1847
  except errors.BlockDeviceError, err:
1848
    _Fail("Failed to find device: %s", err, exc=True)
1849

    
1850
  if rbd is None:
1851
    return None
1852

    
1853
  return rbd.GetSyncStatus()
1854

    
1855

    
1856
def BlockdevGetsize(disks):
1857
  """Computes the size of the given disks.
1858

1859
  If a disk is not found, returns None instead.
1860

1861
  @type disks: list of L{objects.Disk}
1862
  @param disks: the list of disk to compute the size for
1863
  @rtype: list
1864
  @return: list with elements None if the disk cannot be found,
1865
      otherwise the size
1866

1867
  """
1868
  result = []
1869
  for cf in disks:
1870
    try:
1871
      rbd = _RecursiveFindBD(cf)
1872
    except errors.BlockDeviceError:
1873
      result.append(None)
1874
      continue
1875
    if rbd is None:
1876
      result.append(None)
1877
    else:
1878
      result.append(rbd.GetActualSize())
1879
  return result
1880

    
1881

    
1882
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1883
  """Export a block device to a remote node.
1884

1885
  @type disk: L{objects.Disk}
1886
  @param disk: the description of the disk to export
1887
  @type dest_node: str
1888
  @param dest_node: the destination node to export to
1889
  @type dest_path: str
1890
  @param dest_path: the destination path on the target node
1891
  @type cluster_name: str
1892
  @param cluster_name: the cluster name, needed for SSH hostalias
1893
  @rtype: None
1894

1895
  """
1896
  real_disk = _OpenRealBD(disk)
1897

    
1898
  # the block size on the read dd is 1MiB to match our units
1899
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1900
                               "dd if=%s bs=1048576 count=%s",
1901
                               real_disk.dev_path, str(disk.size))
1902

    
1903
  # we set here a smaller block size as, due to ssh buffering, more
1904
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1905
  # device is not already there or we pass a wrong path; we use
1906
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1907
  # to not buffer too much memory; this means that at best, we flush
1908
  # every 64k, which will not be very fast
1909
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1910
                                " oflag=dsync", dest_path)
1911

    
1912
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1913
                                                   constants.GANETI_RUNAS,
1914
                                                   destcmd)
1915

    
1916
  # all commands have been checked, so we're safe to combine them
1917
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1918

    
1919
  result = utils.RunCmd(["bash", "-c", command])
1920

    
1921
  if result.failed:
1922
    _Fail("Disk copy command '%s' returned error: %s"
1923
          " output: %s", command, result.fail_reason, result.output)
1924

    
1925

    
1926
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1927
  """Write a file to the filesystem.
1928

1929
  This allows the master to overwrite(!) a file. It will only perform
1930
  the operation if the file belongs to a list of configuration files.
1931

1932
  @type file_name: str
1933
  @param file_name: the target file name
1934
  @type data: str
1935
  @param data: the new contents of the file
1936
  @type mode: int
1937
  @param mode: the mode to give the file (can be None)
1938
  @type uid: string
1939
  @param uid: the owner of the file
1940
  @type gid: string
1941
  @param gid: the group of the file
1942
  @type atime: float
1943
  @param atime: the atime to set on the file (can be None)
1944
  @type mtime: float
1945
  @param mtime: the mtime to set on the file (can be None)
1946
  @rtype: None
1947

1948
  """
1949
  if not os.path.isabs(file_name):
1950
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1951

    
1952
  if file_name not in _ALLOWED_UPLOAD_FILES:
1953
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1954
          file_name)
1955

    
1956
  raw_data = _Decompress(data)
1957

    
1958
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1959
    _Fail("Invalid username/groupname type")
1960

    
1961
  getents = runtime.GetEnts()
1962
  uid = getents.LookupUser(uid)
1963
  gid = getents.LookupGroup(gid)
1964

    
1965
  utils.SafeWriteFile(file_name, None,
1966
                      data=raw_data, mode=mode, uid=uid, gid=gid,
1967
                      atime=atime, mtime=mtime)
1968

    
1969

    
1970
def RunOob(oob_program, command, node, timeout):
1971
  """Executes oob_program with given command on given node.
1972

1973
  @param oob_program: The path to the executable oob_program
1974
  @param command: The command to invoke on oob_program
1975
  @param node: The node given as an argument to the program
1976
  @param timeout: Timeout after which we kill the oob program
1977

1978
  @return: stdout
1979
  @raise RPCFail: If execution fails for some reason
1980

1981
  """
1982
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1983

    
1984
  if result.failed:
1985
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1986
          result.fail_reason, result.output)
1987

    
1988
  return result.stdout
1989

    
1990

    
1991
def WriteSsconfFiles(values):
1992
  """Update all ssconf files.
1993

1994
  Wrapper around the SimpleStore.WriteFiles.
1995

1996
  """
1997
  ssconf.SimpleStore().WriteFiles(values)
1998

    
1999

    
2000
def _ErrnoOrStr(err):
2001
  """Format an EnvironmentError exception.
2002

2003
  If the L{err} argument has an errno attribute, it will be looked up
2004
  and converted into a textual C{E...} description. Otherwise the
2005
  string representation of the error will be returned.
2006

2007
  @type err: L{EnvironmentError}
2008
  @param err: the exception to format
2009

2010
  """
2011
  if hasattr(err, "errno"):
2012
    detail = errno.errorcode[err.errno]
2013
  else:
2014
    detail = str(err)
2015
  return detail
2016

    
2017

    
2018
def _OSOndiskAPIVersion(os_dir):
2019
  """Compute and return the API version of a given OS.
2020

2021
  This function will try to read the API version of the OS residing in
2022
  the 'os_dir' directory.
2023

2024
  @type os_dir: str
2025
  @param os_dir: the directory in which we should look for the OS
2026
  @rtype: tuple
2027
  @return: tuple (status, data) with status denoting the validity and
2028
      data holding either the vaid versions or an error message
2029

2030
  """
2031
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2032

    
2033
  try:
2034
    st = os.stat(api_file)
2035
  except EnvironmentError, err:
2036
    return False, ("Required file '%s' not found under path %s: %s" %
2037
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2038

    
2039
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2040
    return False, ("File '%s' in %s is not a regular file" %
2041
                   (constants.OS_API_FILE, os_dir))
2042

    
2043
  try:
2044
    api_versions = utils.ReadFile(api_file).splitlines()
2045
  except EnvironmentError, err:
2046
    return False, ("Error while reading the API version file at %s: %s" %
2047
                   (api_file, _ErrnoOrStr(err)))
2048

    
2049
  try:
2050
    api_versions = [int(version.strip()) for version in api_versions]
2051
  except (TypeError, ValueError), err:
2052
    return False, ("API version(s) can't be converted to integer: %s" %
2053
                   str(err))
2054

    
2055
  return True, api_versions
2056

    
2057

    
2058
def DiagnoseOS(top_dirs=None):
2059
  """Compute the validity for all OSes.
2060

2061
  @type top_dirs: list
2062
  @param top_dirs: the list of directories in which to
2063
      search (if not given defaults to
2064
      L{constants.OS_SEARCH_PATH})
2065
  @rtype: list of L{objects.OS}
2066
  @return: a list of tuples (name, path, status, diagnose, variants,
2067
      parameters, api_version) for all (potential) OSes under all
2068
      search paths, where:
2069
          - name is the (potential) OS name
2070
          - path is the full path to the OS
2071
          - status True/False is the validity of the OS
2072
          - diagnose is the error message for an invalid OS, otherwise empty
2073
          - variants is a list of supported OS variants, if any
2074
          - parameters is a list of (name, help) parameters, if any
2075
          - api_version is a list of support OS API versions
2076

2077
  """
2078
  if top_dirs is None:
2079
    top_dirs = constants.OS_SEARCH_PATH
2080

    
2081
  result = []
2082
  for dir_name in top_dirs:
2083
    if os.path.isdir(dir_name):
2084
      try:
2085
        f_names = utils.ListVisibleFiles(dir_name)
2086
      except EnvironmentError, err:
2087
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2088
        break
2089
      for name in f_names:
2090
        os_path = utils.PathJoin(dir_name, name)
2091
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2092
        if status:
2093
          diagnose = ""
2094
          variants = os_inst.supported_variants
2095
          parameters = os_inst.supported_parameters
2096
          api_versions = os_inst.api_versions
2097
        else:
2098
          diagnose = os_inst
2099
          variants = parameters = api_versions = []
2100
        result.append((name, os_path, status, diagnose, variants,
2101
                       parameters, api_versions))
2102

    
2103
  return result
2104

    
2105

    
2106
def _TryOSFromDisk(name, base_dir=None):
2107
  """Create an OS instance from disk.
2108

2109
  This function will return an OS instance if the given name is a
2110
  valid OS name.
2111

2112
  @type base_dir: string
2113
  @keyword base_dir: Base directory containing OS installations.
2114
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2115
  @rtype: tuple
2116
  @return: success and either the OS instance if we find a valid one,
2117
      or error message
2118

2119
  """
2120
  if base_dir is None:
2121
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2122
  else:
2123
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2124

    
2125
  if os_dir is None:
2126
    return False, "Directory for OS %s not found in search path" % name
2127

    
2128
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2129
  if not status:
2130
    # push the error up
2131
    return status, api_versions
2132

    
2133
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2134
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2135
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2136

    
2137
  # OS Files dictionary, we will populate it with the absolute path
2138
  # names; if the value is True, then it is a required file, otherwise
2139
  # an optional one
2140
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2141

    
2142
  if max(api_versions) >= constants.OS_API_V15:
2143
    os_files[constants.OS_VARIANTS_FILE] = False
2144

    
2145
  if max(api_versions) >= constants.OS_API_V20:
2146
    os_files[constants.OS_PARAMETERS_FILE] = True
2147
  else:
2148
    del os_files[constants.OS_SCRIPT_VERIFY]
2149

    
2150
  for (filename, required) in os_files.items():
2151
    os_files[filename] = utils.PathJoin(os_dir, filename)
2152

    
2153
    try:
2154
      st = os.stat(os_files[filename])
2155
    except EnvironmentError, err:
2156
      if err.errno == errno.ENOENT and not required:
2157
        del os_files[filename]
2158
        continue
2159
      return False, ("File '%s' under path '%s' is missing (%s)" %
2160
                     (filename, os_dir, _ErrnoOrStr(err)))
2161

    
2162
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2163
      return False, ("File '%s' under path '%s' is not a regular file" %
2164
                     (filename, os_dir))
2165

    
2166
    if filename in constants.OS_SCRIPTS:
2167
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2168
        return False, ("File '%s' under path '%s' is not executable" %
2169
                       (filename, os_dir))
2170

    
2171
  variants = []
2172
  if constants.OS_VARIANTS_FILE in os_files:
2173
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2174
    try:
2175
      variants = utils.ReadFile(variants_file).splitlines()
2176
    except EnvironmentError, err:
2177
      # we accept missing files, but not other errors
2178
      if err.errno != errno.ENOENT:
2179
        return False, ("Error while reading the OS variants file at %s: %s" %
2180
                       (variants_file, _ErrnoOrStr(err)))
2181

    
2182
  parameters = []
2183
  if constants.OS_PARAMETERS_FILE in os_files:
2184
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2185
    try:
2186
      parameters = utils.ReadFile(parameters_file).splitlines()
2187
    except EnvironmentError, err:
2188
      return False, ("Error while reading the OS parameters file at %s: %s" %
2189
                     (parameters_file, _ErrnoOrStr(err)))
2190
    parameters = [v.split(None, 1) for v in parameters]
2191

    
2192
  os_obj = objects.OS(name=name, path=os_dir,
2193
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2194
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2195
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2196
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2197
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2198
                                                 None),
2199
                      supported_variants=variants,
2200
                      supported_parameters=parameters,
2201
                      api_versions=api_versions)
2202
  return True, os_obj
2203

    
2204

    
2205
def OSFromDisk(name, base_dir=None):
2206
  """Create an OS instance from disk.
2207

2208
  This function will return an OS instance if the given name is a
2209
  valid OS name. Otherwise, it will raise an appropriate
2210
  L{RPCFail} exception, detailing why this is not a valid OS.
2211

2212
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2213
  an exception but returns true/false status data.
2214

2215
  @type base_dir: string
2216
  @keyword base_dir: Base directory containing OS installations.
2217
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2218
  @rtype: L{objects.OS}
2219
  @return: the OS instance if we find a valid one
2220
  @raise RPCFail: if we don't find a valid OS
2221

2222
  """
2223
  name_only = objects.OS.GetName(name)
2224
  status, payload = _TryOSFromDisk(name_only, base_dir)
2225

    
2226
  if not status:
2227
    _Fail(payload)
2228

    
2229
  return payload
2230

    
2231

    
2232
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2233
  """Calculate the basic environment for an os script.
2234

2235
  @type os_name: str
2236
  @param os_name: full operating system name (including variant)
2237
  @type inst_os: L{objects.OS}
2238
  @param inst_os: operating system for which the environment is being built
2239
  @type os_params: dict
2240
  @param os_params: the OS parameters
2241
  @type debug: integer
2242
  @param debug: debug level (0 or 1, for OS Api 10)
2243
  @rtype: dict
2244
  @return: dict of environment variables
2245
  @raise errors.BlockDeviceError: if the block device
2246
      cannot be found
2247

2248
  """
2249
  result = {}
2250
  api_version = \
2251
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2252
  result["OS_API_VERSION"] = "%d" % api_version
2253
  result["OS_NAME"] = inst_os.name
2254
  result["DEBUG_LEVEL"] = "%d" % debug
2255

    
2256
  # OS variants
2257
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2258
    variant = objects.OS.GetVariant(os_name)
2259
    if not variant:
2260
      variant = inst_os.supported_variants[0]
2261
  else:
2262
    variant = ""
2263
  result["OS_VARIANT"] = variant
2264

    
2265
  # OS params
2266
  for pname, pvalue in os_params.items():
2267
    result["OSP_%s" % pname.upper()] = pvalue
2268

    
2269
  return result
2270

    
2271

    
2272
def OSEnvironment(instance, inst_os, debug=0):
2273
  """Calculate the environment for an os script.
2274

2275
  @type instance: L{objects.Instance}
2276
  @param instance: target instance for the os script run
2277
  @type inst_os: L{objects.OS}
2278
  @param inst_os: operating system for which the environment is being built
2279
  @type debug: integer
2280
  @param debug: debug level (0 or 1, for OS Api 10)
2281
  @rtype: dict
2282
  @return: dict of environment variables
2283
  @raise errors.BlockDeviceError: if the block device
2284
      cannot be found
2285

2286
  """
2287
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2288

    
2289
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2290
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2291

    
2292
  result["HYPERVISOR"] = instance.hypervisor
2293
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2294
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2295
  result["INSTANCE_SECONDARY_NODES"] = \
2296
      ("%s" % " ".join(instance.secondary_nodes))
2297

    
2298
  # Disks
2299
  for idx, disk in enumerate(instance.disks):
2300
    real_disk = _OpenRealBD(disk)
2301
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2302
    result["DISK_%d_ACCESS" % idx] = disk.mode
2303
    if constants.HV_DISK_TYPE in instance.hvparams:
2304
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2305
        instance.hvparams[constants.HV_DISK_TYPE]
2306
    if disk.dev_type in constants.LDS_BLOCK:
2307
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2308
    elif disk.dev_type == constants.LD_FILE:
2309
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2310
        "file:%s" % disk.physical_id[0]
2311

    
2312
  # NICs
2313
  for idx, nic in enumerate(instance.nics):
2314
    result["NIC_%d_MAC" % idx] = nic.mac
2315
    if nic.ip:
2316
      result["NIC_%d_IP" % idx] = nic.ip
2317
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2318
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2319
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2320
    if nic.nicparams[constants.NIC_LINK]:
2321
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2322
    if constants.HV_NIC_TYPE in instance.hvparams:
2323
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2324
        instance.hvparams[constants.HV_NIC_TYPE]
2325

    
2326
  # HV/BE params
2327
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2328
    for key, value in source.items():
2329
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2330

    
2331
  return result
2332

    
2333

    
2334
def BlockdevGrow(disk, amount, dryrun):
2335
  """Grow a stack of block devices.
2336

2337
  This function is called recursively, with the childrens being the
2338
  first ones to resize.
2339

2340
  @type disk: L{objects.Disk}
2341
  @param disk: the disk to be grown
2342
  @type amount: integer
2343
  @param amount: the amount (in mebibytes) to grow with
2344
  @type dryrun: boolean
2345
  @param dryrun: whether to execute the operation in simulation mode
2346
      only, without actually increasing the size
2347
  @rtype: (status, result)
2348
  @return: a tuple with the status of the operation (True/False), and
2349
      the errors message if status is False
2350

2351
  """
2352
  r_dev = _RecursiveFindBD(disk)
2353
  if r_dev is None:
2354
    _Fail("Cannot find block device %s", disk)
2355

    
2356
  try:
2357
    r_dev.Grow(amount, dryrun)
2358
  except errors.BlockDeviceError, err:
2359
    _Fail("Failed to grow block device: %s", err, exc=True)
2360

    
2361

    
2362
def BlockdevSnapshot(disk):
2363
  """Create a snapshot copy of a block device.
2364

2365
  This function is called recursively, and the snapshot is actually created
2366
  just for the leaf lvm backend device.
2367

2368
  @type disk: L{objects.Disk}
2369
  @param disk: the disk to be snapshotted
2370
  @rtype: string
2371
  @return: snapshot disk ID as (vg, lv)
2372

2373
  """
2374
  if disk.dev_type == constants.LD_DRBD8:
2375
    if not disk.children:
2376
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2377
            disk.unique_id)
2378
    return BlockdevSnapshot(disk.children[0])
2379
  elif disk.dev_type == constants.LD_LV:
2380
    r_dev = _RecursiveFindBD(disk)
2381
    if r_dev is not None:
2382
      # FIXME: choose a saner value for the snapshot size
2383
      # let's stay on the safe side and ask for the full size, for now
2384
      return r_dev.Snapshot(disk.size)
2385
    else:
2386
      _Fail("Cannot find block device %s", disk)
2387
  else:
2388
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2389
          disk.unique_id, disk.dev_type)
2390

    
2391

    
2392
def FinalizeExport(instance, snap_disks):
2393
  """Write out the export configuration information.
2394

2395
  @type instance: L{objects.Instance}
2396
  @param instance: the instance which we export, used for
2397
      saving configuration
2398
  @type snap_disks: list of L{objects.Disk}
2399
  @param snap_disks: list of snapshot block devices, which
2400
      will be used to get the actual name of the dump file
2401

2402
  @rtype: None
2403

2404
  """
2405
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2406
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2407

    
2408
  config = objects.SerializableConfigParser()
2409

    
2410
  config.add_section(constants.INISECT_EXP)
2411
  config.set(constants.INISECT_EXP, "version", "0")
2412
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2413
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2414
  config.set(constants.INISECT_EXP, "os", instance.os)
2415
  config.set(constants.INISECT_EXP, "compression", "none")
2416

    
2417
  config.add_section(constants.INISECT_INS)
2418
  config.set(constants.INISECT_INS, "name", instance.name)
2419
  config.set(constants.INISECT_INS, "memory", "%d" %
2420
             instance.beparams[constants.BE_MEMORY])
2421
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2422
             instance.beparams[constants.BE_VCPUS])
2423
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2424
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2425
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2426

    
2427
  nic_total = 0
2428
  for nic_count, nic in enumerate(instance.nics):
2429
    nic_total += 1
2430
    config.set(constants.INISECT_INS, "nic%d_mac" %
2431
               nic_count, "%s" % nic.mac)
2432
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2433
    for param in constants.NICS_PARAMETER_TYPES:
2434
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2435
                 "%s" % nic.nicparams.get(param, None))
2436
  # TODO: redundant: on load can read nics until it doesn't exist
2437
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2438

    
2439
  disk_total = 0
2440
  for disk_count, disk in enumerate(snap_disks):
2441
    if disk:
2442
      disk_total += 1
2443
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2444
                 ("%s" % disk.iv_name))
2445
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2446
                 ("%s" % disk.physical_id[1]))
2447
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2448
                 ("%d" % disk.size))
2449

    
2450
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2451

    
2452
  # New-style hypervisor/backend parameters
2453

    
2454
  config.add_section(constants.INISECT_HYP)
2455
  for name, value in instance.hvparams.items():
2456
    if name not in constants.HVC_GLOBALS:
2457
      config.set(constants.INISECT_HYP, name, str(value))
2458

    
2459
  config.add_section(constants.INISECT_BEP)
2460
  for name, value in instance.beparams.items():
2461
    config.set(constants.INISECT_BEP, name, str(value))
2462

    
2463
  config.add_section(constants.INISECT_OSP)
2464
  for name, value in instance.osparams.items():
2465
    config.set(constants.INISECT_OSP, name, str(value))
2466

    
2467
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2468
                  data=config.Dumps())
2469
  shutil.rmtree(finaldestdir, ignore_errors=True)
2470
  shutil.move(destdir, finaldestdir)
2471

    
2472

    
2473
def ExportInfo(dest):
2474
  """Get export configuration information.
2475

2476
  @type dest: str
2477
  @param dest: directory containing the export
2478

2479
  @rtype: L{objects.SerializableConfigParser}
2480
  @return: a serializable config file containing the
2481
      export info
2482

2483
  """
2484
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2485

    
2486
  config = objects.SerializableConfigParser()
2487
  config.read(cff)
2488

    
2489
  if (not config.has_section(constants.INISECT_EXP) or
2490
      not config.has_section(constants.INISECT_INS)):
2491
    _Fail("Export info file doesn't have the required fields")
2492

    
2493
  return config.Dumps()
2494

    
2495

    
2496
def ListExports():
2497
  """Return a list of exports currently available on this machine.
2498

2499
  @rtype: list
2500
  @return: list of the exports
2501

2502
  """
2503
  if os.path.isdir(constants.EXPORT_DIR):
2504
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2505
  else:
2506
    _Fail("No exports directory")
2507

    
2508

    
2509
def RemoveExport(export):
2510
  """Remove an existing export from the node.
2511

2512
  @type export: str
2513
  @param export: the name of the export to remove
2514
  @rtype: None
2515

2516
  """
2517
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2518

    
2519
  try:
2520
    shutil.rmtree(target)
2521
  except EnvironmentError, err:
2522
    _Fail("Error while removing the export: %s", err, exc=True)
2523

    
2524

    
2525
def BlockdevRename(devlist):
2526
  """Rename a list of block devices.
2527

2528
  @type devlist: list of tuples
2529
  @param devlist: list of tuples of the form  (disk,
2530
      new_logical_id, new_physical_id); disk is an
2531
      L{objects.Disk} object describing the current disk,
2532
      and new logical_id/physical_id is the name we
2533
      rename it to
2534
  @rtype: boolean
2535
  @return: True if all renames succeeded, False otherwise
2536

2537
  """
2538
  msgs = []
2539
  result = True
2540
  for disk, unique_id in devlist:
2541
    dev = _RecursiveFindBD(disk)
2542
    if dev is None:
2543
      msgs.append("Can't find device %s in rename" % str(disk))
2544
      result = False
2545
      continue
2546
    try:
2547
      old_rpath = dev.dev_path
2548
      dev.Rename(unique_id)
2549
      new_rpath = dev.dev_path
2550
      if old_rpath != new_rpath:
2551
        DevCacheManager.RemoveCache(old_rpath)
2552
        # FIXME: we should add the new cache information here, like:
2553
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2554
        # but we don't have the owner here - maybe parse from existing
2555
        # cache? for now, we only lose lvm data when we rename, which
2556
        # is less critical than DRBD or MD
2557
    except errors.BlockDeviceError, err:
2558
      msgs.append("Can't rename device '%s' to '%s': %s" %
2559
                  (dev, unique_id, err))
2560
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2561
      result = False
2562
  if not result:
2563
    _Fail("; ".join(msgs))
2564

    
2565

    
2566
def _TransformFileStorageDir(fs_dir):
2567
  """Checks whether given file_storage_dir is valid.
2568

2569
  Checks wheter the given fs_dir is within the cluster-wide default
2570
  file_storage_dir or the shared_file_storage_dir, which are stored in
2571
  SimpleStore. Only paths under those directories are allowed.
2572

2573
  @type fs_dir: str
2574
  @param fs_dir: the path to check
2575

2576
  @return: the normalized path if valid, None otherwise
2577

2578
  """
2579
  if not constants.ENABLE_FILE_STORAGE:
2580
    _Fail("File storage disabled at configure time")
2581
  cfg = _GetConfig()
2582
  fs_dir = os.path.normpath(fs_dir)
2583
  base_fstore = cfg.GetFileStorageDir()
2584
  base_shared = cfg.GetSharedFileStorageDir()
2585
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2586
          utils.IsBelowDir(base_shared, fs_dir)):
2587
    _Fail("File storage directory '%s' is not under base file"
2588
          " storage directory '%s' or shared storage directory '%s'",
2589
          fs_dir, base_fstore, base_shared)
2590
  return fs_dir
2591

    
2592

    
2593
def CreateFileStorageDir(file_storage_dir):
2594
  """Create file storage directory.
2595

2596
  @type file_storage_dir: str
2597
  @param file_storage_dir: directory to create
2598

2599
  @rtype: tuple
2600
  @return: tuple with first element a boolean indicating wheter dir
2601
      creation was successful or not
2602

2603
  """
2604
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2605
  if os.path.exists(file_storage_dir):
2606
    if not os.path.isdir(file_storage_dir):
2607
      _Fail("Specified storage dir '%s' is not a directory",
2608
            file_storage_dir)
2609
  else:
2610
    try:
2611
      os.makedirs(file_storage_dir, 0750)
2612
    except OSError, err:
2613
      _Fail("Cannot create file storage directory '%s': %s",
2614
            file_storage_dir, err, exc=True)
2615

    
2616

    
2617
def RemoveFileStorageDir(file_storage_dir):
2618
  """Remove file storage directory.
2619

2620
  Remove it only if it's empty. If not log an error and return.
2621

2622
  @type file_storage_dir: str
2623
  @param file_storage_dir: the directory we should cleanup
2624
  @rtype: tuple (success,)
2625
  @return: tuple of one element, C{success}, denoting
2626
      whether the operation was successful
2627

2628
  """
2629
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2630
  if os.path.exists(file_storage_dir):
2631
    if not os.path.isdir(file_storage_dir):
2632
      _Fail("Specified Storage directory '%s' is not a directory",
2633
            file_storage_dir)
2634
    # deletes dir only if empty, otherwise we want to fail the rpc call
2635
    try:
2636
      os.rmdir(file_storage_dir)
2637
    except OSError, err:
2638
      _Fail("Cannot remove file storage directory '%s': %s",
2639
            file_storage_dir, err)
2640

    
2641

    
2642
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2643
  """Rename the file storage directory.
2644

2645
  @type old_file_storage_dir: str
2646
  @param old_file_storage_dir: the current path
2647
  @type new_file_storage_dir: str
2648
  @param new_file_storage_dir: the name we should rename to
2649
  @rtype: tuple (success,)
2650
  @return: tuple of one element, C{success}, denoting
2651
      whether the operation was successful
2652

2653
  """
2654
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2655
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2656
  if not os.path.exists(new_file_storage_dir):
2657
    if os.path.isdir(old_file_storage_dir):
2658
      try:
2659
        os.rename(old_file_storage_dir, new_file_storage_dir)
2660
      except OSError, err:
2661
        _Fail("Cannot rename '%s' to '%s': %s",
2662
              old_file_storage_dir, new_file_storage_dir, err)
2663
    else:
2664
      _Fail("Specified storage dir '%s' is not a directory",
2665
            old_file_storage_dir)
2666
  else:
2667
    if os.path.exists(old_file_storage_dir):
2668
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2669
            old_file_storage_dir, new_file_storage_dir)
2670

    
2671

    
2672
def _EnsureJobQueueFile(file_name):
2673
  """Checks whether the given filename is in the queue directory.
2674

2675
  @type file_name: str
2676
  @param file_name: the file name we should check
2677
  @rtype: None
2678
  @raises RPCFail: if the file is not valid
2679

2680
  """
2681
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2682
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2683

    
2684
  if not result:
2685
    _Fail("Passed job queue file '%s' does not belong to"
2686
          " the queue directory '%s'", file_name, queue_dir)
2687

    
2688

    
2689
def JobQueueUpdate(file_name, content):
2690
  """Updates a file in the queue directory.
2691

2692
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2693
  checking.
2694

2695
  @type file_name: str
2696
  @param file_name: the job file name
2697
  @type content: str
2698
  @param content: the new job contents
2699
  @rtype: boolean
2700
  @return: the success of the operation
2701

2702
  """
2703
  _EnsureJobQueueFile(file_name)
2704
  getents = runtime.GetEnts()
2705

    
2706
  # Write and replace the file atomically
2707
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2708
                  gid=getents.masterd_gid)
2709

    
2710

    
2711
def JobQueueRename(old, new):
2712
  """Renames a job queue file.
2713

2714
  This is just a wrapper over os.rename with proper checking.
2715

2716
  @type old: str
2717
  @param old: the old (actual) file name
2718
  @type new: str
2719
  @param new: the desired file name
2720
  @rtype: tuple
2721
  @return: the success of the operation and payload
2722

2723
  """
2724
  _EnsureJobQueueFile(old)
2725
  _EnsureJobQueueFile(new)
2726

    
2727
  utils.RenameFile(old, new, mkdir=True)
2728

    
2729

    
2730
def BlockdevClose(instance_name, disks):
2731
  """Closes the given block devices.
2732

2733
  This means they will be switched to secondary mode (in case of
2734
  DRBD).
2735

2736
  @param instance_name: if the argument is not empty, the symlinks
2737
      of this instance will be removed
2738
  @type disks: list of L{objects.Disk}
2739
  @param disks: the list of disks to be closed
2740
  @rtype: tuple (success, message)
2741
  @return: a tuple of success and message, where success
2742
      indicates the succes of the operation, and message
2743
      which will contain the error details in case we
2744
      failed
2745

2746
  """
2747
  bdevs = []
2748
  for cf in disks:
2749
    rd = _RecursiveFindBD(cf)
2750
    if rd is None:
2751
      _Fail("Can't find device %s", cf)
2752
    bdevs.append(rd)
2753

    
2754
  msg = []
2755
  for rd in bdevs:
2756
    try:
2757
      rd.Close()
2758
    except errors.BlockDeviceError, err:
2759
      msg.append(str(err))
2760
  if msg:
2761
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2762
  else:
2763
    if instance_name:
2764
      _RemoveBlockDevLinks(instance_name, disks)
2765

    
2766

    
2767
def ValidateHVParams(hvname, hvparams):
2768
  """Validates the given hypervisor parameters.
2769

2770
  @type hvname: string
2771
  @param hvname: the hypervisor name
2772
  @type hvparams: dict
2773
  @param hvparams: the hypervisor parameters to be validated
2774
  @rtype: None
2775

2776
  """
2777
  try:
2778
    hv_type = hypervisor.GetHypervisor(hvname)
2779
    hv_type.ValidateParameters(hvparams)
2780
  except errors.HypervisorError, err:
2781
    _Fail(str(err), log=False)
2782

    
2783

    
2784
def _CheckOSPList(os_obj, parameters):
2785
  """Check whether a list of parameters is supported by the OS.
2786

2787
  @type os_obj: L{objects.OS}
2788
  @param os_obj: OS object to check
2789
  @type parameters: list
2790
  @param parameters: the list of parameters to check
2791

2792
  """
2793
  supported = [v[0] for v in os_obj.supported_parameters]
2794
  delta = frozenset(parameters).difference(supported)
2795
  if delta:
2796
    _Fail("The following parameters are not supported"
2797
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2798

    
2799

    
2800
def ValidateOS(required, osname, checks, osparams):
2801
  """Validate the given OS' parameters.
2802

2803
  @type required: boolean
2804
  @param required: whether absence of the OS should translate into
2805
      failure or not
2806
  @type osname: string
2807
  @param osname: the OS to be validated
2808
  @type checks: list
2809
  @param checks: list of the checks to run (currently only 'parameters')
2810
  @type osparams: dict
2811
  @param osparams: dictionary with OS parameters
2812
  @rtype: boolean
2813
  @return: True if the validation passed, or False if the OS was not
2814
      found and L{required} was false
2815

2816
  """
2817
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2818
    _Fail("Unknown checks required for OS %s: %s", osname,
2819
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2820

    
2821
  name_only = objects.OS.GetName(osname)
2822
  status, tbv = _TryOSFromDisk(name_only, None)
2823

    
2824
  if not status:
2825
    if required:
2826
      _Fail(tbv)
2827
    else:
2828
      return False
2829

    
2830
  if max(tbv.api_versions) < constants.OS_API_V20:
2831
    return True
2832

    
2833
  if constants.OS_VALIDATE_PARAMETERS in checks:
2834
    _CheckOSPList(tbv, osparams.keys())
2835

    
2836
  validate_env = OSCoreEnv(osname, tbv, osparams)
2837
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2838
                        cwd=tbv.path, reset_env=True)
2839
  if result.failed:
2840
    logging.error("os validate command '%s' returned error: %s output: %s",
2841
                  result.cmd, result.fail_reason, result.output)
2842
    _Fail("OS validation script failed (%s), output: %s",
2843
          result.fail_reason, result.output, log=False)
2844

    
2845
  return True
2846

    
2847

    
2848
def DemoteFromMC():
2849
  """Demotes the current node from master candidate role.
2850

2851
  """
2852
  # try to ensure we're not the master by mistake
2853
  master, myself = ssconf.GetMasterAndMyself()
2854
  if master == myself:
2855
    _Fail("ssconf status shows I'm the master node, will not demote")
2856

    
2857
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2858
  if not result.failed:
2859
    _Fail("The master daemon is running, will not demote")
2860

    
2861
  try:
2862
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2863
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2864
  except EnvironmentError, err:
2865
    if err.errno != errno.ENOENT:
2866
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2867

    
2868
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2869

    
2870

    
2871
def _GetX509Filenames(cryptodir, name):
2872
  """Returns the full paths for the private key and certificate.
2873

2874
  """
2875
  return (utils.PathJoin(cryptodir, name),
2876
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2877
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2878

    
2879

    
2880
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2881
  """Creates a new X509 certificate for SSL/TLS.
2882

2883
  @type validity: int
2884
  @param validity: Validity in seconds
2885
  @rtype: tuple; (string, string)
2886
  @return: Certificate name and public part
2887

2888
  """
2889
  (key_pem, cert_pem) = \
2890
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2891
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2892

    
2893
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2894
                              prefix="x509-%s-" % utils.TimestampForFilename())
2895
  try:
2896
    name = os.path.basename(cert_dir)
2897
    assert len(name) > 5
2898

    
2899
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2900

    
2901
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2902
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2903

    
2904
    # Never return private key as it shouldn't leave the node
2905
    return (name, cert_pem)
2906
  except Exception:
2907
    shutil.rmtree(cert_dir, ignore_errors=True)
2908
    raise
2909

    
2910

    
2911
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2912
  """Removes a X509 certificate.
2913

2914
  @type name: string
2915
  @param name: Certificate name
2916

2917
  """
2918
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2919

    
2920
  utils.RemoveFile(key_file)
2921
  utils.RemoveFile(cert_file)
2922

    
2923
  try:
2924
    os.rmdir(cert_dir)
2925
  except EnvironmentError, err:
2926
    _Fail("Cannot remove certificate directory '%s': %s",
2927
          cert_dir, err)
2928

    
2929

    
2930
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2931
  """Returns the command for the requested input/output.
2932

2933
  @type instance: L{objects.Instance}
2934
  @param instance: The instance object
2935
  @param mode: Import/export mode
2936
  @param ieio: Input/output type
2937
  @param ieargs: Input/output arguments
2938

2939
  """
2940
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2941

    
2942
  env = None
2943
  prefix = None
2944
  suffix = None
2945
  exp_size = None
2946

    
2947
  if ieio == constants.IEIO_FILE:
2948
    (filename, ) = ieargs
2949

    
2950
    if not utils.IsNormAbsPath(filename):
2951
      _Fail("Path '%s' is not normalized or absolute", filename)
2952

    
2953
    real_filename = os.path.realpath(filename)
2954
    directory = os.path.dirname(real_filename)
2955

    
2956
    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2957
      _Fail("File '%s' is not under exports directory '%s': %s",
2958
            filename, constants.EXPORT_DIR, real_filename)
2959

    
2960
    # Create directory
2961
    utils.Makedirs(directory, mode=0750)
2962

    
2963
    quoted_filename = utils.ShellQuote(filename)
2964

    
2965
    if mode == constants.IEM_IMPORT:
2966
      suffix = "> %s" % quoted_filename
2967
    elif mode == constants.IEM_EXPORT:
2968
      suffix = "< %s" % quoted_filename
2969

    
2970
      # Retrieve file size
2971
      try:
2972
        st = os.stat(filename)
2973
      except EnvironmentError, err:
2974
        logging.error("Can't stat(2) %s: %s", filename, err)
2975
      else:
2976
        exp_size = utils.BytesToMebibyte(st.st_size)
2977

    
2978
  elif ieio == constants.IEIO_RAW_DISK:
2979
    (disk, ) = ieargs
2980

    
2981
    real_disk = _OpenRealBD(disk)
2982

    
2983
    if mode == constants.IEM_IMPORT:
2984
      # we set here a smaller block size as, due to transport buffering, more
2985
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2986
      # is not already there or we pass a wrong path; we use notrunc to no
2987
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2988
      # much memory; this means that at best, we flush every 64k, which will
2989
      # not be very fast
2990
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2991
                                    " bs=%s oflag=dsync"),
2992
                                    real_disk.dev_path,
2993
                                    str(64 * 1024))
2994

    
2995
    elif mode == constants.IEM_EXPORT:
2996
      # the block size on the read dd is 1MiB to match our units
2997
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2998
                                   real_disk.dev_path,
2999
                                   str(1024 * 1024), # 1 MB
3000
                                   str(disk.size))
3001
      exp_size = disk.size
3002

    
3003
  elif ieio == constants.IEIO_SCRIPT:
3004
    (disk, disk_index, ) = ieargs
3005

    
3006
    assert isinstance(disk_index, (int, long))
3007

    
3008
    real_disk = _OpenRealBD(disk)
3009

    
3010
    inst_os = OSFromDisk(instance.os)
3011
    env = OSEnvironment(instance, inst_os)
3012

    
3013
    if mode == constants.IEM_IMPORT:
3014
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3015
      env["IMPORT_INDEX"] = str(disk_index)
3016
      script = inst_os.import_script
3017

    
3018
    elif mode == constants.IEM_EXPORT:
3019
      env["EXPORT_DEVICE"] = real_disk.dev_path
3020
      env["EXPORT_INDEX"] = str(disk_index)
3021
      script = inst_os.export_script
3022

    
3023
    # TODO: Pass special environment only to script
3024
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3025

    
3026
    if mode == constants.IEM_IMPORT:
3027
      suffix = "| %s" % script_cmd
3028

    
3029
    elif mode == constants.IEM_EXPORT:
3030
      prefix = "%s |" % script_cmd
3031

    
3032
    # Let script predict size
3033
    exp_size = constants.IE_CUSTOM_SIZE
3034

    
3035
  else:
3036
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3037

    
3038
  return (env, prefix, suffix, exp_size)
3039

    
3040

    
3041
def _CreateImportExportStatusDir(prefix):
3042
  """Creates status directory for import/export.
3043

3044
  """
3045
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3046
                          prefix=("%s-%s-" %
3047
                                  (prefix, utils.TimestampForFilename())))
3048

    
3049

    
3050
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3051
                            ieio, ieioargs):
3052
  """Starts an import or export daemon.
3053

3054
  @param mode: Import/output mode
3055
  @type opts: L{objects.ImportExportOptions}
3056
  @param opts: Daemon options
3057
  @type host: string
3058
  @param host: Remote host for export (None for import)
3059
  @type port: int
3060
  @param port: Remote port for export (None for import)
3061
  @type instance: L{objects.Instance}
3062
  @param instance: Instance object
3063
  @type component: string
3064
  @param component: which part of the instance is transferred now,
3065
      e.g. 'disk/0'
3066
  @param ieio: Input/output type
3067
  @param ieioargs: Input/output arguments
3068

3069
  """
3070
  if mode == constants.IEM_IMPORT:
3071
    prefix = "import"
3072

    
3073
    if not (host is None and port is None):
3074
      _Fail("Can not specify host or port on import")
3075

    
3076
  elif mode == constants.IEM_EXPORT:
3077
    prefix = "export"
3078

    
3079
    if host is None or port is None:
3080
      _Fail("Host and port must be specified for an export")
3081

    
3082
  else:
3083
    _Fail("Invalid mode %r", mode)
3084

    
3085
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3086
    _Fail("Cluster certificate can only be used for both key and CA")
3087

    
3088
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3089
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3090

    
3091
  if opts.key_name is None:
3092
    # Use server.pem
3093
    key_path = constants.NODED_CERT_FILE
3094
    cert_path = constants.NODED_CERT_FILE
3095
    assert opts.ca_pem is None
3096
  else:
3097
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3098
                                                 opts.key_name)
3099
    assert opts.ca_pem is not None
3100

    
3101
  for i in [key_path, cert_path]:
3102
    if not os.path.exists(i):
3103
      _Fail("File '%s' does not exist" % i)
3104

    
3105
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3106
  try:
3107
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3108
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3109
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3110

    
3111
    if opts.ca_pem is None:
3112
      # Use server.pem
3113
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
3114
    else:
3115
      ca = opts.ca_pem
3116

    
3117
    # Write CA file
3118
    utils.WriteFile(ca_file, data=ca, mode=0400)
3119

    
3120
    cmd = [
3121
      constants.IMPORT_EXPORT_DAEMON,
3122
      status_file, mode,
3123
      "--key=%s" % key_path,
3124
      "--cert=%s" % cert_path,
3125
      "--ca=%s" % ca_file,
3126
      ]
3127

    
3128
    if host:
3129
      cmd.append("--host=%s" % host)
3130

    
3131
    if port:
3132
      cmd.append("--port=%s" % port)
3133

    
3134
    if opts.ipv6:
3135
      cmd.append("--ipv6")
3136
    else:
3137
      cmd.append("--ipv4")
3138

    
3139
    if opts.compress:
3140
      cmd.append("--compress=%s" % opts.compress)
3141

    
3142
    if opts.magic:
3143
      cmd.append("--magic=%s" % opts.magic)
3144

    
3145
    if exp_size is not None:
3146
      cmd.append("--expected-size=%s" % exp_size)
3147

    
3148
    if cmd_prefix:
3149
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3150

    
3151
    if cmd_suffix:
3152
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3153

    
3154
    if mode == constants.IEM_EXPORT:
3155
      # Retry connection a few times when connecting to remote peer
3156
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3157
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3158
    elif opts.connect_timeout is not None:
3159
      assert mode == constants.IEM_IMPORT
3160
      # Overall timeout for establishing connection while listening
3161
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3162

    
3163
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3164

    
3165
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3166
    # support for receiving a file descriptor for output
3167
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3168
                      output=logfile)
3169

    
3170
    # The import/export name is simply the status directory name
3171
    return os.path.basename(status_dir)
3172

    
3173
  except Exception:
3174
    shutil.rmtree(status_dir, ignore_errors=True)
3175
    raise
3176

    
3177

    
3178
def GetImportExportStatus(names):
3179
  """Returns import/export daemon status.
3180

3181
  @type names: sequence
3182
  @param names: List of names
3183
  @rtype: List of dicts
3184
  @return: Returns a list of the state of each named import/export or None if a
3185
           status couldn't be read
3186

3187
  """
3188
  result = []
3189

    
3190
  for name in names:
3191
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3192
                                 _IES_STATUS_FILE)
3193

    
3194
    try:
3195
      data = utils.ReadFile(status_file)
3196
    except EnvironmentError, err:
3197
      if err.errno != errno.ENOENT:
3198
        raise
3199
      data = None
3200

    
3201
    if not data:
3202
      result.append(None)
3203
      continue
3204

    
3205
    result.append(serializer.LoadJson(data))
3206

    
3207
  return result
3208

    
3209

    
3210
def AbortImportExport(name):
3211
  """Sends SIGTERM to a running import/export daemon.
3212

3213
  """
3214
  logging.info("Abort import/export %s", name)
3215

    
3216
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3217
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3218

    
3219
  if pid:
3220
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3221
                 name, pid)
3222
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3223

    
3224

    
3225
def CleanupImportExport(name):
3226
  """Cleanup after an import or export.
3227

3228
  If the import/export daemon is still running it's killed. Afterwards the
3229
  whole status directory is removed.
3230

3231
  """
3232
  logging.info("Finalizing import/export %s", name)
3233

    
3234
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3235

    
3236
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3237

    
3238
  if pid:
3239
    logging.info("Import/export %s is still running with PID %s",
3240
                 name, pid)
3241
    utils.KillProcess(pid, waitpid=False)
3242

    
3243
  shutil.rmtree(status_dir, ignore_errors=True)
3244

    
3245

    
3246
def _FindDisks(nodes_ip, disks):
3247
  """Sets the physical ID on disks and returns the block devices.
3248

3249
  """
3250
  # set the correct physical ID
3251
  my_name = netutils.Hostname.GetSysName()
3252
  for cf in disks:
3253
    cf.SetPhysicalID(my_name, nodes_ip)
3254

    
3255
  bdevs = []
3256

    
3257
  for cf in disks:
3258
    rd = _RecursiveFindBD(cf)
3259
    if rd is None:
3260
      _Fail("Can't find device %s", cf)
3261
    bdevs.append(rd)
3262
  return bdevs
3263

    
3264

    
3265
def DrbdDisconnectNet(nodes_ip, disks):
3266
  """Disconnects the network on a list of drbd devices.
3267

3268
  """
3269
  bdevs = _FindDisks(nodes_ip, disks)
3270

    
3271
  # disconnect disks
3272
  for rd in bdevs:
3273
    try:
3274
      rd.DisconnectNet()
3275
    except errors.BlockDeviceError, err:
3276
      _Fail("Can't change network configuration to standalone mode: %s",
3277
            err, exc=True)
3278

    
3279

    
3280
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3281
  """Attaches the network on a list of drbd devices.
3282

3283
  """
3284
  bdevs = _FindDisks(nodes_ip, disks)
3285

    
3286
  if multimaster:
3287
    for idx, rd in enumerate(bdevs):
3288
      try:
3289
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3290
      except EnvironmentError, err:
3291
        _Fail("Can't create symlink: %s", err)
3292
  # reconnect disks, switch to new master configuration and if
3293
  # needed primary mode
3294
  for rd in bdevs:
3295
    try:
3296
      rd.AttachNet(multimaster)
3297
    except errors.BlockDeviceError, err:
3298
      _Fail("Can't change network configuration: %s", err)
3299

    
3300
  # wait until the disks are connected; we need to retry the re-attach
3301
  # if the device becomes standalone, as this might happen if the one
3302
  # node disconnects and reconnects in a different mode before the
3303
  # other node reconnects; in this case, one or both of the nodes will
3304
  # decide it has wrong configuration and switch to standalone
3305

    
3306
  def _Attach():
3307
    all_connected = True
3308

    
3309
    for rd in bdevs:
3310
      stats = rd.GetProcStatus()
3311

    
3312
      all_connected = (all_connected and
3313
                       (stats.is_connected or stats.is_in_resync))
3314

    
3315
      if stats.is_standalone:
3316
        # peer had different config info and this node became
3317
        # standalone, even though this should not happen with the
3318
        # new staged way of changing disk configs
3319
        try:
3320
          rd.AttachNet(multimaster)
3321
        except errors.BlockDeviceError, err:
3322
          _Fail("Can't change network configuration: %s", err)
3323

    
3324
    if not all_connected:
3325
      raise utils.RetryAgain()
3326

    
3327
  try:
3328
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3329
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3330
  except utils.RetryTimeout:
3331
    _Fail("Timeout in disk reconnecting")
3332

    
3333
  if multimaster:
3334
    # change to primary mode
3335
    for rd in bdevs:
3336
      try:
3337
        rd.Open()
3338
      except errors.BlockDeviceError, err:
3339
        _Fail("Can't change to primary mode: %s", err)
3340

    
3341

    
3342
def DrbdWaitSync(nodes_ip, disks):
3343
  """Wait until DRBDs have synchronized.
3344

3345
  """
3346
  def _helper(rd):
3347
    stats = rd.GetProcStatus()
3348
    if not (stats.is_connected or stats.is_in_resync):
3349
      raise utils.RetryAgain()
3350
    return stats
3351

    
3352
  bdevs = _FindDisks(nodes_ip, disks)
3353

    
3354
  min_resync = 100
3355
  alldone = True
3356
  for rd in bdevs:
3357
    try:
3358
      # poll each second for 15 seconds
3359
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3360
    except utils.RetryTimeout:
3361
      stats = rd.GetProcStatus()
3362
      # last check
3363
      if not (stats.is_connected or stats.is_in_resync):
3364
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3365
    alldone = alldone and (not stats.is_in_resync)
3366
    if stats.sync_percent is not None:
3367
      min_resync = min(min_resync, stats.sync_percent)
3368

    
3369
  return (alldone, min_resync)
3370

    
3371

    
3372
def GetDrbdUsermodeHelper():
3373
  """Returns DRBD usermode helper currently configured.
3374

3375
  """
3376
  try:
3377
    return bdev.BaseDRBD.GetUsermodeHelper()
3378
  except errors.BlockDeviceError, err:
3379
    _Fail(str(err))
3380

    
3381

    
3382
def PowercycleNode(hypervisor_type):
3383
  """Hard-powercycle the node.
3384

3385
  Because we need to return first, and schedule the powercycle in the
3386
  background, we won't be able to report failures nicely.
3387

3388
  """
3389
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3390
  try:
3391
    pid = os.fork()
3392
  except OSError:
3393
    # if we can't fork, we'll pretend that we're in the child process
3394
    pid = 0
3395
  if pid > 0:
3396
    return "Reboot scheduled in 5 seconds"
3397
  # ensure the child is running on ram
3398
  try:
3399
    utils.Mlockall()
3400
  except Exception: # pylint: disable=W0703
3401
    pass
3402
  time.sleep(5)
3403
  hyper.PowercycleNode()
3404

    
3405

    
3406
class HooksRunner(object):
3407
  """Hook runner.
3408

3409
  This class is instantiated on the node side (ganeti-noded) and not
3410
  on the master side.
3411

3412
  """
3413
  def __init__(self, hooks_base_dir=None):
3414
    """Constructor for hooks runner.
3415

3416
    @type hooks_base_dir: str or None
3417
    @param hooks_base_dir: if not None, this overrides the
3418
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3419

3420
    """
3421
    if hooks_base_dir is None:
3422
      hooks_base_dir = constants.HOOKS_BASE_DIR
3423
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3424
    # constant
3425
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3426

    
3427
  def RunHooks(self, hpath, phase, env):
3428
    """Run the scripts in the hooks directory.
3429

3430
    @type hpath: str
3431
    @param hpath: the path to the hooks directory which
3432
        holds the scripts
3433
    @type phase: str
3434
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3435
        L{constants.HOOKS_PHASE_POST}
3436
    @type env: dict
3437
    @param env: dictionary with the environment for the hook
3438
    @rtype: list
3439
    @return: list of 3-element tuples:
3440
      - script path
3441
      - script result, either L{constants.HKR_SUCCESS} or
3442
        L{constants.HKR_FAIL}
3443
      - output of the script
3444

3445
    @raise errors.ProgrammerError: for invalid input
3446
        parameters
3447

3448
    """
3449
    if phase == constants.HOOKS_PHASE_PRE:
3450
      suffix = "pre"
3451
    elif phase == constants.HOOKS_PHASE_POST:
3452
      suffix = "post"
3453
    else:
3454
      _Fail("Unknown hooks phase '%s'", phase)
3455

    
3456
    subdir = "%s-%s.d" % (hpath, suffix)
3457
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3458

    
3459
    results = []
3460

    
3461
    if not os.path.isdir(dir_name):
3462
      # for non-existing/non-dirs, we simply exit instead of logging a
3463
      # warning at every operation
3464
      return results
3465

    
3466
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3467

    
3468
    for (relname, relstatus, runresult)  in runparts_results:
3469
      if relstatus == constants.RUNPARTS_SKIP:
3470
        rrval = constants.HKR_SKIP
3471
        output = ""
3472
      elif relstatus == constants.RUNPARTS_ERR:
3473
        rrval = constants.HKR_FAIL
3474
        output = "Hook script execution error: %s" % runresult
3475
      elif relstatus == constants.RUNPARTS_RUN:
3476
        if runresult.failed:
3477
          rrval = constants.HKR_FAIL
3478
        else:
3479
          rrval = constants.HKR_SUCCESS
3480
        output = utils.SafeEncode(runresult.output.strip())
3481
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3482

    
3483
    return results
3484

    
3485

    
3486
class IAllocatorRunner(object):
3487
  """IAllocator runner.
3488

3489
  This class is instantiated on the node side (ganeti-noded) and not on
3490
  the master side.
3491

3492
  """
3493
  @staticmethod
3494
  def Run(name, idata):
3495
    """Run an iallocator script.
3496

3497
    @type name: str
3498
    @param name: the iallocator script name
3499
    @type idata: str
3500
    @param idata: the allocator input data
3501

3502
    @rtype: tuple
3503
    @return: two element tuple of:
3504
       - status
3505
       - either error message or stdout of allocator (for success)
3506

3507
    """
3508
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3509
                                  os.path.isfile)
3510
    if alloc_script is None:
3511
      _Fail("iallocator module '%s' not found in the search path", name)
3512

    
3513
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3514
    try:
3515
      os.write(fd, idata)
3516
      os.close(fd)
3517
      result = utils.RunCmd([alloc_script, fin_name])
3518
      if result.failed:
3519
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3520
              name, result.fail_reason, result.output)
3521
    finally:
3522
      os.unlink(fin_name)
3523

    
3524
    return result.stdout
3525

    
3526

    
3527
class DevCacheManager(object):
3528
  """Simple class for managing a cache of block device information.
3529

3530
  """
3531
  _DEV_PREFIX = "/dev/"
3532
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3533

    
3534
  @classmethod
3535
  def _ConvertPath(cls, dev_path):
3536
    """Converts a /dev/name path to the cache file name.
3537

3538
    This replaces slashes with underscores and strips the /dev
3539
    prefix. It then returns the full path to the cache file.
3540

3541
    @type dev_path: str
3542
    @param dev_path: the C{/dev/} path name
3543
    @rtype: str
3544
    @return: the converted path name
3545

3546
    """
3547
    if dev_path.startswith(cls._DEV_PREFIX):
3548
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3549
    dev_path = dev_path.replace("/", "_")
3550
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3551
    return fpath
3552

    
3553
  @classmethod
3554
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3555
    """Updates the cache information for a given device.
3556

3557
    @type dev_path: str
3558
    @param dev_path: the pathname of the device
3559
    @type owner: str
3560
    @param owner: the owner (instance name) of the device
3561
    @type on_primary: bool
3562
    @param on_primary: whether this is the primary
3563
        node nor not
3564
    @type iv_name: str
3565
    @param iv_name: the instance-visible name of the
3566
        device, as in objects.Disk.iv_name
3567

3568
    @rtype: None
3569

3570
    """
3571
    if dev_path is None:
3572
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3573
      return
3574
    fpath = cls._ConvertPath(dev_path)
3575
    if on_primary:
3576
      state = "primary"
3577
    else:
3578
      state = "secondary"
3579
    if iv_name is None:
3580
      iv_name = "not_visible"
3581
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3582
    try:
3583
      utils.WriteFile(fpath, data=fdata)
3584
    except EnvironmentError, err:
3585
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3586

    
3587
  @classmethod
3588
  def RemoveCache(cls, dev_path):
3589
    """Remove data for a dev_path.
3590

3591
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3592
    path name and logging.
3593

3594
    @type dev_path: str
3595
    @param dev_path: the pathname of the device
3596

3597
    @rtype: None
3598

3599
    """
3600
    if dev_path is None:
3601
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3602
      return
3603
    fpath = cls._ConvertPath(dev_path)
3604
    try:
3605
      utils.RemoveFile(fpath)
3606
    except EnvironmentError, err:
3607
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)