Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 57c7bc57

History | View | Annotate | Download (112.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64
from ganeti import compat
65

    
66

    
67
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
68
_ALLOWED_CLEAN_DIRS = frozenset([
69
  constants.DATA_DIR,
70
  constants.JOB_QUEUE_ARCHIVE_DIR,
71
  constants.QUEUE_DIR,
72
  constants.CRYPTO_KEYS_DIR,
73
  ])
74
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
75
_X509_KEY_FILE = "key"
76
_X509_CERT_FILE = "cert"
77
_IES_STATUS_FILE = "status"
78
_IES_PID_FILE = "pid"
79
_IES_CA_FILE = "ca"
80

    
81
#: Valid LVS output line regex
82
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
83

    
84

    
85
class RPCFail(Exception):
86
  """Class denoting RPC failure.
87

88
  Its argument is the error message.
89

90
  """
91

    
92

    
93
def _Fail(msg, *args, **kwargs):
94
  """Log an error and the raise an RPCFail exception.
95

96
  This exception is then handled specially in the ganeti daemon and
97
  turned into a 'failed' return type. As such, this function is a
98
  useful shortcut for logging the error and returning it to the master
99
  daemon.
100

101
  @type msg: string
102
  @param msg: the text of the exception
103
  @raise RPCFail
104

105
  """
106
  if args:
107
    msg = msg % args
108
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
109
    if "exc" in kwargs and kwargs["exc"]:
110
      logging.exception(msg)
111
    else:
112
      logging.error(msg)
113
  raise RPCFail(msg)
114

    
115

    
116
def _GetConfig():
117
  """Simple wrapper to return a SimpleStore.
118

119
  @rtype: L{ssconf.SimpleStore}
120
  @return: a SimpleStore instance
121

122
  """
123
  return ssconf.SimpleStore()
124

    
125

    
126
def _GetSshRunner(cluster_name):
127
  """Simple wrapper to return an SshRunner.
128

129
  @type cluster_name: str
130
  @param cluster_name: the cluster name, which is needed
131
      by the SshRunner constructor
132
  @rtype: L{ssh.SshRunner}
133
  @return: an SshRunner instance
134

135
  """
136
  return ssh.SshRunner(cluster_name)
137

    
138

    
139
def _Decompress(data):
140
  """Unpacks data compressed by the RPC client.
141

142
  @type data: list or tuple
143
  @param data: Data sent by RPC client
144
  @rtype: str
145
  @return: Decompressed data
146

147
  """
148
  assert isinstance(data, (list, tuple))
149
  assert len(data) == 2
150
  (encoding, content) = data
151
  if encoding == constants.RPC_ENCODING_NONE:
152
    return content
153
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
154
    return zlib.decompress(base64.b64decode(content))
155
  else:
156
    raise AssertionError("Unknown data encoding")
157

    
158

    
159
def _CleanDirectory(path, exclude=None):
160
  """Removes all regular files in a directory.
161

162
  @type path: str
163
  @param path: the directory to clean
164
  @type exclude: list
165
  @param exclude: list of files to be excluded, defaults
166
      to the empty list
167

168
  """
169
  if path not in _ALLOWED_CLEAN_DIRS:
170
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
171
          path)
172

    
173
  if not os.path.isdir(path):
174
    return
175
  if exclude is None:
176
    exclude = []
177
  else:
178
    # Normalize excluded paths
179
    exclude = [os.path.normpath(i) for i in exclude]
180

    
181
  for rel_name in utils.ListVisibleFiles(path):
182
    full_name = utils.PathJoin(path, rel_name)
183
    if full_name in exclude:
184
      continue
185
    if os.path.isfile(full_name) and not os.path.islink(full_name):
186
      utils.RemoveFile(full_name)
187

    
188

    
189
def _BuildUploadFileList():
190
  """Build the list of allowed upload files.
191

192
  This is abstracted so that it's built only once at module import time.
193

194
  """
195
  allowed_files = set([
196
    constants.CLUSTER_CONF_FILE,
197
    constants.ETC_HOSTS,
198
    constants.SSH_KNOWN_HOSTS_FILE,
199
    constants.VNC_PASSWORD_FILE,
200
    constants.RAPI_CERT_FILE,
201
    constants.SPICE_CERT_FILE,
202
    constants.SPICE_CACERT_FILE,
203
    constants.RAPI_USERS_FILE,
204
    constants.CONFD_HMAC_KEY,
205
    constants.CLUSTER_DOMAIN_SECRET_FILE,
206
    ])
207

    
208
  for hv_name in constants.HYPER_TYPES:
209
    hv_class = hypervisor.GetHypervisorClass(hv_name)
210
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
211

    
212
  return frozenset(allowed_files)
213

    
214

    
215
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
216

    
217

    
218
def JobQueuePurge():
219
  """Removes job queue files and archived jobs.
220

221
  @rtype: tuple
222
  @return: True, None
223

224
  """
225
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
226
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
227

    
228

    
229
def GetMasterInfo():
230
  """Returns master information.
231

232
  This is an utility function to compute master information, either
233
  for consumption here or from the node daemon.
234

235
  @rtype: tuple
236
  @return: master_netdev, master_ip, master_name, primary_ip_family,
237
    master_netmask
238
  @raise RPCFail: in case of errors
239

240
  """
241
  try:
242
    cfg = _GetConfig()
243
    master_netdev = cfg.GetMasterNetdev()
244
    master_ip = cfg.GetMasterIP()
245
    master_netmask = cfg.GetMasterNetmask()
246
    master_node = cfg.GetMasterNode()
247
    primary_ip_family = cfg.GetPrimaryIPFamily()
248
  except errors.ConfigurationError, err:
249
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
250
  return (master_netdev, master_ip, master_node, primary_ip_family,
251
      master_netmask)
252

    
253

    
254
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
255
  """Decorator that runs hooks before and after the decorated function.
256

257
  @type hook_opcode: string
258
  @param hook_opcode: opcode of the hook
259
  @type hooks_path: string
260
  @param hooks_path: path of the hooks
261
  @type env_builder_fn: function
262
  @param env_builder_fn: function that returns a dictionary containing the
263
    environment variables for the hooks. Will get all the parameters of the
264
    decorated function.
265
  @raise RPCFail: in case of pre-hook failure
266

267
  """
268
  def decorator(fn):
269
    def wrapper(*args, **kwargs):
270
      _, myself = ssconf.GetMasterAndMyself()
271
      nodes = ([myself], [myself])  # these hooks run locally
272

    
273
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
274

    
275
      cfg = _GetConfig()
276
      hr = HooksRunner()
277
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
278
                            None, env_fn, logging.warning, cfg.GetClusterName(),
279
                            cfg.GetMasterNode())
280

    
281
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
282
      result = fn(*args, **kwargs)
283
      hm.RunPhase(constants.HOOKS_PHASE_POST)
284

    
285
      return result
286
    return wrapper
287
  return decorator
288

    
289

    
290
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
291
  """Builds environment variables for master IP hooks.
292

293
  @type master_params: L{objects.MasterNetworkParameters}
294
  @param master_params: network parameters of the master
295
  @type use_external_mip_script: boolean
296
  @param use_external_mip_script: whether to use an external master IP
297
    address setup script (unused, but necessary per the implementation of the
298
    _RunLocalHooks decorator)
299

300
  """
301
  # pylint: disable=W0613
302
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
303
  env = {
304
    "MASTER_NETDEV": master_params.netdev,
305
    "MASTER_IP": master_params.ip,
306
    "MASTER_NETMASK": master_params.netmask,
307
    "CLUSTER_IP_VERSION": str(ver),
308
  }
309

    
310
  return env
311

    
312

    
313
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
314
               _BuildMasterIpEnv)
315
def ActivateMasterIp(master_params, use_external_mip_script):
316
  """Activate the IP address of the master daemon.
317

318
  @type master_params: L{objects.MasterNetworkParameters}
319
  @param master_params: network parameters of the master
320
  @type use_external_mip_script: boolean
321
  @param use_external_mip_script: whether to use an external master IP
322
    address setup script
323

324
  """
325
  # pylint: disable=W0613
326
  err_msg = None
327
  if netutils.TcpPing(master_params.ip, constants.DEFAULT_NODED_PORT):
328
    if netutils.IPAddress.Own(master_params.ip):
329
      # we already have the ip:
330
      logging.debug("Master IP already configured, doing nothing")
331
    else:
332
      err_msg = "Someone else has the master ip, not activating"
333
      logging.error(err_msg)
334
  else:
335
    ipcls = netutils.IPAddress.GetClassFromIpFamily(master_params.ip_family)
336

    
337
    result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
338
                           "%s/%s" % (master_params.ip, master_params.netmask),
339
                           "dev", master_params.netdev, "label",
340
                           "%s:0" % master_params.netdev])
341
    if result.failed:
342
      err_msg = "Can't activate master IP: %s" % result.output
343
      logging.error(err_msg)
344

    
345
    else:
346
      # we ignore the exit code of the following cmds
347
      if ipcls == netutils.IP4Address:
348
        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_params.netdev,
349
                      "-s", master_params.ip, master_params.ip])
350
      elif ipcls == netutils.IP6Address:
351
        try:
352
          utils.RunCmd(["ndisc6", "-q", "-r 3", master_params.ip,
353
                        master_params.netdev])
354
        except errors.OpExecError:
355
          # TODO: Better error reporting
356
          logging.warning("Can't execute ndisc6, please install if missing")
357

    
358
  if err_msg:
359
    _Fail(err_msg)
360

    
361

    
362
def StartMasterDaemons(no_voting):
363
  """Activate local node as master node.
364

365
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
366

367
  @type no_voting: boolean
368
  @param no_voting: whether to start ganeti-masterd without a node vote
369
      but still non-interactively
370
  @rtype: None
371

372
  """
373

    
374
  if no_voting:
375
    masterd_args = "--no-voting --yes-do-it"
376
  else:
377
    masterd_args = ""
378

    
379
  env = {
380
    "EXTRA_MASTERD_ARGS": masterd_args,
381
    }
382

    
383
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
384
  if result.failed:
385
    msg = "Can't start Ganeti master: %s" % result.output
386
    logging.error(msg)
387
    _Fail(msg)
388

    
389

    
390
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
391
               _BuildMasterIpEnv)
392
def DeactivateMasterIp(master_params, use_external_mip_script):
393
  """Deactivate the master IP on this node.
394

395
  @type master_params: L{objects.MasterNetworkParameters}
396
  @param master_params: network parameters of the master
397
  @type use_external_mip_script: boolean
398
  @param use_external_mip_script: whether to use an external master IP
399
    address setup script
400

401
  """
402
  # pylint: disable=W0613
403
  # TODO: log and report back to the caller the error failures; we
404
  # need to decide in which case we fail the RPC for this
405

    
406
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
407
                         "%s/%s" % (master_params.ip, master_params.netmask),
408
                         "dev", master_params.netdev])
409
  if result.failed:
410
    logging.error("Can't remove the master IP, error: %s", result.output)
411
    # but otherwise ignore the failure
412

    
413

    
414
def StopMasterDaemons():
415
  """Stop the master daemons on this node.
416

417
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
418

419
  @rtype: None
420

421
  """
422
  # TODO: log and report back to the caller the error failures; we
423
  # need to decide in which case we fail the RPC for this
424

    
425
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
426
  if result.failed:
427
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
428
                  " and error %s",
429
                  result.cmd, result.exit_code, result.output)
430

    
431

    
432
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
433
  """Change the netmask of the master IP.
434

435
  @param old_netmask: the old value of the netmask
436
  @param netmask: the new value of the netmask
437
  @param master_ip: the master IP
438
  @param master_netdev: the master network device
439

440
  """
441
  if old_netmask == netmask:
442
    return
443

    
444
  if not netutils.IPAddress.Own(master_ip):
445
    _Fail("The master IP address is not up, not attempting to change its"
446
          " netmask")
447

    
448
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
449
                         "%s/%s" % (master_ip, netmask),
450
                         "dev", master_netdev, "label",
451
                         "%s:0" % master_netdev])
452
  if result.failed:
453
    _Fail("Could not set the new netmask on the master IP address")
454

    
455
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
456
                         "%s/%s" % (master_ip, old_netmask),
457
                         "dev", master_netdev, "label",
458
                         "%s:0" % master_netdev])
459
  if result.failed:
460
    _Fail("Could not bring down the master IP address with the old netmask")
461

    
462

    
463
def EtcHostsModify(mode, host, ip):
464
  """Modify a host entry in /etc/hosts.
465

466
  @param mode: The mode to operate. Either add or remove entry
467
  @param host: The host to operate on
468
  @param ip: The ip associated with the entry
469

470
  """
471
  if mode == constants.ETC_HOSTS_ADD:
472
    if not ip:
473
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
474
              " present")
475
    utils.AddHostToEtcHosts(host, ip)
476
  elif mode == constants.ETC_HOSTS_REMOVE:
477
    if ip:
478
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
479
              " parameter is present")
480
    utils.RemoveHostFromEtcHosts(host)
481
  else:
482
    RPCFail("Mode not supported")
483

    
484

    
485
def LeaveCluster(modify_ssh_setup):
486
  """Cleans up and remove the current node.
487

488
  This function cleans up and prepares the current node to be removed
489
  from the cluster.
490

491
  If processing is successful, then it raises an
492
  L{errors.QuitGanetiException} which is used as a special case to
493
  shutdown the node daemon.
494

495
  @param modify_ssh_setup: boolean
496

497
  """
498
  _CleanDirectory(constants.DATA_DIR)
499
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
500
  JobQueuePurge()
501

    
502
  if modify_ssh_setup:
503
    try:
504
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
505

    
506
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
507

    
508
      utils.RemoveFile(priv_key)
509
      utils.RemoveFile(pub_key)
510
    except errors.OpExecError:
511
      logging.exception("Error while processing ssh files")
512

    
513
  try:
514
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
515
    utils.RemoveFile(constants.RAPI_CERT_FILE)
516
    utils.RemoveFile(constants.SPICE_CERT_FILE)
517
    utils.RemoveFile(constants.SPICE_CACERT_FILE)
518
    utils.RemoveFile(constants.NODED_CERT_FILE)
519
  except: # pylint: disable=W0702
520
    logging.exception("Error while removing cluster secrets")
521

    
522
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
523
  if result.failed:
524
    logging.error("Command %s failed with exitcode %s and error %s",
525
                  result.cmd, result.exit_code, result.output)
526

    
527
  # Raise a custom exception (handled in ganeti-noded)
528
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
529

    
530

    
531
def GetNodeInfo(vgname, hypervisor_type):
532
  """Gives back a hash with different information about the node.
533

534
  @type vgname: C{string}
535
  @param vgname: the name of the volume group to ask for disk space information
536
  @type hypervisor_type: C{str}
537
  @param hypervisor_type: the name of the hypervisor to ask for
538
      memory information
539
  @rtype: C{dict}
540
  @return: dictionary with the following keys:
541
      - vg_size is the size of the configured volume group in MiB
542
      - vg_free is the free size of the volume group in MiB
543
      - memory_dom0 is the memory allocated for domain0 in MiB
544
      - memory_free is the currently available (free) ram in MiB
545
      - memory_total is the total number of ram in MiB
546
      - hv_version: the hypervisor version, if available
547

548
  """
549
  outputarray = {}
550

    
551
  if vgname is not None:
552
    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
553
    vg_free = vg_size = None
554
    if vginfo:
555
      vg_free = int(round(vginfo[0][0], 0))
556
      vg_size = int(round(vginfo[0][1], 0))
557
    outputarray["vg_size"] = vg_size
558
    outputarray["vg_free"] = vg_free
559

    
560
  if hypervisor_type is not None:
561
    hyper = hypervisor.GetHypervisor(hypervisor_type)
562
    hyp_info = hyper.GetNodeInfo()
563
    if hyp_info is not None:
564
      outputarray.update(hyp_info)
565

    
566
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
567

    
568
  return outputarray
569

    
570

    
571
def VerifyNode(what, cluster_name):
572
  """Verify the status of the local node.
573

574
  Based on the input L{what} parameter, various checks are done on the
575
  local node.
576

577
  If the I{filelist} key is present, this list of
578
  files is checksummed and the file/checksum pairs are returned.
579

580
  If the I{nodelist} key is present, we check that we have
581
  connectivity via ssh with the target nodes (and check the hostname
582
  report).
583

584
  If the I{node-net-test} key is present, we check that we have
585
  connectivity to the given nodes via both primary IP and, if
586
  applicable, secondary IPs.
587

588
  @type what: C{dict}
589
  @param what: a dictionary of things to check:
590
      - filelist: list of files for which to compute checksums
591
      - nodelist: list of nodes we should check ssh communication with
592
      - node-net-test: list of nodes we should check node daemon port
593
        connectivity with
594
      - hypervisor: list with hypervisors to run the verify for
595
  @rtype: dict
596
  @return: a dictionary with the same keys as the input dict, and
597
      values representing the result of the checks
598

599
  """
600
  result = {}
601
  my_name = netutils.Hostname.GetSysName()
602
  port = netutils.GetDaemonPort(constants.NODED)
603
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
604

    
605
  if constants.NV_HYPERVISOR in what and vm_capable:
606
    result[constants.NV_HYPERVISOR] = tmp = {}
607
    for hv_name in what[constants.NV_HYPERVISOR]:
608
      try:
609
        val = hypervisor.GetHypervisor(hv_name).Verify()
610
      except errors.HypervisorError, err:
611
        val = "Error while checking hypervisor: %s" % str(err)
612
      tmp[hv_name] = val
613

    
614
  if constants.NV_HVPARAMS in what and vm_capable:
615
    result[constants.NV_HVPARAMS] = tmp = []
616
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
617
      try:
618
        logging.info("Validating hv %s, %s", hv_name, hvparms)
619
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
620
      except errors.HypervisorError, err:
621
        tmp.append((source, hv_name, str(err)))
622

    
623
  if constants.NV_FILELIST in what:
624
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
625
      what[constants.NV_FILELIST])
626

    
627
  if constants.NV_NODELIST in what:
628
    (nodes, bynode) = what[constants.NV_NODELIST]
629

    
630
    # Add nodes from other groups (different for each node)
631
    try:
632
      nodes.extend(bynode[my_name])
633
    except KeyError:
634
      pass
635

    
636
    # Use a random order
637
    random.shuffle(nodes)
638

    
639
    # Try to contact all nodes
640
    val = {}
641
    for node in nodes:
642
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
643
      if not success:
644
        val[node] = message
645

    
646
    result[constants.NV_NODELIST] = val
647

    
648
  if constants.NV_NODENETTEST in what:
649
    result[constants.NV_NODENETTEST] = tmp = {}
650
    my_pip = my_sip = None
651
    for name, pip, sip in what[constants.NV_NODENETTEST]:
652
      if name == my_name:
653
        my_pip = pip
654
        my_sip = sip
655
        break
656
    if not my_pip:
657
      tmp[my_name] = ("Can't find my own primary/secondary IP"
658
                      " in the node list")
659
    else:
660
      for name, pip, sip in what[constants.NV_NODENETTEST]:
661
        fail = []
662
        if not netutils.TcpPing(pip, port, source=my_pip):
663
          fail.append("primary")
664
        if sip != pip:
665
          if not netutils.TcpPing(sip, port, source=my_sip):
666
            fail.append("secondary")
667
        if fail:
668
          tmp[name] = ("failure using the %s interface(s)" %
669
                       " and ".join(fail))
670

    
671
  if constants.NV_MASTERIP in what:
672
    # FIXME: add checks on incoming data structures (here and in the
673
    # rest of the function)
674
    master_name, master_ip = what[constants.NV_MASTERIP]
675
    if master_name == my_name:
676
      source = constants.IP4_ADDRESS_LOCALHOST
677
    else:
678
      source = None
679
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
680
                                                  source=source)
681

    
682
  if constants.NV_USERSCRIPTS in what:
683
    result[constants.NV_USERSCRIPTS] = \
684
      [script for script in what[constants.NV_USERSCRIPTS]
685
       if not (os.path.exists(script) and os.access(script, os.X_OK))]
686

    
687
  if constants.NV_OOB_PATHS in what:
688
    result[constants.NV_OOB_PATHS] = tmp = []
689
    for path in what[constants.NV_OOB_PATHS]:
690
      try:
691
        st = os.stat(path)
692
      except OSError, err:
693
        tmp.append("error stating out of band helper: %s" % err)
694
      else:
695
        if stat.S_ISREG(st.st_mode):
696
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
697
            tmp.append(None)
698
          else:
699
            tmp.append("out of band helper %s is not executable" % path)
700
        else:
701
          tmp.append("out of band helper %s is not a file" % path)
702

    
703
  if constants.NV_LVLIST in what and vm_capable:
704
    try:
705
      val = GetVolumeList(utils.ListVolumeGroups().keys())
706
    except RPCFail, err:
707
      val = str(err)
708
    result[constants.NV_LVLIST] = val
709

    
710
  if constants.NV_INSTANCELIST in what and vm_capable:
711
    # GetInstanceList can fail
712
    try:
713
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
714
    except RPCFail, err:
715
      val = str(err)
716
    result[constants.NV_INSTANCELIST] = val
717

    
718
  if constants.NV_VGLIST in what and vm_capable:
719
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
720

    
721
  if constants.NV_PVLIST in what and vm_capable:
722
    result[constants.NV_PVLIST] = \
723
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
724
                                   filter_allocatable=False)
725

    
726
  if constants.NV_VERSION in what:
727
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
728
                                    constants.RELEASE_VERSION)
729

    
730
  if constants.NV_HVINFO in what and vm_capable:
731
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
732
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
733

    
734
  if constants.NV_DRBDLIST in what and vm_capable:
735
    try:
736
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
737
    except errors.BlockDeviceError, err:
738
      logging.warning("Can't get used minors list", exc_info=True)
739
      used_minors = str(err)
740
    result[constants.NV_DRBDLIST] = used_minors
741

    
742
  if constants.NV_DRBDHELPER in what and vm_capable:
743
    status = True
744
    try:
745
      payload = bdev.BaseDRBD.GetUsermodeHelper()
746
    except errors.BlockDeviceError, err:
747
      logging.error("Can't get DRBD usermode helper: %s", str(err))
748
      status = False
749
      payload = str(err)
750
    result[constants.NV_DRBDHELPER] = (status, payload)
751

    
752
  if constants.NV_NODESETUP in what:
753
    result[constants.NV_NODESETUP] = tmpr = []
754
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
755
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
756
                  " under /sys, missing required directories /sys/block"
757
                  " and /sys/class/net")
758
    if (not os.path.isdir("/proc/sys") or
759
        not os.path.isfile("/proc/sysrq-trigger")):
760
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
761
                  " under /proc, missing required directory /proc/sys and"
762
                  " the file /proc/sysrq-trigger")
763

    
764
  if constants.NV_TIME in what:
765
    result[constants.NV_TIME] = utils.SplitTime(time.time())
766

    
767
  if constants.NV_OSLIST in what and vm_capable:
768
    result[constants.NV_OSLIST] = DiagnoseOS()
769

    
770
  if constants.NV_BRIDGES in what and vm_capable:
771
    result[constants.NV_BRIDGES] = [bridge
772
                                    for bridge in what[constants.NV_BRIDGES]
773
                                    if not utils.BridgeExists(bridge)]
774
  return result
775

    
776

    
777
def GetBlockDevSizes(devices):
778
  """Return the size of the given block devices
779

780
  @type devices: list
781
  @param devices: list of block device nodes to query
782
  @rtype: dict
783
  @return:
784
    dictionary of all block devices under /dev (key). The value is their
785
    size in MiB.
786

787
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
788

789
  """
790
  DEV_PREFIX = "/dev/"
791
  blockdevs = {}
792

    
793
  for devpath in devices:
794
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
795
      continue
796

    
797
    try:
798
      st = os.stat(devpath)
799
    except EnvironmentError, err:
800
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
801
      continue
802

    
803
    if stat.S_ISBLK(st.st_mode):
804
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
805
      if result.failed:
806
        # We don't want to fail, just do not list this device as available
807
        logging.warning("Cannot get size for block device %s", devpath)
808
        continue
809

    
810
      size = int(result.stdout) / (1024 * 1024)
811
      blockdevs[devpath] = size
812
  return blockdevs
813

    
814

    
815
def GetVolumeList(vg_names):
816
  """Compute list of logical volumes and their size.
817

818
  @type vg_names: list
819
  @param vg_names: the volume groups whose LVs we should list, or
820
      empty for all volume groups
821
  @rtype: dict
822
  @return:
823
      dictionary of all partions (key) with value being a tuple of
824
      their size (in MiB), inactive and online status::
825

826
        {'xenvg/test1': ('20.06', True, True)}
827

828
      in case of errors, a string is returned with the error
829
      details.
830

831
  """
832
  lvs = {}
833
  sep = "|"
834
  if not vg_names:
835
    vg_names = []
836
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
837
                         "--separator=%s" % sep,
838
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
839
  if result.failed:
840
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
841

    
842
  for line in result.stdout.splitlines():
843
    line = line.strip()
844
    match = _LVSLINE_REGEX.match(line)
845
    if not match:
846
      logging.error("Invalid line returned from lvs output: '%s'", line)
847
      continue
848
    vg_name, name, size, attr = match.groups()
849
    inactive = attr[4] == "-"
850
    online = attr[5] == "o"
851
    virtual = attr[0] == "v"
852
    if virtual:
853
      # we don't want to report such volumes as existing, since they
854
      # don't really hold data
855
      continue
856
    lvs[vg_name + "/" + name] = (size, inactive, online)
857

    
858
  return lvs
859

    
860

    
861
def ListVolumeGroups():
862
  """List the volume groups and their size.
863

864
  @rtype: dict
865
  @return: dictionary with keys volume name and values the
866
      size of the volume
867

868
  """
869
  return utils.ListVolumeGroups()
870

    
871

    
872
def NodeVolumes():
873
  """List all volumes on this node.
874

875
  @rtype: list
876
  @return:
877
    A list of dictionaries, each having four keys:
878
      - name: the logical volume name,
879
      - size: the size of the logical volume
880
      - dev: the physical device on which the LV lives
881
      - vg: the volume group to which it belongs
882

883
    In case of errors, we return an empty list and log the
884
    error.
885

886
    Note that since a logical volume can live on multiple physical
887
    volumes, the resulting list might include a logical volume
888
    multiple times.
889

890
  """
891
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
892
                         "--separator=|",
893
                         "--options=lv_name,lv_size,devices,vg_name"])
894
  if result.failed:
895
    _Fail("Failed to list logical volumes, lvs output: %s",
896
          result.output)
897

    
898
  def parse_dev(dev):
899
    return dev.split("(")[0]
900

    
901
  def handle_dev(dev):
902
    return [parse_dev(x) for x in dev.split(",")]
903

    
904
  def map_line(line):
905
    line = [v.strip() for v in line]
906
    return [{"name": line[0], "size": line[1],
907
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
908

    
909
  all_devs = []
910
  for line in result.stdout.splitlines():
911
    if line.count("|") >= 3:
912
      all_devs.extend(map_line(line.split("|")))
913
    else:
914
      logging.warning("Strange line in the output from lvs: '%s'", line)
915
  return all_devs
916

    
917

    
918
def BridgesExist(bridges_list):
919
  """Check if a list of bridges exist on the current node.
920

921
  @rtype: boolean
922
  @return: C{True} if all of them exist, C{False} otherwise
923

924
  """
925
  missing = []
926
  for bridge in bridges_list:
927
    if not utils.BridgeExists(bridge):
928
      missing.append(bridge)
929

    
930
  if missing:
931
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
932

    
933

    
934
def GetInstanceList(hypervisor_list):
935
  """Provides a list of instances.
936

937
  @type hypervisor_list: list
938
  @param hypervisor_list: the list of hypervisors to query information
939

940
  @rtype: list
941
  @return: a list of all running instances on the current node
942
    - instance1.example.com
943
    - instance2.example.com
944

945
  """
946
  results = []
947
  for hname in hypervisor_list:
948
    try:
949
      names = hypervisor.GetHypervisor(hname).ListInstances()
950
      results.extend(names)
951
    except errors.HypervisorError, err:
952
      _Fail("Error enumerating instances (hypervisor %s): %s",
953
            hname, err, exc=True)
954

    
955
  return results
956

    
957

    
958
def GetInstanceInfo(instance, hname):
959
  """Gives back the information about an instance as a dictionary.
960

961
  @type instance: string
962
  @param instance: the instance name
963
  @type hname: string
964
  @param hname: the hypervisor type of the instance
965

966
  @rtype: dict
967
  @return: dictionary with the following keys:
968
      - memory: memory size of instance (int)
969
      - state: xen state of instance (string)
970
      - time: cpu time of instance (float)
971

972
  """
973
  output = {}
974

    
975
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
976
  if iinfo is not None:
977
    output["memory"] = iinfo[2]
978
    output["state"] = iinfo[4]
979
    output["time"] = iinfo[5]
980

    
981
  return output
982

    
983

    
984
def GetInstanceMigratable(instance):
985
  """Gives whether an instance can be migrated.
986

987
  @type instance: L{objects.Instance}
988
  @param instance: object representing the instance to be checked.
989

990
  @rtype: tuple
991
  @return: tuple of (result, description) where:
992
      - result: whether the instance can be migrated or not
993
      - description: a description of the issue, if relevant
994

995
  """
996
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
997
  iname = instance.name
998
  if iname not in hyper.ListInstances():
999
    _Fail("Instance %s is not running", iname)
1000

    
1001
  for idx in range(len(instance.disks)):
1002
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1003
    if not os.path.islink(link_name):
1004
      logging.warning("Instance %s is missing symlink %s for disk %d",
1005
                      iname, link_name, idx)
1006

    
1007

    
1008
def GetAllInstancesInfo(hypervisor_list):
1009
  """Gather data about all instances.
1010

1011
  This is the equivalent of L{GetInstanceInfo}, except that it
1012
  computes data for all instances at once, thus being faster if one
1013
  needs data about more than one instance.
1014

1015
  @type hypervisor_list: list
1016
  @param hypervisor_list: list of hypervisors to query for instance data
1017

1018
  @rtype: dict
1019
  @return: dictionary of instance: data, with data having the following keys:
1020
      - memory: memory size of instance (int)
1021
      - state: xen state of instance (string)
1022
      - time: cpu time of instance (float)
1023
      - vcpus: the number of vcpus
1024

1025
  """
1026
  output = {}
1027

    
1028
  for hname in hypervisor_list:
1029
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1030
    if iinfo:
1031
      for name, _, memory, vcpus, state, times in iinfo:
1032
        value = {
1033
          "memory": memory,
1034
          "vcpus": vcpus,
1035
          "state": state,
1036
          "time": times,
1037
          }
1038
        if name in output:
1039
          # we only check static parameters, like memory and vcpus,
1040
          # and not state and time which can change between the
1041
          # invocations of the different hypervisors
1042
          for key in "memory", "vcpus":
1043
            if value[key] != output[name][key]:
1044
              _Fail("Instance %s is running twice"
1045
                    " with different parameters", name)
1046
        output[name] = value
1047

    
1048
  return output
1049

    
1050

    
1051
def _InstanceLogName(kind, os_name, instance, component):
1052
  """Compute the OS log filename for a given instance and operation.
1053

1054
  The instance name and os name are passed in as strings since not all
1055
  operations have these as part of an instance object.
1056

1057
  @type kind: string
1058
  @param kind: the operation type (e.g. add, import, etc.)
1059
  @type os_name: string
1060
  @param os_name: the os name
1061
  @type instance: string
1062
  @param instance: the name of the instance being imported/added/etc.
1063
  @type component: string or None
1064
  @param component: the name of the component of the instance being
1065
      transferred
1066

1067
  """
1068
  # TODO: Use tempfile.mkstemp to create unique filename
1069
  if component:
1070
    assert "/" not in component
1071
    c_msg = "-%s" % component
1072
  else:
1073
    c_msg = ""
1074
  base = ("%s-%s-%s%s-%s.log" %
1075
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1076
  return utils.PathJoin(constants.LOG_OS_DIR, base)
1077

    
1078

    
1079
def InstanceOsAdd(instance, reinstall, debug):
1080
  """Add an OS to an instance.
1081

1082
  @type instance: L{objects.Instance}
1083
  @param instance: Instance whose OS is to be installed
1084
  @type reinstall: boolean
1085
  @param reinstall: whether this is an instance reinstall
1086
  @type debug: integer
1087
  @param debug: debug level, passed to the OS scripts
1088
  @rtype: None
1089

1090
  """
1091
  inst_os = OSFromDisk(instance.os)
1092

    
1093
  create_env = OSEnvironment(instance, inst_os, debug)
1094
  if reinstall:
1095
    create_env["INSTANCE_REINSTALL"] = "1"
1096

    
1097
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1098

    
1099
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1100
                        cwd=inst_os.path, output=logfile, reset_env=True)
1101
  if result.failed:
1102
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1103
                  " output: %s", result.cmd, result.fail_reason, logfile,
1104
                  result.output)
1105
    lines = [utils.SafeEncode(val)
1106
             for val in utils.TailFile(logfile, lines=20)]
1107
    _Fail("OS create script failed (%s), last lines in the"
1108
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1109

    
1110

    
1111
def RunRenameInstance(instance, old_name, debug):
1112
  """Run the OS rename script for an instance.
1113

1114
  @type instance: L{objects.Instance}
1115
  @param instance: Instance whose OS is to be installed
1116
  @type old_name: string
1117
  @param old_name: previous instance name
1118
  @type debug: integer
1119
  @param debug: debug level, passed to the OS scripts
1120
  @rtype: boolean
1121
  @return: the success of the operation
1122

1123
  """
1124
  inst_os = OSFromDisk(instance.os)
1125

    
1126
  rename_env = OSEnvironment(instance, inst_os, debug)
1127
  rename_env["OLD_INSTANCE_NAME"] = old_name
1128

    
1129
  logfile = _InstanceLogName("rename", instance.os,
1130
                             "%s-%s" % (old_name, instance.name), None)
1131

    
1132
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1133
                        cwd=inst_os.path, output=logfile, reset_env=True)
1134

    
1135
  if result.failed:
1136
    logging.error("os create command '%s' returned error: %s output: %s",
1137
                  result.cmd, result.fail_reason, result.output)
1138
    lines = [utils.SafeEncode(val)
1139
             for val in utils.TailFile(logfile, lines=20)]
1140
    _Fail("OS rename script failed (%s), last lines in the"
1141
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1142

    
1143

    
1144
def _GetBlockDevSymlinkPath(instance_name, idx):
1145
  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1146
                        (instance_name, constants.DISK_SEPARATOR, idx))
1147

    
1148

    
1149
def _SymlinkBlockDev(instance_name, device_path, idx):
1150
  """Set up symlinks to a instance's block device.
1151

1152
  This is an auxiliary function run when an instance is start (on the primary
1153
  node) or when an instance is migrated (on the target node).
1154

1155

1156
  @param instance_name: the name of the target instance
1157
  @param device_path: path of the physical block device, on the node
1158
  @param idx: the disk index
1159
  @return: absolute path to the disk's symlink
1160

1161
  """
1162
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1163
  try:
1164
    os.symlink(device_path, link_name)
1165
  except OSError, err:
1166
    if err.errno == errno.EEXIST:
1167
      if (not os.path.islink(link_name) or
1168
          os.readlink(link_name) != device_path):
1169
        os.remove(link_name)
1170
        os.symlink(device_path, link_name)
1171
    else:
1172
      raise
1173

    
1174
  return link_name
1175

    
1176

    
1177
def _RemoveBlockDevLinks(instance_name, disks):
1178
  """Remove the block device symlinks belonging to the given instance.
1179

1180
  """
1181
  for idx, _ in enumerate(disks):
1182
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1183
    if os.path.islink(link_name):
1184
      try:
1185
        os.remove(link_name)
1186
      except OSError:
1187
        logging.exception("Can't remove symlink '%s'", link_name)
1188

    
1189

    
1190
def _GatherAndLinkBlockDevs(instance):
1191
  """Set up an instance's block device(s).
1192

1193
  This is run on the primary node at instance startup. The block
1194
  devices must be already assembled.
1195

1196
  @type instance: L{objects.Instance}
1197
  @param instance: the instance whose disks we shoul assemble
1198
  @rtype: list
1199
  @return: list of (disk_object, device_path)
1200

1201
  """
1202
  block_devices = []
1203
  for idx, disk in enumerate(instance.disks):
1204
    device = _RecursiveFindBD(disk)
1205
    if device is None:
1206
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1207
                                    str(disk))
1208
    device.Open()
1209
    try:
1210
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1211
    except OSError, e:
1212
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1213
                                    e.strerror)
1214

    
1215
    block_devices.append((disk, link_name))
1216

    
1217
  return block_devices
1218

    
1219

    
1220
def StartInstance(instance, startup_paused):
1221
  """Start an instance.
1222

1223
  @type instance: L{objects.Instance}
1224
  @param instance: the instance object
1225
  @type startup_paused: bool
1226
  @param instance: pause instance at startup?
1227
  @rtype: None
1228

1229
  """
1230
  running_instances = GetInstanceList([instance.hypervisor])
1231

    
1232
  if instance.name in running_instances:
1233
    logging.info("Instance %s already running, not starting", instance.name)
1234
    return
1235

    
1236
  try:
1237
    block_devices = _GatherAndLinkBlockDevs(instance)
1238
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1239
    hyper.StartInstance(instance, block_devices, startup_paused)
1240
  except errors.BlockDeviceError, err:
1241
    _Fail("Block device error: %s", err, exc=True)
1242
  except errors.HypervisorError, err:
1243
    _RemoveBlockDevLinks(instance.name, instance.disks)
1244
    _Fail("Hypervisor error: %s", err, exc=True)
1245

    
1246

    
1247
def InstanceShutdown(instance, timeout):
1248
  """Shut an instance down.
1249

1250
  @note: this functions uses polling with a hardcoded timeout.
1251

1252
  @type instance: L{objects.Instance}
1253
  @param instance: the instance object
1254
  @type timeout: integer
1255
  @param timeout: maximum timeout for soft shutdown
1256
  @rtype: None
1257

1258
  """
1259
  hv_name = instance.hypervisor
1260
  hyper = hypervisor.GetHypervisor(hv_name)
1261
  iname = instance.name
1262

    
1263
  if instance.name not in hyper.ListInstances():
1264
    logging.info("Instance %s not running, doing nothing", iname)
1265
    return
1266

    
1267
  class _TryShutdown:
1268
    def __init__(self):
1269
      self.tried_once = False
1270

    
1271
    def __call__(self):
1272
      if iname not in hyper.ListInstances():
1273
        return
1274

    
1275
      try:
1276
        hyper.StopInstance(instance, retry=self.tried_once)
1277
      except errors.HypervisorError, err:
1278
        if iname not in hyper.ListInstances():
1279
          # if the instance is no longer existing, consider this a
1280
          # success and go to cleanup
1281
          return
1282

    
1283
        _Fail("Failed to stop instance %s: %s", iname, err)
1284

    
1285
      self.tried_once = True
1286

    
1287
      raise utils.RetryAgain()
1288

    
1289
  try:
1290
    utils.Retry(_TryShutdown(), 5, timeout)
1291
  except utils.RetryTimeout:
1292
    # the shutdown did not succeed
1293
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1294

    
1295
    try:
1296
      hyper.StopInstance(instance, force=True)
1297
    except errors.HypervisorError, err:
1298
      if iname in hyper.ListInstances():
1299
        # only raise an error if the instance still exists, otherwise
1300
        # the error could simply be "instance ... unknown"!
1301
        _Fail("Failed to force stop instance %s: %s", iname, err)
1302

    
1303
    time.sleep(1)
1304

    
1305
    if iname in hyper.ListInstances():
1306
      _Fail("Could not shutdown instance %s even by destroy", iname)
1307

    
1308
  try:
1309
    hyper.CleanupInstance(instance.name)
1310
  except errors.HypervisorError, err:
1311
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1312

    
1313
  _RemoveBlockDevLinks(iname, instance.disks)
1314

    
1315

    
1316
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1317
  """Reboot an instance.
1318

1319
  @type instance: L{objects.Instance}
1320
  @param instance: the instance object to reboot
1321
  @type reboot_type: str
1322
  @param reboot_type: the type of reboot, one the following
1323
    constants:
1324
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1325
        instance OS, do not recreate the VM
1326
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1327
        restart the VM (at the hypervisor level)
1328
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1329
        not accepted here, since that mode is handled differently, in
1330
        cmdlib, and translates into full stop and start of the
1331
        instance (instead of a call_instance_reboot RPC)
1332
  @type shutdown_timeout: integer
1333
  @param shutdown_timeout: maximum timeout for soft shutdown
1334
  @rtype: None
1335

1336
  """
1337
  running_instances = GetInstanceList([instance.hypervisor])
1338

    
1339
  if instance.name not in running_instances:
1340
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1341

    
1342
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1343
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1344
    try:
1345
      hyper.RebootInstance(instance)
1346
    except errors.HypervisorError, err:
1347
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1348
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1349
    try:
1350
      InstanceShutdown(instance, shutdown_timeout)
1351
      return StartInstance(instance, False)
1352
    except errors.HypervisorError, err:
1353
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1354
  else:
1355
    _Fail("Invalid reboot_type received: %s", reboot_type)
1356

    
1357

    
1358
def MigrationInfo(instance):
1359
  """Gather information about an instance to be migrated.
1360

1361
  @type instance: L{objects.Instance}
1362
  @param instance: the instance definition
1363

1364
  """
1365
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1366
  try:
1367
    info = hyper.MigrationInfo(instance)
1368
  except errors.HypervisorError, err:
1369
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1370
  return info
1371

    
1372

    
1373
def AcceptInstance(instance, info, target):
1374
  """Prepare the node to accept an instance.
1375

1376
  @type instance: L{objects.Instance}
1377
  @param instance: the instance definition
1378
  @type info: string/data (opaque)
1379
  @param info: migration information, from the source node
1380
  @type target: string
1381
  @param target: target host (usually ip), on this node
1382

1383
  """
1384
  # TODO: why is this required only for DTS_EXT_MIRROR?
1385
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1386
    # Create the symlinks, as the disks are not active
1387
    # in any way
1388
    try:
1389
      _GatherAndLinkBlockDevs(instance)
1390
    except errors.BlockDeviceError, err:
1391
      _Fail("Block device error: %s", err, exc=True)
1392

    
1393
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1394
  try:
1395
    hyper.AcceptInstance(instance, info, target)
1396
  except errors.HypervisorError, err:
1397
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1398
      _RemoveBlockDevLinks(instance.name, instance.disks)
1399
    _Fail("Failed to accept instance: %s", err, exc=True)
1400

    
1401

    
1402
def FinalizeMigrationDst(instance, info, success):
1403
  """Finalize any preparation to accept an instance.
1404

1405
  @type instance: L{objects.Instance}
1406
  @param instance: the instance definition
1407
  @type info: string/data (opaque)
1408
  @param info: migration information, from the source node
1409
  @type success: boolean
1410
  @param success: whether the migration was a success or a failure
1411

1412
  """
1413
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1414
  try:
1415
    hyper.FinalizeMigrationDst(instance, info, success)
1416
  except errors.HypervisorError, err:
1417
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1418

    
1419

    
1420
def MigrateInstance(instance, target, live):
1421
  """Migrates an instance to another node.
1422

1423
  @type instance: L{objects.Instance}
1424
  @param instance: the instance definition
1425
  @type target: string
1426
  @param target: the target node name
1427
  @type live: boolean
1428
  @param live: whether the migration should be done live or not (the
1429
      interpretation of this parameter is left to the hypervisor)
1430
  @raise RPCFail: if migration fails for some reason
1431

1432
  """
1433
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1434

    
1435
  try:
1436
    hyper.MigrateInstance(instance, target, live)
1437
  except errors.HypervisorError, err:
1438
    _Fail("Failed to migrate instance: %s", err, exc=True)
1439

    
1440

    
1441
def FinalizeMigrationSource(instance, success, live):
1442
  """Finalize the instance migration on the source node.
1443

1444
  @type instance: L{objects.Instance}
1445
  @param instance: the instance definition of the migrated instance
1446
  @type success: bool
1447
  @param success: whether the migration succeeded or not
1448
  @type live: bool
1449
  @param live: whether the user requested a live migration or not
1450
  @raise RPCFail: If the execution fails for some reason
1451

1452
  """
1453
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1454

    
1455
  try:
1456
    hyper.FinalizeMigrationSource(instance, success, live)
1457
  except Exception, err:  # pylint: disable=W0703
1458
    _Fail("Failed to finalize the migration on the source node: %s", err,
1459
          exc=True)
1460

    
1461

    
1462
def GetMigrationStatus(instance):
1463
  """Get the migration status
1464

1465
  @type instance: L{objects.Instance}
1466
  @param instance: the instance that is being migrated
1467
  @rtype: L{objects.MigrationStatus}
1468
  @return: the status of the current migration (one of
1469
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1470
           progress info that can be retrieved from the hypervisor
1471
  @raise RPCFail: If the migration status cannot be retrieved
1472

1473
  """
1474
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1475
  try:
1476
    return hyper.GetMigrationStatus(instance)
1477
  except Exception, err:  # pylint: disable=W0703
1478
    _Fail("Failed to get migration status: %s", err, exc=True)
1479

    
1480

    
1481
def BlockdevCreate(disk, size, owner, on_primary, info):
1482
  """Creates a block device for an instance.
1483

1484
  @type disk: L{objects.Disk}
1485
  @param disk: the object describing the disk we should create
1486
  @type size: int
1487
  @param size: the size of the physical underlying device, in MiB
1488
  @type owner: str
1489
  @param owner: the name of the instance for which disk is created,
1490
      used for device cache data
1491
  @type on_primary: boolean
1492
  @param on_primary:  indicates if it is the primary node or not
1493
  @type info: string
1494
  @param info: string that will be sent to the physical device
1495
      creation, used for example to set (LVM) tags on LVs
1496

1497
  @return: the new unique_id of the device (this can sometime be
1498
      computed only after creation), or None. On secondary nodes,
1499
      it's not required to return anything.
1500

1501
  """
1502
  # TODO: remove the obsolete "size" argument
1503
  # pylint: disable=W0613
1504
  clist = []
1505
  if disk.children:
1506
    for child in disk.children:
1507
      try:
1508
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1509
      except errors.BlockDeviceError, err:
1510
        _Fail("Can't assemble device %s: %s", child, err)
1511
      if on_primary or disk.AssembleOnSecondary():
1512
        # we need the children open in case the device itself has to
1513
        # be assembled
1514
        try:
1515
          # pylint: disable=E1103
1516
          crdev.Open()
1517
        except errors.BlockDeviceError, err:
1518
          _Fail("Can't make child '%s' read-write: %s", child, err)
1519
      clist.append(crdev)
1520

    
1521
  try:
1522
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1523
  except errors.BlockDeviceError, err:
1524
    _Fail("Can't create block device: %s", err)
1525

    
1526
  if on_primary or disk.AssembleOnSecondary():
1527
    try:
1528
      device.Assemble()
1529
    except errors.BlockDeviceError, err:
1530
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1531
    device.SetSyncSpeed(constants.SYNC_SPEED)
1532
    if on_primary or disk.OpenOnSecondary():
1533
      try:
1534
        device.Open(force=True)
1535
      except errors.BlockDeviceError, err:
1536
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1537
    DevCacheManager.UpdateCache(device.dev_path, owner,
1538
                                on_primary, disk.iv_name)
1539

    
1540
  device.SetInfo(info)
1541

    
1542
  return device.unique_id
1543

    
1544

    
1545
def _WipeDevice(path, offset, size):
1546
  """This function actually wipes the device.
1547

1548
  @param path: The path to the device to wipe
1549
  @param offset: The offset in MiB in the file
1550
  @param size: The size in MiB to write
1551

1552
  """
1553
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1554
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1555
         "count=%d" % size]
1556
  result = utils.RunCmd(cmd)
1557

    
1558
  if result.failed:
1559
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1560
          result.fail_reason, result.output)
1561

    
1562

    
1563
def BlockdevWipe(disk, offset, size):
1564
  """Wipes a block device.
1565

1566
  @type disk: L{objects.Disk}
1567
  @param disk: the disk object we want to wipe
1568
  @type offset: int
1569
  @param offset: The offset in MiB in the file
1570
  @type size: int
1571
  @param size: The size in MiB to write
1572

1573
  """
1574
  try:
1575
    rdev = _RecursiveFindBD(disk)
1576
  except errors.BlockDeviceError:
1577
    rdev = None
1578

    
1579
  if not rdev:
1580
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1581

    
1582
  # Do cross verify some of the parameters
1583
  if offset > rdev.size:
1584
    _Fail("Offset is bigger than device size")
1585
  if (offset + size) > rdev.size:
1586
    _Fail("The provided offset and size to wipe is bigger than device size")
1587

    
1588
  _WipeDevice(rdev.dev_path, offset, size)
1589

    
1590

    
1591
def BlockdevPauseResumeSync(disks, pause):
1592
  """Pause or resume the sync of the block device.
1593

1594
  @type disks: list of L{objects.Disk}
1595
  @param disks: the disks object we want to pause/resume
1596
  @type pause: bool
1597
  @param pause: Wheater to pause or resume
1598

1599
  """
1600
  success = []
1601
  for disk in disks:
1602
    try:
1603
      rdev = _RecursiveFindBD(disk)
1604
    except errors.BlockDeviceError:
1605
      rdev = None
1606

    
1607
    if not rdev:
1608
      success.append((False, ("Cannot change sync for device %s:"
1609
                              " device not found" % disk.iv_name)))
1610
      continue
1611

    
1612
    result = rdev.PauseResumeSync(pause)
1613

    
1614
    if result:
1615
      success.append((result, None))
1616
    else:
1617
      if pause:
1618
        msg = "Pause"
1619
      else:
1620
        msg = "Resume"
1621
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1622

    
1623
  return success
1624

    
1625

    
1626
def BlockdevRemove(disk):
1627
  """Remove a block device.
1628

1629
  @note: This is intended to be called recursively.
1630

1631
  @type disk: L{objects.Disk}
1632
  @param disk: the disk object we should remove
1633
  @rtype: boolean
1634
  @return: the success of the operation
1635

1636
  """
1637
  msgs = []
1638
  try:
1639
    rdev = _RecursiveFindBD(disk)
1640
  except errors.BlockDeviceError, err:
1641
    # probably can't attach
1642
    logging.info("Can't attach to device %s in remove", disk)
1643
    rdev = None
1644
  if rdev is not None:
1645
    r_path = rdev.dev_path
1646
    try:
1647
      rdev.Remove()
1648
    except errors.BlockDeviceError, err:
1649
      msgs.append(str(err))
1650
    if not msgs:
1651
      DevCacheManager.RemoveCache(r_path)
1652

    
1653
  if disk.children:
1654
    for child in disk.children:
1655
      try:
1656
        BlockdevRemove(child)
1657
      except RPCFail, err:
1658
        msgs.append(str(err))
1659

    
1660
  if msgs:
1661
    _Fail("; ".join(msgs))
1662

    
1663

    
1664
def _RecursiveAssembleBD(disk, owner, as_primary):
1665
  """Activate a block device for an instance.
1666

1667
  This is run on the primary and secondary nodes for an instance.
1668

1669
  @note: this function is called recursively.
1670

1671
  @type disk: L{objects.Disk}
1672
  @param disk: the disk we try to assemble
1673
  @type owner: str
1674
  @param owner: the name of the instance which owns the disk
1675
  @type as_primary: boolean
1676
  @param as_primary: if we should make the block device
1677
      read/write
1678

1679
  @return: the assembled device or None (in case no device
1680
      was assembled)
1681
  @raise errors.BlockDeviceError: in case there is an error
1682
      during the activation of the children or the device
1683
      itself
1684

1685
  """
1686
  children = []
1687
  if disk.children:
1688
    mcn = disk.ChildrenNeeded()
1689
    if mcn == -1:
1690
      mcn = 0 # max number of Nones allowed
1691
    else:
1692
      mcn = len(disk.children) - mcn # max number of Nones
1693
    for chld_disk in disk.children:
1694
      try:
1695
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1696
      except errors.BlockDeviceError, err:
1697
        if children.count(None) >= mcn:
1698
          raise
1699
        cdev = None
1700
        logging.error("Error in child activation (but continuing): %s",
1701
                      str(err))
1702
      children.append(cdev)
1703

    
1704
  if as_primary or disk.AssembleOnSecondary():
1705
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1706
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1707
    result = r_dev
1708
    if as_primary or disk.OpenOnSecondary():
1709
      r_dev.Open()
1710
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1711
                                as_primary, disk.iv_name)
1712

    
1713
  else:
1714
    result = True
1715
  return result
1716

    
1717

    
1718
def BlockdevAssemble(disk, owner, as_primary, idx):
1719
  """Activate a block device for an instance.
1720

1721
  This is a wrapper over _RecursiveAssembleBD.
1722

1723
  @rtype: str or boolean
1724
  @return: a C{/dev/...} path for primary nodes, and
1725
      C{True} for secondary nodes
1726

1727
  """
1728
  try:
1729
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1730
    if isinstance(result, bdev.BlockDev):
1731
      # pylint: disable=E1103
1732
      result = result.dev_path
1733
      if as_primary:
1734
        _SymlinkBlockDev(owner, result, idx)
1735
  except errors.BlockDeviceError, err:
1736
    _Fail("Error while assembling disk: %s", err, exc=True)
1737
  except OSError, err:
1738
    _Fail("Error while symlinking disk: %s", err, exc=True)
1739

    
1740
  return result
1741

    
1742

    
1743
def BlockdevShutdown(disk):
1744
  """Shut down a block device.
1745

1746
  First, if the device is assembled (Attach() is successful), then
1747
  the device is shutdown. Then the children of the device are
1748
  shutdown.
1749

1750
  This function is called recursively. Note that we don't cache the
1751
  children or such, as oppossed to assemble, shutdown of different
1752
  devices doesn't require that the upper device was active.
1753

1754
  @type disk: L{objects.Disk}
1755
  @param disk: the description of the disk we should
1756
      shutdown
1757
  @rtype: None
1758

1759
  """
1760
  msgs = []
1761
  r_dev = _RecursiveFindBD(disk)
1762
  if r_dev is not None:
1763
    r_path = r_dev.dev_path
1764
    try:
1765
      r_dev.Shutdown()
1766
      DevCacheManager.RemoveCache(r_path)
1767
    except errors.BlockDeviceError, err:
1768
      msgs.append(str(err))
1769

    
1770
  if disk.children:
1771
    for child in disk.children:
1772
      try:
1773
        BlockdevShutdown(child)
1774
      except RPCFail, err:
1775
        msgs.append(str(err))
1776

    
1777
  if msgs:
1778
    _Fail("; ".join(msgs))
1779

    
1780

    
1781
def BlockdevAddchildren(parent_cdev, new_cdevs):
1782
  """Extend a mirrored block device.
1783

1784
  @type parent_cdev: L{objects.Disk}
1785
  @param parent_cdev: the disk to which we should add children
1786
  @type new_cdevs: list of L{objects.Disk}
1787
  @param new_cdevs: the list of children which we should add
1788
  @rtype: None
1789

1790
  """
1791
  parent_bdev = _RecursiveFindBD(parent_cdev)
1792
  if parent_bdev is None:
1793
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1794
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1795
  if new_bdevs.count(None) > 0:
1796
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1797
  parent_bdev.AddChildren(new_bdevs)
1798

    
1799

    
1800
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1801
  """Shrink a mirrored block device.
1802

1803
  @type parent_cdev: L{objects.Disk}
1804
  @param parent_cdev: the disk from which we should remove children
1805
  @type new_cdevs: list of L{objects.Disk}
1806
  @param new_cdevs: the list of children which we should remove
1807
  @rtype: None
1808

1809
  """
1810
  parent_bdev = _RecursiveFindBD(parent_cdev)
1811
  if parent_bdev is None:
1812
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1813
  devs = []
1814
  for disk in new_cdevs:
1815
    rpath = disk.StaticDevPath()
1816
    if rpath is None:
1817
      bd = _RecursiveFindBD(disk)
1818
      if bd is None:
1819
        _Fail("Can't find device %s while removing children", disk)
1820
      else:
1821
        devs.append(bd.dev_path)
1822
    else:
1823
      if not utils.IsNormAbsPath(rpath):
1824
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1825
      devs.append(rpath)
1826
  parent_bdev.RemoveChildren(devs)
1827

    
1828

    
1829
def BlockdevGetmirrorstatus(disks):
1830
  """Get the mirroring status of a list of devices.
1831

1832
  @type disks: list of L{objects.Disk}
1833
  @param disks: the list of disks which we should query
1834
  @rtype: disk
1835
  @return: List of L{objects.BlockDevStatus}, one for each disk
1836
  @raise errors.BlockDeviceError: if any of the disks cannot be
1837
      found
1838

1839
  """
1840
  stats = []
1841
  for dsk in disks:
1842
    rbd = _RecursiveFindBD(dsk)
1843
    if rbd is None:
1844
      _Fail("Can't find device %s", dsk)
1845

    
1846
    stats.append(rbd.CombinedSyncStatus())
1847

    
1848
  return stats
1849

    
1850

    
1851
def BlockdevGetmirrorstatusMulti(disks):
1852
  """Get the mirroring status of a list of devices.
1853

1854
  @type disks: list of L{objects.Disk}
1855
  @param disks: the list of disks which we should query
1856
  @rtype: disk
1857
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1858
    success/failure, status is L{objects.BlockDevStatus} on success, string
1859
    otherwise
1860

1861
  """
1862
  result = []
1863
  for disk in disks:
1864
    try:
1865
      rbd = _RecursiveFindBD(disk)
1866
      if rbd is None:
1867
        result.append((False, "Can't find device %s" % disk))
1868
        continue
1869

    
1870
      status = rbd.CombinedSyncStatus()
1871
    except errors.BlockDeviceError, err:
1872
      logging.exception("Error while getting disk status")
1873
      result.append((False, str(err)))
1874
    else:
1875
      result.append((True, status))
1876

    
1877
  assert len(disks) == len(result)
1878

    
1879
  return result
1880

    
1881

    
1882
def _RecursiveFindBD(disk):
1883
  """Check if a device is activated.
1884

1885
  If so, return information about the real device.
1886

1887
  @type disk: L{objects.Disk}
1888
  @param disk: the disk object we need to find
1889

1890
  @return: None if the device can't be found,
1891
      otherwise the device instance
1892

1893
  """
1894
  children = []
1895
  if disk.children:
1896
    for chdisk in disk.children:
1897
      children.append(_RecursiveFindBD(chdisk))
1898

    
1899
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1900

    
1901

    
1902
def _OpenRealBD(disk):
1903
  """Opens the underlying block device of a disk.
1904

1905
  @type disk: L{objects.Disk}
1906
  @param disk: the disk object we want to open
1907

1908
  """
1909
  real_disk = _RecursiveFindBD(disk)
1910
  if real_disk is None:
1911
    _Fail("Block device '%s' is not set up", disk)
1912

    
1913
  real_disk.Open()
1914

    
1915
  return real_disk
1916

    
1917

    
1918
def BlockdevFind(disk):
1919
  """Check if a device is activated.
1920

1921
  If it is, return information about the real device.
1922

1923
  @type disk: L{objects.Disk}
1924
  @param disk: the disk to find
1925
  @rtype: None or objects.BlockDevStatus
1926
  @return: None if the disk cannot be found, otherwise a the current
1927
           information
1928

1929
  """
1930
  try:
1931
    rbd = _RecursiveFindBD(disk)
1932
  except errors.BlockDeviceError, err:
1933
    _Fail("Failed to find device: %s", err, exc=True)
1934

    
1935
  if rbd is None:
1936
    return None
1937

    
1938
  return rbd.GetSyncStatus()
1939

    
1940

    
1941
def BlockdevGetsize(disks):
1942
  """Computes the size of the given disks.
1943

1944
  If a disk is not found, returns None instead.
1945

1946
  @type disks: list of L{objects.Disk}
1947
  @param disks: the list of disk to compute the size for
1948
  @rtype: list
1949
  @return: list with elements None if the disk cannot be found,
1950
      otherwise the size
1951

1952
  """
1953
  result = []
1954
  for cf in disks:
1955
    try:
1956
      rbd = _RecursiveFindBD(cf)
1957
    except errors.BlockDeviceError:
1958
      result.append(None)
1959
      continue
1960
    if rbd is None:
1961
      result.append(None)
1962
    else:
1963
      result.append(rbd.GetActualSize())
1964
  return result
1965

    
1966

    
1967
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1968
  """Export a block device to a remote node.
1969

1970
  @type disk: L{objects.Disk}
1971
  @param disk: the description of the disk to export
1972
  @type dest_node: str
1973
  @param dest_node: the destination node to export to
1974
  @type dest_path: str
1975
  @param dest_path: the destination path on the target node
1976
  @type cluster_name: str
1977
  @param cluster_name: the cluster name, needed for SSH hostalias
1978
  @rtype: None
1979

1980
  """
1981
  real_disk = _OpenRealBD(disk)
1982

    
1983
  # the block size on the read dd is 1MiB to match our units
1984
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1985
                               "dd if=%s bs=1048576 count=%s",
1986
                               real_disk.dev_path, str(disk.size))
1987

    
1988
  # we set here a smaller block size as, due to ssh buffering, more
1989
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1990
  # device is not already there or we pass a wrong path; we use
1991
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1992
  # to not buffer too much memory; this means that at best, we flush
1993
  # every 64k, which will not be very fast
1994
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1995
                                " oflag=dsync", dest_path)
1996

    
1997
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1998
                                                   constants.GANETI_RUNAS,
1999
                                                   destcmd)
2000

    
2001
  # all commands have been checked, so we're safe to combine them
2002
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2003

    
2004
  result = utils.RunCmd(["bash", "-c", command])
2005

    
2006
  if result.failed:
2007
    _Fail("Disk copy command '%s' returned error: %s"
2008
          " output: %s", command, result.fail_reason, result.output)
2009

    
2010

    
2011
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2012
  """Write a file to the filesystem.
2013

2014
  This allows the master to overwrite(!) a file. It will only perform
2015
  the operation if the file belongs to a list of configuration files.
2016

2017
  @type file_name: str
2018
  @param file_name: the target file name
2019
  @type data: str
2020
  @param data: the new contents of the file
2021
  @type mode: int
2022
  @param mode: the mode to give the file (can be None)
2023
  @type uid: string
2024
  @param uid: the owner of the file
2025
  @type gid: string
2026
  @param gid: the group of the file
2027
  @type atime: float
2028
  @param atime: the atime to set on the file (can be None)
2029
  @type mtime: float
2030
  @param mtime: the mtime to set on the file (can be None)
2031
  @rtype: None
2032

2033
  """
2034
  if not os.path.isabs(file_name):
2035
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2036

    
2037
  if file_name not in _ALLOWED_UPLOAD_FILES:
2038
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2039
          file_name)
2040

    
2041
  raw_data = _Decompress(data)
2042

    
2043
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2044
    _Fail("Invalid username/groupname type")
2045

    
2046
  getents = runtime.GetEnts()
2047
  uid = getents.LookupUser(uid)
2048
  gid = getents.LookupGroup(gid)
2049

    
2050
  utils.SafeWriteFile(file_name, None,
2051
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2052
                      atime=atime, mtime=mtime)
2053

    
2054

    
2055
def RunOob(oob_program, command, node, timeout):
2056
  """Executes oob_program with given command on given node.
2057

2058
  @param oob_program: The path to the executable oob_program
2059
  @param command: The command to invoke on oob_program
2060
  @param node: The node given as an argument to the program
2061
  @param timeout: Timeout after which we kill the oob program
2062

2063
  @return: stdout
2064
  @raise RPCFail: If execution fails for some reason
2065

2066
  """
2067
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2068

    
2069
  if result.failed:
2070
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2071
          result.fail_reason, result.output)
2072

    
2073
  return result.stdout
2074

    
2075

    
2076
def WriteSsconfFiles(values):
2077
  """Update all ssconf files.
2078

2079
  Wrapper around the SimpleStore.WriteFiles.
2080

2081
  """
2082
  ssconf.SimpleStore().WriteFiles(values)
2083

    
2084

    
2085
def _ErrnoOrStr(err):
2086
  """Format an EnvironmentError exception.
2087

2088
  If the L{err} argument has an errno attribute, it will be looked up
2089
  and converted into a textual C{E...} description. Otherwise the
2090
  string representation of the error will be returned.
2091

2092
  @type err: L{EnvironmentError}
2093
  @param err: the exception to format
2094

2095
  """
2096
  if hasattr(err, "errno"):
2097
    detail = errno.errorcode[err.errno]
2098
  else:
2099
    detail = str(err)
2100
  return detail
2101

    
2102

    
2103
def _OSOndiskAPIVersion(os_dir):
2104
  """Compute and return the API version of a given OS.
2105

2106
  This function will try to read the API version of the OS residing in
2107
  the 'os_dir' directory.
2108

2109
  @type os_dir: str
2110
  @param os_dir: the directory in which we should look for the OS
2111
  @rtype: tuple
2112
  @return: tuple (status, data) with status denoting the validity and
2113
      data holding either the vaid versions or an error message
2114

2115
  """
2116
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2117

    
2118
  try:
2119
    st = os.stat(api_file)
2120
  except EnvironmentError, err:
2121
    return False, ("Required file '%s' not found under path %s: %s" %
2122
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2123

    
2124
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2125
    return False, ("File '%s' in %s is not a regular file" %
2126
                   (constants.OS_API_FILE, os_dir))
2127

    
2128
  try:
2129
    api_versions = utils.ReadFile(api_file).splitlines()
2130
  except EnvironmentError, err:
2131
    return False, ("Error while reading the API version file at %s: %s" %
2132
                   (api_file, _ErrnoOrStr(err)))
2133

    
2134
  try:
2135
    api_versions = [int(version.strip()) for version in api_versions]
2136
  except (TypeError, ValueError), err:
2137
    return False, ("API version(s) can't be converted to integer: %s" %
2138
                   str(err))
2139

    
2140
  return True, api_versions
2141

    
2142

    
2143
def DiagnoseOS(top_dirs=None):
2144
  """Compute the validity for all OSes.
2145

2146
  @type top_dirs: list
2147
  @param top_dirs: the list of directories in which to
2148
      search (if not given defaults to
2149
      L{constants.OS_SEARCH_PATH})
2150
  @rtype: list of L{objects.OS}
2151
  @return: a list of tuples (name, path, status, diagnose, variants,
2152
      parameters, api_version) for all (potential) OSes under all
2153
      search paths, where:
2154
          - name is the (potential) OS name
2155
          - path is the full path to the OS
2156
          - status True/False is the validity of the OS
2157
          - diagnose is the error message for an invalid OS, otherwise empty
2158
          - variants is a list of supported OS variants, if any
2159
          - parameters is a list of (name, help) parameters, if any
2160
          - api_version is a list of support OS API versions
2161

2162
  """
2163
  if top_dirs is None:
2164
    top_dirs = constants.OS_SEARCH_PATH
2165

    
2166
  result = []
2167
  for dir_name in top_dirs:
2168
    if os.path.isdir(dir_name):
2169
      try:
2170
        f_names = utils.ListVisibleFiles(dir_name)
2171
      except EnvironmentError, err:
2172
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2173
        break
2174
      for name in f_names:
2175
        os_path = utils.PathJoin(dir_name, name)
2176
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2177
        if status:
2178
          diagnose = ""
2179
          variants = os_inst.supported_variants
2180
          parameters = os_inst.supported_parameters
2181
          api_versions = os_inst.api_versions
2182
        else:
2183
          diagnose = os_inst
2184
          variants = parameters = api_versions = []
2185
        result.append((name, os_path, status, diagnose, variants,
2186
                       parameters, api_versions))
2187

    
2188
  return result
2189

    
2190

    
2191
def _TryOSFromDisk(name, base_dir=None):
2192
  """Create an OS instance from disk.
2193

2194
  This function will return an OS instance if the given name is a
2195
  valid OS name.
2196

2197
  @type base_dir: string
2198
  @keyword base_dir: Base directory containing OS installations.
2199
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2200
  @rtype: tuple
2201
  @return: success and either the OS instance if we find a valid one,
2202
      or error message
2203

2204
  """
2205
  if base_dir is None:
2206
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2207
  else:
2208
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2209

    
2210
  if os_dir is None:
2211
    return False, "Directory for OS %s not found in search path" % name
2212

    
2213
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2214
  if not status:
2215
    # push the error up
2216
    return status, api_versions
2217

    
2218
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2219
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2220
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2221

    
2222
  # OS Files dictionary, we will populate it with the absolute path
2223
  # names; if the value is True, then it is a required file, otherwise
2224
  # an optional one
2225
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2226

    
2227
  if max(api_versions) >= constants.OS_API_V15:
2228
    os_files[constants.OS_VARIANTS_FILE] = False
2229

    
2230
  if max(api_versions) >= constants.OS_API_V20:
2231
    os_files[constants.OS_PARAMETERS_FILE] = True
2232
  else:
2233
    del os_files[constants.OS_SCRIPT_VERIFY]
2234

    
2235
  for (filename, required) in os_files.items():
2236
    os_files[filename] = utils.PathJoin(os_dir, filename)
2237

    
2238
    try:
2239
      st = os.stat(os_files[filename])
2240
    except EnvironmentError, err:
2241
      if err.errno == errno.ENOENT and not required:
2242
        del os_files[filename]
2243
        continue
2244
      return False, ("File '%s' under path '%s' is missing (%s)" %
2245
                     (filename, os_dir, _ErrnoOrStr(err)))
2246

    
2247
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2248
      return False, ("File '%s' under path '%s' is not a regular file" %
2249
                     (filename, os_dir))
2250

    
2251
    if filename in constants.OS_SCRIPTS:
2252
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2253
        return False, ("File '%s' under path '%s' is not executable" %
2254
                       (filename, os_dir))
2255

    
2256
  variants = []
2257
  if constants.OS_VARIANTS_FILE in os_files:
2258
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2259
    try:
2260
      variants = utils.ReadFile(variants_file).splitlines()
2261
    except EnvironmentError, err:
2262
      # we accept missing files, but not other errors
2263
      if err.errno != errno.ENOENT:
2264
        return False, ("Error while reading the OS variants file at %s: %s" %
2265
                       (variants_file, _ErrnoOrStr(err)))
2266

    
2267
  parameters = []
2268
  if constants.OS_PARAMETERS_FILE in os_files:
2269
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2270
    try:
2271
      parameters = utils.ReadFile(parameters_file).splitlines()
2272
    except EnvironmentError, err:
2273
      return False, ("Error while reading the OS parameters file at %s: %s" %
2274
                     (parameters_file, _ErrnoOrStr(err)))
2275
    parameters = [v.split(None, 1) for v in parameters]
2276

    
2277
  os_obj = objects.OS(name=name, path=os_dir,
2278
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2279
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2280
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2281
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2282
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2283
                                                 None),
2284
                      supported_variants=variants,
2285
                      supported_parameters=parameters,
2286
                      api_versions=api_versions)
2287
  return True, os_obj
2288

    
2289

    
2290
def OSFromDisk(name, base_dir=None):
2291
  """Create an OS instance from disk.
2292

2293
  This function will return an OS instance if the given name is a
2294
  valid OS name. Otherwise, it will raise an appropriate
2295
  L{RPCFail} exception, detailing why this is not a valid OS.
2296

2297
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2298
  an exception but returns true/false status data.
2299

2300
  @type base_dir: string
2301
  @keyword base_dir: Base directory containing OS installations.
2302
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2303
  @rtype: L{objects.OS}
2304
  @return: the OS instance if we find a valid one
2305
  @raise RPCFail: if we don't find a valid OS
2306

2307
  """
2308
  name_only = objects.OS.GetName(name)
2309
  status, payload = _TryOSFromDisk(name_only, base_dir)
2310

    
2311
  if not status:
2312
    _Fail(payload)
2313

    
2314
  return payload
2315

    
2316

    
2317
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2318
  """Calculate the basic environment for an os script.
2319

2320
  @type os_name: str
2321
  @param os_name: full operating system name (including variant)
2322
  @type inst_os: L{objects.OS}
2323
  @param inst_os: operating system for which the environment is being built
2324
  @type os_params: dict
2325
  @param os_params: the OS parameters
2326
  @type debug: integer
2327
  @param debug: debug level (0 or 1, for OS Api 10)
2328
  @rtype: dict
2329
  @return: dict of environment variables
2330
  @raise errors.BlockDeviceError: if the block device
2331
      cannot be found
2332

2333
  """
2334
  result = {}
2335
  api_version = \
2336
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2337
  result["OS_API_VERSION"] = "%d" % api_version
2338
  result["OS_NAME"] = inst_os.name
2339
  result["DEBUG_LEVEL"] = "%d" % debug
2340

    
2341
  # OS variants
2342
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2343
    variant = objects.OS.GetVariant(os_name)
2344
    if not variant:
2345
      variant = inst_os.supported_variants[0]
2346
  else:
2347
    variant = ""
2348
  result["OS_VARIANT"] = variant
2349

    
2350
  # OS params
2351
  for pname, pvalue in os_params.items():
2352
    result["OSP_%s" % pname.upper()] = pvalue
2353

    
2354
  return result
2355

    
2356

    
2357
def OSEnvironment(instance, inst_os, debug=0):
2358
  """Calculate the environment for an os script.
2359

2360
  @type instance: L{objects.Instance}
2361
  @param instance: target instance for the os script run
2362
  @type inst_os: L{objects.OS}
2363
  @param inst_os: operating system for which the environment is being built
2364
  @type debug: integer
2365
  @param debug: debug level (0 or 1, for OS Api 10)
2366
  @rtype: dict
2367
  @return: dict of environment variables
2368
  @raise errors.BlockDeviceError: if the block device
2369
      cannot be found
2370

2371
  """
2372
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2373

    
2374
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2375
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2376

    
2377
  result["HYPERVISOR"] = instance.hypervisor
2378
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2379
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2380
  result["INSTANCE_SECONDARY_NODES"] = \
2381
      ("%s" % " ".join(instance.secondary_nodes))
2382

    
2383
  # Disks
2384
  for idx, disk in enumerate(instance.disks):
2385
    real_disk = _OpenRealBD(disk)
2386
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2387
    result["DISK_%d_ACCESS" % idx] = disk.mode
2388
    if constants.HV_DISK_TYPE in instance.hvparams:
2389
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2390
        instance.hvparams[constants.HV_DISK_TYPE]
2391
    if disk.dev_type in constants.LDS_BLOCK:
2392
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2393
    elif disk.dev_type == constants.LD_FILE:
2394
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2395
        "file:%s" % disk.physical_id[0]
2396

    
2397
  # NICs
2398
  for idx, nic in enumerate(instance.nics):
2399
    result["NIC_%d_MAC" % idx] = nic.mac
2400
    if nic.ip:
2401
      result["NIC_%d_IP" % idx] = nic.ip
2402
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2403
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2404
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2405
    if nic.nicparams[constants.NIC_LINK]:
2406
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2407
    if constants.HV_NIC_TYPE in instance.hvparams:
2408
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2409
        instance.hvparams[constants.HV_NIC_TYPE]
2410

    
2411
  # HV/BE params
2412
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2413
    for key, value in source.items():
2414
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2415

    
2416
  return result
2417

    
2418

    
2419
def BlockdevGrow(disk, amount, dryrun):
2420
  """Grow a stack of block devices.
2421

2422
  This function is called recursively, with the childrens being the
2423
  first ones to resize.
2424

2425
  @type disk: L{objects.Disk}
2426
  @param disk: the disk to be grown
2427
  @type amount: integer
2428
  @param amount: the amount (in mebibytes) to grow with
2429
  @type dryrun: boolean
2430
  @param dryrun: whether to execute the operation in simulation mode
2431
      only, without actually increasing the size
2432
  @rtype: (status, result)
2433
  @return: a tuple with the status of the operation (True/False), and
2434
      the errors message if status is False
2435

2436
  """
2437
  r_dev = _RecursiveFindBD(disk)
2438
  if r_dev is None:
2439
    _Fail("Cannot find block device %s", disk)
2440

    
2441
  try:
2442
    r_dev.Grow(amount, dryrun)
2443
  except errors.BlockDeviceError, err:
2444
    _Fail("Failed to grow block device: %s", err, exc=True)
2445

    
2446

    
2447
def BlockdevSnapshot(disk):
2448
  """Create a snapshot copy of a block device.
2449

2450
  This function is called recursively, and the snapshot is actually created
2451
  just for the leaf lvm backend device.
2452

2453
  @type disk: L{objects.Disk}
2454
  @param disk: the disk to be snapshotted
2455
  @rtype: string
2456
  @return: snapshot disk ID as (vg, lv)
2457

2458
  """
2459
  if disk.dev_type == constants.LD_DRBD8:
2460
    if not disk.children:
2461
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2462
            disk.unique_id)
2463
    return BlockdevSnapshot(disk.children[0])
2464
  elif disk.dev_type == constants.LD_LV:
2465
    r_dev = _RecursiveFindBD(disk)
2466
    if r_dev is not None:
2467
      # FIXME: choose a saner value for the snapshot size
2468
      # let's stay on the safe side and ask for the full size, for now
2469
      return r_dev.Snapshot(disk.size)
2470
    else:
2471
      _Fail("Cannot find block device %s", disk)
2472
  else:
2473
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2474
          disk.unique_id, disk.dev_type)
2475

    
2476

    
2477
def FinalizeExport(instance, snap_disks):
2478
  """Write out the export configuration information.
2479

2480
  @type instance: L{objects.Instance}
2481
  @param instance: the instance which we export, used for
2482
      saving configuration
2483
  @type snap_disks: list of L{objects.Disk}
2484
  @param snap_disks: list of snapshot block devices, which
2485
      will be used to get the actual name of the dump file
2486

2487
  @rtype: None
2488

2489
  """
2490
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2491
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2492

    
2493
  config = objects.SerializableConfigParser()
2494

    
2495
  config.add_section(constants.INISECT_EXP)
2496
  config.set(constants.INISECT_EXP, "version", "0")
2497
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2498
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2499
  config.set(constants.INISECT_EXP, "os", instance.os)
2500
  config.set(constants.INISECT_EXP, "compression", "none")
2501

    
2502
  config.add_section(constants.INISECT_INS)
2503
  config.set(constants.INISECT_INS, "name", instance.name)
2504
  config.set(constants.INISECT_INS, "memory", "%d" %
2505
             instance.beparams[constants.BE_MEMORY])
2506
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2507
             instance.beparams[constants.BE_VCPUS])
2508
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2509
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2510
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2511

    
2512
  nic_total = 0
2513
  for nic_count, nic in enumerate(instance.nics):
2514
    nic_total += 1
2515
    config.set(constants.INISECT_INS, "nic%d_mac" %
2516
               nic_count, "%s" % nic.mac)
2517
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2518
    for param in constants.NICS_PARAMETER_TYPES:
2519
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2520
                 "%s" % nic.nicparams.get(param, None))
2521
  # TODO: redundant: on load can read nics until it doesn't exist
2522
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2523

    
2524
  disk_total = 0
2525
  for disk_count, disk in enumerate(snap_disks):
2526
    if disk:
2527
      disk_total += 1
2528
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2529
                 ("%s" % disk.iv_name))
2530
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2531
                 ("%s" % disk.physical_id[1]))
2532
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2533
                 ("%d" % disk.size))
2534

    
2535
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2536

    
2537
  # New-style hypervisor/backend parameters
2538

    
2539
  config.add_section(constants.INISECT_HYP)
2540
  for name, value in instance.hvparams.items():
2541
    if name not in constants.HVC_GLOBALS:
2542
      config.set(constants.INISECT_HYP, name, str(value))
2543

    
2544
  config.add_section(constants.INISECT_BEP)
2545
  for name, value in instance.beparams.items():
2546
    config.set(constants.INISECT_BEP, name, str(value))
2547

    
2548
  config.add_section(constants.INISECT_OSP)
2549
  for name, value in instance.osparams.items():
2550
    config.set(constants.INISECT_OSP, name, str(value))
2551

    
2552
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2553
                  data=config.Dumps())
2554
  shutil.rmtree(finaldestdir, ignore_errors=True)
2555
  shutil.move(destdir, finaldestdir)
2556

    
2557

    
2558
def ExportInfo(dest):
2559
  """Get export configuration information.
2560

2561
  @type dest: str
2562
  @param dest: directory containing the export
2563

2564
  @rtype: L{objects.SerializableConfigParser}
2565
  @return: a serializable config file containing the
2566
      export info
2567

2568
  """
2569
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2570

    
2571
  config = objects.SerializableConfigParser()
2572
  config.read(cff)
2573

    
2574
  if (not config.has_section(constants.INISECT_EXP) or
2575
      not config.has_section(constants.INISECT_INS)):
2576
    _Fail("Export info file doesn't have the required fields")
2577

    
2578
  return config.Dumps()
2579

    
2580

    
2581
def ListExports():
2582
  """Return a list of exports currently available on this machine.
2583

2584
  @rtype: list
2585
  @return: list of the exports
2586

2587
  """
2588
  if os.path.isdir(constants.EXPORT_DIR):
2589
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2590
  else:
2591
    _Fail("No exports directory")
2592

    
2593

    
2594
def RemoveExport(export):
2595
  """Remove an existing export from the node.
2596

2597
  @type export: str
2598
  @param export: the name of the export to remove
2599
  @rtype: None
2600

2601
  """
2602
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2603

    
2604
  try:
2605
    shutil.rmtree(target)
2606
  except EnvironmentError, err:
2607
    _Fail("Error while removing the export: %s", err, exc=True)
2608

    
2609

    
2610
def BlockdevRename(devlist):
2611
  """Rename a list of block devices.
2612

2613
  @type devlist: list of tuples
2614
  @param devlist: list of tuples of the form  (disk,
2615
      new_logical_id, new_physical_id); disk is an
2616
      L{objects.Disk} object describing the current disk,
2617
      and new logical_id/physical_id is the name we
2618
      rename it to
2619
  @rtype: boolean
2620
  @return: True if all renames succeeded, False otherwise
2621

2622
  """
2623
  msgs = []
2624
  result = True
2625
  for disk, unique_id in devlist:
2626
    dev = _RecursiveFindBD(disk)
2627
    if dev is None:
2628
      msgs.append("Can't find device %s in rename" % str(disk))
2629
      result = False
2630
      continue
2631
    try:
2632
      old_rpath = dev.dev_path
2633
      dev.Rename(unique_id)
2634
      new_rpath = dev.dev_path
2635
      if old_rpath != new_rpath:
2636
        DevCacheManager.RemoveCache(old_rpath)
2637
        # FIXME: we should add the new cache information here, like:
2638
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2639
        # but we don't have the owner here - maybe parse from existing
2640
        # cache? for now, we only lose lvm data when we rename, which
2641
        # is less critical than DRBD or MD
2642
    except errors.BlockDeviceError, err:
2643
      msgs.append("Can't rename device '%s' to '%s': %s" %
2644
                  (dev, unique_id, err))
2645
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2646
      result = False
2647
  if not result:
2648
    _Fail("; ".join(msgs))
2649

    
2650

    
2651
def _TransformFileStorageDir(fs_dir):
2652
  """Checks whether given file_storage_dir is valid.
2653

2654
  Checks wheter the given fs_dir is within the cluster-wide default
2655
  file_storage_dir or the shared_file_storage_dir, which are stored in
2656
  SimpleStore. Only paths under those directories are allowed.
2657

2658
  @type fs_dir: str
2659
  @param fs_dir: the path to check
2660

2661
  @return: the normalized path if valid, None otherwise
2662

2663
  """
2664
  if not constants.ENABLE_FILE_STORAGE:
2665
    _Fail("File storage disabled at configure time")
2666
  cfg = _GetConfig()
2667
  fs_dir = os.path.normpath(fs_dir)
2668
  base_fstore = cfg.GetFileStorageDir()
2669
  base_shared = cfg.GetSharedFileStorageDir()
2670
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2671
          utils.IsBelowDir(base_shared, fs_dir)):
2672
    _Fail("File storage directory '%s' is not under base file"
2673
          " storage directory '%s' or shared storage directory '%s'",
2674
          fs_dir, base_fstore, base_shared)
2675
  return fs_dir
2676

    
2677

    
2678
def CreateFileStorageDir(file_storage_dir):
2679
  """Create file storage directory.
2680

2681
  @type file_storage_dir: str
2682
  @param file_storage_dir: directory to create
2683

2684
  @rtype: tuple
2685
  @return: tuple with first element a boolean indicating wheter dir
2686
      creation was successful or not
2687

2688
  """
2689
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2690
  if os.path.exists(file_storage_dir):
2691
    if not os.path.isdir(file_storage_dir):
2692
      _Fail("Specified storage dir '%s' is not a directory",
2693
            file_storage_dir)
2694
  else:
2695
    try:
2696
      os.makedirs(file_storage_dir, 0750)
2697
    except OSError, err:
2698
      _Fail("Cannot create file storage directory '%s': %s",
2699
            file_storage_dir, err, exc=True)
2700

    
2701

    
2702
def RemoveFileStorageDir(file_storage_dir):
2703
  """Remove file storage directory.
2704

2705
  Remove it only if it's empty. If not log an error and return.
2706

2707
  @type file_storage_dir: str
2708
  @param file_storage_dir: the directory we should cleanup
2709
  @rtype: tuple (success,)
2710
  @return: tuple of one element, C{success}, denoting
2711
      whether the operation was successful
2712

2713
  """
2714
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2715
  if os.path.exists(file_storage_dir):
2716
    if not os.path.isdir(file_storage_dir):
2717
      _Fail("Specified Storage directory '%s' is not a directory",
2718
            file_storage_dir)
2719
    # deletes dir only if empty, otherwise we want to fail the rpc call
2720
    try:
2721
      os.rmdir(file_storage_dir)
2722
    except OSError, err:
2723
      _Fail("Cannot remove file storage directory '%s': %s",
2724
            file_storage_dir, err)
2725

    
2726

    
2727
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2728
  """Rename the file storage directory.
2729

2730
  @type old_file_storage_dir: str
2731
  @param old_file_storage_dir: the current path
2732
  @type new_file_storage_dir: str
2733
  @param new_file_storage_dir: the name we should rename to
2734
  @rtype: tuple (success,)
2735
  @return: tuple of one element, C{success}, denoting
2736
      whether the operation was successful
2737

2738
  """
2739
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2740
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2741
  if not os.path.exists(new_file_storage_dir):
2742
    if os.path.isdir(old_file_storage_dir):
2743
      try:
2744
        os.rename(old_file_storage_dir, new_file_storage_dir)
2745
      except OSError, err:
2746
        _Fail("Cannot rename '%s' to '%s': %s",
2747
              old_file_storage_dir, new_file_storage_dir, err)
2748
    else:
2749
      _Fail("Specified storage dir '%s' is not a directory",
2750
            old_file_storage_dir)
2751
  else:
2752
    if os.path.exists(old_file_storage_dir):
2753
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2754
            old_file_storage_dir, new_file_storage_dir)
2755

    
2756

    
2757
def _EnsureJobQueueFile(file_name):
2758
  """Checks whether the given filename is in the queue directory.
2759

2760
  @type file_name: str
2761
  @param file_name: the file name we should check
2762
  @rtype: None
2763
  @raises RPCFail: if the file is not valid
2764

2765
  """
2766
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2767
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2768

    
2769
  if not result:
2770
    _Fail("Passed job queue file '%s' does not belong to"
2771
          " the queue directory '%s'", file_name, queue_dir)
2772

    
2773

    
2774
def JobQueueUpdate(file_name, content):
2775
  """Updates a file in the queue directory.
2776

2777
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2778
  checking.
2779

2780
  @type file_name: str
2781
  @param file_name: the job file name
2782
  @type content: str
2783
  @param content: the new job contents
2784
  @rtype: boolean
2785
  @return: the success of the operation
2786

2787
  """
2788
  _EnsureJobQueueFile(file_name)
2789
  getents = runtime.GetEnts()
2790

    
2791
  # Write and replace the file atomically
2792
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2793
                  gid=getents.masterd_gid)
2794

    
2795

    
2796
def JobQueueRename(old, new):
2797
  """Renames a job queue file.
2798

2799
  This is just a wrapper over os.rename with proper checking.
2800

2801
  @type old: str
2802
  @param old: the old (actual) file name
2803
  @type new: str
2804
  @param new: the desired file name
2805
  @rtype: tuple
2806
  @return: the success of the operation and payload
2807

2808
  """
2809
  _EnsureJobQueueFile(old)
2810
  _EnsureJobQueueFile(new)
2811

    
2812
  getents = runtime.GetEnts()
2813

    
2814
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2815
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2816

    
2817

    
2818
def BlockdevClose(instance_name, disks):
2819
  """Closes the given block devices.
2820

2821
  This means they will be switched to secondary mode (in case of
2822
  DRBD).
2823

2824
  @param instance_name: if the argument is not empty, the symlinks
2825
      of this instance will be removed
2826
  @type disks: list of L{objects.Disk}
2827
  @param disks: the list of disks to be closed
2828
  @rtype: tuple (success, message)
2829
  @return: a tuple of success and message, where success
2830
      indicates the succes of the operation, and message
2831
      which will contain the error details in case we
2832
      failed
2833

2834
  """
2835
  bdevs = []
2836
  for cf in disks:
2837
    rd = _RecursiveFindBD(cf)
2838
    if rd is None:
2839
      _Fail("Can't find device %s", cf)
2840
    bdevs.append(rd)
2841

    
2842
  msg = []
2843
  for rd in bdevs:
2844
    try:
2845
      rd.Close()
2846
    except errors.BlockDeviceError, err:
2847
      msg.append(str(err))
2848
  if msg:
2849
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2850
  else:
2851
    if instance_name:
2852
      _RemoveBlockDevLinks(instance_name, disks)
2853

    
2854

    
2855
def ValidateHVParams(hvname, hvparams):
2856
  """Validates the given hypervisor parameters.
2857

2858
  @type hvname: string
2859
  @param hvname: the hypervisor name
2860
  @type hvparams: dict
2861
  @param hvparams: the hypervisor parameters to be validated
2862
  @rtype: None
2863

2864
  """
2865
  try:
2866
    hv_type = hypervisor.GetHypervisor(hvname)
2867
    hv_type.ValidateParameters(hvparams)
2868
  except errors.HypervisorError, err:
2869
    _Fail(str(err), log=False)
2870

    
2871

    
2872
def _CheckOSPList(os_obj, parameters):
2873
  """Check whether a list of parameters is supported by the OS.
2874

2875
  @type os_obj: L{objects.OS}
2876
  @param os_obj: OS object to check
2877
  @type parameters: list
2878
  @param parameters: the list of parameters to check
2879

2880
  """
2881
  supported = [v[0] for v in os_obj.supported_parameters]
2882
  delta = frozenset(parameters).difference(supported)
2883
  if delta:
2884
    _Fail("The following parameters are not supported"
2885
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2886

    
2887

    
2888
def ValidateOS(required, osname, checks, osparams):
2889
  """Validate the given OS' parameters.
2890

2891
  @type required: boolean
2892
  @param required: whether absence of the OS should translate into
2893
      failure or not
2894
  @type osname: string
2895
  @param osname: the OS to be validated
2896
  @type checks: list
2897
  @param checks: list of the checks to run (currently only 'parameters')
2898
  @type osparams: dict
2899
  @param osparams: dictionary with OS parameters
2900
  @rtype: boolean
2901
  @return: True if the validation passed, or False if the OS was not
2902
      found and L{required} was false
2903

2904
  """
2905
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2906
    _Fail("Unknown checks required for OS %s: %s", osname,
2907
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2908

    
2909
  name_only = objects.OS.GetName(osname)
2910
  status, tbv = _TryOSFromDisk(name_only, None)
2911

    
2912
  if not status:
2913
    if required:
2914
      _Fail(tbv)
2915
    else:
2916
      return False
2917

    
2918
  if max(tbv.api_versions) < constants.OS_API_V20:
2919
    return True
2920

    
2921
  if constants.OS_VALIDATE_PARAMETERS in checks:
2922
    _CheckOSPList(tbv, osparams.keys())
2923

    
2924
  validate_env = OSCoreEnv(osname, tbv, osparams)
2925
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2926
                        cwd=tbv.path, reset_env=True)
2927
  if result.failed:
2928
    logging.error("os validate command '%s' returned error: %s output: %s",
2929
                  result.cmd, result.fail_reason, result.output)
2930
    _Fail("OS validation script failed (%s), output: %s",
2931
          result.fail_reason, result.output, log=False)
2932

    
2933
  return True
2934

    
2935

    
2936
def DemoteFromMC():
2937
  """Demotes the current node from master candidate role.
2938

2939
  """
2940
  # try to ensure we're not the master by mistake
2941
  master, myself = ssconf.GetMasterAndMyself()
2942
  if master == myself:
2943
    _Fail("ssconf status shows I'm the master node, will not demote")
2944

    
2945
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2946
  if not result.failed:
2947
    _Fail("The master daemon is running, will not demote")
2948

    
2949
  try:
2950
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2951
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2952
  except EnvironmentError, err:
2953
    if err.errno != errno.ENOENT:
2954
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2955

    
2956
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2957

    
2958

    
2959
def _GetX509Filenames(cryptodir, name):
2960
  """Returns the full paths for the private key and certificate.
2961

2962
  """
2963
  return (utils.PathJoin(cryptodir, name),
2964
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2965
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2966

    
2967

    
2968
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2969
  """Creates a new X509 certificate for SSL/TLS.
2970

2971
  @type validity: int
2972
  @param validity: Validity in seconds
2973
  @rtype: tuple; (string, string)
2974
  @return: Certificate name and public part
2975

2976
  """
2977
  (key_pem, cert_pem) = \
2978
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2979
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2980

    
2981
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2982
                              prefix="x509-%s-" % utils.TimestampForFilename())
2983
  try:
2984
    name = os.path.basename(cert_dir)
2985
    assert len(name) > 5
2986

    
2987
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2988

    
2989
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2990
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2991

    
2992
    # Never return private key as it shouldn't leave the node
2993
    return (name, cert_pem)
2994
  except Exception:
2995
    shutil.rmtree(cert_dir, ignore_errors=True)
2996
    raise
2997

    
2998

    
2999
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
3000
  """Removes a X509 certificate.
3001

3002
  @type name: string
3003
  @param name: Certificate name
3004

3005
  """
3006
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3007

    
3008
  utils.RemoveFile(key_file)
3009
  utils.RemoveFile(cert_file)
3010

    
3011
  try:
3012
    os.rmdir(cert_dir)
3013
  except EnvironmentError, err:
3014
    _Fail("Cannot remove certificate directory '%s': %s",
3015
          cert_dir, err)
3016

    
3017

    
3018
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3019
  """Returns the command for the requested input/output.
3020

3021
  @type instance: L{objects.Instance}
3022
  @param instance: The instance object
3023
  @param mode: Import/export mode
3024
  @param ieio: Input/output type
3025
  @param ieargs: Input/output arguments
3026

3027
  """
3028
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3029

    
3030
  env = None
3031
  prefix = None
3032
  suffix = None
3033
  exp_size = None
3034

    
3035
  if ieio == constants.IEIO_FILE:
3036
    (filename, ) = ieargs
3037

    
3038
    if not utils.IsNormAbsPath(filename):
3039
      _Fail("Path '%s' is not normalized or absolute", filename)
3040

    
3041
    real_filename = os.path.realpath(filename)
3042
    directory = os.path.dirname(real_filename)
3043

    
3044
    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
3045
      _Fail("File '%s' is not under exports directory '%s': %s",
3046
            filename, constants.EXPORT_DIR, real_filename)
3047

    
3048
    # Create directory
3049
    utils.Makedirs(directory, mode=0750)
3050

    
3051
    quoted_filename = utils.ShellQuote(filename)
3052

    
3053
    if mode == constants.IEM_IMPORT:
3054
      suffix = "> %s" % quoted_filename
3055
    elif mode == constants.IEM_EXPORT:
3056
      suffix = "< %s" % quoted_filename
3057

    
3058
      # Retrieve file size
3059
      try:
3060
        st = os.stat(filename)
3061
      except EnvironmentError, err:
3062
        logging.error("Can't stat(2) %s: %s", filename, err)
3063
      else:
3064
        exp_size = utils.BytesToMebibyte(st.st_size)
3065

    
3066
  elif ieio == constants.IEIO_RAW_DISK:
3067
    (disk, ) = ieargs
3068

    
3069
    real_disk = _OpenRealBD(disk)
3070

    
3071
    if mode == constants.IEM_IMPORT:
3072
      # we set here a smaller block size as, due to transport buffering, more
3073
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3074
      # is not already there or we pass a wrong path; we use notrunc to no
3075
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3076
      # much memory; this means that at best, we flush every 64k, which will
3077
      # not be very fast
3078
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3079
                                    " bs=%s oflag=dsync"),
3080
                                    real_disk.dev_path,
3081
                                    str(64 * 1024))
3082

    
3083
    elif mode == constants.IEM_EXPORT:
3084
      # the block size on the read dd is 1MiB to match our units
3085
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3086
                                   real_disk.dev_path,
3087
                                   str(1024 * 1024), # 1 MB
3088
                                   str(disk.size))
3089
      exp_size = disk.size
3090

    
3091
  elif ieio == constants.IEIO_SCRIPT:
3092
    (disk, disk_index, ) = ieargs
3093

    
3094
    assert isinstance(disk_index, (int, long))
3095

    
3096
    real_disk = _OpenRealBD(disk)
3097

    
3098
    inst_os = OSFromDisk(instance.os)
3099
    env = OSEnvironment(instance, inst_os)
3100

    
3101
    if mode == constants.IEM_IMPORT:
3102
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3103
      env["IMPORT_INDEX"] = str(disk_index)
3104
      script = inst_os.import_script
3105

    
3106
    elif mode == constants.IEM_EXPORT:
3107
      env["EXPORT_DEVICE"] = real_disk.dev_path
3108
      env["EXPORT_INDEX"] = str(disk_index)
3109
      script = inst_os.export_script
3110

    
3111
    # TODO: Pass special environment only to script
3112
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3113

    
3114
    if mode == constants.IEM_IMPORT:
3115
      suffix = "| %s" % script_cmd
3116

    
3117
    elif mode == constants.IEM_EXPORT:
3118
      prefix = "%s |" % script_cmd
3119

    
3120
    # Let script predict size
3121
    exp_size = constants.IE_CUSTOM_SIZE
3122

    
3123
  else:
3124
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3125

    
3126
  return (env, prefix, suffix, exp_size)
3127

    
3128

    
3129
def _CreateImportExportStatusDir(prefix):
3130
  """Creates status directory for import/export.
3131

3132
  """
3133
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3134
                          prefix=("%s-%s-" %
3135
                                  (prefix, utils.TimestampForFilename())))
3136

    
3137

    
3138
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3139
                            ieio, ieioargs):
3140
  """Starts an import or export daemon.
3141

3142
  @param mode: Import/output mode
3143
  @type opts: L{objects.ImportExportOptions}
3144
  @param opts: Daemon options
3145
  @type host: string
3146
  @param host: Remote host for export (None for import)
3147
  @type port: int
3148
  @param port: Remote port for export (None for import)
3149
  @type instance: L{objects.Instance}
3150
  @param instance: Instance object
3151
  @type component: string
3152
  @param component: which part of the instance is transferred now,
3153
      e.g. 'disk/0'
3154
  @param ieio: Input/output type
3155
  @param ieioargs: Input/output arguments
3156

3157
  """
3158
  if mode == constants.IEM_IMPORT:
3159
    prefix = "import"
3160

    
3161
    if not (host is None and port is None):
3162
      _Fail("Can not specify host or port on import")
3163

    
3164
  elif mode == constants.IEM_EXPORT:
3165
    prefix = "export"
3166

    
3167
    if host is None or port is None:
3168
      _Fail("Host and port must be specified for an export")
3169

    
3170
  else:
3171
    _Fail("Invalid mode %r", mode)
3172

    
3173
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3174
    _Fail("Cluster certificate can only be used for both key and CA")
3175

    
3176
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3177
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3178

    
3179
  if opts.key_name is None:
3180
    # Use server.pem
3181
    key_path = constants.NODED_CERT_FILE
3182
    cert_path = constants.NODED_CERT_FILE
3183
    assert opts.ca_pem is None
3184
  else:
3185
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3186
                                                 opts.key_name)
3187
    assert opts.ca_pem is not None
3188

    
3189
  for i in [key_path, cert_path]:
3190
    if not os.path.exists(i):
3191
      _Fail("File '%s' does not exist" % i)
3192

    
3193
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3194
  try:
3195
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3196
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3197
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3198

    
3199
    if opts.ca_pem is None:
3200
      # Use server.pem
3201
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
3202
    else:
3203
      ca = opts.ca_pem
3204

    
3205
    # Write CA file
3206
    utils.WriteFile(ca_file, data=ca, mode=0400)
3207

    
3208
    cmd = [
3209
      constants.IMPORT_EXPORT_DAEMON,
3210
      status_file, mode,
3211
      "--key=%s" % key_path,
3212
      "--cert=%s" % cert_path,
3213
      "--ca=%s" % ca_file,
3214
      ]
3215

    
3216
    if host:
3217
      cmd.append("--host=%s" % host)
3218

    
3219
    if port:
3220
      cmd.append("--port=%s" % port)
3221

    
3222
    if opts.ipv6:
3223
      cmd.append("--ipv6")
3224
    else:
3225
      cmd.append("--ipv4")
3226

    
3227
    if opts.compress:
3228
      cmd.append("--compress=%s" % opts.compress)
3229

    
3230
    if opts.magic:
3231
      cmd.append("--magic=%s" % opts.magic)
3232

    
3233
    if exp_size is not None:
3234
      cmd.append("--expected-size=%s" % exp_size)
3235

    
3236
    if cmd_prefix:
3237
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3238

    
3239
    if cmd_suffix:
3240
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3241

    
3242
    if mode == constants.IEM_EXPORT:
3243
      # Retry connection a few times when connecting to remote peer
3244
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3245
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3246
    elif opts.connect_timeout is not None:
3247
      assert mode == constants.IEM_IMPORT
3248
      # Overall timeout for establishing connection while listening
3249
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3250

    
3251
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3252

    
3253
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3254
    # support for receiving a file descriptor for output
3255
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3256
                      output=logfile)
3257

    
3258
    # The import/export name is simply the status directory name
3259
    return os.path.basename(status_dir)
3260

    
3261
  except Exception:
3262
    shutil.rmtree(status_dir, ignore_errors=True)
3263
    raise
3264

    
3265

    
3266
def GetImportExportStatus(names):
3267
  """Returns import/export daemon status.
3268

3269
  @type names: sequence
3270
  @param names: List of names
3271
  @rtype: List of dicts
3272
  @return: Returns a list of the state of each named import/export or None if a
3273
           status couldn't be read
3274

3275
  """
3276
  result = []
3277

    
3278
  for name in names:
3279
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3280
                                 _IES_STATUS_FILE)
3281

    
3282
    try:
3283
      data = utils.ReadFile(status_file)
3284
    except EnvironmentError, err:
3285
      if err.errno != errno.ENOENT:
3286
        raise
3287
      data = None
3288

    
3289
    if not data:
3290
      result.append(None)
3291
      continue
3292

    
3293
    result.append(serializer.LoadJson(data))
3294

    
3295
  return result
3296

    
3297

    
3298
def AbortImportExport(name):
3299
  """Sends SIGTERM to a running import/export daemon.
3300

3301
  """
3302
  logging.info("Abort import/export %s", name)
3303

    
3304
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3305
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3306

    
3307
  if pid:
3308
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3309
                 name, pid)
3310
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3311

    
3312

    
3313
def CleanupImportExport(name):
3314
  """Cleanup after an import or export.
3315

3316
  If the import/export daemon is still running it's killed. Afterwards the
3317
  whole status directory is removed.
3318

3319
  """
3320
  logging.info("Finalizing import/export %s", name)
3321

    
3322
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3323

    
3324
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3325

    
3326
  if pid:
3327
    logging.info("Import/export %s is still running with PID %s",
3328
                 name, pid)
3329
    utils.KillProcess(pid, waitpid=False)
3330

    
3331
  shutil.rmtree(status_dir, ignore_errors=True)
3332

    
3333

    
3334
def _FindDisks(nodes_ip, disks):
3335
  """Sets the physical ID on disks and returns the block devices.
3336

3337
  """
3338
  # set the correct physical ID
3339
  my_name = netutils.Hostname.GetSysName()
3340
  for cf in disks:
3341
    cf.SetPhysicalID(my_name, nodes_ip)
3342

    
3343
  bdevs = []
3344

    
3345
  for cf in disks:
3346
    rd = _RecursiveFindBD(cf)
3347
    if rd is None:
3348
      _Fail("Can't find device %s", cf)
3349
    bdevs.append(rd)
3350
  return bdevs
3351

    
3352

    
3353
def DrbdDisconnectNet(nodes_ip, disks):
3354
  """Disconnects the network on a list of drbd devices.
3355

3356
  """
3357
  bdevs = _FindDisks(nodes_ip, disks)
3358

    
3359
  # disconnect disks
3360
  for rd in bdevs:
3361
    try:
3362
      rd.DisconnectNet()
3363
    except errors.BlockDeviceError, err:
3364
      _Fail("Can't change network configuration to standalone mode: %s",
3365
            err, exc=True)
3366

    
3367

    
3368
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3369
  """Attaches the network on a list of drbd devices.
3370

3371
  """
3372
  bdevs = _FindDisks(nodes_ip, disks)
3373

    
3374
  if multimaster:
3375
    for idx, rd in enumerate(bdevs):
3376
      try:
3377
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3378
      except EnvironmentError, err:
3379
        _Fail("Can't create symlink: %s", err)
3380
  # reconnect disks, switch to new master configuration and if
3381
  # needed primary mode
3382
  for rd in bdevs:
3383
    try:
3384
      rd.AttachNet(multimaster)
3385
    except errors.BlockDeviceError, err:
3386
      _Fail("Can't change network configuration: %s", err)
3387

    
3388
  # wait until the disks are connected; we need to retry the re-attach
3389
  # if the device becomes standalone, as this might happen if the one
3390
  # node disconnects and reconnects in a different mode before the
3391
  # other node reconnects; in this case, one or both of the nodes will
3392
  # decide it has wrong configuration and switch to standalone
3393

    
3394
  def _Attach():
3395
    all_connected = True
3396

    
3397
    for rd in bdevs:
3398
      stats = rd.GetProcStatus()
3399

    
3400
      all_connected = (all_connected and
3401
                       (stats.is_connected or stats.is_in_resync))
3402

    
3403
      if stats.is_standalone:
3404
        # peer had different config info and this node became
3405
        # standalone, even though this should not happen with the
3406
        # new staged way of changing disk configs
3407
        try:
3408
          rd.AttachNet(multimaster)
3409
        except errors.BlockDeviceError, err:
3410
          _Fail("Can't change network configuration: %s", err)
3411

    
3412
    if not all_connected:
3413
      raise utils.RetryAgain()
3414

    
3415
  try:
3416
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3417
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3418
  except utils.RetryTimeout:
3419
    _Fail("Timeout in disk reconnecting")
3420

    
3421
  if multimaster:
3422
    # change to primary mode
3423
    for rd in bdevs:
3424
      try:
3425
        rd.Open()
3426
      except errors.BlockDeviceError, err:
3427
        _Fail("Can't change to primary mode: %s", err)
3428

    
3429

    
3430
def DrbdWaitSync(nodes_ip, disks):
3431
  """Wait until DRBDs have synchronized.
3432

3433
  """
3434
  def _helper(rd):
3435
    stats = rd.GetProcStatus()
3436
    if not (stats.is_connected or stats.is_in_resync):
3437
      raise utils.RetryAgain()
3438
    return stats
3439

    
3440
  bdevs = _FindDisks(nodes_ip, disks)
3441

    
3442
  min_resync = 100
3443
  alldone = True
3444
  for rd in bdevs:
3445
    try:
3446
      # poll each second for 15 seconds
3447
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3448
    except utils.RetryTimeout:
3449
      stats = rd.GetProcStatus()
3450
      # last check
3451
      if not (stats.is_connected or stats.is_in_resync):
3452
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3453
    alldone = alldone and (not stats.is_in_resync)
3454
    if stats.sync_percent is not None:
3455
      min_resync = min(min_resync, stats.sync_percent)
3456

    
3457
  return (alldone, min_resync)
3458

    
3459

    
3460
def GetDrbdUsermodeHelper():
3461
  """Returns DRBD usermode helper currently configured.
3462

3463
  """
3464
  try:
3465
    return bdev.BaseDRBD.GetUsermodeHelper()
3466
  except errors.BlockDeviceError, err:
3467
    _Fail(str(err))
3468

    
3469

    
3470
def PowercycleNode(hypervisor_type):
3471
  """Hard-powercycle the node.
3472

3473
  Because we need to return first, and schedule the powercycle in the
3474
  background, we won't be able to report failures nicely.
3475

3476
  """
3477
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3478
  try:
3479
    pid = os.fork()
3480
  except OSError:
3481
    # if we can't fork, we'll pretend that we're in the child process
3482
    pid = 0
3483
  if pid > 0:
3484
    return "Reboot scheduled in 5 seconds"
3485
  # ensure the child is running on ram
3486
  try:
3487
    utils.Mlockall()
3488
  except Exception: # pylint: disable=W0703
3489
    pass
3490
  time.sleep(5)
3491
  hyper.PowercycleNode()
3492

    
3493

    
3494
class HooksRunner(object):
3495
  """Hook runner.
3496

3497
  This class is instantiated on the node side (ganeti-noded) and not
3498
  on the master side.
3499

3500
  """
3501
  def __init__(self, hooks_base_dir=None):
3502
    """Constructor for hooks runner.
3503

3504
    @type hooks_base_dir: str or None
3505
    @param hooks_base_dir: if not None, this overrides the
3506
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3507

3508
    """
3509
    if hooks_base_dir is None:
3510
      hooks_base_dir = constants.HOOKS_BASE_DIR
3511
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3512
    # constant
3513
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3514

    
3515
  def RunLocalHooks(self, node_list, hpath, phase, env):
3516
    """Check that the hooks will be run only locally and then run them.
3517

3518
    """
3519
    assert len(node_list) == 1
3520
    node = node_list[0]
3521
    _, myself = ssconf.GetMasterAndMyself()
3522
    assert node == myself
3523

    
3524
    results = self.RunHooks(hpath, phase, env)
3525

    
3526
    # Return values in the form expected by HooksMaster
3527
    return {node: (None, False, results)}
3528

    
3529
  def RunHooks(self, hpath, phase, env):
3530
    """Run the scripts in the hooks directory.
3531

3532
    @type hpath: str
3533
    @param hpath: the path to the hooks directory which
3534
        holds the scripts
3535
    @type phase: str
3536
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3537
        L{constants.HOOKS_PHASE_POST}
3538
    @type env: dict
3539
    @param env: dictionary with the environment for the hook
3540
    @rtype: list
3541
    @return: list of 3-element tuples:
3542
      - script path
3543
      - script result, either L{constants.HKR_SUCCESS} or
3544
        L{constants.HKR_FAIL}
3545
      - output of the script
3546

3547
    @raise errors.ProgrammerError: for invalid input
3548
        parameters
3549

3550
    """
3551
    if phase == constants.HOOKS_PHASE_PRE:
3552
      suffix = "pre"
3553
    elif phase == constants.HOOKS_PHASE_POST:
3554
      suffix = "post"
3555
    else:
3556
      _Fail("Unknown hooks phase '%s'", phase)
3557

    
3558
    subdir = "%s-%s.d" % (hpath, suffix)
3559
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3560

    
3561
    results = []
3562

    
3563
    if not os.path.isdir(dir_name):
3564
      # for non-existing/non-dirs, we simply exit instead of logging a
3565
      # warning at every operation
3566
      return results
3567

    
3568
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3569

    
3570
    for (relname, relstatus, runresult)  in runparts_results:
3571
      if relstatus == constants.RUNPARTS_SKIP:
3572
        rrval = constants.HKR_SKIP
3573
        output = ""
3574
      elif relstatus == constants.RUNPARTS_ERR:
3575
        rrval = constants.HKR_FAIL
3576
        output = "Hook script execution error: %s" % runresult
3577
      elif relstatus == constants.RUNPARTS_RUN:
3578
        if runresult.failed:
3579
          rrval = constants.HKR_FAIL
3580
        else:
3581
          rrval = constants.HKR_SUCCESS
3582
        output = utils.SafeEncode(runresult.output.strip())
3583
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3584

    
3585
    return results
3586

    
3587

    
3588
class IAllocatorRunner(object):
3589
  """IAllocator runner.
3590

3591
  This class is instantiated on the node side (ganeti-noded) and not on
3592
  the master side.
3593

3594
  """
3595
  @staticmethod
3596
  def Run(name, idata):
3597
    """Run an iallocator script.
3598

3599
    @type name: str
3600
    @param name: the iallocator script name
3601
    @type idata: str
3602
    @param idata: the allocator input data
3603

3604
    @rtype: tuple
3605
    @return: two element tuple of:
3606
       - status
3607
       - either error message or stdout of allocator (for success)
3608

3609
    """
3610
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3611
                                  os.path.isfile)
3612
    if alloc_script is None:
3613
      _Fail("iallocator module '%s' not found in the search path", name)
3614

    
3615
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3616
    try:
3617
      os.write(fd, idata)
3618
      os.close(fd)
3619
      result = utils.RunCmd([alloc_script, fin_name])
3620
      if result.failed:
3621
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3622
              name, result.fail_reason, result.output)
3623
    finally:
3624
      os.unlink(fin_name)
3625

    
3626
    return result.stdout
3627

    
3628

    
3629
class DevCacheManager(object):
3630
  """Simple class for managing a cache of block device information.
3631

3632
  """
3633
  _DEV_PREFIX = "/dev/"
3634
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3635

    
3636
  @classmethod
3637
  def _ConvertPath(cls, dev_path):
3638
    """Converts a /dev/name path to the cache file name.
3639

3640
    This replaces slashes with underscores and strips the /dev
3641
    prefix. It then returns the full path to the cache file.
3642

3643
    @type dev_path: str
3644
    @param dev_path: the C{/dev/} path name
3645
    @rtype: str
3646
    @return: the converted path name
3647

3648
    """
3649
    if dev_path.startswith(cls._DEV_PREFIX):
3650
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3651
    dev_path = dev_path.replace("/", "_")
3652
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3653
    return fpath
3654

    
3655
  @classmethod
3656
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3657
    """Updates the cache information for a given device.
3658

3659
    @type dev_path: str
3660
    @param dev_path: the pathname of the device
3661
    @type owner: str
3662
    @param owner: the owner (instance name) of the device
3663
    @type on_primary: bool
3664
    @param on_primary: whether this is the primary
3665
        node nor not
3666
    @type iv_name: str
3667
    @param iv_name: the instance-visible name of the
3668
        device, as in objects.Disk.iv_name
3669

3670
    @rtype: None
3671

3672
    """
3673
    if dev_path is None:
3674
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3675
      return
3676
    fpath = cls._ConvertPath(dev_path)
3677
    if on_primary:
3678
      state = "primary"
3679
    else:
3680
      state = "secondary"
3681
    if iv_name is None:
3682
      iv_name = "not_visible"
3683
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3684
    try:
3685
      utils.WriteFile(fpath, data=fdata)
3686
    except EnvironmentError, err:
3687
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3688

    
3689
  @classmethod
3690
  def RemoveCache(cls, dev_path):
3691
    """Remove data for a dev_path.
3692

3693
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3694
    path name and logging.
3695

3696
    @type dev_path: str
3697
    @param dev_path: the pathname of the device
3698

3699
    @rtype: None
3700

3701
    """
3702
    if dev_path is None:
3703
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3704
      return
3705
    fpath = cls._ConvertPath(dev_path)
3706
    try:
3707
      utils.RemoveFile(fpath)
3708
    except EnvironmentError, err:
3709
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)