Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 91ae95fd

History | View | Annotate | Download (111.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64

    
65

    
66
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
67
_ALLOWED_CLEAN_DIRS = frozenset([
68
  constants.DATA_DIR,
69
  constants.JOB_QUEUE_ARCHIVE_DIR,
70
  constants.QUEUE_DIR,
71
  constants.CRYPTO_KEYS_DIR,
72
  ])
73
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
74
_X509_KEY_FILE = "key"
75
_X509_CERT_FILE = "cert"
76
_IES_STATUS_FILE = "status"
77
_IES_PID_FILE = "pid"
78
_IES_CA_FILE = "ca"
79

    
80
#: Valid LVS output line regex
81
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
82

    
83

    
84
class RPCFail(Exception):
85
  """Class denoting RPC failure.
86

87
  Its argument is the error message.
88

89
  """
90

    
91

    
92
def _Fail(msg, *args, **kwargs):
93
  """Log an error and the raise an RPCFail exception.
94

95
  This exception is then handled specially in the ganeti daemon and
96
  turned into a 'failed' return type. As such, this function is a
97
  useful shortcut for logging the error and returning it to the master
98
  daemon.
99

100
  @type msg: string
101
  @param msg: the text of the exception
102
  @raise RPCFail
103

104
  """
105
  if args:
106
    msg = msg % args
107
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
108
    if "exc" in kwargs and kwargs["exc"]:
109
      logging.exception(msg)
110
    else:
111
      logging.error(msg)
112
  raise RPCFail(msg)
113

    
114

    
115
def _GetConfig():
116
  """Simple wrapper to return a SimpleStore.
117

118
  @rtype: L{ssconf.SimpleStore}
119
  @return: a SimpleStore instance
120

121
  """
122
  return ssconf.SimpleStore()
123

    
124

    
125
def _GetSshRunner(cluster_name):
126
  """Simple wrapper to return an SshRunner.
127

128
  @type cluster_name: str
129
  @param cluster_name: the cluster name, which is needed
130
      by the SshRunner constructor
131
  @rtype: L{ssh.SshRunner}
132
  @return: an SshRunner instance
133

134
  """
135
  return ssh.SshRunner(cluster_name)
136

    
137

    
138
def _Decompress(data):
139
  """Unpacks data compressed by the RPC client.
140

141
  @type data: list or tuple
142
  @param data: Data sent by RPC client
143
  @rtype: str
144
  @return: Decompressed data
145

146
  """
147
  assert isinstance(data, (list, tuple))
148
  assert len(data) == 2
149
  (encoding, content) = data
150
  if encoding == constants.RPC_ENCODING_NONE:
151
    return content
152
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
153
    return zlib.decompress(base64.b64decode(content))
154
  else:
155
    raise AssertionError("Unknown data encoding")
156

    
157

    
158
def _CleanDirectory(path, exclude=None):
159
  """Removes all regular files in a directory.
160

161
  @type path: str
162
  @param path: the directory to clean
163
  @type exclude: list
164
  @param exclude: list of files to be excluded, defaults
165
      to the empty list
166

167
  """
168
  if path not in _ALLOWED_CLEAN_DIRS:
169
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
170
          path)
171

    
172
  if not os.path.isdir(path):
173
    return
174
  if exclude is None:
175
    exclude = []
176
  else:
177
    # Normalize excluded paths
178
    exclude = [os.path.normpath(i) for i in exclude]
179

    
180
  for rel_name in utils.ListVisibleFiles(path):
181
    full_name = utils.PathJoin(path, rel_name)
182
    if full_name in exclude:
183
      continue
184
    if os.path.isfile(full_name) and not os.path.islink(full_name):
185
      utils.RemoveFile(full_name)
186

    
187

    
188
def _BuildUploadFileList():
189
  """Build the list of allowed upload files.
190

191
  This is abstracted so that it's built only once at module import time.
192

193
  """
194
  allowed_files = set([
195
    constants.CLUSTER_CONF_FILE,
196
    constants.ETC_HOSTS,
197
    constants.SSH_KNOWN_HOSTS_FILE,
198
    constants.VNC_PASSWORD_FILE,
199
    constants.RAPI_CERT_FILE,
200
    constants.SPICE_CERT_FILE,
201
    constants.SPICE_CACERT_FILE,
202
    constants.RAPI_USERS_FILE,
203
    constants.CONFD_HMAC_KEY,
204
    constants.CLUSTER_DOMAIN_SECRET_FILE,
205
    ])
206

    
207
  for hv_name in constants.HYPER_TYPES:
208
    hv_class = hypervisor.GetHypervisorClass(hv_name)
209
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
210

    
211
  return frozenset(allowed_files)
212

    
213

    
214
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
215

    
216

    
217
def JobQueuePurge():
218
  """Removes job queue files and archived jobs.
219

220
  @rtype: tuple
221
  @return: True, None
222

223
  """
224
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
225
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
226

    
227

    
228
def GetMasterInfo():
229
  """Returns master information.
230

231
  This is an utility function to compute master information, either
232
  for consumption here or from the node daemon.
233

234
  @rtype: tuple
235
  @return: master_netdev, master_ip, master_name, primary_ip_family,
236
    master_netmask
237
  @raise RPCFail: in case of errors
238

239
  """
240
  try:
241
    cfg = _GetConfig()
242
    master_netdev = cfg.GetMasterNetdev()
243
    master_ip = cfg.GetMasterIP()
244
    master_netmask = cfg.GetMasterNetmask()
245
    master_node = cfg.GetMasterNode()
246
    primary_ip_family = cfg.GetPrimaryIPFamily()
247
  except errors.ConfigurationError, err:
248
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
249
  return (master_netdev, master_ip, master_node, primary_ip_family,
250
      master_netmask)
251

    
252

    
253
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
254
  """Decorator that runs hooks before and after the decorated function.
255

256
  @type hook_opcode: string
257
  @param hook_opcode: opcode of the hook
258
  @type hooks_path: string
259
  @param hooks_path: path of the hooks
260
  @type env_builder_fn: function
261
  @param env_builder_fn: function that returns a dictionary containing the
262
    environment variables for the hooks.
263
  @raise RPCFail: in case of pre-hook failure
264

265
  """
266
  def decorator(fn):
267
    def wrapper(*args, **kwargs):
268
      _, myself = ssconf.GetMasterAndMyself()
269
      nodes = ([myself], [myself])  # these hooks run locally
270

    
271
      cfg = _GetConfig()
272
      hr = HooksRunner()
273
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
274
                            None, env_builder_fn, logging.warning,
275
                            cfg.GetClusterName(), cfg.GetMasterNode())
276

    
277
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
278
      result = fn(*args, **kwargs)
279
      hm.RunPhase(constants.HOOKS_PHASE_POST)
280

    
281
      return result
282
    return wrapper
283
  return decorator
284

    
285

    
286
def _BuildMasterIpEnv():
287
  """Builds environment variables for master IP hooks.
288

289
  """
290
  master_netdev, master_ip, _, family, master_netmask = GetMasterInfo()
291
  version = str(netutils.IPAddress.GetVersionFromAddressFamily(family))
292
  env = {
293
    "MASTER_NETDEV": master_netdev,
294
    "MASTER_IP": master_ip,
295
    "MASTER_NETMASK": master_netmask,
296
    "CLUSTER_IP_VERSION": version,
297
  }
298

    
299
  return env
300

    
301

    
302
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
303
               _BuildMasterIpEnv)
304
def ActivateMasterIp(master_ip, master_netmask, master_netdev, family):
305
  """Activate the IP address of the master daemon.
306

307
  @param master_ip: the master IP
308
  @param master_netmask: the master IP netmask
309
  @param master_netdev: the master network device
310
  @param family: the IP family
311

312
  """
313
  err_msg = None
314
  if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
315
    if netutils.IPAddress.Own(master_ip):
316
      # we already have the ip:
317
      logging.debug("Master IP already configured, doing nothing")
318
    else:
319
      err_msg = "Someone else has the master ip, not activating"
320
      logging.error(err_msg)
321
  else:
322
    ipcls = netutils.IPAddress.GetClassFromIpFamily(family)
323

    
324
    result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
325
                           "%s/%s" % (master_ip, master_netmask),
326
                           "dev", master_netdev, "label",
327
                           "%s:0" % master_netdev])
328
    if result.failed:
329
      err_msg = "Can't activate master IP: %s" % result.output
330
      logging.error(err_msg)
331

    
332
    else:
333
      # we ignore the exit code of the following cmds
334
      if ipcls == netutils.IP4Address:
335
        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
336
                      master_ip, master_ip])
337
      elif ipcls == netutils.IP6Address:
338
        try:
339
          utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
340
        except errors.OpExecError:
341
          # TODO: Better error reporting
342
          logging.warning("Can't execute ndisc6, please install if missing")
343

    
344
  if err_msg:
345
    _Fail(err_msg)
346

    
347

    
348
def StartMasterDaemons(no_voting):
349
  """Activate local node as master node.
350

351
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
352

353
  @type no_voting: boolean
354
  @param no_voting: whether to start ganeti-masterd without a node vote
355
      but still non-interactively
356
  @rtype: None
357

358
  """
359

    
360
  if no_voting:
361
    masterd_args = "--no-voting --yes-do-it"
362
  else:
363
    masterd_args = ""
364

    
365
  env = {
366
    "EXTRA_MASTERD_ARGS": masterd_args,
367
    }
368

    
369
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
370
  if result.failed:
371
    msg = "Can't start Ganeti master: %s" % result.output
372
    logging.error(msg)
373
    _Fail(msg)
374

    
375

    
376
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
377
               _BuildMasterIpEnv)
378
def DeactivateMasterIp(master_ip, master_netmask, master_netdev, family):
379
  """Deactivate the master IP on this node.
380

381
  @param master_ip: the master IP
382
  @param master_netmask: the master IP netmask
383
  @param master_netdev: the master network device
384
  @param family: the IP family
385

386
  """
387
  # pylint: disable=W0613
388
  # TODO: log and report back to the caller the error failures; we
389
  # need to decide in which case we fail the RPC for this
390

    
391
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
392
                         "%s/%s" % (master_ip, master_netmask),
393
                         "dev", master_netdev])
394
  if result.failed:
395
    logging.error("Can't remove the master IP, error: %s", result.output)
396
    # but otherwise ignore the failure
397

    
398

    
399
def StopMasterDaemons():
400
  """Stop the master daemons on this node.
401

402
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
403

404
  @rtype: None
405

406
  """
407
  # TODO: log and report back to the caller the error failures; we
408
  # need to decide in which case we fail the RPC for this
409

    
410
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
411
  if result.failed:
412
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
413
                  " and error %s",
414
                  result.cmd, result.exit_code, result.output)
415

    
416

    
417
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
418
  """Change the netmask of the master IP.
419

420
  @param old_netmask: the old value of the netmask
421
  @param netmask: the new value of the netmask
422
  @param master_ip: the master IP
423
  @param master_netdev: the master network device
424

425
  """
426
  if old_netmask == netmask:
427
    return
428

    
429
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
430
                         "%s/%s" % (master_ip, netmask),
431
                         "dev", master_netdev, "label",
432
                         "%s:0" % master_netdev])
433
  if result.failed:
434
    _Fail("Could not change the master IP netmask")
435

    
436
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
437
                         "%s/%s" % (master_ip, old_netmask),
438
                         "dev", master_netdev, "label",
439
                         "%s:0" % master_netdev])
440
  if result.failed:
441
    _Fail("Could not change the master IP netmask")
442

    
443

    
444
def EtcHostsModify(mode, host, ip):
445
  """Modify a host entry in /etc/hosts.
446

447
  @param mode: The mode to operate. Either add or remove entry
448
  @param host: The host to operate on
449
  @param ip: The ip associated with the entry
450

451
  """
452
  if mode == constants.ETC_HOSTS_ADD:
453
    if not ip:
454
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
455
              " present")
456
    utils.AddHostToEtcHosts(host, ip)
457
  elif mode == constants.ETC_HOSTS_REMOVE:
458
    if ip:
459
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
460
              " parameter is present")
461
    utils.RemoveHostFromEtcHosts(host)
462
  else:
463
    RPCFail("Mode not supported")
464

    
465

    
466
def LeaveCluster(modify_ssh_setup):
467
  """Cleans up and remove the current node.
468

469
  This function cleans up and prepares the current node to be removed
470
  from the cluster.
471

472
  If processing is successful, then it raises an
473
  L{errors.QuitGanetiException} which is used as a special case to
474
  shutdown the node daemon.
475

476
  @param modify_ssh_setup: boolean
477

478
  """
479
  _CleanDirectory(constants.DATA_DIR)
480
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
481
  JobQueuePurge()
482

    
483
  if modify_ssh_setup:
484
    try:
485
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
486

    
487
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
488

    
489
      utils.RemoveFile(priv_key)
490
      utils.RemoveFile(pub_key)
491
    except errors.OpExecError:
492
      logging.exception("Error while processing ssh files")
493

    
494
  try:
495
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
496
    utils.RemoveFile(constants.RAPI_CERT_FILE)
497
    utils.RemoveFile(constants.SPICE_CERT_FILE)
498
    utils.RemoveFile(constants.SPICE_CACERT_FILE)
499
    utils.RemoveFile(constants.NODED_CERT_FILE)
500
  except: # pylint: disable=W0702
501
    logging.exception("Error while removing cluster secrets")
502

    
503
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
504
  if result.failed:
505
    logging.error("Command %s failed with exitcode %s and error %s",
506
                  result.cmd, result.exit_code, result.output)
507

    
508
  # Raise a custom exception (handled in ganeti-noded)
509
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
510

    
511

    
512
def GetNodeInfo(vgname, hypervisor_type):
513
  """Gives back a hash with different information about the node.
514

515
  @type vgname: C{string}
516
  @param vgname: the name of the volume group to ask for disk space information
517
  @type hypervisor_type: C{str}
518
  @param hypervisor_type: the name of the hypervisor to ask for
519
      memory information
520
  @rtype: C{dict}
521
  @return: dictionary with the following keys:
522
      - vg_size is the size of the configured volume group in MiB
523
      - vg_free is the free size of the volume group in MiB
524
      - memory_dom0 is the memory allocated for domain0 in MiB
525
      - memory_free is the currently available (free) ram in MiB
526
      - memory_total is the total number of ram in MiB
527
      - hv_version: the hypervisor version, if available
528

529
  """
530
  outputarray = {}
531

    
532
  if vgname is not None:
533
    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
534
    vg_free = vg_size = None
535
    if vginfo:
536
      vg_free = int(round(vginfo[0][0], 0))
537
      vg_size = int(round(vginfo[0][1], 0))
538
    outputarray["vg_size"] = vg_size
539
    outputarray["vg_free"] = vg_free
540

    
541
  if hypervisor_type is not None:
542
    hyper = hypervisor.GetHypervisor(hypervisor_type)
543
    hyp_info = hyper.GetNodeInfo()
544
    if hyp_info is not None:
545
      outputarray.update(hyp_info)
546

    
547
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
548

    
549
  return outputarray
550

    
551

    
552
def VerifyNode(what, cluster_name):
553
  """Verify the status of the local node.
554

555
  Based on the input L{what} parameter, various checks are done on the
556
  local node.
557

558
  If the I{filelist} key is present, this list of
559
  files is checksummed and the file/checksum pairs are returned.
560

561
  If the I{nodelist} key is present, we check that we have
562
  connectivity via ssh with the target nodes (and check the hostname
563
  report).
564

565
  If the I{node-net-test} key is present, we check that we have
566
  connectivity to the given nodes via both primary IP and, if
567
  applicable, secondary IPs.
568

569
  @type what: C{dict}
570
  @param what: a dictionary of things to check:
571
      - filelist: list of files for which to compute checksums
572
      - nodelist: list of nodes we should check ssh communication with
573
      - node-net-test: list of nodes we should check node daemon port
574
        connectivity with
575
      - hypervisor: list with hypervisors to run the verify for
576
  @rtype: dict
577
  @return: a dictionary with the same keys as the input dict, and
578
      values representing the result of the checks
579

580
  """
581
  result = {}
582
  my_name = netutils.Hostname.GetSysName()
583
  port = netutils.GetDaemonPort(constants.NODED)
584
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
585

    
586
  if constants.NV_HYPERVISOR in what and vm_capable:
587
    result[constants.NV_HYPERVISOR] = tmp = {}
588
    for hv_name in what[constants.NV_HYPERVISOR]:
589
      try:
590
        val = hypervisor.GetHypervisor(hv_name).Verify()
591
      except errors.HypervisorError, err:
592
        val = "Error while checking hypervisor: %s" % str(err)
593
      tmp[hv_name] = val
594

    
595
  if constants.NV_HVPARAMS in what and vm_capable:
596
    result[constants.NV_HVPARAMS] = tmp = []
597
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
598
      try:
599
        logging.info("Validating hv %s, %s", hv_name, hvparms)
600
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
601
      except errors.HypervisorError, err:
602
        tmp.append((source, hv_name, str(err)))
603

    
604
  if constants.NV_FILELIST in what:
605
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
606
      what[constants.NV_FILELIST])
607

    
608
  if constants.NV_NODELIST in what:
609
    (nodes, bynode) = what[constants.NV_NODELIST]
610

    
611
    # Add nodes from other groups (different for each node)
612
    try:
613
      nodes.extend(bynode[my_name])
614
    except KeyError:
615
      pass
616

    
617
    # Use a random order
618
    random.shuffle(nodes)
619

    
620
    # Try to contact all nodes
621
    val = {}
622
    for node in nodes:
623
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
624
      if not success:
625
        val[node] = message
626

    
627
    result[constants.NV_NODELIST] = val
628

    
629
  if constants.NV_NODENETTEST in what:
630
    result[constants.NV_NODENETTEST] = tmp = {}
631
    my_pip = my_sip = None
632
    for name, pip, sip in what[constants.NV_NODENETTEST]:
633
      if name == my_name:
634
        my_pip = pip
635
        my_sip = sip
636
        break
637
    if not my_pip:
638
      tmp[my_name] = ("Can't find my own primary/secondary IP"
639
                      " in the node list")
640
    else:
641
      for name, pip, sip in what[constants.NV_NODENETTEST]:
642
        fail = []
643
        if not netutils.TcpPing(pip, port, source=my_pip):
644
          fail.append("primary")
645
        if sip != pip:
646
          if not netutils.TcpPing(sip, port, source=my_sip):
647
            fail.append("secondary")
648
        if fail:
649
          tmp[name] = ("failure using the %s interface(s)" %
650
                       " and ".join(fail))
651

    
652
  if constants.NV_MASTERIP in what:
653
    # FIXME: add checks on incoming data structures (here and in the
654
    # rest of the function)
655
    master_name, master_ip = what[constants.NV_MASTERIP]
656
    if master_name == my_name:
657
      source = constants.IP4_ADDRESS_LOCALHOST
658
    else:
659
      source = None
660
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
661
                                                  source=source)
662

    
663
  if constants.NV_OOB_PATHS in what:
664
    result[constants.NV_OOB_PATHS] = tmp = []
665
    for path in what[constants.NV_OOB_PATHS]:
666
      try:
667
        st = os.stat(path)
668
      except OSError, err:
669
        tmp.append("error stating out of band helper: %s" % err)
670
      else:
671
        if stat.S_ISREG(st.st_mode):
672
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
673
            tmp.append(None)
674
          else:
675
            tmp.append("out of band helper %s is not executable" % path)
676
        else:
677
          tmp.append("out of band helper %s is not a file" % path)
678

    
679
  if constants.NV_LVLIST in what and vm_capable:
680
    try:
681
      val = GetVolumeList(utils.ListVolumeGroups().keys())
682
    except RPCFail, err:
683
      val = str(err)
684
    result[constants.NV_LVLIST] = val
685

    
686
  if constants.NV_INSTANCELIST in what and vm_capable:
687
    # GetInstanceList can fail
688
    try:
689
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
690
    except RPCFail, err:
691
      val = str(err)
692
    result[constants.NV_INSTANCELIST] = val
693

    
694
  if constants.NV_VGLIST in what and vm_capable:
695
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
696

    
697
  if constants.NV_PVLIST in what and vm_capable:
698
    result[constants.NV_PVLIST] = \
699
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
700
                                   filter_allocatable=False)
701

    
702
  if constants.NV_VERSION in what:
703
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
704
                                    constants.RELEASE_VERSION)
705

    
706
  if constants.NV_HVINFO in what and vm_capable:
707
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
708
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
709

    
710
  if constants.NV_DRBDLIST in what and vm_capable:
711
    try:
712
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
713
    except errors.BlockDeviceError, err:
714
      logging.warning("Can't get used minors list", exc_info=True)
715
      used_minors = str(err)
716
    result[constants.NV_DRBDLIST] = used_minors
717

    
718
  if constants.NV_DRBDHELPER in what and vm_capable:
719
    status = True
720
    try:
721
      payload = bdev.BaseDRBD.GetUsermodeHelper()
722
    except errors.BlockDeviceError, err:
723
      logging.error("Can't get DRBD usermode helper: %s", str(err))
724
      status = False
725
      payload = str(err)
726
    result[constants.NV_DRBDHELPER] = (status, payload)
727

    
728
  if constants.NV_NODESETUP in what:
729
    result[constants.NV_NODESETUP] = tmpr = []
730
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
731
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
732
                  " under /sys, missing required directories /sys/block"
733
                  " and /sys/class/net")
734
    if (not os.path.isdir("/proc/sys") or
735
        not os.path.isfile("/proc/sysrq-trigger")):
736
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
737
                  " under /proc, missing required directory /proc/sys and"
738
                  " the file /proc/sysrq-trigger")
739

    
740
  if constants.NV_TIME in what:
741
    result[constants.NV_TIME] = utils.SplitTime(time.time())
742

    
743
  if constants.NV_OSLIST in what and vm_capable:
744
    result[constants.NV_OSLIST] = DiagnoseOS()
745

    
746
  if constants.NV_BRIDGES in what and vm_capable:
747
    result[constants.NV_BRIDGES] = [bridge
748
                                    for bridge in what[constants.NV_BRIDGES]
749
                                    if not utils.BridgeExists(bridge)]
750
  return result
751

    
752

    
753
def GetBlockDevSizes(devices):
754
  """Return the size of the given block devices
755

756
  @type devices: list
757
  @param devices: list of block device nodes to query
758
  @rtype: dict
759
  @return:
760
    dictionary of all block devices under /dev (key). The value is their
761
    size in MiB.
762

763
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
764

765
  """
766
  DEV_PREFIX = "/dev/"
767
  blockdevs = {}
768

    
769
  for devpath in devices:
770
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
771
      continue
772

    
773
    try:
774
      st = os.stat(devpath)
775
    except EnvironmentError, err:
776
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
777
      continue
778

    
779
    if stat.S_ISBLK(st.st_mode):
780
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
781
      if result.failed:
782
        # We don't want to fail, just do not list this device as available
783
        logging.warning("Cannot get size for block device %s", devpath)
784
        continue
785

    
786
      size = int(result.stdout) / (1024 * 1024)
787
      blockdevs[devpath] = size
788
  return blockdevs
789

    
790

    
791
def GetVolumeList(vg_names):
792
  """Compute list of logical volumes and their size.
793

794
  @type vg_names: list
795
  @param vg_names: the volume groups whose LVs we should list, or
796
      empty for all volume groups
797
  @rtype: dict
798
  @return:
799
      dictionary of all partions (key) with value being a tuple of
800
      their size (in MiB), inactive and online status::
801

802
        {'xenvg/test1': ('20.06', True, True)}
803

804
      in case of errors, a string is returned with the error
805
      details.
806

807
  """
808
  lvs = {}
809
  sep = "|"
810
  if not vg_names:
811
    vg_names = []
812
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
813
                         "--separator=%s" % sep,
814
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
815
  if result.failed:
816
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
817

    
818
  for line in result.stdout.splitlines():
819
    line = line.strip()
820
    match = _LVSLINE_REGEX.match(line)
821
    if not match:
822
      logging.error("Invalid line returned from lvs output: '%s'", line)
823
      continue
824
    vg_name, name, size, attr = match.groups()
825
    inactive = attr[4] == "-"
826
    online = attr[5] == "o"
827
    virtual = attr[0] == "v"
828
    if virtual:
829
      # we don't want to report such volumes as existing, since they
830
      # don't really hold data
831
      continue
832
    lvs[vg_name + "/" + name] = (size, inactive, online)
833

    
834
  return lvs
835

    
836

    
837
def ListVolumeGroups():
838
  """List the volume groups and their size.
839

840
  @rtype: dict
841
  @return: dictionary with keys volume name and values the
842
      size of the volume
843

844
  """
845
  return utils.ListVolumeGroups()
846

    
847

    
848
def NodeVolumes():
849
  """List all volumes on this node.
850

851
  @rtype: list
852
  @return:
853
    A list of dictionaries, each having four keys:
854
      - name: the logical volume name,
855
      - size: the size of the logical volume
856
      - dev: the physical device on which the LV lives
857
      - vg: the volume group to which it belongs
858

859
    In case of errors, we return an empty list and log the
860
    error.
861

862
    Note that since a logical volume can live on multiple physical
863
    volumes, the resulting list might include a logical volume
864
    multiple times.
865

866
  """
867
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
868
                         "--separator=|",
869
                         "--options=lv_name,lv_size,devices,vg_name"])
870
  if result.failed:
871
    _Fail("Failed to list logical volumes, lvs output: %s",
872
          result.output)
873

    
874
  def parse_dev(dev):
875
    return dev.split("(")[0]
876

    
877
  def handle_dev(dev):
878
    return [parse_dev(x) for x in dev.split(",")]
879

    
880
  def map_line(line):
881
    line = [v.strip() for v in line]
882
    return [{"name": line[0], "size": line[1],
883
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
884

    
885
  all_devs = []
886
  for line in result.stdout.splitlines():
887
    if line.count("|") >= 3:
888
      all_devs.extend(map_line(line.split("|")))
889
    else:
890
      logging.warning("Strange line in the output from lvs: '%s'", line)
891
  return all_devs
892

    
893

    
894
def BridgesExist(bridges_list):
895
  """Check if a list of bridges exist on the current node.
896

897
  @rtype: boolean
898
  @return: C{True} if all of them exist, C{False} otherwise
899

900
  """
901
  missing = []
902
  for bridge in bridges_list:
903
    if not utils.BridgeExists(bridge):
904
      missing.append(bridge)
905

    
906
  if missing:
907
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
908

    
909

    
910
def GetInstanceList(hypervisor_list):
911
  """Provides a list of instances.
912

913
  @type hypervisor_list: list
914
  @param hypervisor_list: the list of hypervisors to query information
915

916
  @rtype: list
917
  @return: a list of all running instances on the current node
918
    - instance1.example.com
919
    - instance2.example.com
920

921
  """
922
  results = []
923
  for hname in hypervisor_list:
924
    try:
925
      names = hypervisor.GetHypervisor(hname).ListInstances()
926
      results.extend(names)
927
    except errors.HypervisorError, err:
928
      _Fail("Error enumerating instances (hypervisor %s): %s",
929
            hname, err, exc=True)
930

    
931
  return results
932

    
933

    
934
def GetInstanceInfo(instance, hname):
935
  """Gives back the information about an instance as a dictionary.
936

937
  @type instance: string
938
  @param instance: the instance name
939
  @type hname: string
940
  @param hname: the hypervisor type of the instance
941

942
  @rtype: dict
943
  @return: dictionary with the following keys:
944
      - memory: memory size of instance (int)
945
      - state: xen state of instance (string)
946
      - time: cpu time of instance (float)
947

948
  """
949
  output = {}
950

    
951
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
952
  if iinfo is not None:
953
    output["memory"] = iinfo[2]
954
    output["state"] = iinfo[4]
955
    output["time"] = iinfo[5]
956

    
957
  return output
958

    
959

    
960
def GetInstanceMigratable(instance):
961
  """Gives whether an instance can be migrated.
962

963
  @type instance: L{objects.Instance}
964
  @param instance: object representing the instance to be checked.
965

966
  @rtype: tuple
967
  @return: tuple of (result, description) where:
968
      - result: whether the instance can be migrated or not
969
      - description: a description of the issue, if relevant
970

971
  """
972
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
973
  iname = instance.name
974
  if iname not in hyper.ListInstances():
975
    _Fail("Instance %s is not running", iname)
976

    
977
  for idx in range(len(instance.disks)):
978
    link_name = _GetBlockDevSymlinkPath(iname, idx)
979
    if not os.path.islink(link_name):
980
      logging.warning("Instance %s is missing symlink %s for disk %d",
981
                      iname, link_name, idx)
982

    
983

    
984
def GetAllInstancesInfo(hypervisor_list):
985
  """Gather data about all instances.
986

987
  This is the equivalent of L{GetInstanceInfo}, except that it
988
  computes data for all instances at once, thus being faster if one
989
  needs data about more than one instance.
990

991
  @type hypervisor_list: list
992
  @param hypervisor_list: list of hypervisors to query for instance data
993

994
  @rtype: dict
995
  @return: dictionary of instance: data, with data having the following keys:
996
      - memory: memory size of instance (int)
997
      - state: xen state of instance (string)
998
      - time: cpu time of instance (float)
999
      - vcpus: the number of vcpus
1000

1001
  """
1002
  output = {}
1003

    
1004
  for hname in hypervisor_list:
1005
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1006
    if iinfo:
1007
      for name, _, memory, vcpus, state, times in iinfo:
1008
        value = {
1009
          "memory": memory,
1010
          "vcpus": vcpus,
1011
          "state": state,
1012
          "time": times,
1013
          }
1014
        if name in output:
1015
          # we only check static parameters, like memory and vcpus,
1016
          # and not state and time which can change between the
1017
          # invocations of the different hypervisors
1018
          for key in "memory", "vcpus":
1019
            if value[key] != output[name][key]:
1020
              _Fail("Instance %s is running twice"
1021
                    " with different parameters", name)
1022
        output[name] = value
1023

    
1024
  return output
1025

    
1026

    
1027
def _InstanceLogName(kind, os_name, instance, component):
1028
  """Compute the OS log filename for a given instance and operation.
1029

1030
  The instance name and os name are passed in as strings since not all
1031
  operations have these as part of an instance object.
1032

1033
  @type kind: string
1034
  @param kind: the operation type (e.g. add, import, etc.)
1035
  @type os_name: string
1036
  @param os_name: the os name
1037
  @type instance: string
1038
  @param instance: the name of the instance being imported/added/etc.
1039
  @type component: string or None
1040
  @param component: the name of the component of the instance being
1041
      transferred
1042

1043
  """
1044
  # TODO: Use tempfile.mkstemp to create unique filename
1045
  if component:
1046
    assert "/" not in component
1047
    c_msg = "-%s" % component
1048
  else:
1049
    c_msg = ""
1050
  base = ("%s-%s-%s%s-%s.log" %
1051
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1052
  return utils.PathJoin(constants.LOG_OS_DIR, base)
1053

    
1054

    
1055
def InstanceOsAdd(instance, reinstall, debug):
1056
  """Add an OS to an instance.
1057

1058
  @type instance: L{objects.Instance}
1059
  @param instance: Instance whose OS is to be installed
1060
  @type reinstall: boolean
1061
  @param reinstall: whether this is an instance reinstall
1062
  @type debug: integer
1063
  @param debug: debug level, passed to the OS scripts
1064
  @rtype: None
1065

1066
  """
1067
  inst_os = OSFromDisk(instance.os)
1068

    
1069
  create_env = OSEnvironment(instance, inst_os, debug)
1070
  if reinstall:
1071
    create_env["INSTANCE_REINSTALL"] = "1"
1072

    
1073
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1074

    
1075
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1076
                        cwd=inst_os.path, output=logfile, reset_env=True)
1077
  if result.failed:
1078
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1079
                  " output: %s", result.cmd, result.fail_reason, logfile,
1080
                  result.output)
1081
    lines = [utils.SafeEncode(val)
1082
             for val in utils.TailFile(logfile, lines=20)]
1083
    _Fail("OS create script failed (%s), last lines in the"
1084
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1085

    
1086

    
1087
def RunRenameInstance(instance, old_name, debug):
1088
  """Run the OS rename script for an instance.
1089

1090
  @type instance: L{objects.Instance}
1091
  @param instance: Instance whose OS is to be installed
1092
  @type old_name: string
1093
  @param old_name: previous instance name
1094
  @type debug: integer
1095
  @param debug: debug level, passed to the OS scripts
1096
  @rtype: boolean
1097
  @return: the success of the operation
1098

1099
  """
1100
  inst_os = OSFromDisk(instance.os)
1101

    
1102
  rename_env = OSEnvironment(instance, inst_os, debug)
1103
  rename_env["OLD_INSTANCE_NAME"] = old_name
1104

    
1105
  logfile = _InstanceLogName("rename", instance.os,
1106
                             "%s-%s" % (old_name, instance.name), None)
1107

    
1108
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1109
                        cwd=inst_os.path, output=logfile, reset_env=True)
1110

    
1111
  if result.failed:
1112
    logging.error("os create command '%s' returned error: %s output: %s",
1113
                  result.cmd, result.fail_reason, result.output)
1114
    lines = [utils.SafeEncode(val)
1115
             for val in utils.TailFile(logfile, lines=20)]
1116
    _Fail("OS rename script failed (%s), last lines in the"
1117
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1118

    
1119

    
1120
def _GetBlockDevSymlinkPath(instance_name, idx):
1121
  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1122
                        (instance_name, constants.DISK_SEPARATOR, idx))
1123

    
1124

    
1125
def _SymlinkBlockDev(instance_name, device_path, idx):
1126
  """Set up symlinks to a instance's block device.
1127

1128
  This is an auxiliary function run when an instance is start (on the primary
1129
  node) or when an instance is migrated (on the target node).
1130

1131

1132
  @param instance_name: the name of the target instance
1133
  @param device_path: path of the physical block device, on the node
1134
  @param idx: the disk index
1135
  @return: absolute path to the disk's symlink
1136

1137
  """
1138
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1139
  try:
1140
    os.symlink(device_path, link_name)
1141
  except OSError, err:
1142
    if err.errno == errno.EEXIST:
1143
      if (not os.path.islink(link_name) or
1144
          os.readlink(link_name) != device_path):
1145
        os.remove(link_name)
1146
        os.symlink(device_path, link_name)
1147
    else:
1148
      raise
1149

    
1150
  return link_name
1151

    
1152

    
1153
def _RemoveBlockDevLinks(instance_name, disks):
1154
  """Remove the block device symlinks belonging to the given instance.
1155

1156
  """
1157
  for idx, _ in enumerate(disks):
1158
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1159
    if os.path.islink(link_name):
1160
      try:
1161
        os.remove(link_name)
1162
      except OSError:
1163
        logging.exception("Can't remove symlink '%s'", link_name)
1164

    
1165

    
1166
def _GatherAndLinkBlockDevs(instance):
1167
  """Set up an instance's block device(s).
1168

1169
  This is run on the primary node at instance startup. The block
1170
  devices must be already assembled.
1171

1172
  @type instance: L{objects.Instance}
1173
  @param instance: the instance whose disks we shoul assemble
1174
  @rtype: list
1175
  @return: list of (disk_object, device_path)
1176

1177
  """
1178
  block_devices = []
1179
  for idx, disk in enumerate(instance.disks):
1180
    device = _RecursiveFindBD(disk)
1181
    if device is None:
1182
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1183
                                    str(disk))
1184
    device.Open()
1185
    try:
1186
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1187
    except OSError, e:
1188
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1189
                                    e.strerror)
1190

    
1191
    block_devices.append((disk, link_name))
1192

    
1193
  return block_devices
1194

    
1195

    
1196
def StartInstance(instance, startup_paused):
1197
  """Start an instance.
1198

1199
  @type instance: L{objects.Instance}
1200
  @param instance: the instance object
1201
  @type startup_paused: bool
1202
  @param instance: pause instance at startup?
1203
  @rtype: None
1204

1205
  """
1206
  running_instances = GetInstanceList([instance.hypervisor])
1207

    
1208
  if instance.name in running_instances:
1209
    logging.info("Instance %s already running, not starting", instance.name)
1210
    return
1211

    
1212
  try:
1213
    block_devices = _GatherAndLinkBlockDevs(instance)
1214
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1215
    hyper.StartInstance(instance, block_devices, startup_paused)
1216
  except errors.BlockDeviceError, err:
1217
    _Fail("Block device error: %s", err, exc=True)
1218
  except errors.HypervisorError, err:
1219
    _RemoveBlockDevLinks(instance.name, instance.disks)
1220
    _Fail("Hypervisor error: %s", err, exc=True)
1221

    
1222

    
1223
def InstanceShutdown(instance, timeout):
1224
  """Shut an instance down.
1225

1226
  @note: this functions uses polling with a hardcoded timeout.
1227

1228
  @type instance: L{objects.Instance}
1229
  @param instance: the instance object
1230
  @type timeout: integer
1231
  @param timeout: maximum timeout for soft shutdown
1232
  @rtype: None
1233

1234
  """
1235
  hv_name = instance.hypervisor
1236
  hyper = hypervisor.GetHypervisor(hv_name)
1237
  iname = instance.name
1238

    
1239
  if instance.name not in hyper.ListInstances():
1240
    logging.info("Instance %s not running, doing nothing", iname)
1241
    return
1242

    
1243
  class _TryShutdown:
1244
    def __init__(self):
1245
      self.tried_once = False
1246

    
1247
    def __call__(self):
1248
      if iname not in hyper.ListInstances():
1249
        return
1250

    
1251
      try:
1252
        hyper.StopInstance(instance, retry=self.tried_once)
1253
      except errors.HypervisorError, err:
1254
        if iname not in hyper.ListInstances():
1255
          # if the instance is no longer existing, consider this a
1256
          # success and go to cleanup
1257
          return
1258

    
1259
        _Fail("Failed to stop instance %s: %s", iname, err)
1260

    
1261
      self.tried_once = True
1262

    
1263
      raise utils.RetryAgain()
1264

    
1265
  try:
1266
    utils.Retry(_TryShutdown(), 5, timeout)
1267
  except utils.RetryTimeout:
1268
    # the shutdown did not succeed
1269
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1270

    
1271
    try:
1272
      hyper.StopInstance(instance, force=True)
1273
    except errors.HypervisorError, err:
1274
      if iname in hyper.ListInstances():
1275
        # only raise an error if the instance still exists, otherwise
1276
        # the error could simply be "instance ... unknown"!
1277
        _Fail("Failed to force stop instance %s: %s", iname, err)
1278

    
1279
    time.sleep(1)
1280

    
1281
    if iname in hyper.ListInstances():
1282
      _Fail("Could not shutdown instance %s even by destroy", iname)
1283

    
1284
  try:
1285
    hyper.CleanupInstance(instance.name)
1286
  except errors.HypervisorError, err:
1287
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1288

    
1289
  _RemoveBlockDevLinks(iname, instance.disks)
1290

    
1291

    
1292
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1293
  """Reboot an instance.
1294

1295
  @type instance: L{objects.Instance}
1296
  @param instance: the instance object to reboot
1297
  @type reboot_type: str
1298
  @param reboot_type: the type of reboot, one the following
1299
    constants:
1300
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1301
        instance OS, do not recreate the VM
1302
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1303
        restart the VM (at the hypervisor level)
1304
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1305
        not accepted here, since that mode is handled differently, in
1306
        cmdlib, and translates into full stop and start of the
1307
        instance (instead of a call_instance_reboot RPC)
1308
  @type shutdown_timeout: integer
1309
  @param shutdown_timeout: maximum timeout for soft shutdown
1310
  @rtype: None
1311

1312
  """
1313
  running_instances = GetInstanceList([instance.hypervisor])
1314

    
1315
  if instance.name not in running_instances:
1316
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1317

    
1318
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1319
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1320
    try:
1321
      hyper.RebootInstance(instance)
1322
    except errors.HypervisorError, err:
1323
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1324
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1325
    try:
1326
      InstanceShutdown(instance, shutdown_timeout)
1327
      return StartInstance(instance, False)
1328
    except errors.HypervisorError, err:
1329
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1330
  else:
1331
    _Fail("Invalid reboot_type received: %s", reboot_type)
1332

    
1333

    
1334
def MigrationInfo(instance):
1335
  """Gather information about an instance to be migrated.
1336

1337
  @type instance: L{objects.Instance}
1338
  @param instance: the instance definition
1339

1340
  """
1341
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1342
  try:
1343
    info = hyper.MigrationInfo(instance)
1344
  except errors.HypervisorError, err:
1345
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1346
  return info
1347

    
1348

    
1349
def AcceptInstance(instance, info, target):
1350
  """Prepare the node to accept an instance.
1351

1352
  @type instance: L{objects.Instance}
1353
  @param instance: the instance definition
1354
  @type info: string/data (opaque)
1355
  @param info: migration information, from the source node
1356
  @type target: string
1357
  @param target: target host (usually ip), on this node
1358

1359
  """
1360
  # TODO: why is this required only for DTS_EXT_MIRROR?
1361
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1362
    # Create the symlinks, as the disks are not active
1363
    # in any way
1364
    try:
1365
      _GatherAndLinkBlockDevs(instance)
1366
    except errors.BlockDeviceError, err:
1367
      _Fail("Block device error: %s", err, exc=True)
1368

    
1369
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1370
  try:
1371
    hyper.AcceptInstance(instance, info, target)
1372
  except errors.HypervisorError, err:
1373
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1374
      _RemoveBlockDevLinks(instance.name, instance.disks)
1375
    _Fail("Failed to accept instance: %s", err, exc=True)
1376

    
1377

    
1378
def FinalizeMigrationDst(instance, info, success):
1379
  """Finalize any preparation to accept an instance.
1380

1381
  @type instance: L{objects.Instance}
1382
  @param instance: the instance definition
1383
  @type info: string/data (opaque)
1384
  @param info: migration information, from the source node
1385
  @type success: boolean
1386
  @param success: whether the migration was a success or a failure
1387

1388
  """
1389
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1390
  try:
1391
    hyper.FinalizeMigrationDst(instance, info, success)
1392
  except errors.HypervisorError, err:
1393
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1394

    
1395

    
1396
def MigrateInstance(instance, target, live):
1397
  """Migrates an instance to another node.
1398

1399
  @type instance: L{objects.Instance}
1400
  @param instance: the instance definition
1401
  @type target: string
1402
  @param target: the target node name
1403
  @type live: boolean
1404
  @param live: whether the migration should be done live or not (the
1405
      interpretation of this parameter is left to the hypervisor)
1406
  @raise RPCFail: if migration fails for some reason
1407

1408
  """
1409
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1410

    
1411
  try:
1412
    hyper.MigrateInstance(instance, target, live)
1413
  except errors.HypervisorError, err:
1414
    _Fail("Failed to migrate instance: %s", err, exc=True)
1415

    
1416

    
1417
def FinalizeMigrationSource(instance, success, live):
1418
  """Finalize the instance migration on the source node.
1419

1420
  @type instance: L{objects.Instance}
1421
  @param instance: the instance definition of the migrated instance
1422
  @type success: bool
1423
  @param success: whether the migration succeeded or not
1424
  @type live: bool
1425
  @param live: whether the user requested a live migration or not
1426
  @raise RPCFail: If the execution fails for some reason
1427

1428
  """
1429
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1430

    
1431
  try:
1432
    hyper.FinalizeMigrationSource(instance, success, live)
1433
  except Exception, err:  # pylint: disable=W0703
1434
    _Fail("Failed to finalize the migration on the source node: %s", err,
1435
          exc=True)
1436

    
1437

    
1438
def GetMigrationStatus(instance):
1439
  """Get the migration status
1440

1441
  @type instance: L{objects.Instance}
1442
  @param instance: the instance that is being migrated
1443
  @rtype: L{objects.MigrationStatus}
1444
  @return: the status of the current migration (one of
1445
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1446
           progress info that can be retrieved from the hypervisor
1447
  @raise RPCFail: If the migration status cannot be retrieved
1448

1449
  """
1450
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1451
  try:
1452
    return hyper.GetMigrationStatus(instance)
1453
  except Exception, err:  # pylint: disable=W0703
1454
    _Fail("Failed to get migration status: %s", err, exc=True)
1455

    
1456

    
1457
def BlockdevCreate(disk, size, owner, on_primary, info):
1458
  """Creates a block device for an instance.
1459

1460
  @type disk: L{objects.Disk}
1461
  @param disk: the object describing the disk we should create
1462
  @type size: int
1463
  @param size: the size of the physical underlying device, in MiB
1464
  @type owner: str
1465
  @param owner: the name of the instance for which disk is created,
1466
      used for device cache data
1467
  @type on_primary: boolean
1468
  @param on_primary:  indicates if it is the primary node or not
1469
  @type info: string
1470
  @param info: string that will be sent to the physical device
1471
      creation, used for example to set (LVM) tags on LVs
1472

1473
  @return: the new unique_id of the device (this can sometime be
1474
      computed only after creation), or None. On secondary nodes,
1475
      it's not required to return anything.
1476

1477
  """
1478
  # TODO: remove the obsolete "size" argument
1479
  # pylint: disable=W0613
1480
  clist = []
1481
  if disk.children:
1482
    for child in disk.children:
1483
      try:
1484
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1485
      except errors.BlockDeviceError, err:
1486
        _Fail("Can't assemble device %s: %s", child, err)
1487
      if on_primary or disk.AssembleOnSecondary():
1488
        # we need the children open in case the device itself has to
1489
        # be assembled
1490
        try:
1491
          # pylint: disable=E1103
1492
          crdev.Open()
1493
        except errors.BlockDeviceError, err:
1494
          _Fail("Can't make child '%s' read-write: %s", child, err)
1495
      clist.append(crdev)
1496

    
1497
  try:
1498
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1499
  except errors.BlockDeviceError, err:
1500
    _Fail("Can't create block device: %s", err)
1501

    
1502
  if on_primary or disk.AssembleOnSecondary():
1503
    try:
1504
      device.Assemble()
1505
    except errors.BlockDeviceError, err:
1506
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1507
    device.SetSyncSpeed(constants.SYNC_SPEED)
1508
    if on_primary or disk.OpenOnSecondary():
1509
      try:
1510
        device.Open(force=True)
1511
      except errors.BlockDeviceError, err:
1512
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1513
    DevCacheManager.UpdateCache(device.dev_path, owner,
1514
                                on_primary, disk.iv_name)
1515

    
1516
  device.SetInfo(info)
1517

    
1518
  return device.unique_id
1519

    
1520

    
1521
def _WipeDevice(path, offset, size):
1522
  """This function actually wipes the device.
1523

1524
  @param path: The path to the device to wipe
1525
  @param offset: The offset in MiB in the file
1526
  @param size: The size in MiB to write
1527

1528
  """
1529
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1530
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1531
         "count=%d" % size]
1532
  result = utils.RunCmd(cmd)
1533

    
1534
  if result.failed:
1535
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1536
          result.fail_reason, result.output)
1537

    
1538

    
1539
def BlockdevWipe(disk, offset, size):
1540
  """Wipes a block device.
1541

1542
  @type disk: L{objects.Disk}
1543
  @param disk: the disk object we want to wipe
1544
  @type offset: int
1545
  @param offset: The offset in MiB in the file
1546
  @type size: int
1547
  @param size: The size in MiB to write
1548

1549
  """
1550
  try:
1551
    rdev = _RecursiveFindBD(disk)
1552
  except errors.BlockDeviceError:
1553
    rdev = None
1554

    
1555
  if not rdev:
1556
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1557

    
1558
  # Do cross verify some of the parameters
1559
  if offset > rdev.size:
1560
    _Fail("Offset is bigger than device size")
1561
  if (offset + size) > rdev.size:
1562
    _Fail("The provided offset and size to wipe is bigger than device size")
1563

    
1564
  _WipeDevice(rdev.dev_path, offset, size)
1565

    
1566

    
1567
def BlockdevPauseResumeSync(disks, pause):
1568
  """Pause or resume the sync of the block device.
1569

1570
  @type disks: list of L{objects.Disk}
1571
  @param disks: the disks object we want to pause/resume
1572
  @type pause: bool
1573
  @param pause: Wheater to pause or resume
1574

1575
  """
1576
  success = []
1577
  for disk in disks:
1578
    try:
1579
      rdev = _RecursiveFindBD(disk)
1580
    except errors.BlockDeviceError:
1581
      rdev = None
1582

    
1583
    if not rdev:
1584
      success.append((False, ("Cannot change sync for device %s:"
1585
                              " device not found" % disk.iv_name)))
1586
      continue
1587

    
1588
    result = rdev.PauseResumeSync(pause)
1589

    
1590
    if result:
1591
      success.append((result, None))
1592
    else:
1593
      if pause:
1594
        msg = "Pause"
1595
      else:
1596
        msg = "Resume"
1597
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1598

    
1599
  return success
1600

    
1601

    
1602
def BlockdevRemove(disk):
1603
  """Remove a block device.
1604

1605
  @note: This is intended to be called recursively.
1606

1607
  @type disk: L{objects.Disk}
1608
  @param disk: the disk object we should remove
1609
  @rtype: boolean
1610
  @return: the success of the operation
1611

1612
  """
1613
  msgs = []
1614
  try:
1615
    rdev = _RecursiveFindBD(disk)
1616
  except errors.BlockDeviceError, err:
1617
    # probably can't attach
1618
    logging.info("Can't attach to device %s in remove", disk)
1619
    rdev = None
1620
  if rdev is not None:
1621
    r_path = rdev.dev_path
1622
    try:
1623
      rdev.Remove()
1624
    except errors.BlockDeviceError, err:
1625
      msgs.append(str(err))
1626
    if not msgs:
1627
      DevCacheManager.RemoveCache(r_path)
1628

    
1629
  if disk.children:
1630
    for child in disk.children:
1631
      try:
1632
        BlockdevRemove(child)
1633
      except RPCFail, err:
1634
        msgs.append(str(err))
1635

    
1636
  if msgs:
1637
    _Fail("; ".join(msgs))
1638

    
1639

    
1640
def _RecursiveAssembleBD(disk, owner, as_primary):
1641
  """Activate a block device for an instance.
1642

1643
  This is run on the primary and secondary nodes for an instance.
1644

1645
  @note: this function is called recursively.
1646

1647
  @type disk: L{objects.Disk}
1648
  @param disk: the disk we try to assemble
1649
  @type owner: str
1650
  @param owner: the name of the instance which owns the disk
1651
  @type as_primary: boolean
1652
  @param as_primary: if we should make the block device
1653
      read/write
1654

1655
  @return: the assembled device or None (in case no device
1656
      was assembled)
1657
  @raise errors.BlockDeviceError: in case there is an error
1658
      during the activation of the children or the device
1659
      itself
1660

1661
  """
1662
  children = []
1663
  if disk.children:
1664
    mcn = disk.ChildrenNeeded()
1665
    if mcn == -1:
1666
      mcn = 0 # max number of Nones allowed
1667
    else:
1668
      mcn = len(disk.children) - mcn # max number of Nones
1669
    for chld_disk in disk.children:
1670
      try:
1671
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1672
      except errors.BlockDeviceError, err:
1673
        if children.count(None) >= mcn:
1674
          raise
1675
        cdev = None
1676
        logging.error("Error in child activation (but continuing): %s",
1677
                      str(err))
1678
      children.append(cdev)
1679

    
1680
  if as_primary or disk.AssembleOnSecondary():
1681
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1682
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1683
    result = r_dev
1684
    if as_primary or disk.OpenOnSecondary():
1685
      r_dev.Open()
1686
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1687
                                as_primary, disk.iv_name)
1688

    
1689
  else:
1690
    result = True
1691
  return result
1692

    
1693

    
1694
def BlockdevAssemble(disk, owner, as_primary, idx):
1695
  """Activate a block device for an instance.
1696

1697
  This is a wrapper over _RecursiveAssembleBD.
1698

1699
  @rtype: str or boolean
1700
  @return: a C{/dev/...} path for primary nodes, and
1701
      C{True} for secondary nodes
1702

1703
  """
1704
  try:
1705
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1706
    if isinstance(result, bdev.BlockDev):
1707
      # pylint: disable=E1103
1708
      result = result.dev_path
1709
      if as_primary:
1710
        _SymlinkBlockDev(owner, result, idx)
1711
  except errors.BlockDeviceError, err:
1712
    _Fail("Error while assembling disk: %s", err, exc=True)
1713
  except OSError, err:
1714
    _Fail("Error while symlinking disk: %s", err, exc=True)
1715

    
1716
  return result
1717

    
1718

    
1719
def BlockdevShutdown(disk):
1720
  """Shut down a block device.
1721

1722
  First, if the device is assembled (Attach() is successful), then
1723
  the device is shutdown. Then the children of the device are
1724
  shutdown.
1725

1726
  This function is called recursively. Note that we don't cache the
1727
  children or such, as oppossed to assemble, shutdown of different
1728
  devices doesn't require that the upper device was active.
1729

1730
  @type disk: L{objects.Disk}
1731
  @param disk: the description of the disk we should
1732
      shutdown
1733
  @rtype: None
1734

1735
  """
1736
  msgs = []
1737
  r_dev = _RecursiveFindBD(disk)
1738
  if r_dev is not None:
1739
    r_path = r_dev.dev_path
1740
    try:
1741
      r_dev.Shutdown()
1742
      DevCacheManager.RemoveCache(r_path)
1743
    except errors.BlockDeviceError, err:
1744
      msgs.append(str(err))
1745

    
1746
  if disk.children:
1747
    for child in disk.children:
1748
      try:
1749
        BlockdevShutdown(child)
1750
      except RPCFail, err:
1751
        msgs.append(str(err))
1752

    
1753
  if msgs:
1754
    _Fail("; ".join(msgs))
1755

    
1756

    
1757
def BlockdevAddchildren(parent_cdev, new_cdevs):
1758
  """Extend a mirrored block device.
1759

1760
  @type parent_cdev: L{objects.Disk}
1761
  @param parent_cdev: the disk to which we should add children
1762
  @type new_cdevs: list of L{objects.Disk}
1763
  @param new_cdevs: the list of children which we should add
1764
  @rtype: None
1765

1766
  """
1767
  parent_bdev = _RecursiveFindBD(parent_cdev)
1768
  if parent_bdev is None:
1769
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1770
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1771
  if new_bdevs.count(None) > 0:
1772
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1773
  parent_bdev.AddChildren(new_bdevs)
1774

    
1775

    
1776
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1777
  """Shrink a mirrored block device.
1778

1779
  @type parent_cdev: L{objects.Disk}
1780
  @param parent_cdev: the disk from which we should remove children
1781
  @type new_cdevs: list of L{objects.Disk}
1782
  @param new_cdevs: the list of children which we should remove
1783
  @rtype: None
1784

1785
  """
1786
  parent_bdev = _RecursiveFindBD(parent_cdev)
1787
  if parent_bdev is None:
1788
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1789
  devs = []
1790
  for disk in new_cdevs:
1791
    rpath = disk.StaticDevPath()
1792
    if rpath is None:
1793
      bd = _RecursiveFindBD(disk)
1794
      if bd is None:
1795
        _Fail("Can't find device %s while removing children", disk)
1796
      else:
1797
        devs.append(bd.dev_path)
1798
    else:
1799
      if not utils.IsNormAbsPath(rpath):
1800
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1801
      devs.append(rpath)
1802
  parent_bdev.RemoveChildren(devs)
1803

    
1804

    
1805
def BlockdevGetmirrorstatus(disks):
1806
  """Get the mirroring status of a list of devices.
1807

1808
  @type disks: list of L{objects.Disk}
1809
  @param disks: the list of disks which we should query
1810
  @rtype: disk
1811
  @return: List of L{objects.BlockDevStatus}, one for each disk
1812
  @raise errors.BlockDeviceError: if any of the disks cannot be
1813
      found
1814

1815
  """
1816
  stats = []
1817
  for dsk in disks:
1818
    rbd = _RecursiveFindBD(dsk)
1819
    if rbd is None:
1820
      _Fail("Can't find device %s", dsk)
1821

    
1822
    stats.append(rbd.CombinedSyncStatus())
1823

    
1824
  return stats
1825

    
1826

    
1827
def BlockdevGetmirrorstatusMulti(disks):
1828
  """Get the mirroring status of a list of devices.
1829

1830
  @type disks: list of L{objects.Disk}
1831
  @param disks: the list of disks which we should query
1832
  @rtype: disk
1833
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1834
    success/failure, status is L{objects.BlockDevStatus} on success, string
1835
    otherwise
1836

1837
  """
1838
  result = []
1839
  for disk in disks:
1840
    try:
1841
      rbd = _RecursiveFindBD(disk)
1842
      if rbd is None:
1843
        result.append((False, "Can't find device %s" % disk))
1844
        continue
1845

    
1846
      status = rbd.CombinedSyncStatus()
1847
    except errors.BlockDeviceError, err:
1848
      logging.exception("Error while getting disk status")
1849
      result.append((False, str(err)))
1850
    else:
1851
      result.append((True, status))
1852

    
1853
  assert len(disks) == len(result)
1854

    
1855
  return result
1856

    
1857

    
1858
def _RecursiveFindBD(disk):
1859
  """Check if a device is activated.
1860

1861
  If so, return information about the real device.
1862

1863
  @type disk: L{objects.Disk}
1864
  @param disk: the disk object we need to find
1865

1866
  @return: None if the device can't be found,
1867
      otherwise the device instance
1868

1869
  """
1870
  children = []
1871
  if disk.children:
1872
    for chdisk in disk.children:
1873
      children.append(_RecursiveFindBD(chdisk))
1874

    
1875
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1876

    
1877

    
1878
def _OpenRealBD(disk):
1879
  """Opens the underlying block device of a disk.
1880

1881
  @type disk: L{objects.Disk}
1882
  @param disk: the disk object we want to open
1883

1884
  """
1885
  real_disk = _RecursiveFindBD(disk)
1886
  if real_disk is None:
1887
    _Fail("Block device '%s' is not set up", disk)
1888

    
1889
  real_disk.Open()
1890

    
1891
  return real_disk
1892

    
1893

    
1894
def BlockdevFind(disk):
1895
  """Check if a device is activated.
1896

1897
  If it is, return information about the real device.
1898

1899
  @type disk: L{objects.Disk}
1900
  @param disk: the disk to find
1901
  @rtype: None or objects.BlockDevStatus
1902
  @return: None if the disk cannot be found, otherwise a the current
1903
           information
1904

1905
  """
1906
  try:
1907
    rbd = _RecursiveFindBD(disk)
1908
  except errors.BlockDeviceError, err:
1909
    _Fail("Failed to find device: %s", err, exc=True)
1910

    
1911
  if rbd is None:
1912
    return None
1913

    
1914
  return rbd.GetSyncStatus()
1915

    
1916

    
1917
def BlockdevGetsize(disks):
1918
  """Computes the size of the given disks.
1919

1920
  If a disk is not found, returns None instead.
1921

1922
  @type disks: list of L{objects.Disk}
1923
  @param disks: the list of disk to compute the size for
1924
  @rtype: list
1925
  @return: list with elements None if the disk cannot be found,
1926
      otherwise the size
1927

1928
  """
1929
  result = []
1930
  for cf in disks:
1931
    try:
1932
      rbd = _RecursiveFindBD(cf)
1933
    except errors.BlockDeviceError:
1934
      result.append(None)
1935
      continue
1936
    if rbd is None:
1937
      result.append(None)
1938
    else:
1939
      result.append(rbd.GetActualSize())
1940
  return result
1941

    
1942

    
1943
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1944
  """Export a block device to a remote node.
1945

1946
  @type disk: L{objects.Disk}
1947
  @param disk: the description of the disk to export
1948
  @type dest_node: str
1949
  @param dest_node: the destination node to export to
1950
  @type dest_path: str
1951
  @param dest_path: the destination path on the target node
1952
  @type cluster_name: str
1953
  @param cluster_name: the cluster name, needed for SSH hostalias
1954
  @rtype: None
1955

1956
  """
1957
  real_disk = _OpenRealBD(disk)
1958

    
1959
  # the block size on the read dd is 1MiB to match our units
1960
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1961
                               "dd if=%s bs=1048576 count=%s",
1962
                               real_disk.dev_path, str(disk.size))
1963

    
1964
  # we set here a smaller block size as, due to ssh buffering, more
1965
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1966
  # device is not already there or we pass a wrong path; we use
1967
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1968
  # to not buffer too much memory; this means that at best, we flush
1969
  # every 64k, which will not be very fast
1970
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1971
                                " oflag=dsync", dest_path)
1972

    
1973
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1974
                                                   constants.GANETI_RUNAS,
1975
                                                   destcmd)
1976

    
1977
  # all commands have been checked, so we're safe to combine them
1978
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1979

    
1980
  result = utils.RunCmd(["bash", "-c", command])
1981

    
1982
  if result.failed:
1983
    _Fail("Disk copy command '%s' returned error: %s"
1984
          " output: %s", command, result.fail_reason, result.output)
1985

    
1986

    
1987
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1988
  """Write a file to the filesystem.
1989

1990
  This allows the master to overwrite(!) a file. It will only perform
1991
  the operation if the file belongs to a list of configuration files.
1992

1993
  @type file_name: str
1994
  @param file_name: the target file name
1995
  @type data: str
1996
  @param data: the new contents of the file
1997
  @type mode: int
1998
  @param mode: the mode to give the file (can be None)
1999
  @type uid: string
2000
  @param uid: the owner of the file
2001
  @type gid: string
2002
  @param gid: the group of the file
2003
  @type atime: float
2004
  @param atime: the atime to set on the file (can be None)
2005
  @type mtime: float
2006
  @param mtime: the mtime to set on the file (can be None)
2007
  @rtype: None
2008

2009
  """
2010
  if not os.path.isabs(file_name):
2011
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2012

    
2013
  if file_name not in _ALLOWED_UPLOAD_FILES:
2014
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2015
          file_name)
2016

    
2017
  raw_data = _Decompress(data)
2018

    
2019
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2020
    _Fail("Invalid username/groupname type")
2021

    
2022
  getents = runtime.GetEnts()
2023
  uid = getents.LookupUser(uid)
2024
  gid = getents.LookupGroup(gid)
2025

    
2026
  utils.SafeWriteFile(file_name, None,
2027
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2028
                      atime=atime, mtime=mtime)
2029

    
2030

    
2031
def RunOob(oob_program, command, node, timeout):
2032
  """Executes oob_program with given command on given node.
2033

2034
  @param oob_program: The path to the executable oob_program
2035
  @param command: The command to invoke on oob_program
2036
  @param node: The node given as an argument to the program
2037
  @param timeout: Timeout after which we kill the oob program
2038

2039
  @return: stdout
2040
  @raise RPCFail: If execution fails for some reason
2041

2042
  """
2043
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2044

    
2045
  if result.failed:
2046
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2047
          result.fail_reason, result.output)
2048

    
2049
  return result.stdout
2050

    
2051

    
2052
def WriteSsconfFiles(values):
2053
  """Update all ssconf files.
2054

2055
  Wrapper around the SimpleStore.WriteFiles.
2056

2057
  """
2058
  ssconf.SimpleStore().WriteFiles(values)
2059

    
2060

    
2061
def _ErrnoOrStr(err):
2062
  """Format an EnvironmentError exception.
2063

2064
  If the L{err} argument has an errno attribute, it will be looked up
2065
  and converted into a textual C{E...} description. Otherwise the
2066
  string representation of the error will be returned.
2067

2068
  @type err: L{EnvironmentError}
2069
  @param err: the exception to format
2070

2071
  """
2072
  if hasattr(err, "errno"):
2073
    detail = errno.errorcode[err.errno]
2074
  else:
2075
    detail = str(err)
2076
  return detail
2077

    
2078

    
2079
def _OSOndiskAPIVersion(os_dir):
2080
  """Compute and return the API version of a given OS.
2081

2082
  This function will try to read the API version of the OS residing in
2083
  the 'os_dir' directory.
2084

2085
  @type os_dir: str
2086
  @param os_dir: the directory in which we should look for the OS
2087
  @rtype: tuple
2088
  @return: tuple (status, data) with status denoting the validity and
2089
      data holding either the vaid versions or an error message
2090

2091
  """
2092
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2093

    
2094
  try:
2095
    st = os.stat(api_file)
2096
  except EnvironmentError, err:
2097
    return False, ("Required file '%s' not found under path %s: %s" %
2098
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2099

    
2100
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2101
    return False, ("File '%s' in %s is not a regular file" %
2102
                   (constants.OS_API_FILE, os_dir))
2103

    
2104
  try:
2105
    api_versions = utils.ReadFile(api_file).splitlines()
2106
  except EnvironmentError, err:
2107
    return False, ("Error while reading the API version file at %s: %s" %
2108
                   (api_file, _ErrnoOrStr(err)))
2109

    
2110
  try:
2111
    api_versions = [int(version.strip()) for version in api_versions]
2112
  except (TypeError, ValueError), err:
2113
    return False, ("API version(s) can't be converted to integer: %s" %
2114
                   str(err))
2115

    
2116
  return True, api_versions
2117

    
2118

    
2119
def DiagnoseOS(top_dirs=None):
2120
  """Compute the validity for all OSes.
2121

2122
  @type top_dirs: list
2123
  @param top_dirs: the list of directories in which to
2124
      search (if not given defaults to
2125
      L{constants.OS_SEARCH_PATH})
2126
  @rtype: list of L{objects.OS}
2127
  @return: a list of tuples (name, path, status, diagnose, variants,
2128
      parameters, api_version) for all (potential) OSes under all
2129
      search paths, where:
2130
          - name is the (potential) OS name
2131
          - path is the full path to the OS
2132
          - status True/False is the validity of the OS
2133
          - diagnose is the error message for an invalid OS, otherwise empty
2134
          - variants is a list of supported OS variants, if any
2135
          - parameters is a list of (name, help) parameters, if any
2136
          - api_version is a list of support OS API versions
2137

2138
  """
2139
  if top_dirs is None:
2140
    top_dirs = constants.OS_SEARCH_PATH
2141

    
2142
  result = []
2143
  for dir_name in top_dirs:
2144
    if os.path.isdir(dir_name):
2145
      try:
2146
        f_names = utils.ListVisibleFiles(dir_name)
2147
      except EnvironmentError, err:
2148
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2149
        break
2150
      for name in f_names:
2151
        os_path = utils.PathJoin(dir_name, name)
2152
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2153
        if status:
2154
          diagnose = ""
2155
          variants = os_inst.supported_variants
2156
          parameters = os_inst.supported_parameters
2157
          api_versions = os_inst.api_versions
2158
        else:
2159
          diagnose = os_inst
2160
          variants = parameters = api_versions = []
2161
        result.append((name, os_path, status, diagnose, variants,
2162
                       parameters, api_versions))
2163

    
2164
  return result
2165

    
2166

    
2167
def _TryOSFromDisk(name, base_dir=None):
2168
  """Create an OS instance from disk.
2169

2170
  This function will return an OS instance if the given name is a
2171
  valid OS name.
2172

2173
  @type base_dir: string
2174
  @keyword base_dir: Base directory containing OS installations.
2175
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2176
  @rtype: tuple
2177
  @return: success and either the OS instance if we find a valid one,
2178
      or error message
2179

2180
  """
2181
  if base_dir is None:
2182
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2183
  else:
2184
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2185

    
2186
  if os_dir is None:
2187
    return False, "Directory for OS %s not found in search path" % name
2188

    
2189
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2190
  if not status:
2191
    # push the error up
2192
    return status, api_versions
2193

    
2194
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2195
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2196
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2197

    
2198
  # OS Files dictionary, we will populate it with the absolute path
2199
  # names; if the value is True, then it is a required file, otherwise
2200
  # an optional one
2201
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2202

    
2203
  if max(api_versions) >= constants.OS_API_V15:
2204
    os_files[constants.OS_VARIANTS_FILE] = False
2205

    
2206
  if max(api_versions) >= constants.OS_API_V20:
2207
    os_files[constants.OS_PARAMETERS_FILE] = True
2208
  else:
2209
    del os_files[constants.OS_SCRIPT_VERIFY]
2210

    
2211
  for (filename, required) in os_files.items():
2212
    os_files[filename] = utils.PathJoin(os_dir, filename)
2213

    
2214
    try:
2215
      st = os.stat(os_files[filename])
2216
    except EnvironmentError, err:
2217
      if err.errno == errno.ENOENT and not required:
2218
        del os_files[filename]
2219
        continue
2220
      return False, ("File '%s' under path '%s' is missing (%s)" %
2221
                     (filename, os_dir, _ErrnoOrStr(err)))
2222

    
2223
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2224
      return False, ("File '%s' under path '%s' is not a regular file" %
2225
                     (filename, os_dir))
2226

    
2227
    if filename in constants.OS_SCRIPTS:
2228
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2229
        return False, ("File '%s' under path '%s' is not executable" %
2230
                       (filename, os_dir))
2231

    
2232
  variants = []
2233
  if constants.OS_VARIANTS_FILE in os_files:
2234
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2235
    try:
2236
      variants = utils.ReadFile(variants_file).splitlines()
2237
    except EnvironmentError, err:
2238
      # we accept missing files, but not other errors
2239
      if err.errno != errno.ENOENT:
2240
        return False, ("Error while reading the OS variants file at %s: %s" %
2241
                       (variants_file, _ErrnoOrStr(err)))
2242

    
2243
  parameters = []
2244
  if constants.OS_PARAMETERS_FILE in os_files:
2245
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2246
    try:
2247
      parameters = utils.ReadFile(parameters_file).splitlines()
2248
    except EnvironmentError, err:
2249
      return False, ("Error while reading the OS parameters file at %s: %s" %
2250
                     (parameters_file, _ErrnoOrStr(err)))
2251
    parameters = [v.split(None, 1) for v in parameters]
2252

    
2253
  os_obj = objects.OS(name=name, path=os_dir,
2254
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2255
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2256
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2257
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2258
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2259
                                                 None),
2260
                      supported_variants=variants,
2261
                      supported_parameters=parameters,
2262
                      api_versions=api_versions)
2263
  return True, os_obj
2264

    
2265

    
2266
def OSFromDisk(name, base_dir=None):
2267
  """Create an OS instance from disk.
2268

2269
  This function will return an OS instance if the given name is a
2270
  valid OS name. Otherwise, it will raise an appropriate
2271
  L{RPCFail} exception, detailing why this is not a valid OS.
2272

2273
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2274
  an exception but returns true/false status data.
2275

2276
  @type base_dir: string
2277
  @keyword base_dir: Base directory containing OS installations.
2278
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2279
  @rtype: L{objects.OS}
2280
  @return: the OS instance if we find a valid one
2281
  @raise RPCFail: if we don't find a valid OS
2282

2283
  """
2284
  name_only = objects.OS.GetName(name)
2285
  status, payload = _TryOSFromDisk(name_only, base_dir)
2286

    
2287
  if not status:
2288
    _Fail(payload)
2289

    
2290
  return payload
2291

    
2292

    
2293
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2294
  """Calculate the basic environment for an os script.
2295

2296
  @type os_name: str
2297
  @param os_name: full operating system name (including variant)
2298
  @type inst_os: L{objects.OS}
2299
  @param inst_os: operating system for which the environment is being built
2300
  @type os_params: dict
2301
  @param os_params: the OS parameters
2302
  @type debug: integer
2303
  @param debug: debug level (0 or 1, for OS Api 10)
2304
  @rtype: dict
2305
  @return: dict of environment variables
2306
  @raise errors.BlockDeviceError: if the block device
2307
      cannot be found
2308

2309
  """
2310
  result = {}
2311
  api_version = \
2312
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2313
  result["OS_API_VERSION"] = "%d" % api_version
2314
  result["OS_NAME"] = inst_os.name
2315
  result["DEBUG_LEVEL"] = "%d" % debug
2316

    
2317
  # OS variants
2318
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2319
    variant = objects.OS.GetVariant(os_name)
2320
    if not variant:
2321
      variant = inst_os.supported_variants[0]
2322
  else:
2323
    variant = ""
2324
  result["OS_VARIANT"] = variant
2325

    
2326
  # OS params
2327
  for pname, pvalue in os_params.items():
2328
    result["OSP_%s" % pname.upper()] = pvalue
2329

    
2330
  return result
2331

    
2332

    
2333
def OSEnvironment(instance, inst_os, debug=0):
2334
  """Calculate the environment for an os script.
2335

2336
  @type instance: L{objects.Instance}
2337
  @param instance: target instance for the os script run
2338
  @type inst_os: L{objects.OS}
2339
  @param inst_os: operating system for which the environment is being built
2340
  @type debug: integer
2341
  @param debug: debug level (0 or 1, for OS Api 10)
2342
  @rtype: dict
2343
  @return: dict of environment variables
2344
  @raise errors.BlockDeviceError: if the block device
2345
      cannot be found
2346

2347
  """
2348
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2349

    
2350
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2351
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2352

    
2353
  result["HYPERVISOR"] = instance.hypervisor
2354
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2355
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2356
  result["INSTANCE_SECONDARY_NODES"] = \
2357
      ("%s" % " ".join(instance.secondary_nodes))
2358

    
2359
  # Disks
2360
  for idx, disk in enumerate(instance.disks):
2361
    real_disk = _OpenRealBD(disk)
2362
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2363
    result["DISK_%d_ACCESS" % idx] = disk.mode
2364
    if constants.HV_DISK_TYPE in instance.hvparams:
2365
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2366
        instance.hvparams[constants.HV_DISK_TYPE]
2367
    if disk.dev_type in constants.LDS_BLOCK:
2368
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2369
    elif disk.dev_type == constants.LD_FILE:
2370
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2371
        "file:%s" % disk.physical_id[0]
2372

    
2373
  # NICs
2374
  for idx, nic in enumerate(instance.nics):
2375
    result["NIC_%d_MAC" % idx] = nic.mac
2376
    if nic.ip:
2377
      result["NIC_%d_IP" % idx] = nic.ip
2378
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2379
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2380
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2381
    if nic.nicparams[constants.NIC_LINK]:
2382
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2383
    if constants.HV_NIC_TYPE in instance.hvparams:
2384
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2385
        instance.hvparams[constants.HV_NIC_TYPE]
2386

    
2387
  # HV/BE params
2388
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2389
    for key, value in source.items():
2390
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2391

    
2392
  return result
2393

    
2394

    
2395
def BlockdevGrow(disk, amount, dryrun):
2396
  """Grow a stack of block devices.
2397

2398
  This function is called recursively, with the childrens being the
2399
  first ones to resize.
2400

2401
  @type disk: L{objects.Disk}
2402
  @param disk: the disk to be grown
2403
  @type amount: integer
2404
  @param amount: the amount (in mebibytes) to grow with
2405
  @type dryrun: boolean
2406
  @param dryrun: whether to execute the operation in simulation mode
2407
      only, without actually increasing the size
2408
  @rtype: (status, result)
2409
  @return: a tuple with the status of the operation (True/False), and
2410
      the errors message if status is False
2411

2412
  """
2413
  r_dev = _RecursiveFindBD(disk)
2414
  if r_dev is None:
2415
    _Fail("Cannot find block device %s", disk)
2416

    
2417
  try:
2418
    r_dev.Grow(amount, dryrun)
2419
  except errors.BlockDeviceError, err:
2420
    _Fail("Failed to grow block device: %s", err, exc=True)
2421

    
2422

    
2423
def BlockdevSnapshot(disk):
2424
  """Create a snapshot copy of a block device.
2425

2426
  This function is called recursively, and the snapshot is actually created
2427
  just for the leaf lvm backend device.
2428

2429
  @type disk: L{objects.Disk}
2430
  @param disk: the disk to be snapshotted
2431
  @rtype: string
2432
  @return: snapshot disk ID as (vg, lv)
2433

2434
  """
2435
  if disk.dev_type == constants.LD_DRBD8:
2436
    if not disk.children:
2437
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2438
            disk.unique_id)
2439
    return BlockdevSnapshot(disk.children[0])
2440
  elif disk.dev_type == constants.LD_LV:
2441
    r_dev = _RecursiveFindBD(disk)
2442
    if r_dev is not None:
2443
      # FIXME: choose a saner value for the snapshot size
2444
      # let's stay on the safe side and ask for the full size, for now
2445
      return r_dev.Snapshot(disk.size)
2446
    else:
2447
      _Fail("Cannot find block device %s", disk)
2448
  else:
2449
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2450
          disk.unique_id, disk.dev_type)
2451

    
2452

    
2453
def FinalizeExport(instance, snap_disks):
2454
  """Write out the export configuration information.
2455

2456
  @type instance: L{objects.Instance}
2457
  @param instance: the instance which we export, used for
2458
      saving configuration
2459
  @type snap_disks: list of L{objects.Disk}
2460
  @param snap_disks: list of snapshot block devices, which
2461
      will be used to get the actual name of the dump file
2462

2463
  @rtype: None
2464

2465
  """
2466
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2467
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2468

    
2469
  config = objects.SerializableConfigParser()
2470

    
2471
  config.add_section(constants.INISECT_EXP)
2472
  config.set(constants.INISECT_EXP, "version", "0")
2473
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2474
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2475
  config.set(constants.INISECT_EXP, "os", instance.os)
2476
  config.set(constants.INISECT_EXP, "compression", "none")
2477

    
2478
  config.add_section(constants.INISECT_INS)
2479
  config.set(constants.INISECT_INS, "name", instance.name)
2480
  config.set(constants.INISECT_INS, "memory", "%d" %
2481
             instance.beparams[constants.BE_MEMORY])
2482
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2483
             instance.beparams[constants.BE_VCPUS])
2484
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2485
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2486
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2487

    
2488
  nic_total = 0
2489
  for nic_count, nic in enumerate(instance.nics):
2490
    nic_total += 1
2491
    config.set(constants.INISECT_INS, "nic%d_mac" %
2492
               nic_count, "%s" % nic.mac)
2493
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2494
    for param in constants.NICS_PARAMETER_TYPES:
2495
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2496
                 "%s" % nic.nicparams.get(param, None))
2497
  # TODO: redundant: on load can read nics until it doesn't exist
2498
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2499

    
2500
  disk_total = 0
2501
  for disk_count, disk in enumerate(snap_disks):
2502
    if disk:
2503
      disk_total += 1
2504
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2505
                 ("%s" % disk.iv_name))
2506
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2507
                 ("%s" % disk.physical_id[1]))
2508
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2509
                 ("%d" % disk.size))
2510

    
2511
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2512

    
2513
  # New-style hypervisor/backend parameters
2514

    
2515
  config.add_section(constants.INISECT_HYP)
2516
  for name, value in instance.hvparams.items():
2517
    if name not in constants.HVC_GLOBALS:
2518
      config.set(constants.INISECT_HYP, name, str(value))
2519

    
2520
  config.add_section(constants.INISECT_BEP)
2521
  for name, value in instance.beparams.items():
2522
    config.set(constants.INISECT_BEP, name, str(value))
2523

    
2524
  config.add_section(constants.INISECT_OSP)
2525
  for name, value in instance.osparams.items():
2526
    config.set(constants.INISECT_OSP, name, str(value))
2527

    
2528
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2529
                  data=config.Dumps())
2530
  shutil.rmtree(finaldestdir, ignore_errors=True)
2531
  shutil.move(destdir, finaldestdir)
2532

    
2533

    
2534
def ExportInfo(dest):
2535
  """Get export configuration information.
2536

2537
  @type dest: str
2538
  @param dest: directory containing the export
2539

2540
  @rtype: L{objects.SerializableConfigParser}
2541
  @return: a serializable config file containing the
2542
      export info
2543

2544
  """
2545
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2546

    
2547
  config = objects.SerializableConfigParser()
2548
  config.read(cff)
2549

    
2550
  if (not config.has_section(constants.INISECT_EXP) or
2551
      not config.has_section(constants.INISECT_INS)):
2552
    _Fail("Export info file doesn't have the required fields")
2553

    
2554
  return config.Dumps()
2555

    
2556

    
2557
def ListExports():
2558
  """Return a list of exports currently available on this machine.
2559

2560
  @rtype: list
2561
  @return: list of the exports
2562

2563
  """
2564
  if os.path.isdir(constants.EXPORT_DIR):
2565
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2566
  else:
2567
    _Fail("No exports directory")
2568

    
2569

    
2570
def RemoveExport(export):
2571
  """Remove an existing export from the node.
2572

2573
  @type export: str
2574
  @param export: the name of the export to remove
2575
  @rtype: None
2576

2577
  """
2578
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2579

    
2580
  try:
2581
    shutil.rmtree(target)
2582
  except EnvironmentError, err:
2583
    _Fail("Error while removing the export: %s", err, exc=True)
2584

    
2585

    
2586
def BlockdevRename(devlist):
2587
  """Rename a list of block devices.
2588

2589
  @type devlist: list of tuples
2590
  @param devlist: list of tuples of the form  (disk,
2591
      new_logical_id, new_physical_id); disk is an
2592
      L{objects.Disk} object describing the current disk,
2593
      and new logical_id/physical_id is the name we
2594
      rename it to
2595
  @rtype: boolean
2596
  @return: True if all renames succeeded, False otherwise
2597

2598
  """
2599
  msgs = []
2600
  result = True
2601
  for disk, unique_id in devlist:
2602
    dev = _RecursiveFindBD(disk)
2603
    if dev is None:
2604
      msgs.append("Can't find device %s in rename" % str(disk))
2605
      result = False
2606
      continue
2607
    try:
2608
      old_rpath = dev.dev_path
2609
      dev.Rename(unique_id)
2610
      new_rpath = dev.dev_path
2611
      if old_rpath != new_rpath:
2612
        DevCacheManager.RemoveCache(old_rpath)
2613
        # FIXME: we should add the new cache information here, like:
2614
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2615
        # but we don't have the owner here - maybe parse from existing
2616
        # cache? for now, we only lose lvm data when we rename, which
2617
        # is less critical than DRBD or MD
2618
    except errors.BlockDeviceError, err:
2619
      msgs.append("Can't rename device '%s' to '%s': %s" %
2620
                  (dev, unique_id, err))
2621
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2622
      result = False
2623
  if not result:
2624
    _Fail("; ".join(msgs))
2625

    
2626

    
2627
def _TransformFileStorageDir(fs_dir):
2628
  """Checks whether given file_storage_dir is valid.
2629

2630
  Checks wheter the given fs_dir is within the cluster-wide default
2631
  file_storage_dir or the shared_file_storage_dir, which are stored in
2632
  SimpleStore. Only paths under those directories are allowed.
2633

2634
  @type fs_dir: str
2635
  @param fs_dir: the path to check
2636

2637
  @return: the normalized path if valid, None otherwise
2638

2639
  """
2640
  if not constants.ENABLE_FILE_STORAGE:
2641
    _Fail("File storage disabled at configure time")
2642
  cfg = _GetConfig()
2643
  fs_dir = os.path.normpath(fs_dir)
2644
  base_fstore = cfg.GetFileStorageDir()
2645
  base_shared = cfg.GetSharedFileStorageDir()
2646
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2647
          utils.IsBelowDir(base_shared, fs_dir)):
2648
    _Fail("File storage directory '%s' is not under base file"
2649
          " storage directory '%s' or shared storage directory '%s'",
2650
          fs_dir, base_fstore, base_shared)
2651
  return fs_dir
2652

    
2653

    
2654
def CreateFileStorageDir(file_storage_dir):
2655
  """Create file storage directory.
2656

2657
  @type file_storage_dir: str
2658
  @param file_storage_dir: directory to create
2659

2660
  @rtype: tuple
2661
  @return: tuple with first element a boolean indicating wheter dir
2662
      creation was successful or not
2663

2664
  """
2665
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2666
  if os.path.exists(file_storage_dir):
2667
    if not os.path.isdir(file_storage_dir):
2668
      _Fail("Specified storage dir '%s' is not a directory",
2669
            file_storage_dir)
2670
  else:
2671
    try:
2672
      os.makedirs(file_storage_dir, 0750)
2673
    except OSError, err:
2674
      _Fail("Cannot create file storage directory '%s': %s",
2675
            file_storage_dir, err, exc=True)
2676

    
2677

    
2678
def RemoveFileStorageDir(file_storage_dir):
2679
  """Remove file storage directory.
2680

2681
  Remove it only if it's empty. If not log an error and return.
2682

2683
  @type file_storage_dir: str
2684
  @param file_storage_dir: the directory we should cleanup
2685
  @rtype: tuple (success,)
2686
  @return: tuple of one element, C{success}, denoting
2687
      whether the operation was successful
2688

2689
  """
2690
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2691
  if os.path.exists(file_storage_dir):
2692
    if not os.path.isdir(file_storage_dir):
2693
      _Fail("Specified Storage directory '%s' is not a directory",
2694
            file_storage_dir)
2695
    # deletes dir only if empty, otherwise we want to fail the rpc call
2696
    try:
2697
      os.rmdir(file_storage_dir)
2698
    except OSError, err:
2699
      _Fail("Cannot remove file storage directory '%s': %s",
2700
            file_storage_dir, err)
2701

    
2702

    
2703
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2704
  """Rename the file storage directory.
2705

2706
  @type old_file_storage_dir: str
2707
  @param old_file_storage_dir: the current path
2708
  @type new_file_storage_dir: str
2709
  @param new_file_storage_dir: the name we should rename to
2710
  @rtype: tuple (success,)
2711
  @return: tuple of one element, C{success}, denoting
2712
      whether the operation was successful
2713

2714
  """
2715
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2716
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2717
  if not os.path.exists(new_file_storage_dir):
2718
    if os.path.isdir(old_file_storage_dir):
2719
      try:
2720
        os.rename(old_file_storage_dir, new_file_storage_dir)
2721
      except OSError, err:
2722
        _Fail("Cannot rename '%s' to '%s': %s",
2723
              old_file_storage_dir, new_file_storage_dir, err)
2724
    else:
2725
      _Fail("Specified storage dir '%s' is not a directory",
2726
            old_file_storage_dir)
2727
  else:
2728
    if os.path.exists(old_file_storage_dir):
2729
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2730
            old_file_storage_dir, new_file_storage_dir)
2731

    
2732

    
2733
def _EnsureJobQueueFile(file_name):
2734
  """Checks whether the given filename is in the queue directory.
2735

2736
  @type file_name: str
2737
  @param file_name: the file name we should check
2738
  @rtype: None
2739
  @raises RPCFail: if the file is not valid
2740

2741
  """
2742
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2743
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2744

    
2745
  if not result:
2746
    _Fail("Passed job queue file '%s' does not belong to"
2747
          " the queue directory '%s'", file_name, queue_dir)
2748

    
2749

    
2750
def JobQueueUpdate(file_name, content):
2751
  """Updates a file in the queue directory.
2752

2753
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2754
  checking.
2755

2756
  @type file_name: str
2757
  @param file_name: the job file name
2758
  @type content: str
2759
  @param content: the new job contents
2760
  @rtype: boolean
2761
  @return: the success of the operation
2762

2763
  """
2764
  _EnsureJobQueueFile(file_name)
2765
  getents = runtime.GetEnts()
2766

    
2767
  # Write and replace the file atomically
2768
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2769
                  gid=getents.masterd_gid)
2770

    
2771

    
2772
def JobQueueRename(old, new):
2773
  """Renames a job queue file.
2774

2775
  This is just a wrapper over os.rename with proper checking.
2776

2777
  @type old: str
2778
  @param old: the old (actual) file name
2779
  @type new: str
2780
  @param new: the desired file name
2781
  @rtype: tuple
2782
  @return: the success of the operation and payload
2783

2784
  """
2785
  _EnsureJobQueueFile(old)
2786
  _EnsureJobQueueFile(new)
2787

    
2788
  getents = runtime.GetEnts()
2789

    
2790
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2791
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2792

    
2793

    
2794
def BlockdevClose(instance_name, disks):
2795
  """Closes the given block devices.
2796

2797
  This means they will be switched to secondary mode (in case of
2798
  DRBD).
2799

2800
  @param instance_name: if the argument is not empty, the symlinks
2801
      of this instance will be removed
2802
  @type disks: list of L{objects.Disk}
2803
  @param disks: the list of disks to be closed
2804
  @rtype: tuple (success, message)
2805
  @return: a tuple of success and message, where success
2806
      indicates the succes of the operation, and message
2807
      which will contain the error details in case we
2808
      failed
2809

2810
  """
2811
  bdevs = []
2812
  for cf in disks:
2813
    rd = _RecursiveFindBD(cf)
2814
    if rd is None:
2815
      _Fail("Can't find device %s", cf)
2816
    bdevs.append(rd)
2817

    
2818
  msg = []
2819
  for rd in bdevs:
2820
    try:
2821
      rd.Close()
2822
    except errors.BlockDeviceError, err:
2823
      msg.append(str(err))
2824
  if msg:
2825
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2826
  else:
2827
    if instance_name:
2828
      _RemoveBlockDevLinks(instance_name, disks)
2829

    
2830

    
2831
def ValidateHVParams(hvname, hvparams):
2832
  """Validates the given hypervisor parameters.
2833

2834
  @type hvname: string
2835
  @param hvname: the hypervisor name
2836
  @type hvparams: dict
2837
  @param hvparams: the hypervisor parameters to be validated
2838
  @rtype: None
2839

2840
  """
2841
  try:
2842
    hv_type = hypervisor.GetHypervisor(hvname)
2843
    hv_type.ValidateParameters(hvparams)
2844
  except errors.HypervisorError, err:
2845
    _Fail(str(err), log=False)
2846

    
2847

    
2848
def _CheckOSPList(os_obj, parameters):
2849
  """Check whether a list of parameters is supported by the OS.
2850

2851
  @type os_obj: L{objects.OS}
2852
  @param os_obj: OS object to check
2853
  @type parameters: list
2854
  @param parameters: the list of parameters to check
2855

2856
  """
2857
  supported = [v[0] for v in os_obj.supported_parameters]
2858
  delta = frozenset(parameters).difference(supported)
2859
  if delta:
2860
    _Fail("The following parameters are not supported"
2861
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2862

    
2863

    
2864
def ValidateOS(required, osname, checks, osparams):
2865
  """Validate the given OS' parameters.
2866

2867
  @type required: boolean
2868
  @param required: whether absence of the OS should translate into
2869
      failure or not
2870
  @type osname: string
2871
  @param osname: the OS to be validated
2872
  @type checks: list
2873
  @param checks: list of the checks to run (currently only 'parameters')
2874
  @type osparams: dict
2875
  @param osparams: dictionary with OS parameters
2876
  @rtype: boolean
2877
  @return: True if the validation passed, or False if the OS was not
2878
      found and L{required} was false
2879

2880
  """
2881
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2882
    _Fail("Unknown checks required for OS %s: %s", osname,
2883
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2884

    
2885
  name_only = objects.OS.GetName(osname)
2886
  status, tbv = _TryOSFromDisk(name_only, None)
2887

    
2888
  if not status:
2889
    if required:
2890
      _Fail(tbv)
2891
    else:
2892
      return False
2893

    
2894
  if max(tbv.api_versions) < constants.OS_API_V20:
2895
    return True
2896

    
2897
  if constants.OS_VALIDATE_PARAMETERS in checks:
2898
    _CheckOSPList(tbv, osparams.keys())
2899

    
2900
  validate_env = OSCoreEnv(osname, tbv, osparams)
2901
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2902
                        cwd=tbv.path, reset_env=True)
2903
  if result.failed:
2904
    logging.error("os validate command '%s' returned error: %s output: %s",
2905
                  result.cmd, result.fail_reason, result.output)
2906
    _Fail("OS validation script failed (%s), output: %s",
2907
          result.fail_reason, result.output, log=False)
2908

    
2909
  return True
2910

    
2911

    
2912
def DemoteFromMC():
2913
  """Demotes the current node from master candidate role.
2914

2915
  """
2916
  # try to ensure we're not the master by mistake
2917
  master, myself = ssconf.GetMasterAndMyself()
2918
  if master == myself:
2919
    _Fail("ssconf status shows I'm the master node, will not demote")
2920

    
2921
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2922
  if not result.failed:
2923
    _Fail("The master daemon is running, will not demote")
2924

    
2925
  try:
2926
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2927
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2928
  except EnvironmentError, err:
2929
    if err.errno != errno.ENOENT:
2930
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2931

    
2932
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2933

    
2934

    
2935
def _GetX509Filenames(cryptodir, name):
2936
  """Returns the full paths for the private key and certificate.
2937

2938
  """
2939
  return (utils.PathJoin(cryptodir, name),
2940
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2941
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2942

    
2943

    
2944
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2945
  """Creates a new X509 certificate for SSL/TLS.
2946

2947
  @type validity: int
2948
  @param validity: Validity in seconds
2949
  @rtype: tuple; (string, string)
2950
  @return: Certificate name and public part
2951

2952
  """
2953
  (key_pem, cert_pem) = \
2954
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2955
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2956

    
2957
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2958
                              prefix="x509-%s-" % utils.TimestampForFilename())
2959
  try:
2960
    name = os.path.basename(cert_dir)
2961
    assert len(name) > 5
2962

    
2963
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2964

    
2965
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2966
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2967

    
2968
    # Never return private key as it shouldn't leave the node
2969
    return (name, cert_pem)
2970
  except Exception:
2971
    shutil.rmtree(cert_dir, ignore_errors=True)
2972
    raise
2973

    
2974

    
2975
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2976
  """Removes a X509 certificate.
2977

2978
  @type name: string
2979
  @param name: Certificate name
2980

2981
  """
2982
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2983

    
2984
  utils.RemoveFile(key_file)
2985
  utils.RemoveFile(cert_file)
2986

    
2987
  try:
2988
    os.rmdir(cert_dir)
2989
  except EnvironmentError, err:
2990
    _Fail("Cannot remove certificate directory '%s': %s",
2991
          cert_dir, err)
2992

    
2993

    
2994
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2995
  """Returns the command for the requested input/output.
2996

2997
  @type instance: L{objects.Instance}
2998
  @param instance: The instance object
2999
  @param mode: Import/export mode
3000
  @param ieio: Input/output type
3001
  @param ieargs: Input/output arguments
3002

3003
  """
3004
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3005

    
3006
  env = None
3007
  prefix = None
3008
  suffix = None
3009
  exp_size = None
3010

    
3011
  if ieio == constants.IEIO_FILE:
3012
    (filename, ) = ieargs
3013

    
3014
    if not utils.IsNormAbsPath(filename):
3015
      _Fail("Path '%s' is not normalized or absolute", filename)
3016

    
3017
    real_filename = os.path.realpath(filename)
3018
    directory = os.path.dirname(real_filename)
3019

    
3020
    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
3021
      _Fail("File '%s' is not under exports directory '%s': %s",
3022
            filename, constants.EXPORT_DIR, real_filename)
3023

    
3024
    # Create directory
3025
    utils.Makedirs(directory, mode=0750)
3026

    
3027
    quoted_filename = utils.ShellQuote(filename)
3028

    
3029
    if mode == constants.IEM_IMPORT:
3030
      suffix = "> %s" % quoted_filename
3031
    elif mode == constants.IEM_EXPORT:
3032
      suffix = "< %s" % quoted_filename
3033

    
3034
      # Retrieve file size
3035
      try:
3036
        st = os.stat(filename)
3037
      except EnvironmentError, err:
3038
        logging.error("Can't stat(2) %s: %s", filename, err)
3039
      else:
3040
        exp_size = utils.BytesToMebibyte(st.st_size)
3041

    
3042
  elif ieio == constants.IEIO_RAW_DISK:
3043
    (disk, ) = ieargs
3044

    
3045
    real_disk = _OpenRealBD(disk)
3046

    
3047
    if mode == constants.IEM_IMPORT:
3048
      # we set here a smaller block size as, due to transport buffering, more
3049
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3050
      # is not already there or we pass a wrong path; we use notrunc to no
3051
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3052
      # much memory; this means that at best, we flush every 64k, which will
3053
      # not be very fast
3054
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3055
                                    " bs=%s oflag=dsync"),
3056
                                    real_disk.dev_path,
3057
                                    str(64 * 1024))
3058

    
3059
    elif mode == constants.IEM_EXPORT:
3060
      # the block size on the read dd is 1MiB to match our units
3061
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3062
                                   real_disk.dev_path,
3063
                                   str(1024 * 1024), # 1 MB
3064
                                   str(disk.size))
3065
      exp_size = disk.size
3066

    
3067
  elif ieio == constants.IEIO_SCRIPT:
3068
    (disk, disk_index, ) = ieargs
3069

    
3070
    assert isinstance(disk_index, (int, long))
3071

    
3072
    real_disk = _OpenRealBD(disk)
3073

    
3074
    inst_os = OSFromDisk(instance.os)
3075
    env = OSEnvironment(instance, inst_os)
3076

    
3077
    if mode == constants.IEM_IMPORT:
3078
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3079
      env["IMPORT_INDEX"] = str(disk_index)
3080
      script = inst_os.import_script
3081

    
3082
    elif mode == constants.IEM_EXPORT:
3083
      env["EXPORT_DEVICE"] = real_disk.dev_path
3084
      env["EXPORT_INDEX"] = str(disk_index)
3085
      script = inst_os.export_script
3086

    
3087
    # TODO: Pass special environment only to script
3088
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3089

    
3090
    if mode == constants.IEM_IMPORT:
3091
      suffix = "| %s" % script_cmd
3092

    
3093
    elif mode == constants.IEM_EXPORT:
3094
      prefix = "%s |" % script_cmd
3095

    
3096
    # Let script predict size
3097
    exp_size = constants.IE_CUSTOM_SIZE
3098

    
3099
  else:
3100
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3101

    
3102
  return (env, prefix, suffix, exp_size)
3103

    
3104

    
3105
def _CreateImportExportStatusDir(prefix):
3106
  """Creates status directory for import/export.
3107

3108
  """
3109
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3110
                          prefix=("%s-%s-" %
3111
                                  (prefix, utils.TimestampForFilename())))
3112

    
3113

    
3114
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3115
                            ieio, ieioargs):
3116
  """Starts an import or export daemon.
3117

3118
  @param mode: Import/output mode
3119
  @type opts: L{objects.ImportExportOptions}
3120
  @param opts: Daemon options
3121
  @type host: string
3122
  @param host: Remote host for export (None for import)
3123
  @type port: int
3124
  @param port: Remote port for export (None for import)
3125
  @type instance: L{objects.Instance}
3126
  @param instance: Instance object
3127
  @type component: string
3128
  @param component: which part of the instance is transferred now,
3129
      e.g. 'disk/0'
3130
  @param ieio: Input/output type
3131
  @param ieioargs: Input/output arguments
3132

3133
  """
3134
  if mode == constants.IEM_IMPORT:
3135
    prefix = "import"
3136

    
3137
    if not (host is None and port is None):
3138
      _Fail("Can not specify host or port on import")
3139

    
3140
  elif mode == constants.IEM_EXPORT:
3141
    prefix = "export"
3142

    
3143
    if host is None or port is None:
3144
      _Fail("Host and port must be specified for an export")
3145

    
3146
  else:
3147
    _Fail("Invalid mode %r", mode)
3148

    
3149
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3150
    _Fail("Cluster certificate can only be used for both key and CA")
3151

    
3152
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3153
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3154

    
3155
  if opts.key_name is None:
3156
    # Use server.pem
3157
    key_path = constants.NODED_CERT_FILE
3158
    cert_path = constants.NODED_CERT_FILE
3159
    assert opts.ca_pem is None
3160
  else:
3161
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3162
                                                 opts.key_name)
3163
    assert opts.ca_pem is not None
3164

    
3165
  for i in [key_path, cert_path]:
3166
    if not os.path.exists(i):
3167
      _Fail("File '%s' does not exist" % i)
3168

    
3169
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3170
  try:
3171
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3172
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3173
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3174

    
3175
    if opts.ca_pem is None:
3176
      # Use server.pem
3177
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
3178
    else:
3179
      ca = opts.ca_pem
3180

    
3181
    # Write CA file
3182
    utils.WriteFile(ca_file, data=ca, mode=0400)
3183

    
3184
    cmd = [
3185
      constants.IMPORT_EXPORT_DAEMON,
3186
      status_file, mode,
3187
      "--key=%s" % key_path,
3188
      "--cert=%s" % cert_path,
3189
      "--ca=%s" % ca_file,
3190
      ]
3191

    
3192
    if host:
3193
      cmd.append("--host=%s" % host)
3194

    
3195
    if port:
3196
      cmd.append("--port=%s" % port)
3197

    
3198
    if opts.ipv6:
3199
      cmd.append("--ipv6")
3200
    else:
3201
      cmd.append("--ipv4")
3202

    
3203
    if opts.compress:
3204
      cmd.append("--compress=%s" % opts.compress)
3205

    
3206
    if opts.magic:
3207
      cmd.append("--magic=%s" % opts.magic)
3208

    
3209
    if exp_size is not None:
3210
      cmd.append("--expected-size=%s" % exp_size)
3211

    
3212
    if cmd_prefix:
3213
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3214

    
3215
    if cmd_suffix:
3216
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3217

    
3218
    if mode == constants.IEM_EXPORT:
3219
      # Retry connection a few times when connecting to remote peer
3220
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3221
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3222
    elif opts.connect_timeout is not None:
3223
      assert mode == constants.IEM_IMPORT
3224
      # Overall timeout for establishing connection while listening
3225
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3226

    
3227
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3228

    
3229
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3230
    # support for receiving a file descriptor for output
3231
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3232
                      output=logfile)
3233

    
3234
    # The import/export name is simply the status directory name
3235
    return os.path.basename(status_dir)
3236

    
3237
  except Exception:
3238
    shutil.rmtree(status_dir, ignore_errors=True)
3239
    raise
3240

    
3241

    
3242
def GetImportExportStatus(names):
3243
  """Returns import/export daemon status.
3244

3245
  @type names: sequence
3246
  @param names: List of names
3247
  @rtype: List of dicts
3248
  @return: Returns a list of the state of each named import/export or None if a
3249
           status couldn't be read
3250

3251
  """
3252
  result = []
3253

    
3254
  for name in names:
3255
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3256
                                 _IES_STATUS_FILE)
3257

    
3258
    try:
3259
      data = utils.ReadFile(status_file)
3260
    except EnvironmentError, err:
3261
      if err.errno != errno.ENOENT:
3262
        raise
3263
      data = None
3264

    
3265
    if not data:
3266
      result.append(None)
3267
      continue
3268

    
3269
    result.append(serializer.LoadJson(data))
3270

    
3271
  return result
3272

    
3273

    
3274
def AbortImportExport(name):
3275
  """Sends SIGTERM to a running import/export daemon.
3276

3277
  """
3278
  logging.info("Abort import/export %s", name)
3279

    
3280
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3281
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3282

    
3283
  if pid:
3284
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3285
                 name, pid)
3286
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3287

    
3288

    
3289
def CleanupImportExport(name):
3290
  """Cleanup after an import or export.
3291

3292
  If the import/export daemon is still running it's killed. Afterwards the
3293
  whole status directory is removed.
3294

3295
  """
3296
  logging.info("Finalizing import/export %s", name)
3297

    
3298
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3299

    
3300
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3301

    
3302
  if pid:
3303
    logging.info("Import/export %s is still running with PID %s",
3304
                 name, pid)
3305
    utils.KillProcess(pid, waitpid=False)
3306

    
3307
  shutil.rmtree(status_dir, ignore_errors=True)
3308

    
3309

    
3310
def _FindDisks(nodes_ip, disks):
3311
  """Sets the physical ID on disks and returns the block devices.
3312

3313
  """
3314
  # set the correct physical ID
3315
  my_name = netutils.Hostname.GetSysName()
3316
  for cf in disks:
3317
    cf.SetPhysicalID(my_name, nodes_ip)
3318

    
3319
  bdevs = []
3320

    
3321
  for cf in disks:
3322
    rd = _RecursiveFindBD(cf)
3323
    if rd is None:
3324
      _Fail("Can't find device %s", cf)
3325
    bdevs.append(rd)
3326
  return bdevs
3327

    
3328

    
3329
def DrbdDisconnectNet(nodes_ip, disks):
3330
  """Disconnects the network on a list of drbd devices.
3331

3332
  """
3333
  bdevs = _FindDisks(nodes_ip, disks)
3334

    
3335
  # disconnect disks
3336
  for rd in bdevs:
3337
    try:
3338
      rd.DisconnectNet()
3339
    except errors.BlockDeviceError, err:
3340
      _Fail("Can't change network configuration to standalone mode: %s",
3341
            err, exc=True)
3342

    
3343

    
3344
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3345
  """Attaches the network on a list of drbd devices.
3346

3347
  """
3348
  bdevs = _FindDisks(nodes_ip, disks)
3349

    
3350
  if multimaster:
3351
    for idx, rd in enumerate(bdevs):
3352
      try:
3353
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3354
      except EnvironmentError, err:
3355
        _Fail("Can't create symlink: %s", err)
3356
  # reconnect disks, switch to new master configuration and if
3357
  # needed primary mode
3358
  for rd in bdevs:
3359
    try:
3360
      rd.AttachNet(multimaster)
3361
    except errors.BlockDeviceError, err:
3362
      _Fail("Can't change network configuration: %s", err)
3363

    
3364
  # wait until the disks are connected; we need to retry the re-attach
3365
  # if the device becomes standalone, as this might happen if the one
3366
  # node disconnects and reconnects in a different mode before the
3367
  # other node reconnects; in this case, one or both of the nodes will
3368
  # decide it has wrong configuration and switch to standalone
3369

    
3370
  def _Attach():
3371
    all_connected = True
3372

    
3373
    for rd in bdevs:
3374
      stats = rd.GetProcStatus()
3375

    
3376
      all_connected = (all_connected and
3377
                       (stats.is_connected or stats.is_in_resync))
3378

    
3379
      if stats.is_standalone:
3380
        # peer had different config info and this node became
3381
        # standalone, even though this should not happen with the
3382
        # new staged way of changing disk configs
3383
        try:
3384
          rd.AttachNet(multimaster)
3385
        except errors.BlockDeviceError, err:
3386
          _Fail("Can't change network configuration: %s", err)
3387

    
3388
    if not all_connected:
3389
      raise utils.RetryAgain()
3390

    
3391
  try:
3392
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3393
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3394
  except utils.RetryTimeout:
3395
    _Fail("Timeout in disk reconnecting")
3396

    
3397
  if multimaster:
3398
    # change to primary mode
3399
    for rd in bdevs:
3400
      try:
3401
        rd.Open()
3402
      except errors.BlockDeviceError, err:
3403
        _Fail("Can't change to primary mode: %s", err)
3404

    
3405

    
3406
def DrbdWaitSync(nodes_ip, disks):
3407
  """Wait until DRBDs have synchronized.
3408

3409
  """
3410
  def _helper(rd):
3411
    stats = rd.GetProcStatus()
3412
    if not (stats.is_connected or stats.is_in_resync):
3413
      raise utils.RetryAgain()
3414
    return stats
3415

    
3416
  bdevs = _FindDisks(nodes_ip, disks)
3417

    
3418
  min_resync = 100
3419
  alldone = True
3420
  for rd in bdevs:
3421
    try:
3422
      # poll each second for 15 seconds
3423
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3424
    except utils.RetryTimeout:
3425
      stats = rd.GetProcStatus()
3426
      # last check
3427
      if not (stats.is_connected or stats.is_in_resync):
3428
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3429
    alldone = alldone and (not stats.is_in_resync)
3430
    if stats.sync_percent is not None:
3431
      min_resync = min(min_resync, stats.sync_percent)
3432

    
3433
  return (alldone, min_resync)
3434

    
3435

    
3436
def GetDrbdUsermodeHelper():
3437
  """Returns DRBD usermode helper currently configured.
3438

3439
  """
3440
  try:
3441
    return bdev.BaseDRBD.GetUsermodeHelper()
3442
  except errors.BlockDeviceError, err:
3443
    _Fail(str(err))
3444

    
3445

    
3446
def PowercycleNode(hypervisor_type):
3447
  """Hard-powercycle the node.
3448

3449
  Because we need to return first, and schedule the powercycle in the
3450
  background, we won't be able to report failures nicely.
3451

3452
  """
3453
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3454
  try:
3455
    pid = os.fork()
3456
  except OSError:
3457
    # if we can't fork, we'll pretend that we're in the child process
3458
    pid = 0
3459
  if pid > 0:
3460
    return "Reboot scheduled in 5 seconds"
3461
  # ensure the child is running on ram
3462
  try:
3463
    utils.Mlockall()
3464
  except Exception: # pylint: disable=W0703
3465
    pass
3466
  time.sleep(5)
3467
  hyper.PowercycleNode()
3468

    
3469

    
3470
class HooksRunner(object):
3471
  """Hook runner.
3472

3473
  This class is instantiated on the node side (ganeti-noded) and not
3474
  on the master side.
3475

3476
  """
3477
  def __init__(self, hooks_base_dir=None):
3478
    """Constructor for hooks runner.
3479

3480
    @type hooks_base_dir: str or None
3481
    @param hooks_base_dir: if not None, this overrides the
3482
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3483

3484
    """
3485
    if hooks_base_dir is None:
3486
      hooks_base_dir = constants.HOOKS_BASE_DIR
3487
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3488
    # constant
3489
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3490

    
3491
  def RunLocalHooks(self, node_list, hpath, phase, env):
3492
    """Check that the hooks will be run only locally and then run them.
3493

3494
    """
3495
    assert len(node_list) == 1
3496
    node = node_list[0]
3497
    _, myself = ssconf.GetMasterAndMyself()
3498
    assert node == myself
3499

    
3500
    results = self.RunHooks(hpath, phase, env)
3501

    
3502
    # Return values in the form expected by HooksMaster
3503
    return {node: (None, False, results)}
3504

    
3505
  def RunHooks(self, hpath, phase, env):
3506
    """Run the scripts in the hooks directory.
3507

3508
    @type hpath: str
3509
    @param hpath: the path to the hooks directory which
3510
        holds the scripts
3511
    @type phase: str
3512
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3513
        L{constants.HOOKS_PHASE_POST}
3514
    @type env: dict
3515
    @param env: dictionary with the environment for the hook
3516
    @rtype: list
3517
    @return: list of 3-element tuples:
3518
      - script path
3519
      - script result, either L{constants.HKR_SUCCESS} or
3520
        L{constants.HKR_FAIL}
3521
      - output of the script
3522

3523
    @raise errors.ProgrammerError: for invalid input
3524
        parameters
3525

3526
    """
3527
    if phase == constants.HOOKS_PHASE_PRE:
3528
      suffix = "pre"
3529
    elif phase == constants.HOOKS_PHASE_POST:
3530
      suffix = "post"
3531
    else:
3532
      _Fail("Unknown hooks phase '%s'", phase)
3533

    
3534
    subdir = "%s-%s.d" % (hpath, suffix)
3535
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3536

    
3537
    results = []
3538

    
3539
    if not os.path.isdir(dir_name):
3540
      # for non-existing/non-dirs, we simply exit instead of logging a
3541
      # warning at every operation
3542
      return results
3543

    
3544
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3545

    
3546
    for (relname, relstatus, runresult)  in runparts_results:
3547
      if relstatus == constants.RUNPARTS_SKIP:
3548
        rrval = constants.HKR_SKIP
3549
        output = ""
3550
      elif relstatus == constants.RUNPARTS_ERR:
3551
        rrval = constants.HKR_FAIL
3552
        output = "Hook script execution error: %s" % runresult
3553
      elif relstatus == constants.RUNPARTS_RUN:
3554
        if runresult.failed:
3555
          rrval = constants.HKR_FAIL
3556
        else:
3557
          rrval = constants.HKR_SUCCESS
3558
        output = utils.SafeEncode(runresult.output.strip())
3559
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3560

    
3561
    return results
3562

    
3563

    
3564
class IAllocatorRunner(object):
3565
  """IAllocator runner.
3566

3567
  This class is instantiated on the node side (ganeti-noded) and not on
3568
  the master side.
3569

3570
  """
3571
  @staticmethod
3572
  def Run(name, idata):
3573
    """Run an iallocator script.
3574

3575
    @type name: str
3576
    @param name: the iallocator script name
3577
    @type idata: str
3578
    @param idata: the allocator input data
3579

3580
    @rtype: tuple
3581
    @return: two element tuple of:
3582
       - status
3583
       - either error message or stdout of allocator (for success)
3584

3585
    """
3586
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3587
                                  os.path.isfile)
3588
    if alloc_script is None:
3589
      _Fail("iallocator module '%s' not found in the search path", name)
3590

    
3591
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3592
    try:
3593
      os.write(fd, idata)
3594
      os.close(fd)
3595
      result = utils.RunCmd([alloc_script, fin_name])
3596
      if result.failed:
3597
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3598
              name, result.fail_reason, result.output)
3599
    finally:
3600
      os.unlink(fin_name)
3601

    
3602
    return result.stdout
3603

    
3604

    
3605
class DevCacheManager(object):
3606
  """Simple class for managing a cache of block device information.
3607

3608
  """
3609
  _DEV_PREFIX = "/dev/"
3610
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3611

    
3612
  @classmethod
3613
  def _ConvertPath(cls, dev_path):
3614
    """Converts a /dev/name path to the cache file name.
3615

3616
    This replaces slashes with underscores and strips the /dev
3617
    prefix. It then returns the full path to the cache file.
3618

3619
    @type dev_path: str
3620
    @param dev_path: the C{/dev/} path name
3621
    @rtype: str
3622
    @return: the converted path name
3623

3624
    """
3625
    if dev_path.startswith(cls._DEV_PREFIX):
3626
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3627
    dev_path = dev_path.replace("/", "_")
3628
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3629
    return fpath
3630

    
3631
  @classmethod
3632
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3633
    """Updates the cache information for a given device.
3634

3635
    @type dev_path: str
3636
    @param dev_path: the pathname of the device
3637
    @type owner: str
3638
    @param owner: the owner (instance name) of the device
3639
    @type on_primary: bool
3640
    @param on_primary: whether this is the primary
3641
        node nor not
3642
    @type iv_name: str
3643
    @param iv_name: the instance-visible name of the
3644
        device, as in objects.Disk.iv_name
3645

3646
    @rtype: None
3647

3648
    """
3649
    if dev_path is None:
3650
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3651
      return
3652
    fpath = cls._ConvertPath(dev_path)
3653
    if on_primary:
3654
      state = "primary"
3655
    else:
3656
      state = "secondary"
3657
    if iv_name is None:
3658
      iv_name = "not_visible"
3659
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3660
    try:
3661
      utils.WriteFile(fpath, data=fdata)
3662
    except EnvironmentError, err:
3663
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3664

    
3665
  @classmethod
3666
  def RemoveCache(cls, dev_path):
3667
    """Remove data for a dev_path.
3668

3669
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3670
    path name and logging.
3671

3672
    @type dev_path: str
3673
    @param dev_path: the pathname of the device
3674

3675
    @rtype: None
3676

3677
    """
3678
    if dev_path is None:
3679
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3680
      return
3681
    fpath = cls._ConvertPath(dev_path)
3682
    try:
3683
      utils.RemoveFile(fpath)
3684
    except EnvironmentError, err:
3685
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)