Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 2d88fdd3

History | View | Annotate | Download (110.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64

    
65

    
66
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
67
_ALLOWED_CLEAN_DIRS = frozenset([
68
  constants.DATA_DIR,
69
  constants.JOB_QUEUE_ARCHIVE_DIR,
70
  constants.QUEUE_DIR,
71
  constants.CRYPTO_KEYS_DIR,
72
  ])
73
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
74
_X509_KEY_FILE = "key"
75
_X509_CERT_FILE = "cert"
76
_IES_STATUS_FILE = "status"
77
_IES_PID_FILE = "pid"
78
_IES_CA_FILE = "ca"
79

    
80
#: Valid LVS output line regex
81
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
82

    
83

    
84
class RPCFail(Exception):
85
  """Class denoting RPC failure.
86

87
  Its argument is the error message.
88

89
  """
90

    
91

    
92
def _Fail(msg, *args, **kwargs):
93
  """Log an error and the raise an RPCFail exception.
94

95
  This exception is then handled specially in the ganeti daemon and
96
  turned into a 'failed' return type. As such, this function is a
97
  useful shortcut for logging the error and returning it to the master
98
  daemon.
99

100
  @type msg: string
101
  @param msg: the text of the exception
102
  @raise RPCFail
103

104
  """
105
  if args:
106
    msg = msg % args
107
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
108
    if "exc" in kwargs and kwargs["exc"]:
109
      logging.exception(msg)
110
    else:
111
      logging.error(msg)
112
  raise RPCFail(msg)
113

    
114

    
115
def _GetConfig():
116
  """Simple wrapper to return a SimpleStore.
117

118
  @rtype: L{ssconf.SimpleStore}
119
  @return: a SimpleStore instance
120

121
  """
122
  return ssconf.SimpleStore()
123

    
124

    
125
def _GetSshRunner(cluster_name):
126
  """Simple wrapper to return an SshRunner.
127

128
  @type cluster_name: str
129
  @param cluster_name: the cluster name, which is needed
130
      by the SshRunner constructor
131
  @rtype: L{ssh.SshRunner}
132
  @return: an SshRunner instance
133

134
  """
135
  return ssh.SshRunner(cluster_name)
136

    
137

    
138
def _Decompress(data):
139
  """Unpacks data compressed by the RPC client.
140

141
  @type data: list or tuple
142
  @param data: Data sent by RPC client
143
  @rtype: str
144
  @return: Decompressed data
145

146
  """
147
  assert isinstance(data, (list, tuple))
148
  assert len(data) == 2
149
  (encoding, content) = data
150
  if encoding == constants.RPC_ENCODING_NONE:
151
    return content
152
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
153
    return zlib.decompress(base64.b64decode(content))
154
  else:
155
    raise AssertionError("Unknown data encoding")
156

    
157

    
158
def _CleanDirectory(path, exclude=None):
159
  """Removes all regular files in a directory.
160

161
  @type path: str
162
  @param path: the directory to clean
163
  @type exclude: list
164
  @param exclude: list of files to be excluded, defaults
165
      to the empty list
166

167
  """
168
  if path not in _ALLOWED_CLEAN_DIRS:
169
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
170
          path)
171

    
172
  if not os.path.isdir(path):
173
    return
174
  if exclude is None:
175
    exclude = []
176
  else:
177
    # Normalize excluded paths
178
    exclude = [os.path.normpath(i) for i in exclude]
179

    
180
  for rel_name in utils.ListVisibleFiles(path):
181
    full_name = utils.PathJoin(path, rel_name)
182
    if full_name in exclude:
183
      continue
184
    if os.path.isfile(full_name) and not os.path.islink(full_name):
185
      utils.RemoveFile(full_name)
186

    
187

    
188
def _BuildUploadFileList():
189
  """Build the list of allowed upload files.
190

191
  This is abstracted so that it's built only once at module import time.
192

193
  """
194
  allowed_files = set([
195
    constants.CLUSTER_CONF_FILE,
196
    constants.ETC_HOSTS,
197
    constants.SSH_KNOWN_HOSTS_FILE,
198
    constants.VNC_PASSWORD_FILE,
199
    constants.RAPI_CERT_FILE,
200
    constants.SPICE_CERT_FILE,
201
    constants.SPICE_CACERT_FILE,
202
    constants.RAPI_USERS_FILE,
203
    constants.CONFD_HMAC_KEY,
204
    constants.CLUSTER_DOMAIN_SECRET_FILE,
205
    ])
206

    
207
  for hv_name in constants.HYPER_TYPES:
208
    hv_class = hypervisor.GetHypervisorClass(hv_name)
209
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
210

    
211
  return frozenset(allowed_files)
212

    
213

    
214
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
215

    
216

    
217
def JobQueuePurge():
218
  """Removes job queue files and archived jobs.
219

220
  @rtype: tuple
221
  @return: True, None
222

223
  """
224
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
225
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
226

    
227

    
228
def GetMasterInfo():
229
  """Returns master information.
230

231
  This is an utility function to compute master information, either
232
  for consumption here or from the node daemon.
233

234
  @rtype: tuple
235
  @return: master_netdev, master_ip, master_name, primary_ip_family,
236
    master_netmask
237
  @raise RPCFail: in case of errors
238

239
  """
240
  try:
241
    cfg = _GetConfig()
242
    master_netdev = cfg.GetMasterNetdev()
243
    master_ip = cfg.GetMasterIP()
244
    master_netmask = cfg.GetMasterNetmask()
245
    master_node = cfg.GetMasterNode()
246
    primary_ip_family = cfg.GetPrimaryIPFamily()
247
  except errors.ConfigurationError, err:
248
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
249
  return (master_netdev, master_ip, master_node, primary_ip_family,
250
      master_netmask)
251

    
252

    
253
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
254
  """Decorator that runs hooks before and after the decorated function.
255

256
  @type hook_opcode: string
257
  @param hook_opcode: opcode of the hook
258
  @type hooks_path: string
259
  @param hooks_path: path of the hooks
260
  @type env_builder_fn: function
261
  @param env_builder_fn: function that returns a dictionary containing the
262
    environment variables for the hooks.
263
  @raise RPCFail: in case of pre-hook failure
264

265
  """
266
  def decorator(fn):
267
    def wrapper(*args, **kwargs):
268
      _, myself = ssconf.GetMasterAndMyself()
269
      nodes = ([myself], [myself])  # these hooks run locally
270

    
271
      cfg = _GetConfig()
272
      hr = HooksRunner()
273
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
274
                            None, env_builder_fn, logging.warning,
275
                            cfg.GetClusterName(), cfg.GetMasterNode())
276

    
277
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
278
      result = fn(*args, **kwargs)
279
      hm.RunPhase(constants.HOOKS_PHASE_POST)
280

    
281
      return result
282
    return wrapper
283
  return decorator
284

    
285

    
286
def _BuildMasterIpHookEnv():
287
  """Builds environment variables for master IP hooks.
288

289
  """
290
  cfg = _GetConfig()
291
  env = {
292
    "MASTER_NETDEV": cfg.GetMasterNetdev(),
293
    "MASTER_IP": cfg.GetMasterIP(),
294
  }
295

    
296
  return env
297

    
298

    
299
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
300
               _BuildMasterIpHookEnv)
301
def ActivateMasterIp():
302
  """Activate the IP address of the master daemon.
303

304
  """
305
  # GetMasterInfo will raise an exception if not able to return data
306
  master_netdev, master_ip, _, family, master_netmask = GetMasterInfo()
307

    
308
  err_msg = None
309
  if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
310
    if netutils.IPAddress.Own(master_ip):
311
      # we already have the ip:
312
      logging.debug("Master IP already configured, doing nothing")
313
    else:
314
      err_msg = "Someone else has the master ip, not activating"
315
      logging.error(err_msg)
316
  else:
317
    ipcls = netutils.IPAddress.GetClassFromIpFamily(family)
318

    
319
    result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
320
                           "%s/%s" % (master_ip, master_netmask),
321
                           "dev", master_netdev, "label",
322
                           "%s:0" % master_netdev])
323
    if result.failed:
324
      err_msg = "Can't activate master IP: %s" % result.output
325
      logging.error(err_msg)
326

    
327
    else:
328
      # we ignore the exit code of the following cmds
329
      if ipcls == netutils.IP4Address:
330
        utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
331
                      master_ip, master_ip])
332
      elif ipcls == netutils.IP6Address:
333
        try:
334
          utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
335
        except errors.OpExecError:
336
          # TODO: Better error reporting
337
          logging.warning("Can't execute ndisc6, please install if missing")
338

    
339
  if err_msg:
340
    _Fail(err_msg)
341

    
342

    
343
def StartMasterDaemons(no_voting):
344
  """Activate local node as master node.
345

346
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
347

348
  @type no_voting: boolean
349
  @param no_voting: whether to start ganeti-masterd without a node vote
350
      but still non-interactively
351
  @rtype: None
352

353
  """
354

    
355
  if no_voting:
356
    masterd_args = "--no-voting --yes-do-it"
357
  else:
358
    masterd_args = ""
359

    
360
  env = {
361
    "EXTRA_MASTERD_ARGS": masterd_args,
362
    }
363

    
364
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
365
  if result.failed:
366
    msg = "Can't start Ganeti master: %s" % result.output
367
    logging.error(msg)
368
    _Fail(msg)
369

    
370

    
371
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
372
               _BuildMasterIpHookEnv)
373
def DeactivateMasterIp():
374
  """Deactivate the master IP on this node.
375

376
  """
377
  # TODO: log and report back to the caller the error failures; we
378
  # need to decide in which case we fail the RPC for this
379

    
380
  # GetMasterInfo will raise an exception if not able to return data
381
  master_netdev, master_ip, _, _, master_netmask = GetMasterInfo()
382

    
383
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
384
                         "%s/%s" % (master_ip, master_netmask),
385
                         "dev", master_netdev])
386
  if result.failed:
387
    logging.error("Can't remove the master IP, error: %s", result.output)
388
    # but otherwise ignore the failure
389

    
390

    
391
def StopMasterDaemons():
392
  """Stop the master daemons on this node.
393

394
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
395

396
  @rtype: None
397

398
  """
399
  # TODO: log and report back to the caller the error failures; we
400
  # need to decide in which case we fail the RPC for this
401

    
402
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
403
  if result.failed:
404
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
405
                  " and error %s",
406
                  result.cmd, result.exit_code, result.output)
407

    
408

    
409
def ChangeMasterNetmask(netmask):
410
  """Change the netmask of the master IP.
411

412
  """
413
  master_netdev, master_ip, _, _, old_netmask = GetMasterInfo()
414
  if old_netmask == netmask:
415
    return
416

    
417
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
418
                         "%s/%s" % (master_ip, netmask),
419
                         "dev", master_netdev, "label",
420
                         "%s:0" % master_netdev])
421
  if result.failed:
422
    _Fail("Could not change the master IP netmask")
423

    
424
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
425
                         "%s/%s" % (master_ip, old_netmask),
426
                         "dev", master_netdev, "label",
427
                         "%s:0" % master_netdev])
428
  if result.failed:
429
    _Fail("Could not change the master IP netmask")
430

    
431

    
432
def EtcHostsModify(mode, host, ip):
433
  """Modify a host entry in /etc/hosts.
434

435
  @param mode: The mode to operate. Either add or remove entry
436
  @param host: The host to operate on
437
  @param ip: The ip associated with the entry
438

439
  """
440
  if mode == constants.ETC_HOSTS_ADD:
441
    if not ip:
442
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
443
              " present")
444
    utils.AddHostToEtcHosts(host, ip)
445
  elif mode == constants.ETC_HOSTS_REMOVE:
446
    if ip:
447
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
448
              " parameter is present")
449
    utils.RemoveHostFromEtcHosts(host)
450
  else:
451
    RPCFail("Mode not supported")
452

    
453

    
454
def LeaveCluster(modify_ssh_setup):
455
  """Cleans up and remove the current node.
456

457
  This function cleans up and prepares the current node to be removed
458
  from the cluster.
459

460
  If processing is successful, then it raises an
461
  L{errors.QuitGanetiException} which is used as a special case to
462
  shutdown the node daemon.
463

464
  @param modify_ssh_setup: boolean
465

466
  """
467
  _CleanDirectory(constants.DATA_DIR)
468
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
469
  JobQueuePurge()
470

    
471
  if modify_ssh_setup:
472
    try:
473
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
474

    
475
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
476

    
477
      utils.RemoveFile(priv_key)
478
      utils.RemoveFile(pub_key)
479
    except errors.OpExecError:
480
      logging.exception("Error while processing ssh files")
481

    
482
  try:
483
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
484
    utils.RemoveFile(constants.RAPI_CERT_FILE)
485
    utils.RemoveFile(constants.SPICE_CERT_FILE)
486
    utils.RemoveFile(constants.SPICE_CACERT_FILE)
487
    utils.RemoveFile(constants.NODED_CERT_FILE)
488
  except: # pylint: disable=W0702
489
    logging.exception("Error while removing cluster secrets")
490

    
491
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
492
  if result.failed:
493
    logging.error("Command %s failed with exitcode %s and error %s",
494
                  result.cmd, result.exit_code, result.output)
495

    
496
  # Raise a custom exception (handled in ganeti-noded)
497
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
498

    
499

    
500
def GetNodeInfo(vgname, hypervisor_type):
501
  """Gives back a hash with different information about the node.
502

503
  @type vgname: C{string}
504
  @param vgname: the name of the volume group to ask for disk space information
505
  @type hypervisor_type: C{str}
506
  @param hypervisor_type: the name of the hypervisor to ask for
507
      memory information
508
  @rtype: C{dict}
509
  @return: dictionary with the following keys:
510
      - vg_size is the size of the configured volume group in MiB
511
      - vg_free is the free size of the volume group in MiB
512
      - memory_dom0 is the memory allocated for domain0 in MiB
513
      - memory_free is the currently available (free) ram in MiB
514
      - memory_total is the total number of ram in MiB
515
      - hv_version: the hypervisor version, if available
516

517
  """
518
  outputarray = {}
519

    
520
  if vgname is not None:
521
    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
522
    vg_free = vg_size = None
523
    if vginfo:
524
      vg_free = int(round(vginfo[0][0], 0))
525
      vg_size = int(round(vginfo[0][1], 0))
526
    outputarray["vg_size"] = vg_size
527
    outputarray["vg_free"] = vg_free
528

    
529
  if hypervisor_type is not None:
530
    hyper = hypervisor.GetHypervisor(hypervisor_type)
531
    hyp_info = hyper.GetNodeInfo()
532
    if hyp_info is not None:
533
      outputarray.update(hyp_info)
534

    
535
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
536

    
537
  return outputarray
538

    
539

    
540
def VerifyNode(what, cluster_name):
541
  """Verify the status of the local node.
542

543
  Based on the input L{what} parameter, various checks are done on the
544
  local node.
545

546
  If the I{filelist} key is present, this list of
547
  files is checksummed and the file/checksum pairs are returned.
548

549
  If the I{nodelist} key is present, we check that we have
550
  connectivity via ssh with the target nodes (and check the hostname
551
  report).
552

553
  If the I{node-net-test} key is present, we check that we have
554
  connectivity to the given nodes via both primary IP and, if
555
  applicable, secondary IPs.
556

557
  @type what: C{dict}
558
  @param what: a dictionary of things to check:
559
      - filelist: list of files for which to compute checksums
560
      - nodelist: list of nodes we should check ssh communication with
561
      - node-net-test: list of nodes we should check node daemon port
562
        connectivity with
563
      - hypervisor: list with hypervisors to run the verify for
564
  @rtype: dict
565
  @return: a dictionary with the same keys as the input dict, and
566
      values representing the result of the checks
567

568
  """
569
  result = {}
570
  my_name = netutils.Hostname.GetSysName()
571
  port = netutils.GetDaemonPort(constants.NODED)
572
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
573

    
574
  if constants.NV_HYPERVISOR in what and vm_capable:
575
    result[constants.NV_HYPERVISOR] = tmp = {}
576
    for hv_name in what[constants.NV_HYPERVISOR]:
577
      try:
578
        val = hypervisor.GetHypervisor(hv_name).Verify()
579
      except errors.HypervisorError, err:
580
        val = "Error while checking hypervisor: %s" % str(err)
581
      tmp[hv_name] = val
582

    
583
  if constants.NV_HVPARAMS in what and vm_capable:
584
    result[constants.NV_HVPARAMS] = tmp = []
585
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
586
      try:
587
        logging.info("Validating hv %s, %s", hv_name, hvparms)
588
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
589
      except errors.HypervisorError, err:
590
        tmp.append((source, hv_name, str(err)))
591

    
592
  if constants.NV_FILELIST in what:
593
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
594
      what[constants.NV_FILELIST])
595

    
596
  if constants.NV_NODELIST in what:
597
    (nodes, bynode) = what[constants.NV_NODELIST]
598

    
599
    # Add nodes from other groups (different for each node)
600
    try:
601
      nodes.extend(bynode[my_name])
602
    except KeyError:
603
      pass
604

    
605
    # Use a random order
606
    random.shuffle(nodes)
607

    
608
    # Try to contact all nodes
609
    val = {}
610
    for node in nodes:
611
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
612
      if not success:
613
        val[node] = message
614

    
615
    result[constants.NV_NODELIST] = val
616

    
617
  if constants.NV_NODENETTEST in what:
618
    result[constants.NV_NODENETTEST] = tmp = {}
619
    my_pip = my_sip = None
620
    for name, pip, sip in what[constants.NV_NODENETTEST]:
621
      if name == my_name:
622
        my_pip = pip
623
        my_sip = sip
624
        break
625
    if not my_pip:
626
      tmp[my_name] = ("Can't find my own primary/secondary IP"
627
                      " in the node list")
628
    else:
629
      for name, pip, sip in what[constants.NV_NODENETTEST]:
630
        fail = []
631
        if not netutils.TcpPing(pip, port, source=my_pip):
632
          fail.append("primary")
633
        if sip != pip:
634
          if not netutils.TcpPing(sip, port, source=my_sip):
635
            fail.append("secondary")
636
        if fail:
637
          tmp[name] = ("failure using the %s interface(s)" %
638
                       " and ".join(fail))
639

    
640
  if constants.NV_MASTERIP in what:
641
    # FIXME: add checks on incoming data structures (here and in the
642
    # rest of the function)
643
    master_name, master_ip = what[constants.NV_MASTERIP]
644
    if master_name == my_name:
645
      source = constants.IP4_ADDRESS_LOCALHOST
646
    else:
647
      source = None
648
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
649
                                                  source=source)
650

    
651
  if constants.NV_OOB_PATHS in what:
652
    result[constants.NV_OOB_PATHS] = tmp = []
653
    for path in what[constants.NV_OOB_PATHS]:
654
      try:
655
        st = os.stat(path)
656
      except OSError, err:
657
        tmp.append("error stating out of band helper: %s" % err)
658
      else:
659
        if stat.S_ISREG(st.st_mode):
660
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
661
            tmp.append(None)
662
          else:
663
            tmp.append("out of band helper %s is not executable" % path)
664
        else:
665
          tmp.append("out of band helper %s is not a file" % path)
666

    
667
  if constants.NV_LVLIST in what and vm_capable:
668
    try:
669
      val = GetVolumeList(utils.ListVolumeGroups().keys())
670
    except RPCFail, err:
671
      val = str(err)
672
    result[constants.NV_LVLIST] = val
673

    
674
  if constants.NV_INSTANCELIST in what and vm_capable:
675
    # GetInstanceList can fail
676
    try:
677
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
678
    except RPCFail, err:
679
      val = str(err)
680
    result[constants.NV_INSTANCELIST] = val
681

    
682
  if constants.NV_VGLIST in what and vm_capable:
683
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
684

    
685
  if constants.NV_PVLIST in what and vm_capable:
686
    result[constants.NV_PVLIST] = \
687
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
688
                                   filter_allocatable=False)
689

    
690
  if constants.NV_VERSION in what:
691
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
692
                                    constants.RELEASE_VERSION)
693

    
694
  if constants.NV_HVINFO in what and vm_capable:
695
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
696
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
697

    
698
  if constants.NV_DRBDLIST in what and vm_capable:
699
    try:
700
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
701
    except errors.BlockDeviceError, err:
702
      logging.warning("Can't get used minors list", exc_info=True)
703
      used_minors = str(err)
704
    result[constants.NV_DRBDLIST] = used_minors
705

    
706
  if constants.NV_DRBDHELPER in what and vm_capable:
707
    status = True
708
    try:
709
      payload = bdev.BaseDRBD.GetUsermodeHelper()
710
    except errors.BlockDeviceError, err:
711
      logging.error("Can't get DRBD usermode helper: %s", str(err))
712
      status = False
713
      payload = str(err)
714
    result[constants.NV_DRBDHELPER] = (status, payload)
715

    
716
  if constants.NV_NODESETUP in what:
717
    result[constants.NV_NODESETUP] = tmpr = []
718
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
719
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
720
                  " under /sys, missing required directories /sys/block"
721
                  " and /sys/class/net")
722
    if (not os.path.isdir("/proc/sys") or
723
        not os.path.isfile("/proc/sysrq-trigger")):
724
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
725
                  " under /proc, missing required directory /proc/sys and"
726
                  " the file /proc/sysrq-trigger")
727

    
728
  if constants.NV_TIME in what:
729
    result[constants.NV_TIME] = utils.SplitTime(time.time())
730

    
731
  if constants.NV_OSLIST in what and vm_capable:
732
    result[constants.NV_OSLIST] = DiagnoseOS()
733

    
734
  if constants.NV_BRIDGES in what and vm_capable:
735
    result[constants.NV_BRIDGES] = [bridge
736
                                    for bridge in what[constants.NV_BRIDGES]
737
                                    if not utils.BridgeExists(bridge)]
738
  return result
739

    
740

    
741
def GetBlockDevSizes(devices):
742
  """Return the size of the given block devices
743

744
  @type devices: list
745
  @param devices: list of block device nodes to query
746
  @rtype: dict
747
  @return:
748
    dictionary of all block devices under /dev (key). The value is their
749
    size in MiB.
750

751
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
752

753
  """
754
  DEV_PREFIX = "/dev/"
755
  blockdevs = {}
756

    
757
  for devpath in devices:
758
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
759
      continue
760

    
761
    try:
762
      st = os.stat(devpath)
763
    except EnvironmentError, err:
764
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
765
      continue
766

    
767
    if stat.S_ISBLK(st.st_mode):
768
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
769
      if result.failed:
770
        # We don't want to fail, just do not list this device as available
771
        logging.warning("Cannot get size for block device %s", devpath)
772
        continue
773

    
774
      size = int(result.stdout) / (1024 * 1024)
775
      blockdevs[devpath] = size
776
  return blockdevs
777

    
778

    
779
def GetVolumeList(vg_names):
780
  """Compute list of logical volumes and their size.
781

782
  @type vg_names: list
783
  @param vg_names: the volume groups whose LVs we should list, or
784
      empty for all volume groups
785
  @rtype: dict
786
  @return:
787
      dictionary of all partions (key) with value being a tuple of
788
      their size (in MiB), inactive and online status::
789

790
        {'xenvg/test1': ('20.06', True, True)}
791

792
      in case of errors, a string is returned with the error
793
      details.
794

795
  """
796
  lvs = {}
797
  sep = "|"
798
  if not vg_names:
799
    vg_names = []
800
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
801
                         "--separator=%s" % sep,
802
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
803
  if result.failed:
804
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
805

    
806
  for line in result.stdout.splitlines():
807
    line = line.strip()
808
    match = _LVSLINE_REGEX.match(line)
809
    if not match:
810
      logging.error("Invalid line returned from lvs output: '%s'", line)
811
      continue
812
    vg_name, name, size, attr = match.groups()
813
    inactive = attr[4] == "-"
814
    online = attr[5] == "o"
815
    virtual = attr[0] == "v"
816
    if virtual:
817
      # we don't want to report such volumes as existing, since they
818
      # don't really hold data
819
      continue
820
    lvs[vg_name + "/" + name] = (size, inactive, online)
821

    
822
  return lvs
823

    
824

    
825
def ListVolumeGroups():
826
  """List the volume groups and their size.
827

828
  @rtype: dict
829
  @return: dictionary with keys volume name and values the
830
      size of the volume
831

832
  """
833
  return utils.ListVolumeGroups()
834

    
835

    
836
def NodeVolumes():
837
  """List all volumes on this node.
838

839
  @rtype: list
840
  @return:
841
    A list of dictionaries, each having four keys:
842
      - name: the logical volume name,
843
      - size: the size of the logical volume
844
      - dev: the physical device on which the LV lives
845
      - vg: the volume group to which it belongs
846

847
    In case of errors, we return an empty list and log the
848
    error.
849

850
    Note that since a logical volume can live on multiple physical
851
    volumes, the resulting list might include a logical volume
852
    multiple times.
853

854
  """
855
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
856
                         "--separator=|",
857
                         "--options=lv_name,lv_size,devices,vg_name"])
858
  if result.failed:
859
    _Fail("Failed to list logical volumes, lvs output: %s",
860
          result.output)
861

    
862
  def parse_dev(dev):
863
    return dev.split("(")[0]
864

    
865
  def handle_dev(dev):
866
    return [parse_dev(x) for x in dev.split(",")]
867

    
868
  def map_line(line):
869
    line = [v.strip() for v in line]
870
    return [{"name": line[0], "size": line[1],
871
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
872

    
873
  all_devs = []
874
  for line in result.stdout.splitlines():
875
    if line.count("|") >= 3:
876
      all_devs.extend(map_line(line.split("|")))
877
    else:
878
      logging.warning("Strange line in the output from lvs: '%s'", line)
879
  return all_devs
880

    
881

    
882
def BridgesExist(bridges_list):
883
  """Check if a list of bridges exist on the current node.
884

885
  @rtype: boolean
886
  @return: C{True} if all of them exist, C{False} otherwise
887

888
  """
889
  missing = []
890
  for bridge in bridges_list:
891
    if not utils.BridgeExists(bridge):
892
      missing.append(bridge)
893

    
894
  if missing:
895
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
896

    
897

    
898
def GetInstanceList(hypervisor_list):
899
  """Provides a list of instances.
900

901
  @type hypervisor_list: list
902
  @param hypervisor_list: the list of hypervisors to query information
903

904
  @rtype: list
905
  @return: a list of all running instances on the current node
906
    - instance1.example.com
907
    - instance2.example.com
908

909
  """
910
  results = []
911
  for hname in hypervisor_list:
912
    try:
913
      names = hypervisor.GetHypervisor(hname).ListInstances()
914
      results.extend(names)
915
    except errors.HypervisorError, err:
916
      _Fail("Error enumerating instances (hypervisor %s): %s",
917
            hname, err, exc=True)
918

    
919
  return results
920

    
921

    
922
def GetInstanceInfo(instance, hname):
923
  """Gives back the information about an instance as a dictionary.
924

925
  @type instance: string
926
  @param instance: the instance name
927
  @type hname: string
928
  @param hname: the hypervisor type of the instance
929

930
  @rtype: dict
931
  @return: dictionary with the following keys:
932
      - memory: memory size of instance (int)
933
      - state: xen state of instance (string)
934
      - time: cpu time of instance (float)
935

936
  """
937
  output = {}
938

    
939
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
940
  if iinfo is not None:
941
    output["memory"] = iinfo[2]
942
    output["state"] = iinfo[4]
943
    output["time"] = iinfo[5]
944

    
945
  return output
946

    
947

    
948
def GetInstanceMigratable(instance):
949
  """Gives whether an instance can be migrated.
950

951
  @type instance: L{objects.Instance}
952
  @param instance: object representing the instance to be checked.
953

954
  @rtype: tuple
955
  @return: tuple of (result, description) where:
956
      - result: whether the instance can be migrated or not
957
      - description: a description of the issue, if relevant
958

959
  """
960
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
961
  iname = instance.name
962
  if iname not in hyper.ListInstances():
963
    _Fail("Instance %s is not running", iname)
964

    
965
  for idx in range(len(instance.disks)):
966
    link_name = _GetBlockDevSymlinkPath(iname, idx)
967
    if not os.path.islink(link_name):
968
      logging.warning("Instance %s is missing symlink %s for disk %d",
969
                      iname, link_name, idx)
970

    
971

    
972
def GetAllInstancesInfo(hypervisor_list):
973
  """Gather data about all instances.
974

975
  This is the equivalent of L{GetInstanceInfo}, except that it
976
  computes data for all instances at once, thus being faster if one
977
  needs data about more than one instance.
978

979
  @type hypervisor_list: list
980
  @param hypervisor_list: list of hypervisors to query for instance data
981

982
  @rtype: dict
983
  @return: dictionary of instance: data, with data having the following keys:
984
      - memory: memory size of instance (int)
985
      - state: xen state of instance (string)
986
      - time: cpu time of instance (float)
987
      - vcpus: the number of vcpus
988

989
  """
990
  output = {}
991

    
992
  for hname in hypervisor_list:
993
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
994
    if iinfo:
995
      for name, _, memory, vcpus, state, times in iinfo:
996
        value = {
997
          "memory": memory,
998
          "vcpus": vcpus,
999
          "state": state,
1000
          "time": times,
1001
          }
1002
        if name in output:
1003
          # we only check static parameters, like memory and vcpus,
1004
          # and not state and time which can change between the
1005
          # invocations of the different hypervisors
1006
          for key in "memory", "vcpus":
1007
            if value[key] != output[name][key]:
1008
              _Fail("Instance %s is running twice"
1009
                    " with different parameters", name)
1010
        output[name] = value
1011

    
1012
  return output
1013

    
1014

    
1015
def _InstanceLogName(kind, os_name, instance, component):
1016
  """Compute the OS log filename for a given instance and operation.
1017

1018
  The instance name and os name are passed in as strings since not all
1019
  operations have these as part of an instance object.
1020

1021
  @type kind: string
1022
  @param kind: the operation type (e.g. add, import, etc.)
1023
  @type os_name: string
1024
  @param os_name: the os name
1025
  @type instance: string
1026
  @param instance: the name of the instance being imported/added/etc.
1027
  @type component: string or None
1028
  @param component: the name of the component of the instance being
1029
      transferred
1030

1031
  """
1032
  # TODO: Use tempfile.mkstemp to create unique filename
1033
  if component:
1034
    assert "/" not in component
1035
    c_msg = "-%s" % component
1036
  else:
1037
    c_msg = ""
1038
  base = ("%s-%s-%s%s-%s.log" %
1039
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1040
  return utils.PathJoin(constants.LOG_OS_DIR, base)
1041

    
1042

    
1043
def InstanceOsAdd(instance, reinstall, debug):
1044
  """Add an OS to an instance.
1045

1046
  @type instance: L{objects.Instance}
1047
  @param instance: Instance whose OS is to be installed
1048
  @type reinstall: boolean
1049
  @param reinstall: whether this is an instance reinstall
1050
  @type debug: integer
1051
  @param debug: debug level, passed to the OS scripts
1052
  @rtype: None
1053

1054
  """
1055
  inst_os = OSFromDisk(instance.os)
1056

    
1057
  create_env = OSEnvironment(instance, inst_os, debug)
1058
  if reinstall:
1059
    create_env["INSTANCE_REINSTALL"] = "1"
1060

    
1061
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1062

    
1063
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1064
                        cwd=inst_os.path, output=logfile, reset_env=True)
1065
  if result.failed:
1066
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1067
                  " output: %s", result.cmd, result.fail_reason, logfile,
1068
                  result.output)
1069
    lines = [utils.SafeEncode(val)
1070
             for val in utils.TailFile(logfile, lines=20)]
1071
    _Fail("OS create script failed (%s), last lines in the"
1072
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1073

    
1074

    
1075
def RunRenameInstance(instance, old_name, debug):
1076
  """Run the OS rename script for an instance.
1077

1078
  @type instance: L{objects.Instance}
1079
  @param instance: Instance whose OS is to be installed
1080
  @type old_name: string
1081
  @param old_name: previous instance name
1082
  @type debug: integer
1083
  @param debug: debug level, passed to the OS scripts
1084
  @rtype: boolean
1085
  @return: the success of the operation
1086

1087
  """
1088
  inst_os = OSFromDisk(instance.os)
1089

    
1090
  rename_env = OSEnvironment(instance, inst_os, debug)
1091
  rename_env["OLD_INSTANCE_NAME"] = old_name
1092

    
1093
  logfile = _InstanceLogName("rename", instance.os,
1094
                             "%s-%s" % (old_name, instance.name), None)
1095

    
1096
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1097
                        cwd=inst_os.path, output=logfile, reset_env=True)
1098

    
1099
  if result.failed:
1100
    logging.error("os create command '%s' returned error: %s output: %s",
1101
                  result.cmd, result.fail_reason, result.output)
1102
    lines = [utils.SafeEncode(val)
1103
             for val in utils.TailFile(logfile, lines=20)]
1104
    _Fail("OS rename script failed (%s), last lines in the"
1105
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1106

    
1107

    
1108
def _GetBlockDevSymlinkPath(instance_name, idx):
1109
  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1110
                        (instance_name, constants.DISK_SEPARATOR, idx))
1111

    
1112

    
1113
def _SymlinkBlockDev(instance_name, device_path, idx):
1114
  """Set up symlinks to a instance's block device.
1115

1116
  This is an auxiliary function run when an instance is start (on the primary
1117
  node) or when an instance is migrated (on the target node).
1118

1119

1120
  @param instance_name: the name of the target instance
1121
  @param device_path: path of the physical block device, on the node
1122
  @param idx: the disk index
1123
  @return: absolute path to the disk's symlink
1124

1125
  """
1126
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1127
  try:
1128
    os.symlink(device_path, link_name)
1129
  except OSError, err:
1130
    if err.errno == errno.EEXIST:
1131
      if (not os.path.islink(link_name) or
1132
          os.readlink(link_name) != device_path):
1133
        os.remove(link_name)
1134
        os.symlink(device_path, link_name)
1135
    else:
1136
      raise
1137

    
1138
  return link_name
1139

    
1140

    
1141
def _RemoveBlockDevLinks(instance_name, disks):
1142
  """Remove the block device symlinks belonging to the given instance.
1143

1144
  """
1145
  for idx, _ in enumerate(disks):
1146
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1147
    if os.path.islink(link_name):
1148
      try:
1149
        os.remove(link_name)
1150
      except OSError:
1151
        logging.exception("Can't remove symlink '%s'", link_name)
1152

    
1153

    
1154
def _GatherAndLinkBlockDevs(instance):
1155
  """Set up an instance's block device(s).
1156

1157
  This is run on the primary node at instance startup. The block
1158
  devices must be already assembled.
1159

1160
  @type instance: L{objects.Instance}
1161
  @param instance: the instance whose disks we shoul assemble
1162
  @rtype: list
1163
  @return: list of (disk_object, device_path)
1164

1165
  """
1166
  block_devices = []
1167
  for idx, disk in enumerate(instance.disks):
1168
    device = _RecursiveFindBD(disk)
1169
    if device is None:
1170
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1171
                                    str(disk))
1172
    device.Open()
1173
    try:
1174
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1175
    except OSError, e:
1176
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1177
                                    e.strerror)
1178

    
1179
    block_devices.append((disk, link_name))
1180

    
1181
  return block_devices
1182

    
1183

    
1184
def StartInstance(instance, startup_paused):
1185
  """Start an instance.
1186

1187
  @type instance: L{objects.Instance}
1188
  @param instance: the instance object
1189
  @type startup_paused: bool
1190
  @param instance: pause instance at startup?
1191
  @rtype: None
1192

1193
  """
1194
  running_instances = GetInstanceList([instance.hypervisor])
1195

    
1196
  if instance.name in running_instances:
1197
    logging.info("Instance %s already running, not starting", instance.name)
1198
    return
1199

    
1200
  try:
1201
    block_devices = _GatherAndLinkBlockDevs(instance)
1202
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1203
    hyper.StartInstance(instance, block_devices, startup_paused)
1204
  except errors.BlockDeviceError, err:
1205
    _Fail("Block device error: %s", err, exc=True)
1206
  except errors.HypervisorError, err:
1207
    _RemoveBlockDevLinks(instance.name, instance.disks)
1208
    _Fail("Hypervisor error: %s", err, exc=True)
1209

    
1210

    
1211
def InstanceShutdown(instance, timeout):
1212
  """Shut an instance down.
1213

1214
  @note: this functions uses polling with a hardcoded timeout.
1215

1216
  @type instance: L{objects.Instance}
1217
  @param instance: the instance object
1218
  @type timeout: integer
1219
  @param timeout: maximum timeout for soft shutdown
1220
  @rtype: None
1221

1222
  """
1223
  hv_name = instance.hypervisor
1224
  hyper = hypervisor.GetHypervisor(hv_name)
1225
  iname = instance.name
1226

    
1227
  if instance.name not in hyper.ListInstances():
1228
    logging.info("Instance %s not running, doing nothing", iname)
1229
    return
1230

    
1231
  class _TryShutdown:
1232
    def __init__(self):
1233
      self.tried_once = False
1234

    
1235
    def __call__(self):
1236
      if iname not in hyper.ListInstances():
1237
        return
1238

    
1239
      try:
1240
        hyper.StopInstance(instance, retry=self.tried_once)
1241
      except errors.HypervisorError, err:
1242
        if iname not in hyper.ListInstances():
1243
          # if the instance is no longer existing, consider this a
1244
          # success and go to cleanup
1245
          return
1246

    
1247
        _Fail("Failed to stop instance %s: %s", iname, err)
1248

    
1249
      self.tried_once = True
1250

    
1251
      raise utils.RetryAgain()
1252

    
1253
  try:
1254
    utils.Retry(_TryShutdown(), 5, timeout)
1255
  except utils.RetryTimeout:
1256
    # the shutdown did not succeed
1257
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1258

    
1259
    try:
1260
      hyper.StopInstance(instance, force=True)
1261
    except errors.HypervisorError, err:
1262
      if iname in hyper.ListInstances():
1263
        # only raise an error if the instance still exists, otherwise
1264
        # the error could simply be "instance ... unknown"!
1265
        _Fail("Failed to force stop instance %s: %s", iname, err)
1266

    
1267
    time.sleep(1)
1268

    
1269
    if iname in hyper.ListInstances():
1270
      _Fail("Could not shutdown instance %s even by destroy", iname)
1271

    
1272
  try:
1273
    hyper.CleanupInstance(instance.name)
1274
  except errors.HypervisorError, err:
1275
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1276

    
1277
  _RemoveBlockDevLinks(iname, instance.disks)
1278

    
1279

    
1280
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1281
  """Reboot an instance.
1282

1283
  @type instance: L{objects.Instance}
1284
  @param instance: the instance object to reboot
1285
  @type reboot_type: str
1286
  @param reboot_type: the type of reboot, one the following
1287
    constants:
1288
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1289
        instance OS, do not recreate the VM
1290
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1291
        restart the VM (at the hypervisor level)
1292
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1293
        not accepted here, since that mode is handled differently, in
1294
        cmdlib, and translates into full stop and start of the
1295
        instance (instead of a call_instance_reboot RPC)
1296
  @type shutdown_timeout: integer
1297
  @param shutdown_timeout: maximum timeout for soft shutdown
1298
  @rtype: None
1299

1300
  """
1301
  running_instances = GetInstanceList([instance.hypervisor])
1302

    
1303
  if instance.name not in running_instances:
1304
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1305

    
1306
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1307
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1308
    try:
1309
      hyper.RebootInstance(instance)
1310
    except errors.HypervisorError, err:
1311
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1312
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1313
    try:
1314
      InstanceShutdown(instance, shutdown_timeout)
1315
      return StartInstance(instance, False)
1316
    except errors.HypervisorError, err:
1317
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1318
  else:
1319
    _Fail("Invalid reboot_type received: %s", reboot_type)
1320

    
1321

    
1322
def MigrationInfo(instance):
1323
  """Gather information about an instance to be migrated.
1324

1325
  @type instance: L{objects.Instance}
1326
  @param instance: the instance definition
1327

1328
  """
1329
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1330
  try:
1331
    info = hyper.MigrationInfo(instance)
1332
  except errors.HypervisorError, err:
1333
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1334
  return info
1335

    
1336

    
1337
def AcceptInstance(instance, info, target):
1338
  """Prepare the node to accept an instance.
1339

1340
  @type instance: L{objects.Instance}
1341
  @param instance: the instance definition
1342
  @type info: string/data (opaque)
1343
  @param info: migration information, from the source node
1344
  @type target: string
1345
  @param target: target host (usually ip), on this node
1346

1347
  """
1348
  # TODO: why is this required only for DTS_EXT_MIRROR?
1349
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1350
    # Create the symlinks, as the disks are not active
1351
    # in any way
1352
    try:
1353
      _GatherAndLinkBlockDevs(instance)
1354
    except errors.BlockDeviceError, err:
1355
      _Fail("Block device error: %s", err, exc=True)
1356

    
1357
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1358
  try:
1359
    hyper.AcceptInstance(instance, info, target)
1360
  except errors.HypervisorError, err:
1361
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1362
      _RemoveBlockDevLinks(instance.name, instance.disks)
1363
    _Fail("Failed to accept instance: %s", err, exc=True)
1364

    
1365

    
1366
def FinalizeMigrationDst(instance, info, success):
1367
  """Finalize any preparation to accept an instance.
1368

1369
  @type instance: L{objects.Instance}
1370
  @param instance: the instance definition
1371
  @type info: string/data (opaque)
1372
  @param info: migration information, from the source node
1373
  @type success: boolean
1374
  @param success: whether the migration was a success or a failure
1375

1376
  """
1377
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1378
  try:
1379
    hyper.FinalizeMigrationDst(instance, info, success)
1380
  except errors.HypervisorError, err:
1381
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1382

    
1383

    
1384
def MigrateInstance(instance, target, live):
1385
  """Migrates an instance to another node.
1386

1387
  @type instance: L{objects.Instance}
1388
  @param instance: the instance definition
1389
  @type target: string
1390
  @param target: the target node name
1391
  @type live: boolean
1392
  @param live: whether the migration should be done live or not (the
1393
      interpretation of this parameter is left to the hypervisor)
1394
  @raise RPCFail: if migration fails for some reason
1395

1396
  """
1397
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1398

    
1399
  try:
1400
    hyper.MigrateInstance(instance, target, live)
1401
  except errors.HypervisorError, err:
1402
    _Fail("Failed to migrate instance: %s", err, exc=True)
1403

    
1404

    
1405
def FinalizeMigrationSource(instance, success, live):
1406
  """Finalize the instance migration on the source node.
1407

1408
  @type instance: L{objects.Instance}
1409
  @param instance: the instance definition of the migrated instance
1410
  @type success: bool
1411
  @param success: whether the migration succeeded or not
1412
  @type live: bool
1413
  @param live: whether the user requested a live migration or not
1414
  @raise RPCFail: If the execution fails for some reason
1415

1416
  """
1417
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1418

    
1419
  try:
1420
    hyper.FinalizeMigrationSource(instance, success, live)
1421
  except Exception, err:  # pylint: disable=W0703
1422
    _Fail("Failed to finalize the migration on the source node: %s", err,
1423
          exc=True)
1424

    
1425

    
1426
def GetMigrationStatus(instance):
1427
  """Get the migration status
1428

1429
  @type instance: L{objects.Instance}
1430
  @param instance: the instance that is being migrated
1431
  @rtype: L{objects.MigrationStatus}
1432
  @return: the status of the current migration (one of
1433
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1434
           progress info that can be retrieved from the hypervisor
1435
  @raise RPCFail: If the migration status cannot be retrieved
1436

1437
  """
1438
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1439
  try:
1440
    return hyper.GetMigrationStatus(instance)
1441
  except Exception, err:  # pylint: disable=W0703
1442
    _Fail("Failed to get migration status: %s", err, exc=True)
1443

    
1444

    
1445
def BlockdevCreate(disk, size, owner, on_primary, info):
1446
  """Creates a block device for an instance.
1447

1448
  @type disk: L{objects.Disk}
1449
  @param disk: the object describing the disk we should create
1450
  @type size: int
1451
  @param size: the size of the physical underlying device, in MiB
1452
  @type owner: str
1453
  @param owner: the name of the instance for which disk is created,
1454
      used for device cache data
1455
  @type on_primary: boolean
1456
  @param on_primary:  indicates if it is the primary node or not
1457
  @type info: string
1458
  @param info: string that will be sent to the physical device
1459
      creation, used for example to set (LVM) tags on LVs
1460

1461
  @return: the new unique_id of the device (this can sometime be
1462
      computed only after creation), or None. On secondary nodes,
1463
      it's not required to return anything.
1464

1465
  """
1466
  # TODO: remove the obsolete "size" argument
1467
  # pylint: disable=W0613
1468
  clist = []
1469
  if disk.children:
1470
    for child in disk.children:
1471
      try:
1472
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1473
      except errors.BlockDeviceError, err:
1474
        _Fail("Can't assemble device %s: %s", child, err)
1475
      if on_primary or disk.AssembleOnSecondary():
1476
        # we need the children open in case the device itself has to
1477
        # be assembled
1478
        try:
1479
          # pylint: disable=E1103
1480
          crdev.Open()
1481
        except errors.BlockDeviceError, err:
1482
          _Fail("Can't make child '%s' read-write: %s", child, err)
1483
      clist.append(crdev)
1484

    
1485
  try:
1486
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1487
  except errors.BlockDeviceError, err:
1488
    _Fail("Can't create block device: %s", err)
1489

    
1490
  if on_primary or disk.AssembleOnSecondary():
1491
    try:
1492
      device.Assemble()
1493
    except errors.BlockDeviceError, err:
1494
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1495
    device.SetSyncSpeed(constants.SYNC_SPEED)
1496
    if on_primary or disk.OpenOnSecondary():
1497
      try:
1498
        device.Open(force=True)
1499
      except errors.BlockDeviceError, err:
1500
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1501
    DevCacheManager.UpdateCache(device.dev_path, owner,
1502
                                on_primary, disk.iv_name)
1503

    
1504
  device.SetInfo(info)
1505

    
1506
  return device.unique_id
1507

    
1508

    
1509
def _WipeDevice(path, offset, size):
1510
  """This function actually wipes the device.
1511

1512
  @param path: The path to the device to wipe
1513
  @param offset: The offset in MiB in the file
1514
  @param size: The size in MiB to write
1515

1516
  """
1517
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1518
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1519
         "count=%d" % size]
1520
  result = utils.RunCmd(cmd)
1521

    
1522
  if result.failed:
1523
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1524
          result.fail_reason, result.output)
1525

    
1526

    
1527
def BlockdevWipe(disk, offset, size):
1528
  """Wipes a block device.
1529

1530
  @type disk: L{objects.Disk}
1531
  @param disk: the disk object we want to wipe
1532
  @type offset: int
1533
  @param offset: The offset in MiB in the file
1534
  @type size: int
1535
  @param size: The size in MiB to write
1536

1537
  """
1538
  try:
1539
    rdev = _RecursiveFindBD(disk)
1540
  except errors.BlockDeviceError:
1541
    rdev = None
1542

    
1543
  if not rdev:
1544
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1545

    
1546
  # Do cross verify some of the parameters
1547
  if offset > rdev.size:
1548
    _Fail("Offset is bigger than device size")
1549
  if (offset + size) > rdev.size:
1550
    _Fail("The provided offset and size to wipe is bigger than device size")
1551

    
1552
  _WipeDevice(rdev.dev_path, offset, size)
1553

    
1554

    
1555
def BlockdevPauseResumeSync(disks, pause):
1556
  """Pause or resume the sync of the block device.
1557

1558
  @type disks: list of L{objects.Disk}
1559
  @param disks: the disks object we want to pause/resume
1560
  @type pause: bool
1561
  @param pause: Wheater to pause or resume
1562

1563
  """
1564
  success = []
1565
  for disk in disks:
1566
    try:
1567
      rdev = _RecursiveFindBD(disk)
1568
    except errors.BlockDeviceError:
1569
      rdev = None
1570

    
1571
    if not rdev:
1572
      success.append((False, ("Cannot change sync for device %s:"
1573
                              " device not found" % disk.iv_name)))
1574
      continue
1575

    
1576
    result = rdev.PauseResumeSync(pause)
1577

    
1578
    if result:
1579
      success.append((result, None))
1580
    else:
1581
      if pause:
1582
        msg = "Pause"
1583
      else:
1584
        msg = "Resume"
1585
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1586

    
1587
  return success
1588

    
1589

    
1590
def BlockdevRemove(disk):
1591
  """Remove a block device.
1592

1593
  @note: This is intended to be called recursively.
1594

1595
  @type disk: L{objects.Disk}
1596
  @param disk: the disk object we should remove
1597
  @rtype: boolean
1598
  @return: the success of the operation
1599

1600
  """
1601
  msgs = []
1602
  try:
1603
    rdev = _RecursiveFindBD(disk)
1604
  except errors.BlockDeviceError, err:
1605
    # probably can't attach
1606
    logging.info("Can't attach to device %s in remove", disk)
1607
    rdev = None
1608
  if rdev is not None:
1609
    r_path = rdev.dev_path
1610
    try:
1611
      rdev.Remove()
1612
    except errors.BlockDeviceError, err:
1613
      msgs.append(str(err))
1614
    if not msgs:
1615
      DevCacheManager.RemoveCache(r_path)
1616

    
1617
  if disk.children:
1618
    for child in disk.children:
1619
      try:
1620
        BlockdevRemove(child)
1621
      except RPCFail, err:
1622
        msgs.append(str(err))
1623

    
1624
  if msgs:
1625
    _Fail("; ".join(msgs))
1626

    
1627

    
1628
def _RecursiveAssembleBD(disk, owner, as_primary):
1629
  """Activate a block device for an instance.
1630

1631
  This is run on the primary and secondary nodes for an instance.
1632

1633
  @note: this function is called recursively.
1634

1635
  @type disk: L{objects.Disk}
1636
  @param disk: the disk we try to assemble
1637
  @type owner: str
1638
  @param owner: the name of the instance which owns the disk
1639
  @type as_primary: boolean
1640
  @param as_primary: if we should make the block device
1641
      read/write
1642

1643
  @return: the assembled device or None (in case no device
1644
      was assembled)
1645
  @raise errors.BlockDeviceError: in case there is an error
1646
      during the activation of the children or the device
1647
      itself
1648

1649
  """
1650
  children = []
1651
  if disk.children:
1652
    mcn = disk.ChildrenNeeded()
1653
    if mcn == -1:
1654
      mcn = 0 # max number of Nones allowed
1655
    else:
1656
      mcn = len(disk.children) - mcn # max number of Nones
1657
    for chld_disk in disk.children:
1658
      try:
1659
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1660
      except errors.BlockDeviceError, err:
1661
        if children.count(None) >= mcn:
1662
          raise
1663
        cdev = None
1664
        logging.error("Error in child activation (but continuing): %s",
1665
                      str(err))
1666
      children.append(cdev)
1667

    
1668
  if as_primary or disk.AssembleOnSecondary():
1669
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1670
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1671
    result = r_dev
1672
    if as_primary or disk.OpenOnSecondary():
1673
      r_dev.Open()
1674
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1675
                                as_primary, disk.iv_name)
1676

    
1677
  else:
1678
    result = True
1679
  return result
1680

    
1681

    
1682
def BlockdevAssemble(disk, owner, as_primary, idx):
1683
  """Activate a block device for an instance.
1684

1685
  This is a wrapper over _RecursiveAssembleBD.
1686

1687
  @rtype: str or boolean
1688
  @return: a C{/dev/...} path for primary nodes, and
1689
      C{True} for secondary nodes
1690

1691
  """
1692
  try:
1693
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1694
    if isinstance(result, bdev.BlockDev):
1695
      # pylint: disable=E1103
1696
      result = result.dev_path
1697
      if as_primary:
1698
        _SymlinkBlockDev(owner, result, idx)
1699
  except errors.BlockDeviceError, err:
1700
    _Fail("Error while assembling disk: %s", err, exc=True)
1701
  except OSError, err:
1702
    _Fail("Error while symlinking disk: %s", err, exc=True)
1703

    
1704
  return result
1705

    
1706

    
1707
def BlockdevShutdown(disk):
1708
  """Shut down a block device.
1709

1710
  First, if the device is assembled (Attach() is successful), then
1711
  the device is shutdown. Then the children of the device are
1712
  shutdown.
1713

1714
  This function is called recursively. Note that we don't cache the
1715
  children or such, as oppossed to assemble, shutdown of different
1716
  devices doesn't require that the upper device was active.
1717

1718
  @type disk: L{objects.Disk}
1719
  @param disk: the description of the disk we should
1720
      shutdown
1721
  @rtype: None
1722

1723
  """
1724
  msgs = []
1725
  r_dev = _RecursiveFindBD(disk)
1726
  if r_dev is not None:
1727
    r_path = r_dev.dev_path
1728
    try:
1729
      r_dev.Shutdown()
1730
      DevCacheManager.RemoveCache(r_path)
1731
    except errors.BlockDeviceError, err:
1732
      msgs.append(str(err))
1733

    
1734
  if disk.children:
1735
    for child in disk.children:
1736
      try:
1737
        BlockdevShutdown(child)
1738
      except RPCFail, err:
1739
        msgs.append(str(err))
1740

    
1741
  if msgs:
1742
    _Fail("; ".join(msgs))
1743

    
1744

    
1745
def BlockdevAddchildren(parent_cdev, new_cdevs):
1746
  """Extend a mirrored block device.
1747

1748
  @type parent_cdev: L{objects.Disk}
1749
  @param parent_cdev: the disk to which we should add children
1750
  @type new_cdevs: list of L{objects.Disk}
1751
  @param new_cdevs: the list of children which we should add
1752
  @rtype: None
1753

1754
  """
1755
  parent_bdev = _RecursiveFindBD(parent_cdev)
1756
  if parent_bdev is None:
1757
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1758
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1759
  if new_bdevs.count(None) > 0:
1760
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1761
  parent_bdev.AddChildren(new_bdevs)
1762

    
1763

    
1764
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1765
  """Shrink a mirrored block device.
1766

1767
  @type parent_cdev: L{objects.Disk}
1768
  @param parent_cdev: the disk from which we should remove children
1769
  @type new_cdevs: list of L{objects.Disk}
1770
  @param new_cdevs: the list of children which we should remove
1771
  @rtype: None
1772

1773
  """
1774
  parent_bdev = _RecursiveFindBD(parent_cdev)
1775
  if parent_bdev is None:
1776
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1777
  devs = []
1778
  for disk in new_cdevs:
1779
    rpath = disk.StaticDevPath()
1780
    if rpath is None:
1781
      bd = _RecursiveFindBD(disk)
1782
      if bd is None:
1783
        _Fail("Can't find device %s while removing children", disk)
1784
      else:
1785
        devs.append(bd.dev_path)
1786
    else:
1787
      if not utils.IsNormAbsPath(rpath):
1788
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1789
      devs.append(rpath)
1790
  parent_bdev.RemoveChildren(devs)
1791

    
1792

    
1793
def BlockdevGetmirrorstatus(disks):
1794
  """Get the mirroring status of a list of devices.
1795

1796
  @type disks: list of L{objects.Disk}
1797
  @param disks: the list of disks which we should query
1798
  @rtype: disk
1799
  @return: List of L{objects.BlockDevStatus}, one for each disk
1800
  @raise errors.BlockDeviceError: if any of the disks cannot be
1801
      found
1802

1803
  """
1804
  stats = []
1805
  for dsk in disks:
1806
    rbd = _RecursiveFindBD(dsk)
1807
    if rbd is None:
1808
      _Fail("Can't find device %s", dsk)
1809

    
1810
    stats.append(rbd.CombinedSyncStatus())
1811

    
1812
  return stats
1813

    
1814

    
1815
def BlockdevGetmirrorstatusMulti(disks):
1816
  """Get the mirroring status of a list of devices.
1817

1818
  @type disks: list of L{objects.Disk}
1819
  @param disks: the list of disks which we should query
1820
  @rtype: disk
1821
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1822
    success/failure, status is L{objects.BlockDevStatus} on success, string
1823
    otherwise
1824

1825
  """
1826
  result = []
1827
  for disk in disks:
1828
    try:
1829
      rbd = _RecursiveFindBD(disk)
1830
      if rbd is None:
1831
        result.append((False, "Can't find device %s" % disk))
1832
        continue
1833

    
1834
      status = rbd.CombinedSyncStatus()
1835
    except errors.BlockDeviceError, err:
1836
      logging.exception("Error while getting disk status")
1837
      result.append((False, str(err)))
1838
    else:
1839
      result.append((True, status))
1840

    
1841
  assert len(disks) == len(result)
1842

    
1843
  return result
1844

    
1845

    
1846
def _RecursiveFindBD(disk):
1847
  """Check if a device is activated.
1848

1849
  If so, return information about the real device.
1850

1851
  @type disk: L{objects.Disk}
1852
  @param disk: the disk object we need to find
1853

1854
  @return: None if the device can't be found,
1855
      otherwise the device instance
1856

1857
  """
1858
  children = []
1859
  if disk.children:
1860
    for chdisk in disk.children:
1861
      children.append(_RecursiveFindBD(chdisk))
1862

    
1863
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1864

    
1865

    
1866
def _OpenRealBD(disk):
1867
  """Opens the underlying block device of a disk.
1868

1869
  @type disk: L{objects.Disk}
1870
  @param disk: the disk object we want to open
1871

1872
  """
1873
  real_disk = _RecursiveFindBD(disk)
1874
  if real_disk is None:
1875
    _Fail("Block device '%s' is not set up", disk)
1876

    
1877
  real_disk.Open()
1878

    
1879
  return real_disk
1880

    
1881

    
1882
def BlockdevFind(disk):
1883
  """Check if a device is activated.
1884

1885
  If it is, return information about the real device.
1886

1887
  @type disk: L{objects.Disk}
1888
  @param disk: the disk to find
1889
  @rtype: None or objects.BlockDevStatus
1890
  @return: None if the disk cannot be found, otherwise a the current
1891
           information
1892

1893
  """
1894
  try:
1895
    rbd = _RecursiveFindBD(disk)
1896
  except errors.BlockDeviceError, err:
1897
    _Fail("Failed to find device: %s", err, exc=True)
1898

    
1899
  if rbd is None:
1900
    return None
1901

    
1902
  return rbd.GetSyncStatus()
1903

    
1904

    
1905
def BlockdevGetsize(disks):
1906
  """Computes the size of the given disks.
1907

1908
  If a disk is not found, returns None instead.
1909

1910
  @type disks: list of L{objects.Disk}
1911
  @param disks: the list of disk to compute the size for
1912
  @rtype: list
1913
  @return: list with elements None if the disk cannot be found,
1914
      otherwise the size
1915

1916
  """
1917
  result = []
1918
  for cf in disks:
1919
    try:
1920
      rbd = _RecursiveFindBD(cf)
1921
    except errors.BlockDeviceError:
1922
      result.append(None)
1923
      continue
1924
    if rbd is None:
1925
      result.append(None)
1926
    else:
1927
      result.append(rbd.GetActualSize())
1928
  return result
1929

    
1930

    
1931
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1932
  """Export a block device to a remote node.
1933

1934
  @type disk: L{objects.Disk}
1935
  @param disk: the description of the disk to export
1936
  @type dest_node: str
1937
  @param dest_node: the destination node to export to
1938
  @type dest_path: str
1939
  @param dest_path: the destination path on the target node
1940
  @type cluster_name: str
1941
  @param cluster_name: the cluster name, needed for SSH hostalias
1942
  @rtype: None
1943

1944
  """
1945
  real_disk = _OpenRealBD(disk)
1946

    
1947
  # the block size on the read dd is 1MiB to match our units
1948
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1949
                               "dd if=%s bs=1048576 count=%s",
1950
                               real_disk.dev_path, str(disk.size))
1951

    
1952
  # we set here a smaller block size as, due to ssh buffering, more
1953
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1954
  # device is not already there or we pass a wrong path; we use
1955
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1956
  # to not buffer too much memory; this means that at best, we flush
1957
  # every 64k, which will not be very fast
1958
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1959
                                " oflag=dsync", dest_path)
1960

    
1961
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1962
                                                   constants.GANETI_RUNAS,
1963
                                                   destcmd)
1964

    
1965
  # all commands have been checked, so we're safe to combine them
1966
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1967

    
1968
  result = utils.RunCmd(["bash", "-c", command])
1969

    
1970
  if result.failed:
1971
    _Fail("Disk copy command '%s' returned error: %s"
1972
          " output: %s", command, result.fail_reason, result.output)
1973

    
1974

    
1975
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1976
  """Write a file to the filesystem.
1977

1978
  This allows the master to overwrite(!) a file. It will only perform
1979
  the operation if the file belongs to a list of configuration files.
1980

1981
  @type file_name: str
1982
  @param file_name: the target file name
1983
  @type data: str
1984
  @param data: the new contents of the file
1985
  @type mode: int
1986
  @param mode: the mode to give the file (can be None)
1987
  @type uid: string
1988
  @param uid: the owner of the file
1989
  @type gid: string
1990
  @param gid: the group of the file
1991
  @type atime: float
1992
  @param atime: the atime to set on the file (can be None)
1993
  @type mtime: float
1994
  @param mtime: the mtime to set on the file (can be None)
1995
  @rtype: None
1996

1997
  """
1998
  if not os.path.isabs(file_name):
1999
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2000

    
2001
  if file_name not in _ALLOWED_UPLOAD_FILES:
2002
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2003
          file_name)
2004

    
2005
  raw_data = _Decompress(data)
2006

    
2007
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2008
    _Fail("Invalid username/groupname type")
2009

    
2010
  getents = runtime.GetEnts()
2011
  uid = getents.LookupUser(uid)
2012
  gid = getents.LookupGroup(gid)
2013

    
2014
  utils.SafeWriteFile(file_name, None,
2015
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2016
                      atime=atime, mtime=mtime)
2017

    
2018

    
2019
def RunOob(oob_program, command, node, timeout):
2020
  """Executes oob_program with given command on given node.
2021

2022
  @param oob_program: The path to the executable oob_program
2023
  @param command: The command to invoke on oob_program
2024
  @param node: The node given as an argument to the program
2025
  @param timeout: Timeout after which we kill the oob program
2026

2027
  @return: stdout
2028
  @raise RPCFail: If execution fails for some reason
2029

2030
  """
2031
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2032

    
2033
  if result.failed:
2034
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2035
          result.fail_reason, result.output)
2036

    
2037
  return result.stdout
2038

    
2039

    
2040
def WriteSsconfFiles(values):
2041
  """Update all ssconf files.
2042

2043
  Wrapper around the SimpleStore.WriteFiles.
2044

2045
  """
2046
  ssconf.SimpleStore().WriteFiles(values)
2047

    
2048

    
2049
def _ErrnoOrStr(err):
2050
  """Format an EnvironmentError exception.
2051

2052
  If the L{err} argument has an errno attribute, it will be looked up
2053
  and converted into a textual C{E...} description. Otherwise the
2054
  string representation of the error will be returned.
2055

2056
  @type err: L{EnvironmentError}
2057
  @param err: the exception to format
2058

2059
  """
2060
  if hasattr(err, "errno"):
2061
    detail = errno.errorcode[err.errno]
2062
  else:
2063
    detail = str(err)
2064
  return detail
2065

    
2066

    
2067
def _OSOndiskAPIVersion(os_dir):
2068
  """Compute and return the API version of a given OS.
2069

2070
  This function will try to read the API version of the OS residing in
2071
  the 'os_dir' directory.
2072

2073
  @type os_dir: str
2074
  @param os_dir: the directory in which we should look for the OS
2075
  @rtype: tuple
2076
  @return: tuple (status, data) with status denoting the validity and
2077
      data holding either the vaid versions or an error message
2078

2079
  """
2080
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2081

    
2082
  try:
2083
    st = os.stat(api_file)
2084
  except EnvironmentError, err:
2085
    return False, ("Required file '%s' not found under path %s: %s" %
2086
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2087

    
2088
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2089
    return False, ("File '%s' in %s is not a regular file" %
2090
                   (constants.OS_API_FILE, os_dir))
2091

    
2092
  try:
2093
    api_versions = utils.ReadFile(api_file).splitlines()
2094
  except EnvironmentError, err:
2095
    return False, ("Error while reading the API version file at %s: %s" %
2096
                   (api_file, _ErrnoOrStr(err)))
2097

    
2098
  try:
2099
    api_versions = [int(version.strip()) for version in api_versions]
2100
  except (TypeError, ValueError), err:
2101
    return False, ("API version(s) can't be converted to integer: %s" %
2102
                   str(err))
2103

    
2104
  return True, api_versions
2105

    
2106

    
2107
def DiagnoseOS(top_dirs=None):
2108
  """Compute the validity for all OSes.
2109

2110
  @type top_dirs: list
2111
  @param top_dirs: the list of directories in which to
2112
      search (if not given defaults to
2113
      L{constants.OS_SEARCH_PATH})
2114
  @rtype: list of L{objects.OS}
2115
  @return: a list of tuples (name, path, status, diagnose, variants,
2116
      parameters, api_version) for all (potential) OSes under all
2117
      search paths, where:
2118
          - name is the (potential) OS name
2119
          - path is the full path to the OS
2120
          - status True/False is the validity of the OS
2121
          - diagnose is the error message for an invalid OS, otherwise empty
2122
          - variants is a list of supported OS variants, if any
2123
          - parameters is a list of (name, help) parameters, if any
2124
          - api_version is a list of support OS API versions
2125

2126
  """
2127
  if top_dirs is None:
2128
    top_dirs = constants.OS_SEARCH_PATH
2129

    
2130
  result = []
2131
  for dir_name in top_dirs:
2132
    if os.path.isdir(dir_name):
2133
      try:
2134
        f_names = utils.ListVisibleFiles(dir_name)
2135
      except EnvironmentError, err:
2136
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2137
        break
2138
      for name in f_names:
2139
        os_path = utils.PathJoin(dir_name, name)
2140
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2141
        if status:
2142
          diagnose = ""
2143
          variants = os_inst.supported_variants
2144
          parameters = os_inst.supported_parameters
2145
          api_versions = os_inst.api_versions
2146
        else:
2147
          diagnose = os_inst
2148
          variants = parameters = api_versions = []
2149
        result.append((name, os_path, status, diagnose, variants,
2150
                       parameters, api_versions))
2151

    
2152
  return result
2153

    
2154

    
2155
def _TryOSFromDisk(name, base_dir=None):
2156
  """Create an OS instance from disk.
2157

2158
  This function will return an OS instance if the given name is a
2159
  valid OS name.
2160

2161
  @type base_dir: string
2162
  @keyword base_dir: Base directory containing OS installations.
2163
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2164
  @rtype: tuple
2165
  @return: success and either the OS instance if we find a valid one,
2166
      or error message
2167

2168
  """
2169
  if base_dir is None:
2170
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2171
  else:
2172
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2173

    
2174
  if os_dir is None:
2175
    return False, "Directory for OS %s not found in search path" % name
2176

    
2177
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2178
  if not status:
2179
    # push the error up
2180
    return status, api_versions
2181

    
2182
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2183
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2184
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2185

    
2186
  # OS Files dictionary, we will populate it with the absolute path
2187
  # names; if the value is True, then it is a required file, otherwise
2188
  # an optional one
2189
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2190

    
2191
  if max(api_versions) >= constants.OS_API_V15:
2192
    os_files[constants.OS_VARIANTS_FILE] = False
2193

    
2194
  if max(api_versions) >= constants.OS_API_V20:
2195
    os_files[constants.OS_PARAMETERS_FILE] = True
2196
  else:
2197
    del os_files[constants.OS_SCRIPT_VERIFY]
2198

    
2199
  for (filename, required) in os_files.items():
2200
    os_files[filename] = utils.PathJoin(os_dir, filename)
2201

    
2202
    try:
2203
      st = os.stat(os_files[filename])
2204
    except EnvironmentError, err:
2205
      if err.errno == errno.ENOENT and not required:
2206
        del os_files[filename]
2207
        continue
2208
      return False, ("File '%s' under path '%s' is missing (%s)" %
2209
                     (filename, os_dir, _ErrnoOrStr(err)))
2210

    
2211
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2212
      return False, ("File '%s' under path '%s' is not a regular file" %
2213
                     (filename, os_dir))
2214

    
2215
    if filename in constants.OS_SCRIPTS:
2216
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2217
        return False, ("File '%s' under path '%s' is not executable" %
2218
                       (filename, os_dir))
2219

    
2220
  variants = []
2221
  if constants.OS_VARIANTS_FILE in os_files:
2222
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2223
    try:
2224
      variants = utils.ReadFile(variants_file).splitlines()
2225
    except EnvironmentError, err:
2226
      # we accept missing files, but not other errors
2227
      if err.errno != errno.ENOENT:
2228
        return False, ("Error while reading the OS variants file at %s: %s" %
2229
                       (variants_file, _ErrnoOrStr(err)))
2230

    
2231
  parameters = []
2232
  if constants.OS_PARAMETERS_FILE in os_files:
2233
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2234
    try:
2235
      parameters = utils.ReadFile(parameters_file).splitlines()
2236
    except EnvironmentError, err:
2237
      return False, ("Error while reading the OS parameters file at %s: %s" %
2238
                     (parameters_file, _ErrnoOrStr(err)))
2239
    parameters = [v.split(None, 1) for v in parameters]
2240

    
2241
  os_obj = objects.OS(name=name, path=os_dir,
2242
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2243
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2244
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2245
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2246
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2247
                                                 None),
2248
                      supported_variants=variants,
2249
                      supported_parameters=parameters,
2250
                      api_versions=api_versions)
2251
  return True, os_obj
2252

    
2253

    
2254
def OSFromDisk(name, base_dir=None):
2255
  """Create an OS instance from disk.
2256

2257
  This function will return an OS instance if the given name is a
2258
  valid OS name. Otherwise, it will raise an appropriate
2259
  L{RPCFail} exception, detailing why this is not a valid OS.
2260

2261
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2262
  an exception but returns true/false status data.
2263

2264
  @type base_dir: string
2265
  @keyword base_dir: Base directory containing OS installations.
2266
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2267
  @rtype: L{objects.OS}
2268
  @return: the OS instance if we find a valid one
2269
  @raise RPCFail: if we don't find a valid OS
2270

2271
  """
2272
  name_only = objects.OS.GetName(name)
2273
  status, payload = _TryOSFromDisk(name_only, base_dir)
2274

    
2275
  if not status:
2276
    _Fail(payload)
2277

    
2278
  return payload
2279

    
2280

    
2281
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2282
  """Calculate the basic environment for an os script.
2283

2284
  @type os_name: str
2285
  @param os_name: full operating system name (including variant)
2286
  @type inst_os: L{objects.OS}
2287
  @param inst_os: operating system for which the environment is being built
2288
  @type os_params: dict
2289
  @param os_params: the OS parameters
2290
  @type debug: integer
2291
  @param debug: debug level (0 or 1, for OS Api 10)
2292
  @rtype: dict
2293
  @return: dict of environment variables
2294
  @raise errors.BlockDeviceError: if the block device
2295
      cannot be found
2296

2297
  """
2298
  result = {}
2299
  api_version = \
2300
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2301
  result["OS_API_VERSION"] = "%d" % api_version
2302
  result["OS_NAME"] = inst_os.name
2303
  result["DEBUG_LEVEL"] = "%d" % debug
2304

    
2305
  # OS variants
2306
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2307
    variant = objects.OS.GetVariant(os_name)
2308
    if not variant:
2309
      variant = inst_os.supported_variants[0]
2310
  else:
2311
    variant = ""
2312
  result["OS_VARIANT"] = variant
2313

    
2314
  # OS params
2315
  for pname, pvalue in os_params.items():
2316
    result["OSP_%s" % pname.upper()] = pvalue
2317

    
2318
  return result
2319

    
2320

    
2321
def OSEnvironment(instance, inst_os, debug=0):
2322
  """Calculate the environment for an os script.
2323

2324
  @type instance: L{objects.Instance}
2325
  @param instance: target instance for the os script run
2326
  @type inst_os: L{objects.OS}
2327
  @param inst_os: operating system for which the environment is being built
2328
  @type debug: integer
2329
  @param debug: debug level (0 or 1, for OS Api 10)
2330
  @rtype: dict
2331
  @return: dict of environment variables
2332
  @raise errors.BlockDeviceError: if the block device
2333
      cannot be found
2334

2335
  """
2336
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2337

    
2338
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2339
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2340

    
2341
  result["HYPERVISOR"] = instance.hypervisor
2342
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2343
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2344
  result["INSTANCE_SECONDARY_NODES"] = \
2345
      ("%s" % " ".join(instance.secondary_nodes))
2346

    
2347
  # Disks
2348
  for idx, disk in enumerate(instance.disks):
2349
    real_disk = _OpenRealBD(disk)
2350
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2351
    result["DISK_%d_ACCESS" % idx] = disk.mode
2352
    if constants.HV_DISK_TYPE in instance.hvparams:
2353
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2354
        instance.hvparams[constants.HV_DISK_TYPE]
2355
    if disk.dev_type in constants.LDS_BLOCK:
2356
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2357
    elif disk.dev_type == constants.LD_FILE:
2358
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2359
        "file:%s" % disk.physical_id[0]
2360

    
2361
  # NICs
2362
  for idx, nic in enumerate(instance.nics):
2363
    result["NIC_%d_MAC" % idx] = nic.mac
2364
    if nic.ip:
2365
      result["NIC_%d_IP" % idx] = nic.ip
2366
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2367
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2368
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2369
    if nic.nicparams[constants.NIC_LINK]:
2370
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2371
    if constants.HV_NIC_TYPE in instance.hvparams:
2372
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2373
        instance.hvparams[constants.HV_NIC_TYPE]
2374

    
2375
  # HV/BE params
2376
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2377
    for key, value in source.items():
2378
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2379

    
2380
  return result
2381

    
2382

    
2383
def BlockdevGrow(disk, amount, dryrun):
2384
  """Grow a stack of block devices.
2385

2386
  This function is called recursively, with the childrens being the
2387
  first ones to resize.
2388

2389
  @type disk: L{objects.Disk}
2390
  @param disk: the disk to be grown
2391
  @type amount: integer
2392
  @param amount: the amount (in mebibytes) to grow with
2393
  @type dryrun: boolean
2394
  @param dryrun: whether to execute the operation in simulation mode
2395
      only, without actually increasing the size
2396
  @rtype: (status, result)
2397
  @return: a tuple with the status of the operation (True/False), and
2398
      the errors message if status is False
2399

2400
  """
2401
  r_dev = _RecursiveFindBD(disk)
2402
  if r_dev is None:
2403
    _Fail("Cannot find block device %s", disk)
2404

    
2405
  try:
2406
    r_dev.Grow(amount, dryrun)
2407
  except errors.BlockDeviceError, err:
2408
    _Fail("Failed to grow block device: %s", err, exc=True)
2409

    
2410

    
2411
def BlockdevSnapshot(disk):
2412
  """Create a snapshot copy of a block device.
2413

2414
  This function is called recursively, and the snapshot is actually created
2415
  just for the leaf lvm backend device.
2416

2417
  @type disk: L{objects.Disk}
2418
  @param disk: the disk to be snapshotted
2419
  @rtype: string
2420
  @return: snapshot disk ID as (vg, lv)
2421

2422
  """
2423
  if disk.dev_type == constants.LD_DRBD8:
2424
    if not disk.children:
2425
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2426
            disk.unique_id)
2427
    return BlockdevSnapshot(disk.children[0])
2428
  elif disk.dev_type == constants.LD_LV:
2429
    r_dev = _RecursiveFindBD(disk)
2430
    if r_dev is not None:
2431
      # FIXME: choose a saner value for the snapshot size
2432
      # let's stay on the safe side and ask for the full size, for now
2433
      return r_dev.Snapshot(disk.size)
2434
    else:
2435
      _Fail("Cannot find block device %s", disk)
2436
  else:
2437
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2438
          disk.unique_id, disk.dev_type)
2439

    
2440

    
2441
def FinalizeExport(instance, snap_disks):
2442
  """Write out the export configuration information.
2443

2444
  @type instance: L{objects.Instance}
2445
  @param instance: the instance which we export, used for
2446
      saving configuration
2447
  @type snap_disks: list of L{objects.Disk}
2448
  @param snap_disks: list of snapshot block devices, which
2449
      will be used to get the actual name of the dump file
2450

2451
  @rtype: None
2452

2453
  """
2454
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2455
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2456

    
2457
  config = objects.SerializableConfigParser()
2458

    
2459
  config.add_section(constants.INISECT_EXP)
2460
  config.set(constants.INISECT_EXP, "version", "0")
2461
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2462
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2463
  config.set(constants.INISECT_EXP, "os", instance.os)
2464
  config.set(constants.INISECT_EXP, "compression", "none")
2465

    
2466
  config.add_section(constants.INISECT_INS)
2467
  config.set(constants.INISECT_INS, "name", instance.name)
2468
  config.set(constants.INISECT_INS, "memory", "%d" %
2469
             instance.beparams[constants.BE_MEMORY])
2470
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2471
             instance.beparams[constants.BE_VCPUS])
2472
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2473
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2474
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2475

    
2476
  nic_total = 0
2477
  for nic_count, nic in enumerate(instance.nics):
2478
    nic_total += 1
2479
    config.set(constants.INISECT_INS, "nic%d_mac" %
2480
               nic_count, "%s" % nic.mac)
2481
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2482
    for param in constants.NICS_PARAMETER_TYPES:
2483
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2484
                 "%s" % nic.nicparams.get(param, None))
2485
  # TODO: redundant: on load can read nics until it doesn't exist
2486
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2487

    
2488
  disk_total = 0
2489
  for disk_count, disk in enumerate(snap_disks):
2490
    if disk:
2491
      disk_total += 1
2492
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2493
                 ("%s" % disk.iv_name))
2494
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2495
                 ("%s" % disk.physical_id[1]))
2496
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2497
                 ("%d" % disk.size))
2498

    
2499
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2500

    
2501
  # New-style hypervisor/backend parameters
2502

    
2503
  config.add_section(constants.INISECT_HYP)
2504
  for name, value in instance.hvparams.items():
2505
    if name not in constants.HVC_GLOBALS:
2506
      config.set(constants.INISECT_HYP, name, str(value))
2507

    
2508
  config.add_section(constants.INISECT_BEP)
2509
  for name, value in instance.beparams.items():
2510
    config.set(constants.INISECT_BEP, name, str(value))
2511

    
2512
  config.add_section(constants.INISECT_OSP)
2513
  for name, value in instance.osparams.items():
2514
    config.set(constants.INISECT_OSP, name, str(value))
2515

    
2516
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2517
                  data=config.Dumps())
2518
  shutil.rmtree(finaldestdir, ignore_errors=True)
2519
  shutil.move(destdir, finaldestdir)
2520

    
2521

    
2522
def ExportInfo(dest):
2523
  """Get export configuration information.
2524

2525
  @type dest: str
2526
  @param dest: directory containing the export
2527

2528
  @rtype: L{objects.SerializableConfigParser}
2529
  @return: a serializable config file containing the
2530
      export info
2531

2532
  """
2533
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2534

    
2535
  config = objects.SerializableConfigParser()
2536
  config.read(cff)
2537

    
2538
  if (not config.has_section(constants.INISECT_EXP) or
2539
      not config.has_section(constants.INISECT_INS)):
2540
    _Fail("Export info file doesn't have the required fields")
2541

    
2542
  return config.Dumps()
2543

    
2544

    
2545
def ListExports():
2546
  """Return a list of exports currently available on this machine.
2547

2548
  @rtype: list
2549
  @return: list of the exports
2550

2551
  """
2552
  if os.path.isdir(constants.EXPORT_DIR):
2553
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2554
  else:
2555
    _Fail("No exports directory")
2556

    
2557

    
2558
def RemoveExport(export):
2559
  """Remove an existing export from the node.
2560

2561
  @type export: str
2562
  @param export: the name of the export to remove
2563
  @rtype: None
2564

2565
  """
2566
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2567

    
2568
  try:
2569
    shutil.rmtree(target)
2570
  except EnvironmentError, err:
2571
    _Fail("Error while removing the export: %s", err, exc=True)
2572

    
2573

    
2574
def BlockdevRename(devlist):
2575
  """Rename a list of block devices.
2576

2577
  @type devlist: list of tuples
2578
  @param devlist: list of tuples of the form  (disk,
2579
      new_logical_id, new_physical_id); disk is an
2580
      L{objects.Disk} object describing the current disk,
2581
      and new logical_id/physical_id is the name we
2582
      rename it to
2583
  @rtype: boolean
2584
  @return: True if all renames succeeded, False otherwise
2585

2586
  """
2587
  msgs = []
2588
  result = True
2589
  for disk, unique_id in devlist:
2590
    dev = _RecursiveFindBD(disk)
2591
    if dev is None:
2592
      msgs.append("Can't find device %s in rename" % str(disk))
2593
      result = False
2594
      continue
2595
    try:
2596
      old_rpath = dev.dev_path
2597
      dev.Rename(unique_id)
2598
      new_rpath = dev.dev_path
2599
      if old_rpath != new_rpath:
2600
        DevCacheManager.RemoveCache(old_rpath)
2601
        # FIXME: we should add the new cache information here, like:
2602
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2603
        # but we don't have the owner here - maybe parse from existing
2604
        # cache? for now, we only lose lvm data when we rename, which
2605
        # is less critical than DRBD or MD
2606
    except errors.BlockDeviceError, err:
2607
      msgs.append("Can't rename device '%s' to '%s': %s" %
2608
                  (dev, unique_id, err))
2609
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2610
      result = False
2611
  if not result:
2612
    _Fail("; ".join(msgs))
2613

    
2614

    
2615
def _TransformFileStorageDir(fs_dir):
2616
  """Checks whether given file_storage_dir is valid.
2617

2618
  Checks wheter the given fs_dir is within the cluster-wide default
2619
  file_storage_dir or the shared_file_storage_dir, which are stored in
2620
  SimpleStore. Only paths under those directories are allowed.
2621

2622
  @type fs_dir: str
2623
  @param fs_dir: the path to check
2624

2625
  @return: the normalized path if valid, None otherwise
2626

2627
  """
2628
  if not constants.ENABLE_FILE_STORAGE:
2629
    _Fail("File storage disabled at configure time")
2630
  cfg = _GetConfig()
2631
  fs_dir = os.path.normpath(fs_dir)
2632
  base_fstore = cfg.GetFileStorageDir()
2633
  base_shared = cfg.GetSharedFileStorageDir()
2634
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2635
          utils.IsBelowDir(base_shared, fs_dir)):
2636
    _Fail("File storage directory '%s' is not under base file"
2637
          " storage directory '%s' or shared storage directory '%s'",
2638
          fs_dir, base_fstore, base_shared)
2639
  return fs_dir
2640

    
2641

    
2642
def CreateFileStorageDir(file_storage_dir):
2643
  """Create file storage directory.
2644

2645
  @type file_storage_dir: str
2646
  @param file_storage_dir: directory to create
2647

2648
  @rtype: tuple
2649
  @return: tuple with first element a boolean indicating wheter dir
2650
      creation was successful or not
2651

2652
  """
2653
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2654
  if os.path.exists(file_storage_dir):
2655
    if not os.path.isdir(file_storage_dir):
2656
      _Fail("Specified storage dir '%s' is not a directory",
2657
            file_storage_dir)
2658
  else:
2659
    try:
2660
      os.makedirs(file_storage_dir, 0750)
2661
    except OSError, err:
2662
      _Fail("Cannot create file storage directory '%s': %s",
2663
            file_storage_dir, err, exc=True)
2664

    
2665

    
2666
def RemoveFileStorageDir(file_storage_dir):
2667
  """Remove file storage directory.
2668

2669
  Remove it only if it's empty. If not log an error and return.
2670

2671
  @type file_storage_dir: str
2672
  @param file_storage_dir: the directory we should cleanup
2673
  @rtype: tuple (success,)
2674
  @return: tuple of one element, C{success}, denoting
2675
      whether the operation was successful
2676

2677
  """
2678
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2679
  if os.path.exists(file_storage_dir):
2680
    if not os.path.isdir(file_storage_dir):
2681
      _Fail("Specified Storage directory '%s' is not a directory",
2682
            file_storage_dir)
2683
    # deletes dir only if empty, otherwise we want to fail the rpc call
2684
    try:
2685
      os.rmdir(file_storage_dir)
2686
    except OSError, err:
2687
      _Fail("Cannot remove file storage directory '%s': %s",
2688
            file_storage_dir, err)
2689

    
2690

    
2691
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2692
  """Rename the file storage directory.
2693

2694
  @type old_file_storage_dir: str
2695
  @param old_file_storage_dir: the current path
2696
  @type new_file_storage_dir: str
2697
  @param new_file_storage_dir: the name we should rename to
2698
  @rtype: tuple (success,)
2699
  @return: tuple of one element, C{success}, denoting
2700
      whether the operation was successful
2701

2702
  """
2703
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2704
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2705
  if not os.path.exists(new_file_storage_dir):
2706
    if os.path.isdir(old_file_storage_dir):
2707
      try:
2708
        os.rename(old_file_storage_dir, new_file_storage_dir)
2709
      except OSError, err:
2710
        _Fail("Cannot rename '%s' to '%s': %s",
2711
              old_file_storage_dir, new_file_storage_dir, err)
2712
    else:
2713
      _Fail("Specified storage dir '%s' is not a directory",
2714
            old_file_storage_dir)
2715
  else:
2716
    if os.path.exists(old_file_storage_dir):
2717
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2718
            old_file_storage_dir, new_file_storage_dir)
2719

    
2720

    
2721
def _EnsureJobQueueFile(file_name):
2722
  """Checks whether the given filename is in the queue directory.
2723

2724
  @type file_name: str
2725
  @param file_name: the file name we should check
2726
  @rtype: None
2727
  @raises RPCFail: if the file is not valid
2728

2729
  """
2730
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2731
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2732

    
2733
  if not result:
2734
    _Fail("Passed job queue file '%s' does not belong to"
2735
          " the queue directory '%s'", file_name, queue_dir)
2736

    
2737

    
2738
def JobQueueUpdate(file_name, content):
2739
  """Updates a file in the queue directory.
2740

2741
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2742
  checking.
2743

2744
  @type file_name: str
2745
  @param file_name: the job file name
2746
  @type content: str
2747
  @param content: the new job contents
2748
  @rtype: boolean
2749
  @return: the success of the operation
2750

2751
  """
2752
  _EnsureJobQueueFile(file_name)
2753
  getents = runtime.GetEnts()
2754

    
2755
  # Write and replace the file atomically
2756
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2757
                  gid=getents.masterd_gid)
2758

    
2759

    
2760
def JobQueueRename(old, new):
2761
  """Renames a job queue file.
2762

2763
  This is just a wrapper over os.rename with proper checking.
2764

2765
  @type old: str
2766
  @param old: the old (actual) file name
2767
  @type new: str
2768
  @param new: the desired file name
2769
  @rtype: tuple
2770
  @return: the success of the operation and payload
2771

2772
  """
2773
  _EnsureJobQueueFile(old)
2774
  _EnsureJobQueueFile(new)
2775

    
2776
  getents = runtime.GetEnts()
2777

    
2778
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2779
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2780

    
2781

    
2782
def BlockdevClose(instance_name, disks):
2783
  """Closes the given block devices.
2784

2785
  This means they will be switched to secondary mode (in case of
2786
  DRBD).
2787

2788
  @param instance_name: if the argument is not empty, the symlinks
2789
      of this instance will be removed
2790
  @type disks: list of L{objects.Disk}
2791
  @param disks: the list of disks to be closed
2792
  @rtype: tuple (success, message)
2793
  @return: a tuple of success and message, where success
2794
      indicates the succes of the operation, and message
2795
      which will contain the error details in case we
2796
      failed
2797

2798
  """
2799
  bdevs = []
2800
  for cf in disks:
2801
    rd = _RecursiveFindBD(cf)
2802
    if rd is None:
2803
      _Fail("Can't find device %s", cf)
2804
    bdevs.append(rd)
2805

    
2806
  msg = []
2807
  for rd in bdevs:
2808
    try:
2809
      rd.Close()
2810
    except errors.BlockDeviceError, err:
2811
      msg.append(str(err))
2812
  if msg:
2813
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2814
  else:
2815
    if instance_name:
2816
      _RemoveBlockDevLinks(instance_name, disks)
2817

    
2818

    
2819
def ValidateHVParams(hvname, hvparams):
2820
  """Validates the given hypervisor parameters.
2821

2822
  @type hvname: string
2823
  @param hvname: the hypervisor name
2824
  @type hvparams: dict
2825
  @param hvparams: the hypervisor parameters to be validated
2826
  @rtype: None
2827

2828
  """
2829
  try:
2830
    hv_type = hypervisor.GetHypervisor(hvname)
2831
    hv_type.ValidateParameters(hvparams)
2832
  except errors.HypervisorError, err:
2833
    _Fail(str(err), log=False)
2834

    
2835

    
2836
def _CheckOSPList(os_obj, parameters):
2837
  """Check whether a list of parameters is supported by the OS.
2838

2839
  @type os_obj: L{objects.OS}
2840
  @param os_obj: OS object to check
2841
  @type parameters: list
2842
  @param parameters: the list of parameters to check
2843

2844
  """
2845
  supported = [v[0] for v in os_obj.supported_parameters]
2846
  delta = frozenset(parameters).difference(supported)
2847
  if delta:
2848
    _Fail("The following parameters are not supported"
2849
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2850

    
2851

    
2852
def ValidateOS(required, osname, checks, osparams):
2853
  """Validate the given OS' parameters.
2854

2855
  @type required: boolean
2856
  @param required: whether absence of the OS should translate into
2857
      failure or not
2858
  @type osname: string
2859
  @param osname: the OS to be validated
2860
  @type checks: list
2861
  @param checks: list of the checks to run (currently only 'parameters')
2862
  @type osparams: dict
2863
  @param osparams: dictionary with OS parameters
2864
  @rtype: boolean
2865
  @return: True if the validation passed, or False if the OS was not
2866
      found and L{required} was false
2867

2868
  """
2869
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2870
    _Fail("Unknown checks required for OS %s: %s", osname,
2871
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2872

    
2873
  name_only = objects.OS.GetName(osname)
2874
  status, tbv = _TryOSFromDisk(name_only, None)
2875

    
2876
  if not status:
2877
    if required:
2878
      _Fail(tbv)
2879
    else:
2880
      return False
2881

    
2882
  if max(tbv.api_versions) < constants.OS_API_V20:
2883
    return True
2884

    
2885
  if constants.OS_VALIDATE_PARAMETERS in checks:
2886
    _CheckOSPList(tbv, osparams.keys())
2887

    
2888
  validate_env = OSCoreEnv(osname, tbv, osparams)
2889
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2890
                        cwd=tbv.path, reset_env=True)
2891
  if result.failed:
2892
    logging.error("os validate command '%s' returned error: %s output: %s",
2893
                  result.cmd, result.fail_reason, result.output)
2894
    _Fail("OS validation script failed (%s), output: %s",
2895
          result.fail_reason, result.output, log=False)
2896

    
2897
  return True
2898

    
2899

    
2900
def DemoteFromMC():
2901
  """Demotes the current node from master candidate role.
2902

2903
  """
2904
  # try to ensure we're not the master by mistake
2905
  master, myself = ssconf.GetMasterAndMyself()
2906
  if master == myself:
2907
    _Fail("ssconf status shows I'm the master node, will not demote")
2908

    
2909
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2910
  if not result.failed:
2911
    _Fail("The master daemon is running, will not demote")
2912

    
2913
  try:
2914
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2915
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2916
  except EnvironmentError, err:
2917
    if err.errno != errno.ENOENT:
2918
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2919

    
2920
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2921

    
2922

    
2923
def _GetX509Filenames(cryptodir, name):
2924
  """Returns the full paths for the private key and certificate.
2925

2926
  """
2927
  return (utils.PathJoin(cryptodir, name),
2928
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2929
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2930

    
2931

    
2932
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2933
  """Creates a new X509 certificate for SSL/TLS.
2934

2935
  @type validity: int
2936
  @param validity: Validity in seconds
2937
  @rtype: tuple; (string, string)
2938
  @return: Certificate name and public part
2939

2940
  """
2941
  (key_pem, cert_pem) = \
2942
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2943
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2944

    
2945
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2946
                              prefix="x509-%s-" % utils.TimestampForFilename())
2947
  try:
2948
    name = os.path.basename(cert_dir)
2949
    assert len(name) > 5
2950

    
2951
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2952

    
2953
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2954
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2955

    
2956
    # Never return private key as it shouldn't leave the node
2957
    return (name, cert_pem)
2958
  except Exception:
2959
    shutil.rmtree(cert_dir, ignore_errors=True)
2960
    raise
2961

    
2962

    
2963
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2964
  """Removes a X509 certificate.
2965

2966
  @type name: string
2967
  @param name: Certificate name
2968

2969
  """
2970
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2971

    
2972
  utils.RemoveFile(key_file)
2973
  utils.RemoveFile(cert_file)
2974

    
2975
  try:
2976
    os.rmdir(cert_dir)
2977
  except EnvironmentError, err:
2978
    _Fail("Cannot remove certificate directory '%s': %s",
2979
          cert_dir, err)
2980

    
2981

    
2982
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2983
  """Returns the command for the requested input/output.
2984

2985
  @type instance: L{objects.Instance}
2986
  @param instance: The instance object
2987
  @param mode: Import/export mode
2988
  @param ieio: Input/output type
2989
  @param ieargs: Input/output arguments
2990

2991
  """
2992
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2993

    
2994
  env = None
2995
  prefix = None
2996
  suffix = None
2997
  exp_size = None
2998

    
2999
  if ieio == constants.IEIO_FILE:
3000
    (filename, ) = ieargs
3001

    
3002
    if not utils.IsNormAbsPath(filename):
3003
      _Fail("Path '%s' is not normalized or absolute", filename)
3004

    
3005
    real_filename = os.path.realpath(filename)
3006
    directory = os.path.dirname(real_filename)
3007

    
3008
    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
3009
      _Fail("File '%s' is not under exports directory '%s': %s",
3010
            filename, constants.EXPORT_DIR, real_filename)
3011

    
3012
    # Create directory
3013
    utils.Makedirs(directory, mode=0750)
3014

    
3015
    quoted_filename = utils.ShellQuote(filename)
3016

    
3017
    if mode == constants.IEM_IMPORT:
3018
      suffix = "> %s" % quoted_filename
3019
    elif mode == constants.IEM_EXPORT:
3020
      suffix = "< %s" % quoted_filename
3021

    
3022
      # Retrieve file size
3023
      try:
3024
        st = os.stat(filename)
3025
      except EnvironmentError, err:
3026
        logging.error("Can't stat(2) %s: %s", filename, err)
3027
      else:
3028
        exp_size = utils.BytesToMebibyte(st.st_size)
3029

    
3030
  elif ieio == constants.IEIO_RAW_DISK:
3031
    (disk, ) = ieargs
3032

    
3033
    real_disk = _OpenRealBD(disk)
3034

    
3035
    if mode == constants.IEM_IMPORT:
3036
      # we set here a smaller block size as, due to transport buffering, more
3037
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3038
      # is not already there or we pass a wrong path; we use notrunc to no
3039
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3040
      # much memory; this means that at best, we flush every 64k, which will
3041
      # not be very fast
3042
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3043
                                    " bs=%s oflag=dsync"),
3044
                                    real_disk.dev_path,
3045
                                    str(64 * 1024))
3046

    
3047
    elif mode == constants.IEM_EXPORT:
3048
      # the block size on the read dd is 1MiB to match our units
3049
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3050
                                   real_disk.dev_path,
3051
                                   str(1024 * 1024), # 1 MB
3052
                                   str(disk.size))
3053
      exp_size = disk.size
3054

    
3055
  elif ieio == constants.IEIO_SCRIPT:
3056
    (disk, disk_index, ) = ieargs
3057

    
3058
    assert isinstance(disk_index, (int, long))
3059

    
3060
    real_disk = _OpenRealBD(disk)
3061

    
3062
    inst_os = OSFromDisk(instance.os)
3063
    env = OSEnvironment(instance, inst_os)
3064

    
3065
    if mode == constants.IEM_IMPORT:
3066
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3067
      env["IMPORT_INDEX"] = str(disk_index)
3068
      script = inst_os.import_script
3069

    
3070
    elif mode == constants.IEM_EXPORT:
3071
      env["EXPORT_DEVICE"] = real_disk.dev_path
3072
      env["EXPORT_INDEX"] = str(disk_index)
3073
      script = inst_os.export_script
3074

    
3075
    # TODO: Pass special environment only to script
3076
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3077

    
3078
    if mode == constants.IEM_IMPORT:
3079
      suffix = "| %s" % script_cmd
3080

    
3081
    elif mode == constants.IEM_EXPORT:
3082
      prefix = "%s |" % script_cmd
3083

    
3084
    # Let script predict size
3085
    exp_size = constants.IE_CUSTOM_SIZE
3086

    
3087
  else:
3088
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3089

    
3090
  return (env, prefix, suffix, exp_size)
3091

    
3092

    
3093
def _CreateImportExportStatusDir(prefix):
3094
  """Creates status directory for import/export.
3095

3096
  """
3097
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3098
                          prefix=("%s-%s-" %
3099
                                  (prefix, utils.TimestampForFilename())))
3100

    
3101

    
3102
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3103
                            ieio, ieioargs):
3104
  """Starts an import or export daemon.
3105

3106
  @param mode: Import/output mode
3107
  @type opts: L{objects.ImportExportOptions}
3108
  @param opts: Daemon options
3109
  @type host: string
3110
  @param host: Remote host for export (None for import)
3111
  @type port: int
3112
  @param port: Remote port for export (None for import)
3113
  @type instance: L{objects.Instance}
3114
  @param instance: Instance object
3115
  @type component: string
3116
  @param component: which part of the instance is transferred now,
3117
      e.g. 'disk/0'
3118
  @param ieio: Input/output type
3119
  @param ieioargs: Input/output arguments
3120

3121
  """
3122
  if mode == constants.IEM_IMPORT:
3123
    prefix = "import"
3124

    
3125
    if not (host is None and port is None):
3126
      _Fail("Can not specify host or port on import")
3127

    
3128
  elif mode == constants.IEM_EXPORT:
3129
    prefix = "export"
3130

    
3131
    if host is None or port is None:
3132
      _Fail("Host and port must be specified for an export")
3133

    
3134
  else:
3135
    _Fail("Invalid mode %r", mode)
3136

    
3137
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3138
    _Fail("Cluster certificate can only be used for both key and CA")
3139

    
3140
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3141
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3142

    
3143
  if opts.key_name is None:
3144
    # Use server.pem
3145
    key_path = constants.NODED_CERT_FILE
3146
    cert_path = constants.NODED_CERT_FILE
3147
    assert opts.ca_pem is None
3148
  else:
3149
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3150
                                                 opts.key_name)
3151
    assert opts.ca_pem is not None
3152

    
3153
  for i in [key_path, cert_path]:
3154
    if not os.path.exists(i):
3155
      _Fail("File '%s' does not exist" % i)
3156

    
3157
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3158
  try:
3159
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3160
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3161
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3162

    
3163
    if opts.ca_pem is None:
3164
      # Use server.pem
3165
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
3166
    else:
3167
      ca = opts.ca_pem
3168

    
3169
    # Write CA file
3170
    utils.WriteFile(ca_file, data=ca, mode=0400)
3171

    
3172
    cmd = [
3173
      constants.IMPORT_EXPORT_DAEMON,
3174
      status_file, mode,
3175
      "--key=%s" % key_path,
3176
      "--cert=%s" % cert_path,
3177
      "--ca=%s" % ca_file,
3178
      ]
3179

    
3180
    if host:
3181
      cmd.append("--host=%s" % host)
3182

    
3183
    if port:
3184
      cmd.append("--port=%s" % port)
3185

    
3186
    if opts.ipv6:
3187
      cmd.append("--ipv6")
3188
    else:
3189
      cmd.append("--ipv4")
3190

    
3191
    if opts.compress:
3192
      cmd.append("--compress=%s" % opts.compress)
3193

    
3194
    if opts.magic:
3195
      cmd.append("--magic=%s" % opts.magic)
3196

    
3197
    if exp_size is not None:
3198
      cmd.append("--expected-size=%s" % exp_size)
3199

    
3200
    if cmd_prefix:
3201
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3202

    
3203
    if cmd_suffix:
3204
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3205

    
3206
    if mode == constants.IEM_EXPORT:
3207
      # Retry connection a few times when connecting to remote peer
3208
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3209
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3210
    elif opts.connect_timeout is not None:
3211
      assert mode == constants.IEM_IMPORT
3212
      # Overall timeout for establishing connection while listening
3213
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3214

    
3215
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3216

    
3217
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3218
    # support for receiving a file descriptor for output
3219
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3220
                      output=logfile)
3221

    
3222
    # The import/export name is simply the status directory name
3223
    return os.path.basename(status_dir)
3224

    
3225
  except Exception:
3226
    shutil.rmtree(status_dir, ignore_errors=True)
3227
    raise
3228

    
3229

    
3230
def GetImportExportStatus(names):
3231
  """Returns import/export daemon status.
3232

3233
  @type names: sequence
3234
  @param names: List of names
3235
  @rtype: List of dicts
3236
  @return: Returns a list of the state of each named import/export or None if a
3237
           status couldn't be read
3238

3239
  """
3240
  result = []
3241

    
3242
  for name in names:
3243
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3244
                                 _IES_STATUS_FILE)
3245

    
3246
    try:
3247
      data = utils.ReadFile(status_file)
3248
    except EnvironmentError, err:
3249
      if err.errno != errno.ENOENT:
3250
        raise
3251
      data = None
3252

    
3253
    if not data:
3254
      result.append(None)
3255
      continue
3256

    
3257
    result.append(serializer.LoadJson(data))
3258

    
3259
  return result
3260

    
3261

    
3262
def AbortImportExport(name):
3263
  """Sends SIGTERM to a running import/export daemon.
3264

3265
  """
3266
  logging.info("Abort import/export %s", name)
3267

    
3268
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3269
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3270

    
3271
  if pid:
3272
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3273
                 name, pid)
3274
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3275

    
3276

    
3277
def CleanupImportExport(name):
3278
  """Cleanup after an import or export.
3279

3280
  If the import/export daemon is still running it's killed. Afterwards the
3281
  whole status directory is removed.
3282

3283
  """
3284
  logging.info("Finalizing import/export %s", name)
3285

    
3286
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3287

    
3288
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3289

    
3290
  if pid:
3291
    logging.info("Import/export %s is still running with PID %s",
3292
                 name, pid)
3293
    utils.KillProcess(pid, waitpid=False)
3294

    
3295
  shutil.rmtree(status_dir, ignore_errors=True)
3296

    
3297

    
3298
def _FindDisks(nodes_ip, disks):
3299
  """Sets the physical ID on disks and returns the block devices.
3300

3301
  """
3302
  # set the correct physical ID
3303
  my_name = netutils.Hostname.GetSysName()
3304
  for cf in disks:
3305
    cf.SetPhysicalID(my_name, nodes_ip)
3306

    
3307
  bdevs = []
3308

    
3309
  for cf in disks:
3310
    rd = _RecursiveFindBD(cf)
3311
    if rd is None:
3312
      _Fail("Can't find device %s", cf)
3313
    bdevs.append(rd)
3314
  return bdevs
3315

    
3316

    
3317
def DrbdDisconnectNet(nodes_ip, disks):
3318
  """Disconnects the network on a list of drbd devices.
3319

3320
  """
3321
  bdevs = _FindDisks(nodes_ip, disks)
3322

    
3323
  # disconnect disks
3324
  for rd in bdevs:
3325
    try:
3326
      rd.DisconnectNet()
3327
    except errors.BlockDeviceError, err:
3328
      _Fail("Can't change network configuration to standalone mode: %s",
3329
            err, exc=True)
3330

    
3331

    
3332
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3333
  """Attaches the network on a list of drbd devices.
3334

3335
  """
3336
  bdevs = _FindDisks(nodes_ip, disks)
3337

    
3338
  if multimaster:
3339
    for idx, rd in enumerate(bdevs):
3340
      try:
3341
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3342
      except EnvironmentError, err:
3343
        _Fail("Can't create symlink: %s", err)
3344
  # reconnect disks, switch to new master configuration and if
3345
  # needed primary mode
3346
  for rd in bdevs:
3347
    try:
3348
      rd.AttachNet(multimaster)
3349
    except errors.BlockDeviceError, err:
3350
      _Fail("Can't change network configuration: %s", err)
3351

    
3352
  # wait until the disks are connected; we need to retry the re-attach
3353
  # if the device becomes standalone, as this might happen if the one
3354
  # node disconnects and reconnects in a different mode before the
3355
  # other node reconnects; in this case, one or both of the nodes will
3356
  # decide it has wrong configuration and switch to standalone
3357

    
3358
  def _Attach():
3359
    all_connected = True
3360

    
3361
    for rd in bdevs:
3362
      stats = rd.GetProcStatus()
3363

    
3364
      all_connected = (all_connected and
3365
                       (stats.is_connected or stats.is_in_resync))
3366

    
3367
      if stats.is_standalone:
3368
        # peer had different config info and this node became
3369
        # standalone, even though this should not happen with the
3370
        # new staged way of changing disk configs
3371
        try:
3372
          rd.AttachNet(multimaster)
3373
        except errors.BlockDeviceError, err:
3374
          _Fail("Can't change network configuration: %s", err)
3375

    
3376
    if not all_connected:
3377
      raise utils.RetryAgain()
3378

    
3379
  try:
3380
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3381
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3382
  except utils.RetryTimeout:
3383
    _Fail("Timeout in disk reconnecting")
3384

    
3385
  if multimaster:
3386
    # change to primary mode
3387
    for rd in bdevs:
3388
      try:
3389
        rd.Open()
3390
      except errors.BlockDeviceError, err:
3391
        _Fail("Can't change to primary mode: %s", err)
3392

    
3393

    
3394
def DrbdWaitSync(nodes_ip, disks):
3395
  """Wait until DRBDs have synchronized.
3396

3397
  """
3398
  def _helper(rd):
3399
    stats = rd.GetProcStatus()
3400
    if not (stats.is_connected or stats.is_in_resync):
3401
      raise utils.RetryAgain()
3402
    return stats
3403

    
3404
  bdevs = _FindDisks(nodes_ip, disks)
3405

    
3406
  min_resync = 100
3407
  alldone = True
3408
  for rd in bdevs:
3409
    try:
3410
      # poll each second for 15 seconds
3411
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3412
    except utils.RetryTimeout:
3413
      stats = rd.GetProcStatus()
3414
      # last check
3415
      if not (stats.is_connected or stats.is_in_resync):
3416
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3417
    alldone = alldone and (not stats.is_in_resync)
3418
    if stats.sync_percent is not None:
3419
      min_resync = min(min_resync, stats.sync_percent)
3420

    
3421
  return (alldone, min_resync)
3422

    
3423

    
3424
def GetDrbdUsermodeHelper():
3425
  """Returns DRBD usermode helper currently configured.
3426

3427
  """
3428
  try:
3429
    return bdev.BaseDRBD.GetUsermodeHelper()
3430
  except errors.BlockDeviceError, err:
3431
    _Fail(str(err))
3432

    
3433

    
3434
def PowercycleNode(hypervisor_type):
3435
  """Hard-powercycle the node.
3436

3437
  Because we need to return first, and schedule the powercycle in the
3438
  background, we won't be able to report failures nicely.
3439

3440
  """
3441
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3442
  try:
3443
    pid = os.fork()
3444
  except OSError:
3445
    # if we can't fork, we'll pretend that we're in the child process
3446
    pid = 0
3447
  if pid > 0:
3448
    return "Reboot scheduled in 5 seconds"
3449
  # ensure the child is running on ram
3450
  try:
3451
    utils.Mlockall()
3452
  except Exception: # pylint: disable=W0703
3453
    pass
3454
  time.sleep(5)
3455
  hyper.PowercycleNode()
3456

    
3457

    
3458
class HooksRunner(object):
3459
  """Hook runner.
3460

3461
  This class is instantiated on the node side (ganeti-noded) and not
3462
  on the master side.
3463

3464
  """
3465
  def __init__(self, hooks_base_dir=None):
3466
    """Constructor for hooks runner.
3467

3468
    @type hooks_base_dir: str or None
3469
    @param hooks_base_dir: if not None, this overrides the
3470
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3471

3472
    """
3473
    if hooks_base_dir is None:
3474
      hooks_base_dir = constants.HOOKS_BASE_DIR
3475
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3476
    # constant
3477
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3478

    
3479
  def RunLocalHooks(self, node_list, hpath, phase, env):
3480
    """Check that the hooks will be run only locally and then run them.
3481

3482
    """
3483
    assert len(node_list) == 1
3484
    node = node_list[0]
3485
    _, myself = ssconf.GetMasterAndMyself()
3486
    assert node == myself
3487

    
3488
    results = self.RunHooks(hpath, phase, env)
3489

    
3490
    # Return values in the form expected by HooksMaster
3491
    return {node: (None, False, results)}
3492

    
3493
  def RunHooks(self, hpath, phase, env):
3494
    """Run the scripts in the hooks directory.
3495

3496
    @type hpath: str
3497
    @param hpath: the path to the hooks directory which
3498
        holds the scripts
3499
    @type phase: str
3500
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3501
        L{constants.HOOKS_PHASE_POST}
3502
    @type env: dict
3503
    @param env: dictionary with the environment for the hook
3504
    @rtype: list
3505
    @return: list of 3-element tuples:
3506
      - script path
3507
      - script result, either L{constants.HKR_SUCCESS} or
3508
        L{constants.HKR_FAIL}
3509
      - output of the script
3510

3511
    @raise errors.ProgrammerError: for invalid input
3512
        parameters
3513

3514
    """
3515
    if phase == constants.HOOKS_PHASE_PRE:
3516
      suffix = "pre"
3517
    elif phase == constants.HOOKS_PHASE_POST:
3518
      suffix = "post"
3519
    else:
3520
      _Fail("Unknown hooks phase '%s'", phase)
3521

    
3522
    subdir = "%s-%s.d" % (hpath, suffix)
3523
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3524

    
3525
    results = []
3526

    
3527
    if not os.path.isdir(dir_name):
3528
      # for non-existing/non-dirs, we simply exit instead of logging a
3529
      # warning at every operation
3530
      return results
3531

    
3532
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3533

    
3534
    for (relname, relstatus, runresult)  in runparts_results:
3535
      if relstatus == constants.RUNPARTS_SKIP:
3536
        rrval = constants.HKR_SKIP
3537
        output = ""
3538
      elif relstatus == constants.RUNPARTS_ERR:
3539
        rrval = constants.HKR_FAIL
3540
        output = "Hook script execution error: %s" % runresult
3541
      elif relstatus == constants.RUNPARTS_RUN:
3542
        if runresult.failed:
3543
          rrval = constants.HKR_FAIL
3544
        else:
3545
          rrval = constants.HKR_SUCCESS
3546
        output = utils.SafeEncode(runresult.output.strip())
3547
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3548

    
3549
    return results
3550

    
3551

    
3552
class IAllocatorRunner(object):
3553
  """IAllocator runner.
3554

3555
  This class is instantiated on the node side (ganeti-noded) and not on
3556
  the master side.
3557

3558
  """
3559
  @staticmethod
3560
  def Run(name, idata):
3561
    """Run an iallocator script.
3562

3563
    @type name: str
3564
    @param name: the iallocator script name
3565
    @type idata: str
3566
    @param idata: the allocator input data
3567

3568
    @rtype: tuple
3569
    @return: two element tuple of:
3570
       - status
3571
       - either error message or stdout of allocator (for success)
3572

3573
    """
3574
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3575
                                  os.path.isfile)
3576
    if alloc_script is None:
3577
      _Fail("iallocator module '%s' not found in the search path", name)
3578

    
3579
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3580
    try:
3581
      os.write(fd, idata)
3582
      os.close(fd)
3583
      result = utils.RunCmd([alloc_script, fin_name])
3584
      if result.failed:
3585
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3586
              name, result.fail_reason, result.output)
3587
    finally:
3588
      os.unlink(fin_name)
3589

    
3590
    return result.stdout
3591

    
3592

    
3593
class DevCacheManager(object):
3594
  """Simple class for managing a cache of block device information.
3595

3596
  """
3597
  _DEV_PREFIX = "/dev/"
3598
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3599

    
3600
  @classmethod
3601
  def _ConvertPath(cls, dev_path):
3602
    """Converts a /dev/name path to the cache file name.
3603

3604
    This replaces slashes with underscores and strips the /dev
3605
    prefix. It then returns the full path to the cache file.
3606

3607
    @type dev_path: str
3608
    @param dev_path: the C{/dev/} path name
3609
    @rtype: str
3610
    @return: the converted path name
3611

3612
    """
3613
    if dev_path.startswith(cls._DEV_PREFIX):
3614
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3615
    dev_path = dev_path.replace("/", "_")
3616
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3617
    return fpath
3618

    
3619
  @classmethod
3620
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3621
    """Updates the cache information for a given device.
3622

3623
    @type dev_path: str
3624
    @param dev_path: the pathname of the device
3625
    @type owner: str
3626
    @param owner: the owner (instance name) of the device
3627
    @type on_primary: bool
3628
    @param on_primary: whether this is the primary
3629
        node nor not
3630
    @type iv_name: str
3631
    @param iv_name: the instance-visible name of the
3632
        device, as in objects.Disk.iv_name
3633

3634
    @rtype: None
3635

3636
    """
3637
    if dev_path is None:
3638
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3639
      return
3640
    fpath = cls._ConvertPath(dev_path)
3641
    if on_primary:
3642
      state = "primary"
3643
    else:
3644
      state = "secondary"
3645
    if iv_name is None:
3646
      iv_name = "not_visible"
3647
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3648
    try:
3649
      utils.WriteFile(fpath, data=fdata)
3650
    except EnvironmentError, err:
3651
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3652

    
3653
  @classmethod
3654
  def RemoveCache(cls, dev_path):
3655
    """Remove data for a dev_path.
3656

3657
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3658
    path name and logging.
3659

3660
    @type dev_path: str
3661
    @param dev_path: the pathname of the device
3662

3663
    @rtype: None
3664

3665
    """
3666
    if dev_path is None:
3667
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3668
      return
3669
    fpath = cls._ConvertPath(dev_path)
3670
    try:
3671
      utils.RemoveFile(fpath)
3672
    except EnvironmentError, err:
3673
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)