Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 28aedbc1

History | View | Annotate | Download (108.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64

    
65

    
66
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
67
_ALLOWED_CLEAN_DIRS = frozenset([
68
  constants.DATA_DIR,
69
  constants.JOB_QUEUE_ARCHIVE_DIR,
70
  constants.QUEUE_DIR,
71
  constants.CRYPTO_KEYS_DIR,
72
  ])
73
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
74
_X509_KEY_FILE = "key"
75
_X509_CERT_FILE = "cert"
76
_IES_STATUS_FILE = "status"
77
_IES_PID_FILE = "pid"
78
_IES_CA_FILE = "ca"
79

    
80
#: Valid LVS output line regex
81
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
82

    
83

    
84
class RPCFail(Exception):
85
  """Class denoting RPC failure.
86

87
  Its argument is the error message.
88

89
  """
90

    
91

    
92
def _Fail(msg, *args, **kwargs):
93
  """Log an error and the raise an RPCFail exception.
94

95
  This exception is then handled specially in the ganeti daemon and
96
  turned into a 'failed' return type. As such, this function is a
97
  useful shortcut for logging the error and returning it to the master
98
  daemon.
99

100
  @type msg: string
101
  @param msg: the text of the exception
102
  @raise RPCFail
103

104
  """
105
  if args:
106
    msg = msg % args
107
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
108
    if "exc" in kwargs and kwargs["exc"]:
109
      logging.exception(msg)
110
    else:
111
      logging.error(msg)
112
  raise RPCFail(msg)
113

    
114

    
115
def _GetConfig():
116
  """Simple wrapper to return a SimpleStore.
117

118
  @rtype: L{ssconf.SimpleStore}
119
  @return: a SimpleStore instance
120

121
  """
122
  return ssconf.SimpleStore()
123

    
124

    
125
def _GetSshRunner(cluster_name):
126
  """Simple wrapper to return an SshRunner.
127

128
  @type cluster_name: str
129
  @param cluster_name: the cluster name, which is needed
130
      by the SshRunner constructor
131
  @rtype: L{ssh.SshRunner}
132
  @return: an SshRunner instance
133

134
  """
135
  return ssh.SshRunner(cluster_name)
136

    
137

    
138
def _Decompress(data):
139
  """Unpacks data compressed by the RPC client.
140

141
  @type data: list or tuple
142
  @param data: Data sent by RPC client
143
  @rtype: str
144
  @return: Decompressed data
145

146
  """
147
  assert isinstance(data, (list, tuple))
148
  assert len(data) == 2
149
  (encoding, content) = data
150
  if encoding == constants.RPC_ENCODING_NONE:
151
    return content
152
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
153
    return zlib.decompress(base64.b64decode(content))
154
  else:
155
    raise AssertionError("Unknown data encoding")
156

    
157

    
158
def _CleanDirectory(path, exclude=None):
159
  """Removes all regular files in a directory.
160

161
  @type path: str
162
  @param path: the directory to clean
163
  @type exclude: list
164
  @param exclude: list of files to be excluded, defaults
165
      to the empty list
166

167
  """
168
  if path not in _ALLOWED_CLEAN_DIRS:
169
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
170
          path)
171

    
172
  if not os.path.isdir(path):
173
    return
174
  if exclude is None:
175
    exclude = []
176
  else:
177
    # Normalize excluded paths
178
    exclude = [os.path.normpath(i) for i in exclude]
179

    
180
  for rel_name in utils.ListVisibleFiles(path):
181
    full_name = utils.PathJoin(path, rel_name)
182
    if full_name in exclude:
183
      continue
184
    if os.path.isfile(full_name) and not os.path.islink(full_name):
185
      utils.RemoveFile(full_name)
186

    
187

    
188
def _BuildUploadFileList():
189
  """Build the list of allowed upload files.
190

191
  This is abstracted so that it's built only once at module import time.
192

193
  """
194
  allowed_files = set([
195
    constants.CLUSTER_CONF_FILE,
196
    constants.ETC_HOSTS,
197
    constants.SSH_KNOWN_HOSTS_FILE,
198
    constants.VNC_PASSWORD_FILE,
199
    constants.RAPI_CERT_FILE,
200
    constants.RAPI_USERS_FILE,
201
    constants.CONFD_HMAC_KEY,
202
    constants.CLUSTER_DOMAIN_SECRET_FILE,
203
    ])
204

    
205
  for hv_name in constants.HYPER_TYPES:
206
    hv_class = hypervisor.GetHypervisorClass(hv_name)
207
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
208

    
209
  return frozenset(allowed_files)
210

    
211

    
212
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
213

    
214

    
215
def JobQueuePurge():
216
  """Removes job queue files and archived jobs.
217

218
  @rtype: tuple
219
  @return: True, None
220

221
  """
222
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
223
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
224

    
225

    
226
def GetMasterInfo():
227
  """Returns master information.
228

229
  This is an utility function to compute master information, either
230
  for consumption here or from the node daemon.
231

232
  @rtype: tuple
233
  @return: master_netdev, master_ip, master_name, primary_ip_family
234
  @raise RPCFail: in case of errors
235

236
  """
237
  try:
238
    cfg = _GetConfig()
239
    master_netdev = cfg.GetMasterNetdev()
240
    master_ip = cfg.GetMasterIP()
241
    master_node = cfg.GetMasterNode()
242
    primary_ip_family = cfg.GetPrimaryIPFamily()
243
  except errors.ConfigurationError, err:
244
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
245
  return (master_netdev, master_ip, master_node, primary_ip_family)
246

    
247

    
248
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
249
  """Decorator that runs hooks before and after the decorated function.
250

251
  @type hook_opcode: string
252
  @param hook_opcode: opcode of the hook
253
  @type hooks_path: string
254
  @param hooks_path: path of the hooks
255
  @type env_builder_fn: function
256
  @param env_builder_fn: function that returns a dictionary containing the
257
    environment variables for the hooks.
258
  @raise RPCFail: in case of pre-hook failure
259

260
  """
261
  def decorator(fn):
262
    def wrapper(*args, **kwargs):
263
      _, myself = ssconf.GetMasterAndMyself()
264
      nodes = ([myself], [myself])  # these hooks run locally
265

    
266
      cfg = _GetConfig()
267
      hr = HooksRunner()
268
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
269
                            None, env_builder_fn, logging.warning,
270
                            cfg.GetClusterName(), cfg.GetMasterNode())
271

    
272
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
273
      result = fn(*args, **kwargs)
274
      hm.RunPhase(constants.HOOKS_PHASE_POST)
275

    
276
      return result
277
    return wrapper
278
  return decorator
279

    
280

    
281
def _BuildMasterIpHookEnv():
282
  """Builds environment variables for master IP hooks.
283

284
  """
285
  cfg = _GetConfig()
286
  env = {
287
    "MASTER_NETDEV": cfg.GetMasterNetdev(),
288
    "MASTER_IP": cfg.GetMasterIP(),
289
  }
290

    
291
  return env
292

    
293

    
294
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
295
               _BuildMasterIpHookEnv)
296
def ActivateMasterIp():
297
  """Activate the IP address of the master daemon.
298

299
  """
300
  # GetMasterInfo will raise an exception if not able to return data
301
  master_netdev, master_ip, _, family = GetMasterInfo()
302

    
303
  err_msg = None
304
  if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
305
    if netutils.IPAddress.Own(master_ip):
306
      # we already have the ip:
307
      logging.debug("Master IP already configured, doing nothing")
308
    else:
309
      err_msg = "Someone else has the master ip, not activating"
310
      logging.error(err_msg)
311
  else:
312
    ipcls = netutils.IP4Address
313
    if family == netutils.IP6Address.family:
314
      ipcls = netutils.IP6Address
315

    
316
    result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
317
                           "%s/%d" % (master_ip, ipcls.iplen),
318
                           "dev", master_netdev, "label",
319
                           "%s:0" % master_netdev])
320
    if result.failed:
321
      err_msg = "Can't activate master IP: %s" % result.output
322
      logging.error(err_msg)
323

    
324
    # we ignore the exit code of the following cmds
325
    if ipcls == netutils.IP4Address:
326
      utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
327
                    master_ip, master_ip])
328
    elif ipcls == netutils.IP6Address:
329
      try:
330
        utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
331
      except errors.OpExecError:
332
        # TODO: Better error reporting
333
        logging.warning("Can't execute ndisc6, please install if missing")
334

    
335
  if err_msg:
336
    _Fail(err_msg)
337

    
338

    
339
def StartMasterDaemons(no_voting):
340
  """Activate local node as master node.
341

342
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
343

344
  @type no_voting: boolean
345
  @param no_voting: whether to start ganeti-masterd without a node vote
346
      but still non-interactively
347
  @rtype: None
348

349
  """
350

    
351
  if no_voting:
352
    masterd_args = "--no-voting --yes-do-it"
353
  else:
354
    masterd_args = ""
355

    
356
  env = {
357
    "EXTRA_MASTERD_ARGS": masterd_args,
358
    }
359

    
360
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
361
  if result.failed:
362
    msg = "Can't start Ganeti master: %s" % result.output
363
    logging.error(msg)
364
    _Fail(msg)
365

    
366

    
367
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
368
               _BuildMasterIpHookEnv)
369
def DeactivateMasterIp():
370
  """Deactivate the master IP on this node.
371

372
  """
373
  # TODO: log and report back to the caller the error failures; we
374
  # need to decide in which case we fail the RPC for this
375

    
376
  # GetMasterInfo will raise an exception if not able to return data
377
  master_netdev, master_ip, _, family = GetMasterInfo()
378

    
379
  ipcls = netutils.IP4Address
380
  if family == netutils.IP6Address.family:
381
    ipcls = netutils.IP6Address
382

    
383
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
384
                         "%s/%d" % (master_ip, ipcls.iplen),
385
                         "dev", master_netdev])
386
  if result.failed:
387
    logging.error("Can't remove the master IP, error: %s", result.output)
388
    # but otherwise ignore the failure
389

    
390

    
391
def StopMasterDaemons():
392
  """Stop the master daemons on this node.
393

394
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
395

396
  @rtype: None
397

398
  """
399
  # TODO: log and report back to the caller the error failures; we
400
  # need to decide in which case we fail the RPC for this
401

    
402
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
403
  if result.failed:
404
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
405
                  " and error %s",
406
                  result.cmd, result.exit_code, result.output)
407

    
408

    
409
def EtcHostsModify(mode, host, ip):
410
  """Modify a host entry in /etc/hosts.
411

412
  @param mode: The mode to operate. Either add or remove entry
413
  @param host: The host to operate on
414
  @param ip: The ip associated with the entry
415

416
  """
417
  if mode == constants.ETC_HOSTS_ADD:
418
    if not ip:
419
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
420
              " present")
421
    utils.AddHostToEtcHosts(host, ip)
422
  elif mode == constants.ETC_HOSTS_REMOVE:
423
    if ip:
424
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
425
              " parameter is present")
426
    utils.RemoveHostFromEtcHosts(host)
427
  else:
428
    RPCFail("Mode not supported")
429

    
430

    
431
def LeaveCluster(modify_ssh_setup):
432
  """Cleans up and remove the current node.
433

434
  This function cleans up and prepares the current node to be removed
435
  from the cluster.
436

437
  If processing is successful, then it raises an
438
  L{errors.QuitGanetiException} which is used as a special case to
439
  shutdown the node daemon.
440

441
  @param modify_ssh_setup: boolean
442

443
  """
444
  _CleanDirectory(constants.DATA_DIR)
445
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
446
  JobQueuePurge()
447

    
448
  if modify_ssh_setup:
449
    try:
450
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
451

    
452
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
453

    
454
      utils.RemoveFile(priv_key)
455
      utils.RemoveFile(pub_key)
456
    except errors.OpExecError:
457
      logging.exception("Error while processing ssh files")
458

    
459
  try:
460
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
461
    utils.RemoveFile(constants.RAPI_CERT_FILE)
462
    utils.RemoveFile(constants.NODED_CERT_FILE)
463
  except: # pylint: disable=W0702
464
    logging.exception("Error while removing cluster secrets")
465

    
466
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
467
  if result.failed:
468
    logging.error("Command %s failed with exitcode %s and error %s",
469
                  result.cmd, result.exit_code, result.output)
470

    
471
  # Raise a custom exception (handled in ganeti-noded)
472
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
473

    
474

    
475
def GetNodeInfo(vgname, hypervisor_type):
476
  """Gives back a hash with different information about the node.
477

478
  @type vgname: C{string}
479
  @param vgname: the name of the volume group to ask for disk space information
480
  @type hypervisor_type: C{str}
481
  @param hypervisor_type: the name of the hypervisor to ask for
482
      memory information
483
  @rtype: C{dict}
484
  @return: dictionary with the following keys:
485
      - vg_size is the size of the configured volume group in MiB
486
      - vg_free is the free size of the volume group in MiB
487
      - memory_dom0 is the memory allocated for domain0 in MiB
488
      - memory_free is the currently available (free) ram in MiB
489
      - memory_total is the total number of ram in MiB
490
      - hv_version: the hypervisor version, if available
491

492
  """
493
  outputarray = {}
494

    
495
  if vgname is not None:
496
    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
497
    vg_free = vg_size = None
498
    if vginfo:
499
      vg_free = int(round(vginfo[0][0], 0))
500
      vg_size = int(round(vginfo[0][1], 0))
501
    outputarray["vg_size"] = vg_size
502
    outputarray["vg_free"] = vg_free
503

    
504
  if hypervisor_type is not None:
505
    hyper = hypervisor.GetHypervisor(hypervisor_type)
506
    hyp_info = hyper.GetNodeInfo()
507
    if hyp_info is not None:
508
      outputarray.update(hyp_info)
509

    
510
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
511

    
512
  return outputarray
513

    
514

    
515
def VerifyNode(what, cluster_name):
516
  """Verify the status of the local node.
517

518
  Based on the input L{what} parameter, various checks are done on the
519
  local node.
520

521
  If the I{filelist} key is present, this list of
522
  files is checksummed and the file/checksum pairs are returned.
523

524
  If the I{nodelist} key is present, we check that we have
525
  connectivity via ssh with the target nodes (and check the hostname
526
  report).
527

528
  If the I{node-net-test} key is present, we check that we have
529
  connectivity to the given nodes via both primary IP and, if
530
  applicable, secondary IPs.
531

532
  @type what: C{dict}
533
  @param what: a dictionary of things to check:
534
      - filelist: list of files for which to compute checksums
535
      - nodelist: list of nodes we should check ssh communication with
536
      - node-net-test: list of nodes we should check node daemon port
537
        connectivity with
538
      - hypervisor: list with hypervisors to run the verify for
539
  @rtype: dict
540
  @return: a dictionary with the same keys as the input dict, and
541
      values representing the result of the checks
542

543
  """
544
  result = {}
545
  my_name = netutils.Hostname.GetSysName()
546
  port = netutils.GetDaemonPort(constants.NODED)
547
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
548

    
549
  if constants.NV_HYPERVISOR in what and vm_capable:
550
    result[constants.NV_HYPERVISOR] = tmp = {}
551
    for hv_name in what[constants.NV_HYPERVISOR]:
552
      try:
553
        val = hypervisor.GetHypervisor(hv_name).Verify()
554
      except errors.HypervisorError, err:
555
        val = "Error while checking hypervisor: %s" % str(err)
556
      tmp[hv_name] = val
557

    
558
  if constants.NV_HVPARAMS in what and vm_capable:
559
    result[constants.NV_HVPARAMS] = tmp = []
560
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
561
      try:
562
        logging.info("Validating hv %s, %s", hv_name, hvparms)
563
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
564
      except errors.HypervisorError, err:
565
        tmp.append((source, hv_name, str(err)))
566

    
567
  if constants.NV_FILELIST in what:
568
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
569
      what[constants.NV_FILELIST])
570

    
571
  if constants.NV_NODELIST in what:
572
    (nodes, bynode) = what[constants.NV_NODELIST]
573

    
574
    # Add nodes from other groups (different for each node)
575
    try:
576
      nodes.extend(bynode[my_name])
577
    except KeyError:
578
      pass
579

    
580
    # Use a random order
581
    random.shuffle(nodes)
582

    
583
    # Try to contact all nodes
584
    val = {}
585
    for node in nodes:
586
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
587
      if not success:
588
        val[node] = message
589

    
590
    result[constants.NV_NODELIST] = val
591

    
592
  if constants.NV_NODENETTEST in what:
593
    result[constants.NV_NODENETTEST] = tmp = {}
594
    my_pip = my_sip = None
595
    for name, pip, sip in what[constants.NV_NODENETTEST]:
596
      if name == my_name:
597
        my_pip = pip
598
        my_sip = sip
599
        break
600
    if not my_pip:
601
      tmp[my_name] = ("Can't find my own primary/secondary IP"
602
                      " in the node list")
603
    else:
604
      for name, pip, sip in what[constants.NV_NODENETTEST]:
605
        fail = []
606
        if not netutils.TcpPing(pip, port, source=my_pip):
607
          fail.append("primary")
608
        if sip != pip:
609
          if not netutils.TcpPing(sip, port, source=my_sip):
610
            fail.append("secondary")
611
        if fail:
612
          tmp[name] = ("failure using the %s interface(s)" %
613
                       " and ".join(fail))
614

    
615
  if constants.NV_MASTERIP in what:
616
    # FIXME: add checks on incoming data structures (here and in the
617
    # rest of the function)
618
    master_name, master_ip = what[constants.NV_MASTERIP]
619
    if master_name == my_name:
620
      source = constants.IP4_ADDRESS_LOCALHOST
621
    else:
622
      source = None
623
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
624
                                                  source=source)
625

    
626
  if constants.NV_OOB_PATHS in what:
627
    result[constants.NV_OOB_PATHS] = tmp = []
628
    for path in what[constants.NV_OOB_PATHS]:
629
      try:
630
        st = os.stat(path)
631
      except OSError, err:
632
        tmp.append("error stating out of band helper: %s" % err)
633
      else:
634
        if stat.S_ISREG(st.st_mode):
635
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
636
            tmp.append(None)
637
          else:
638
            tmp.append("out of band helper %s is not executable" % path)
639
        else:
640
          tmp.append("out of band helper %s is not a file" % path)
641

    
642
  if constants.NV_LVLIST in what and vm_capable:
643
    try:
644
      val = GetVolumeList(utils.ListVolumeGroups().keys())
645
    except RPCFail, err:
646
      val = str(err)
647
    result[constants.NV_LVLIST] = val
648

    
649
  if constants.NV_INSTANCELIST in what and vm_capable:
650
    # GetInstanceList can fail
651
    try:
652
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
653
    except RPCFail, err:
654
      val = str(err)
655
    result[constants.NV_INSTANCELIST] = val
656

    
657
  if constants.NV_VGLIST in what and vm_capable:
658
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
659

    
660
  if constants.NV_PVLIST in what and vm_capable:
661
    result[constants.NV_PVLIST] = \
662
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
663
                                   filter_allocatable=False)
664

    
665
  if constants.NV_VERSION in what:
666
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
667
                                    constants.RELEASE_VERSION)
668

    
669
  if constants.NV_HVINFO in what and vm_capable:
670
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
671
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
672

    
673
  if constants.NV_DRBDLIST in what and vm_capable:
674
    try:
675
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
676
    except errors.BlockDeviceError, err:
677
      logging.warning("Can't get used minors list", exc_info=True)
678
      used_minors = str(err)
679
    result[constants.NV_DRBDLIST] = used_minors
680

    
681
  if constants.NV_DRBDHELPER in what and vm_capable:
682
    status = True
683
    try:
684
      payload = bdev.BaseDRBD.GetUsermodeHelper()
685
    except errors.BlockDeviceError, err:
686
      logging.error("Can't get DRBD usermode helper: %s", str(err))
687
      status = False
688
      payload = str(err)
689
    result[constants.NV_DRBDHELPER] = (status, payload)
690

    
691
  if constants.NV_NODESETUP in what:
692
    result[constants.NV_NODESETUP] = tmpr = []
693
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
694
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
695
                  " under /sys, missing required directories /sys/block"
696
                  " and /sys/class/net")
697
    if (not os.path.isdir("/proc/sys") or
698
        not os.path.isfile("/proc/sysrq-trigger")):
699
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
700
                  " under /proc, missing required directory /proc/sys and"
701
                  " the file /proc/sysrq-trigger")
702

    
703
  if constants.NV_TIME in what:
704
    result[constants.NV_TIME] = utils.SplitTime(time.time())
705

    
706
  if constants.NV_OSLIST in what and vm_capable:
707
    result[constants.NV_OSLIST] = DiagnoseOS()
708

    
709
  if constants.NV_BRIDGES in what and vm_capable:
710
    result[constants.NV_BRIDGES] = [bridge
711
                                    for bridge in what[constants.NV_BRIDGES]
712
                                    if not utils.BridgeExists(bridge)]
713
  return result
714

    
715

    
716
def GetBlockDevSizes(devices):
717
  """Return the size of the given block devices
718

719
  @type devices: list
720
  @param devices: list of block device nodes to query
721
  @rtype: dict
722
  @return:
723
    dictionary of all block devices under /dev (key). The value is their
724
    size in MiB.
725

726
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
727

728
  """
729
  DEV_PREFIX = "/dev/"
730
  blockdevs = {}
731

    
732
  for devpath in devices:
733
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
734
      continue
735

    
736
    try:
737
      st = os.stat(devpath)
738
    except EnvironmentError, err:
739
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
740
      continue
741

    
742
    if stat.S_ISBLK(st.st_mode):
743
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
744
      if result.failed:
745
        # We don't want to fail, just do not list this device as available
746
        logging.warning("Cannot get size for block device %s", devpath)
747
        continue
748

    
749
      size = int(result.stdout) / (1024 * 1024)
750
      blockdevs[devpath] = size
751
  return blockdevs
752

    
753

    
754
def GetVolumeList(vg_names):
755
  """Compute list of logical volumes and their size.
756

757
  @type vg_names: list
758
  @param vg_names: the volume groups whose LVs we should list, or
759
      empty for all volume groups
760
  @rtype: dict
761
  @return:
762
      dictionary of all partions (key) with value being a tuple of
763
      their size (in MiB), inactive and online status::
764

765
        {'xenvg/test1': ('20.06', True, True)}
766

767
      in case of errors, a string is returned with the error
768
      details.
769

770
  """
771
  lvs = {}
772
  sep = "|"
773
  if not vg_names:
774
    vg_names = []
775
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
776
                         "--separator=%s" % sep,
777
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
778
  if result.failed:
779
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
780

    
781
  for line in result.stdout.splitlines():
782
    line = line.strip()
783
    match = _LVSLINE_REGEX.match(line)
784
    if not match:
785
      logging.error("Invalid line returned from lvs output: '%s'", line)
786
      continue
787
    vg_name, name, size, attr = match.groups()
788
    inactive = attr[4] == "-"
789
    online = attr[5] == "o"
790
    virtual = attr[0] == "v"
791
    if virtual:
792
      # we don't want to report such volumes as existing, since they
793
      # don't really hold data
794
      continue
795
    lvs[vg_name + "/" + name] = (size, inactive, online)
796

    
797
  return lvs
798

    
799

    
800
def ListVolumeGroups():
801
  """List the volume groups and their size.
802

803
  @rtype: dict
804
  @return: dictionary with keys volume name and values the
805
      size of the volume
806

807
  """
808
  return utils.ListVolumeGroups()
809

    
810

    
811
def NodeVolumes():
812
  """List all volumes on this node.
813

814
  @rtype: list
815
  @return:
816
    A list of dictionaries, each having four keys:
817
      - name: the logical volume name,
818
      - size: the size of the logical volume
819
      - dev: the physical device on which the LV lives
820
      - vg: the volume group to which it belongs
821

822
    In case of errors, we return an empty list and log the
823
    error.
824

825
    Note that since a logical volume can live on multiple physical
826
    volumes, the resulting list might include a logical volume
827
    multiple times.
828

829
  """
830
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
831
                         "--separator=|",
832
                         "--options=lv_name,lv_size,devices,vg_name"])
833
  if result.failed:
834
    _Fail("Failed to list logical volumes, lvs output: %s",
835
          result.output)
836

    
837
  def parse_dev(dev):
838
    return dev.split("(")[0]
839

    
840
  def handle_dev(dev):
841
    return [parse_dev(x) for x in dev.split(",")]
842

    
843
  def map_line(line):
844
    line = [v.strip() for v in line]
845
    return [{"name": line[0], "size": line[1],
846
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
847

    
848
  all_devs = []
849
  for line in result.stdout.splitlines():
850
    if line.count("|") >= 3:
851
      all_devs.extend(map_line(line.split("|")))
852
    else:
853
      logging.warning("Strange line in the output from lvs: '%s'", line)
854
  return all_devs
855

    
856

    
857
def BridgesExist(bridges_list):
858
  """Check if a list of bridges exist on the current node.
859

860
  @rtype: boolean
861
  @return: C{True} if all of them exist, C{False} otherwise
862

863
  """
864
  missing = []
865
  for bridge in bridges_list:
866
    if not utils.BridgeExists(bridge):
867
      missing.append(bridge)
868

    
869
  if missing:
870
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
871

    
872

    
873
def GetInstanceList(hypervisor_list):
874
  """Provides a list of instances.
875

876
  @type hypervisor_list: list
877
  @param hypervisor_list: the list of hypervisors to query information
878

879
  @rtype: list
880
  @return: a list of all running instances on the current node
881
    - instance1.example.com
882
    - instance2.example.com
883

884
  """
885
  results = []
886
  for hname in hypervisor_list:
887
    try:
888
      names = hypervisor.GetHypervisor(hname).ListInstances()
889
      results.extend(names)
890
    except errors.HypervisorError, err:
891
      _Fail("Error enumerating instances (hypervisor %s): %s",
892
            hname, err, exc=True)
893

    
894
  return results
895

    
896

    
897
def GetInstanceInfo(instance, hname):
898
  """Gives back the information about an instance as a dictionary.
899

900
  @type instance: string
901
  @param instance: the instance name
902
  @type hname: string
903
  @param hname: the hypervisor type of the instance
904

905
  @rtype: dict
906
  @return: dictionary with the following keys:
907
      - memory: memory size of instance (int)
908
      - state: xen state of instance (string)
909
      - time: cpu time of instance (float)
910

911
  """
912
  output = {}
913

    
914
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
915
  if iinfo is not None:
916
    output["memory"] = iinfo[2]
917
    output["state"] = iinfo[4]
918
    output["time"] = iinfo[5]
919

    
920
  return output
921

    
922

    
923
def GetInstanceMigratable(instance):
924
  """Gives whether an instance can be migrated.
925

926
  @type instance: L{objects.Instance}
927
  @param instance: object representing the instance to be checked.
928

929
  @rtype: tuple
930
  @return: tuple of (result, description) where:
931
      - result: whether the instance can be migrated or not
932
      - description: a description of the issue, if relevant
933

934
  """
935
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
936
  iname = instance.name
937
  if iname not in hyper.ListInstances():
938
    _Fail("Instance %s is not running", iname)
939

    
940
  for idx in range(len(instance.disks)):
941
    link_name = _GetBlockDevSymlinkPath(iname, idx)
942
    if not os.path.islink(link_name):
943
      logging.warning("Instance %s is missing symlink %s for disk %d",
944
                      iname, link_name, idx)
945

    
946

    
947
def GetAllInstancesInfo(hypervisor_list):
948
  """Gather data about all instances.
949

950
  This is the equivalent of L{GetInstanceInfo}, except that it
951
  computes data for all instances at once, thus being faster if one
952
  needs data about more than one instance.
953

954
  @type hypervisor_list: list
955
  @param hypervisor_list: list of hypervisors to query for instance data
956

957
  @rtype: dict
958
  @return: dictionary of instance: data, with data having the following keys:
959
      - memory: memory size of instance (int)
960
      - state: xen state of instance (string)
961
      - time: cpu time of instance (float)
962
      - vcpus: the number of vcpus
963

964
  """
965
  output = {}
966

    
967
  for hname in hypervisor_list:
968
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
969
    if iinfo:
970
      for name, _, memory, vcpus, state, times in iinfo:
971
        value = {
972
          "memory": memory,
973
          "vcpus": vcpus,
974
          "state": state,
975
          "time": times,
976
          }
977
        if name in output:
978
          # we only check static parameters, like memory and vcpus,
979
          # and not state and time which can change between the
980
          # invocations of the different hypervisors
981
          for key in "memory", "vcpus":
982
            if value[key] != output[name][key]:
983
              _Fail("Instance %s is running twice"
984
                    " with different parameters", name)
985
        output[name] = value
986

    
987
  return output
988

    
989

    
990
def _InstanceLogName(kind, os_name, instance, component):
991
  """Compute the OS log filename for a given instance and operation.
992

993
  The instance name and os name are passed in as strings since not all
994
  operations have these as part of an instance object.
995

996
  @type kind: string
997
  @param kind: the operation type (e.g. add, import, etc.)
998
  @type os_name: string
999
  @param os_name: the os name
1000
  @type instance: string
1001
  @param instance: the name of the instance being imported/added/etc.
1002
  @type component: string or None
1003
  @param component: the name of the component of the instance being
1004
      transferred
1005

1006
  """
1007
  # TODO: Use tempfile.mkstemp to create unique filename
1008
  if component:
1009
    assert "/" not in component
1010
    c_msg = "-%s" % component
1011
  else:
1012
    c_msg = ""
1013
  base = ("%s-%s-%s%s-%s.log" %
1014
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1015
  return utils.PathJoin(constants.LOG_OS_DIR, base)
1016

    
1017

    
1018
def InstanceOsAdd(instance, reinstall, debug):
1019
  """Add an OS to an instance.
1020

1021
  @type instance: L{objects.Instance}
1022
  @param instance: Instance whose OS is to be installed
1023
  @type reinstall: boolean
1024
  @param reinstall: whether this is an instance reinstall
1025
  @type debug: integer
1026
  @param debug: debug level, passed to the OS scripts
1027
  @rtype: None
1028

1029
  """
1030
  inst_os = OSFromDisk(instance.os)
1031

    
1032
  create_env = OSEnvironment(instance, inst_os, debug)
1033
  if reinstall:
1034
    create_env["INSTANCE_REINSTALL"] = "1"
1035

    
1036
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1037

    
1038
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1039
                        cwd=inst_os.path, output=logfile, reset_env=True)
1040
  if result.failed:
1041
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1042
                  " output: %s", result.cmd, result.fail_reason, logfile,
1043
                  result.output)
1044
    lines = [utils.SafeEncode(val)
1045
             for val in utils.TailFile(logfile, lines=20)]
1046
    _Fail("OS create script failed (%s), last lines in the"
1047
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1048

    
1049

    
1050
def RunRenameInstance(instance, old_name, debug):
1051
  """Run the OS rename script for an instance.
1052

1053
  @type instance: L{objects.Instance}
1054
  @param instance: Instance whose OS is to be installed
1055
  @type old_name: string
1056
  @param old_name: previous instance name
1057
  @type debug: integer
1058
  @param debug: debug level, passed to the OS scripts
1059
  @rtype: boolean
1060
  @return: the success of the operation
1061

1062
  """
1063
  inst_os = OSFromDisk(instance.os)
1064

    
1065
  rename_env = OSEnvironment(instance, inst_os, debug)
1066
  rename_env["OLD_INSTANCE_NAME"] = old_name
1067

    
1068
  logfile = _InstanceLogName("rename", instance.os,
1069
                             "%s-%s" % (old_name, instance.name), None)
1070

    
1071
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1072
                        cwd=inst_os.path, output=logfile, reset_env=True)
1073

    
1074
  if result.failed:
1075
    logging.error("os create command '%s' returned error: %s output: %s",
1076
                  result.cmd, result.fail_reason, result.output)
1077
    lines = [utils.SafeEncode(val)
1078
             for val in utils.TailFile(logfile, lines=20)]
1079
    _Fail("OS rename script failed (%s), last lines in the"
1080
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1081

    
1082

    
1083
def _GetBlockDevSymlinkPath(instance_name, idx):
1084
  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1085
                        (instance_name, constants.DISK_SEPARATOR, idx))
1086

    
1087

    
1088
def _SymlinkBlockDev(instance_name, device_path, idx):
1089
  """Set up symlinks to a instance's block device.
1090

1091
  This is an auxiliary function run when an instance is start (on the primary
1092
  node) or when an instance is migrated (on the target node).
1093

1094

1095
  @param instance_name: the name of the target instance
1096
  @param device_path: path of the physical block device, on the node
1097
  @param idx: the disk index
1098
  @return: absolute path to the disk's symlink
1099

1100
  """
1101
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1102
  try:
1103
    os.symlink(device_path, link_name)
1104
  except OSError, err:
1105
    if err.errno == errno.EEXIST:
1106
      if (not os.path.islink(link_name) or
1107
          os.readlink(link_name) != device_path):
1108
        os.remove(link_name)
1109
        os.symlink(device_path, link_name)
1110
    else:
1111
      raise
1112

    
1113
  return link_name
1114

    
1115

    
1116
def _RemoveBlockDevLinks(instance_name, disks):
1117
  """Remove the block device symlinks belonging to the given instance.
1118

1119
  """
1120
  for idx, _ in enumerate(disks):
1121
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1122
    if os.path.islink(link_name):
1123
      try:
1124
        os.remove(link_name)
1125
      except OSError:
1126
        logging.exception("Can't remove symlink '%s'", link_name)
1127

    
1128

    
1129
def _GatherAndLinkBlockDevs(instance):
1130
  """Set up an instance's block device(s).
1131

1132
  This is run on the primary node at instance startup. The block
1133
  devices must be already assembled.
1134

1135
  @type instance: L{objects.Instance}
1136
  @param instance: the instance whose disks we shoul assemble
1137
  @rtype: list
1138
  @return: list of (disk_object, device_path)
1139

1140
  """
1141
  block_devices = []
1142
  for idx, disk in enumerate(instance.disks):
1143
    device = _RecursiveFindBD(disk)
1144
    if device is None:
1145
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1146
                                    str(disk))
1147
    device.Open()
1148
    try:
1149
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1150
    except OSError, e:
1151
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1152
                                    e.strerror)
1153

    
1154
    block_devices.append((disk, link_name))
1155

    
1156
  return block_devices
1157

    
1158

    
1159
def StartInstance(instance, startup_paused):
1160
  """Start an instance.
1161

1162
  @type instance: L{objects.Instance}
1163
  @param instance: the instance object
1164
  @type startup_paused: bool
1165
  @param instance: pause instance at startup?
1166
  @rtype: None
1167

1168
  """
1169
  running_instances = GetInstanceList([instance.hypervisor])
1170

    
1171
  if instance.name in running_instances:
1172
    logging.info("Instance %s already running, not starting", instance.name)
1173
    return
1174

    
1175
  try:
1176
    block_devices = _GatherAndLinkBlockDevs(instance)
1177
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1178
    hyper.StartInstance(instance, block_devices, startup_paused)
1179
  except errors.BlockDeviceError, err:
1180
    _Fail("Block device error: %s", err, exc=True)
1181
  except errors.HypervisorError, err:
1182
    _RemoveBlockDevLinks(instance.name, instance.disks)
1183
    _Fail("Hypervisor error: %s", err, exc=True)
1184

    
1185

    
1186
def InstanceShutdown(instance, timeout):
1187
  """Shut an instance down.
1188

1189
  @note: this functions uses polling with a hardcoded timeout.
1190

1191
  @type instance: L{objects.Instance}
1192
  @param instance: the instance object
1193
  @type timeout: integer
1194
  @param timeout: maximum timeout for soft shutdown
1195
  @rtype: None
1196

1197
  """
1198
  hv_name = instance.hypervisor
1199
  hyper = hypervisor.GetHypervisor(hv_name)
1200
  iname = instance.name
1201

    
1202
  if instance.name not in hyper.ListInstances():
1203
    logging.info("Instance %s not running, doing nothing", iname)
1204
    return
1205

    
1206
  class _TryShutdown:
1207
    def __init__(self):
1208
      self.tried_once = False
1209

    
1210
    def __call__(self):
1211
      if iname not in hyper.ListInstances():
1212
        return
1213

    
1214
      try:
1215
        hyper.StopInstance(instance, retry=self.tried_once)
1216
      except errors.HypervisorError, err:
1217
        if iname not in hyper.ListInstances():
1218
          # if the instance is no longer existing, consider this a
1219
          # success and go to cleanup
1220
          return
1221

    
1222
        _Fail("Failed to stop instance %s: %s", iname, err)
1223

    
1224
      self.tried_once = True
1225

    
1226
      raise utils.RetryAgain()
1227

    
1228
  try:
1229
    utils.Retry(_TryShutdown(), 5, timeout)
1230
  except utils.RetryTimeout:
1231
    # the shutdown did not succeed
1232
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1233

    
1234
    try:
1235
      hyper.StopInstance(instance, force=True)
1236
    except errors.HypervisorError, err:
1237
      if iname in hyper.ListInstances():
1238
        # only raise an error if the instance still exists, otherwise
1239
        # the error could simply be "instance ... unknown"!
1240
        _Fail("Failed to force stop instance %s: %s", iname, err)
1241

    
1242
    time.sleep(1)
1243

    
1244
    if iname in hyper.ListInstances():
1245
      _Fail("Could not shutdown instance %s even by destroy", iname)
1246

    
1247
  try:
1248
    hyper.CleanupInstance(instance.name)
1249
  except errors.HypervisorError, err:
1250
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1251

    
1252
  _RemoveBlockDevLinks(iname, instance.disks)
1253

    
1254

    
1255
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1256
  """Reboot an instance.
1257

1258
  @type instance: L{objects.Instance}
1259
  @param instance: the instance object to reboot
1260
  @type reboot_type: str
1261
  @param reboot_type: the type of reboot, one the following
1262
    constants:
1263
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1264
        instance OS, do not recreate the VM
1265
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1266
        restart the VM (at the hypervisor level)
1267
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1268
        not accepted here, since that mode is handled differently, in
1269
        cmdlib, and translates into full stop and start of the
1270
        instance (instead of a call_instance_reboot RPC)
1271
  @type shutdown_timeout: integer
1272
  @param shutdown_timeout: maximum timeout for soft shutdown
1273
  @rtype: None
1274

1275
  """
1276
  running_instances = GetInstanceList([instance.hypervisor])
1277

    
1278
  if instance.name not in running_instances:
1279
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1280

    
1281
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1282
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1283
    try:
1284
      hyper.RebootInstance(instance)
1285
    except errors.HypervisorError, err:
1286
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1287
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1288
    try:
1289
      InstanceShutdown(instance, shutdown_timeout)
1290
      return StartInstance(instance, False)
1291
    except errors.HypervisorError, err:
1292
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1293
  else:
1294
    _Fail("Invalid reboot_type received: %s", reboot_type)
1295

    
1296

    
1297
def MigrationInfo(instance):
1298
  """Gather information about an instance to be migrated.
1299

1300
  @type instance: L{objects.Instance}
1301
  @param instance: the instance definition
1302

1303
  """
1304
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1305
  try:
1306
    info = hyper.MigrationInfo(instance)
1307
  except errors.HypervisorError, err:
1308
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1309
  return info
1310

    
1311

    
1312
def AcceptInstance(instance, info, target):
1313
  """Prepare the node to accept an instance.
1314

1315
  @type instance: L{objects.Instance}
1316
  @param instance: the instance definition
1317
  @type info: string/data (opaque)
1318
  @param info: migration information, from the source node
1319
  @type target: string
1320
  @param target: target host (usually ip), on this node
1321

1322
  """
1323
  # TODO: why is this required only for DTS_EXT_MIRROR?
1324
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1325
    # Create the symlinks, as the disks are not active
1326
    # in any way
1327
    try:
1328
      _GatherAndLinkBlockDevs(instance)
1329
    except errors.BlockDeviceError, err:
1330
      _Fail("Block device error: %s", err, exc=True)
1331

    
1332
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1333
  try:
1334
    hyper.AcceptInstance(instance, info, target)
1335
  except errors.HypervisorError, err:
1336
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1337
      _RemoveBlockDevLinks(instance.name, instance.disks)
1338
    _Fail("Failed to accept instance: %s", err, exc=True)
1339

    
1340

    
1341
def FinalizeMigration(instance, info, success):
1342
  """Finalize any preparation to accept an instance.
1343

1344
  @type instance: L{objects.Instance}
1345
  @param instance: the instance definition
1346
  @type info: string/data (opaque)
1347
  @param info: migration information, from the source node
1348
  @type success: boolean
1349
  @param success: whether the migration was a success or a failure
1350

1351
  """
1352
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1353
  try:
1354
    hyper.FinalizeMigration(instance, info, success)
1355
  except errors.HypervisorError, err:
1356
    _Fail("Failed to finalize migration: %s", err, exc=True)
1357

    
1358

    
1359
def MigrateInstance(instance, target, live):
1360
  """Migrates an instance to another node.
1361

1362
  @type instance: L{objects.Instance}
1363
  @param instance: the instance definition
1364
  @type target: string
1365
  @param target: the target node name
1366
  @type live: boolean
1367
  @param live: whether the migration should be done live or not (the
1368
      interpretation of this parameter is left to the hypervisor)
1369
  @rtype: tuple
1370
  @return: a tuple of (success, msg) where:
1371
      - succes is a boolean denoting the success/failure of the operation
1372
      - msg is a string with details in case of failure
1373

1374
  """
1375
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1376

    
1377
  try:
1378
    hyper.MigrateInstance(instance, target, live)
1379
  except errors.HypervisorError, err:
1380
    _Fail("Failed to migrate instance: %s", err, exc=True)
1381

    
1382

    
1383
def BlockdevCreate(disk, size, owner, on_primary, info):
1384
  """Creates a block device for an instance.
1385

1386
  @type disk: L{objects.Disk}
1387
  @param disk: the object describing the disk we should create
1388
  @type size: int
1389
  @param size: the size of the physical underlying device, in MiB
1390
  @type owner: str
1391
  @param owner: the name of the instance for which disk is created,
1392
      used for device cache data
1393
  @type on_primary: boolean
1394
  @param on_primary:  indicates if it is the primary node or not
1395
  @type info: string
1396
  @param info: string that will be sent to the physical device
1397
      creation, used for example to set (LVM) tags on LVs
1398

1399
  @return: the new unique_id of the device (this can sometime be
1400
      computed only after creation), or None. On secondary nodes,
1401
      it's not required to return anything.
1402

1403
  """
1404
  # TODO: remove the obsolete "size" argument
1405
  # pylint: disable=W0613
1406
  clist = []
1407
  if disk.children:
1408
    for child in disk.children:
1409
      try:
1410
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1411
      except errors.BlockDeviceError, err:
1412
        _Fail("Can't assemble device %s: %s", child, err)
1413
      if on_primary or disk.AssembleOnSecondary():
1414
        # we need the children open in case the device itself has to
1415
        # be assembled
1416
        try:
1417
          # pylint: disable=E1103
1418
          crdev.Open()
1419
        except errors.BlockDeviceError, err:
1420
          _Fail("Can't make child '%s' read-write: %s", child, err)
1421
      clist.append(crdev)
1422

    
1423
  try:
1424
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1425
  except errors.BlockDeviceError, err:
1426
    _Fail("Can't create block device: %s", err)
1427

    
1428
  if on_primary or disk.AssembleOnSecondary():
1429
    try:
1430
      device.Assemble()
1431
    except errors.BlockDeviceError, err:
1432
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1433
    device.SetSyncSpeed(constants.SYNC_SPEED)
1434
    if on_primary or disk.OpenOnSecondary():
1435
      try:
1436
        device.Open(force=True)
1437
      except errors.BlockDeviceError, err:
1438
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1439
    DevCacheManager.UpdateCache(device.dev_path, owner,
1440
                                on_primary, disk.iv_name)
1441

    
1442
  device.SetInfo(info)
1443

    
1444
  return device.unique_id
1445

    
1446

    
1447
def _WipeDevice(path, offset, size):
1448
  """This function actually wipes the device.
1449

1450
  @param path: The path to the device to wipe
1451
  @param offset: The offset in MiB in the file
1452
  @param size: The size in MiB to write
1453

1454
  """
1455
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1456
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1457
         "count=%d" % size]
1458
  result = utils.RunCmd(cmd)
1459

    
1460
  if result.failed:
1461
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1462
          result.fail_reason, result.output)
1463

    
1464

    
1465
def BlockdevWipe(disk, offset, size):
1466
  """Wipes a block device.
1467

1468
  @type disk: L{objects.Disk}
1469
  @param disk: the disk object we want to wipe
1470
  @type offset: int
1471
  @param offset: The offset in MiB in the file
1472
  @type size: int
1473
  @param size: The size in MiB to write
1474

1475
  """
1476
  try:
1477
    rdev = _RecursiveFindBD(disk)
1478
  except errors.BlockDeviceError:
1479
    rdev = None
1480

    
1481
  if not rdev:
1482
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1483

    
1484
  # Do cross verify some of the parameters
1485
  if offset > rdev.size:
1486
    _Fail("Offset is bigger than device size")
1487
  if (offset + size) > rdev.size:
1488
    _Fail("The provided offset and size to wipe is bigger than device size")
1489

    
1490
  _WipeDevice(rdev.dev_path, offset, size)
1491

    
1492

    
1493
def BlockdevPauseResumeSync(disks, pause):
1494
  """Pause or resume the sync of the block device.
1495

1496
  @type disks: list of L{objects.Disk}
1497
  @param disks: the disks object we want to pause/resume
1498
  @type pause: bool
1499
  @param pause: Wheater to pause or resume
1500

1501
  """
1502
  success = []
1503
  for disk in disks:
1504
    try:
1505
      rdev = _RecursiveFindBD(disk)
1506
    except errors.BlockDeviceError:
1507
      rdev = None
1508

    
1509
    if not rdev:
1510
      success.append((False, ("Cannot change sync for device %s:"
1511
                              " device not found" % disk.iv_name)))
1512
      continue
1513

    
1514
    result = rdev.PauseResumeSync(pause)
1515

    
1516
    if result:
1517
      success.append((result, None))
1518
    else:
1519
      if pause:
1520
        msg = "Pause"
1521
      else:
1522
        msg = "Resume"
1523
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1524

    
1525
  return success
1526

    
1527

    
1528
def BlockdevRemove(disk):
1529
  """Remove a block device.
1530

1531
  @note: This is intended to be called recursively.
1532

1533
  @type disk: L{objects.Disk}
1534
  @param disk: the disk object we should remove
1535
  @rtype: boolean
1536
  @return: the success of the operation
1537

1538
  """
1539
  msgs = []
1540
  try:
1541
    rdev = _RecursiveFindBD(disk)
1542
  except errors.BlockDeviceError, err:
1543
    # probably can't attach
1544
    logging.info("Can't attach to device %s in remove", disk)
1545
    rdev = None
1546
  if rdev is not None:
1547
    r_path = rdev.dev_path
1548
    try:
1549
      rdev.Remove()
1550
    except errors.BlockDeviceError, err:
1551
      msgs.append(str(err))
1552
    if not msgs:
1553
      DevCacheManager.RemoveCache(r_path)
1554

    
1555
  if disk.children:
1556
    for child in disk.children:
1557
      try:
1558
        BlockdevRemove(child)
1559
      except RPCFail, err:
1560
        msgs.append(str(err))
1561

    
1562
  if msgs:
1563
    _Fail("; ".join(msgs))
1564

    
1565

    
1566
def _RecursiveAssembleBD(disk, owner, as_primary):
1567
  """Activate a block device for an instance.
1568

1569
  This is run on the primary and secondary nodes for an instance.
1570

1571
  @note: this function is called recursively.
1572

1573
  @type disk: L{objects.Disk}
1574
  @param disk: the disk we try to assemble
1575
  @type owner: str
1576
  @param owner: the name of the instance which owns the disk
1577
  @type as_primary: boolean
1578
  @param as_primary: if we should make the block device
1579
      read/write
1580

1581
  @return: the assembled device or None (in case no device
1582
      was assembled)
1583
  @raise errors.BlockDeviceError: in case there is an error
1584
      during the activation of the children or the device
1585
      itself
1586

1587
  """
1588
  children = []
1589
  if disk.children:
1590
    mcn = disk.ChildrenNeeded()
1591
    if mcn == -1:
1592
      mcn = 0 # max number of Nones allowed
1593
    else:
1594
      mcn = len(disk.children) - mcn # max number of Nones
1595
    for chld_disk in disk.children:
1596
      try:
1597
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1598
      except errors.BlockDeviceError, err:
1599
        if children.count(None) >= mcn:
1600
          raise
1601
        cdev = None
1602
        logging.error("Error in child activation (but continuing): %s",
1603
                      str(err))
1604
      children.append(cdev)
1605

    
1606
  if as_primary or disk.AssembleOnSecondary():
1607
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1608
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1609
    result = r_dev
1610
    if as_primary or disk.OpenOnSecondary():
1611
      r_dev.Open()
1612
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1613
                                as_primary, disk.iv_name)
1614

    
1615
  else:
1616
    result = True
1617
  return result
1618

    
1619

    
1620
def BlockdevAssemble(disk, owner, as_primary, idx):
1621
  """Activate a block device for an instance.
1622

1623
  This is a wrapper over _RecursiveAssembleBD.
1624

1625
  @rtype: str or boolean
1626
  @return: a C{/dev/...} path for primary nodes, and
1627
      C{True} for secondary nodes
1628

1629
  """
1630
  try:
1631
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1632
    if isinstance(result, bdev.BlockDev):
1633
      # pylint: disable=E1103
1634
      result = result.dev_path
1635
      if as_primary:
1636
        _SymlinkBlockDev(owner, result, idx)
1637
  except errors.BlockDeviceError, err:
1638
    _Fail("Error while assembling disk: %s", err, exc=True)
1639
  except OSError, err:
1640
    _Fail("Error while symlinking disk: %s", err, exc=True)
1641

    
1642
  return result
1643

    
1644

    
1645
def BlockdevShutdown(disk):
1646
  """Shut down a block device.
1647

1648
  First, if the device is assembled (Attach() is successful), then
1649
  the device is shutdown. Then the children of the device are
1650
  shutdown.
1651

1652
  This function is called recursively. Note that we don't cache the
1653
  children or such, as oppossed to assemble, shutdown of different
1654
  devices doesn't require that the upper device was active.
1655

1656
  @type disk: L{objects.Disk}
1657
  @param disk: the description of the disk we should
1658
      shutdown
1659
  @rtype: None
1660

1661
  """
1662
  msgs = []
1663
  r_dev = _RecursiveFindBD(disk)
1664
  if r_dev is not None:
1665
    r_path = r_dev.dev_path
1666
    try:
1667
      r_dev.Shutdown()
1668
      DevCacheManager.RemoveCache(r_path)
1669
    except errors.BlockDeviceError, err:
1670
      msgs.append(str(err))
1671

    
1672
  if disk.children:
1673
    for child in disk.children:
1674
      try:
1675
        BlockdevShutdown(child)
1676
      except RPCFail, err:
1677
        msgs.append(str(err))
1678

    
1679
  if msgs:
1680
    _Fail("; ".join(msgs))
1681

    
1682

    
1683
def BlockdevAddchildren(parent_cdev, new_cdevs):
1684
  """Extend a mirrored block device.
1685

1686
  @type parent_cdev: L{objects.Disk}
1687
  @param parent_cdev: the disk to which we should add children
1688
  @type new_cdevs: list of L{objects.Disk}
1689
  @param new_cdevs: the list of children which we should add
1690
  @rtype: None
1691

1692
  """
1693
  parent_bdev = _RecursiveFindBD(parent_cdev)
1694
  if parent_bdev is None:
1695
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1696
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1697
  if new_bdevs.count(None) > 0:
1698
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1699
  parent_bdev.AddChildren(new_bdevs)
1700

    
1701

    
1702
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1703
  """Shrink a mirrored block device.
1704

1705
  @type parent_cdev: L{objects.Disk}
1706
  @param parent_cdev: the disk from which we should remove children
1707
  @type new_cdevs: list of L{objects.Disk}
1708
  @param new_cdevs: the list of children which we should remove
1709
  @rtype: None
1710

1711
  """
1712
  parent_bdev = _RecursiveFindBD(parent_cdev)
1713
  if parent_bdev is None:
1714
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1715
  devs = []
1716
  for disk in new_cdevs:
1717
    rpath = disk.StaticDevPath()
1718
    if rpath is None:
1719
      bd = _RecursiveFindBD(disk)
1720
      if bd is None:
1721
        _Fail("Can't find device %s while removing children", disk)
1722
      else:
1723
        devs.append(bd.dev_path)
1724
    else:
1725
      if not utils.IsNormAbsPath(rpath):
1726
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1727
      devs.append(rpath)
1728
  parent_bdev.RemoveChildren(devs)
1729

    
1730

    
1731
def BlockdevGetmirrorstatus(disks):
1732
  """Get the mirroring status of a list of devices.
1733

1734
  @type disks: list of L{objects.Disk}
1735
  @param disks: the list of disks which we should query
1736
  @rtype: disk
1737
  @return: List of L{objects.BlockDevStatus}, one for each disk
1738
  @raise errors.BlockDeviceError: if any of the disks cannot be
1739
      found
1740

1741
  """
1742
  stats = []
1743
  for dsk in disks:
1744
    rbd = _RecursiveFindBD(dsk)
1745
    if rbd is None:
1746
      _Fail("Can't find device %s", dsk)
1747

    
1748
    stats.append(rbd.CombinedSyncStatus())
1749

    
1750
  return stats
1751

    
1752

    
1753
def BlockdevGetmirrorstatusMulti(disks):
1754
  """Get the mirroring status of a list of devices.
1755

1756
  @type disks: list of L{objects.Disk}
1757
  @param disks: the list of disks which we should query
1758
  @rtype: disk
1759
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1760
    success/failure, status is L{objects.BlockDevStatus} on success, string
1761
    otherwise
1762

1763
  """
1764
  result = []
1765
  for disk in disks:
1766
    try:
1767
      rbd = _RecursiveFindBD(disk)
1768
      if rbd is None:
1769
        result.append((False, "Can't find device %s" % disk))
1770
        continue
1771

    
1772
      status = rbd.CombinedSyncStatus()
1773
    except errors.BlockDeviceError, err:
1774
      logging.exception("Error while getting disk status")
1775
      result.append((False, str(err)))
1776
    else:
1777
      result.append((True, status))
1778

    
1779
  assert len(disks) == len(result)
1780

    
1781
  return result
1782

    
1783

    
1784
def _RecursiveFindBD(disk):
1785
  """Check if a device is activated.
1786

1787
  If so, return information about the real device.
1788

1789
  @type disk: L{objects.Disk}
1790
  @param disk: the disk object we need to find
1791

1792
  @return: None if the device can't be found,
1793
      otherwise the device instance
1794

1795
  """
1796
  children = []
1797
  if disk.children:
1798
    for chdisk in disk.children:
1799
      children.append(_RecursiveFindBD(chdisk))
1800

    
1801
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1802

    
1803

    
1804
def _OpenRealBD(disk):
1805
  """Opens the underlying block device of a disk.
1806

1807
  @type disk: L{objects.Disk}
1808
  @param disk: the disk object we want to open
1809

1810
  """
1811
  real_disk = _RecursiveFindBD(disk)
1812
  if real_disk is None:
1813
    _Fail("Block device '%s' is not set up", disk)
1814

    
1815
  real_disk.Open()
1816

    
1817
  return real_disk
1818

    
1819

    
1820
def BlockdevFind(disk):
1821
  """Check if a device is activated.
1822

1823
  If it is, return information about the real device.
1824

1825
  @type disk: L{objects.Disk}
1826
  @param disk: the disk to find
1827
  @rtype: None or objects.BlockDevStatus
1828
  @return: None if the disk cannot be found, otherwise a the current
1829
           information
1830

1831
  """
1832
  try:
1833
    rbd = _RecursiveFindBD(disk)
1834
  except errors.BlockDeviceError, err:
1835
    _Fail("Failed to find device: %s", err, exc=True)
1836

    
1837
  if rbd is None:
1838
    return None
1839

    
1840
  return rbd.GetSyncStatus()
1841

    
1842

    
1843
def BlockdevGetsize(disks):
1844
  """Computes the size of the given disks.
1845

1846
  If a disk is not found, returns None instead.
1847

1848
  @type disks: list of L{objects.Disk}
1849
  @param disks: the list of disk to compute the size for
1850
  @rtype: list
1851
  @return: list with elements None if the disk cannot be found,
1852
      otherwise the size
1853

1854
  """
1855
  result = []
1856
  for cf in disks:
1857
    try:
1858
      rbd = _RecursiveFindBD(cf)
1859
    except errors.BlockDeviceError:
1860
      result.append(None)
1861
      continue
1862
    if rbd is None:
1863
      result.append(None)
1864
    else:
1865
      result.append(rbd.GetActualSize())
1866
  return result
1867

    
1868

    
1869
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1870
  """Export a block device to a remote node.
1871

1872
  @type disk: L{objects.Disk}
1873
  @param disk: the description of the disk to export
1874
  @type dest_node: str
1875
  @param dest_node: the destination node to export to
1876
  @type dest_path: str
1877
  @param dest_path: the destination path on the target node
1878
  @type cluster_name: str
1879
  @param cluster_name: the cluster name, needed for SSH hostalias
1880
  @rtype: None
1881

1882
  """
1883
  real_disk = _OpenRealBD(disk)
1884

    
1885
  # the block size on the read dd is 1MiB to match our units
1886
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1887
                               "dd if=%s bs=1048576 count=%s",
1888
                               real_disk.dev_path, str(disk.size))
1889

    
1890
  # we set here a smaller block size as, due to ssh buffering, more
1891
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1892
  # device is not already there or we pass a wrong path; we use
1893
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1894
  # to not buffer too much memory; this means that at best, we flush
1895
  # every 64k, which will not be very fast
1896
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1897
                                " oflag=dsync", dest_path)
1898

    
1899
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1900
                                                   constants.GANETI_RUNAS,
1901
                                                   destcmd)
1902

    
1903
  # all commands have been checked, so we're safe to combine them
1904
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1905

    
1906
  result = utils.RunCmd(["bash", "-c", command])
1907

    
1908
  if result.failed:
1909
    _Fail("Disk copy command '%s' returned error: %s"
1910
          " output: %s", command, result.fail_reason, result.output)
1911

    
1912

    
1913
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1914
  """Write a file to the filesystem.
1915

1916
  This allows the master to overwrite(!) a file. It will only perform
1917
  the operation if the file belongs to a list of configuration files.
1918

1919
  @type file_name: str
1920
  @param file_name: the target file name
1921
  @type data: str
1922
  @param data: the new contents of the file
1923
  @type mode: int
1924
  @param mode: the mode to give the file (can be None)
1925
  @type uid: string
1926
  @param uid: the owner of the file
1927
  @type gid: string
1928
  @param gid: the group of the file
1929
  @type atime: float
1930
  @param atime: the atime to set on the file (can be None)
1931
  @type mtime: float
1932
  @param mtime: the mtime to set on the file (can be None)
1933
  @rtype: None
1934

1935
  """
1936
  if not os.path.isabs(file_name):
1937
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1938

    
1939
  if file_name not in _ALLOWED_UPLOAD_FILES:
1940
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1941
          file_name)
1942

    
1943
  raw_data = _Decompress(data)
1944

    
1945
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1946
    _Fail("Invalid username/groupname type")
1947

    
1948
  getents = runtime.GetEnts()
1949
  uid = getents.LookupUser(uid)
1950
  gid = getents.LookupGroup(gid)
1951

    
1952
  utils.SafeWriteFile(file_name, None,
1953
                      data=raw_data, mode=mode, uid=uid, gid=gid,
1954
                      atime=atime, mtime=mtime)
1955

    
1956

    
1957
def RunOob(oob_program, command, node, timeout):
1958
  """Executes oob_program with given command on given node.
1959

1960
  @param oob_program: The path to the executable oob_program
1961
  @param command: The command to invoke on oob_program
1962
  @param node: The node given as an argument to the program
1963
  @param timeout: Timeout after which we kill the oob program
1964

1965
  @return: stdout
1966
  @raise RPCFail: If execution fails for some reason
1967

1968
  """
1969
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1970

    
1971
  if result.failed:
1972
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1973
          result.fail_reason, result.output)
1974

    
1975
  return result.stdout
1976

    
1977

    
1978
def WriteSsconfFiles(values):
1979
  """Update all ssconf files.
1980

1981
  Wrapper around the SimpleStore.WriteFiles.
1982

1983
  """
1984
  ssconf.SimpleStore().WriteFiles(values)
1985

    
1986

    
1987
def _ErrnoOrStr(err):
1988
  """Format an EnvironmentError exception.
1989

1990
  If the L{err} argument has an errno attribute, it will be looked up
1991
  and converted into a textual C{E...} description. Otherwise the
1992
  string representation of the error will be returned.
1993

1994
  @type err: L{EnvironmentError}
1995
  @param err: the exception to format
1996

1997
  """
1998
  if hasattr(err, "errno"):
1999
    detail = errno.errorcode[err.errno]
2000
  else:
2001
    detail = str(err)
2002
  return detail
2003

    
2004

    
2005
def _OSOndiskAPIVersion(os_dir):
2006
  """Compute and return the API version of a given OS.
2007

2008
  This function will try to read the API version of the OS residing in
2009
  the 'os_dir' directory.
2010

2011
  @type os_dir: str
2012
  @param os_dir: the directory in which we should look for the OS
2013
  @rtype: tuple
2014
  @return: tuple (status, data) with status denoting the validity and
2015
      data holding either the vaid versions or an error message
2016

2017
  """
2018
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2019

    
2020
  try:
2021
    st = os.stat(api_file)
2022
  except EnvironmentError, err:
2023
    return False, ("Required file '%s' not found under path %s: %s" %
2024
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2025

    
2026
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2027
    return False, ("File '%s' in %s is not a regular file" %
2028
                   (constants.OS_API_FILE, os_dir))
2029

    
2030
  try:
2031
    api_versions = utils.ReadFile(api_file).splitlines()
2032
  except EnvironmentError, err:
2033
    return False, ("Error while reading the API version file at %s: %s" %
2034
                   (api_file, _ErrnoOrStr(err)))
2035

    
2036
  try:
2037
    api_versions = [int(version.strip()) for version in api_versions]
2038
  except (TypeError, ValueError), err:
2039
    return False, ("API version(s) can't be converted to integer: %s" %
2040
                   str(err))
2041

    
2042
  return True, api_versions
2043

    
2044

    
2045
def DiagnoseOS(top_dirs=None):
2046
  """Compute the validity for all OSes.
2047

2048
  @type top_dirs: list
2049
  @param top_dirs: the list of directories in which to
2050
      search (if not given defaults to
2051
      L{constants.OS_SEARCH_PATH})
2052
  @rtype: list of L{objects.OS}
2053
  @return: a list of tuples (name, path, status, diagnose, variants,
2054
      parameters, api_version) for all (potential) OSes under all
2055
      search paths, where:
2056
          - name is the (potential) OS name
2057
          - path is the full path to the OS
2058
          - status True/False is the validity of the OS
2059
          - diagnose is the error message for an invalid OS, otherwise empty
2060
          - variants is a list of supported OS variants, if any
2061
          - parameters is a list of (name, help) parameters, if any
2062
          - api_version is a list of support OS API versions
2063

2064
  """
2065
  if top_dirs is None:
2066
    top_dirs = constants.OS_SEARCH_PATH
2067

    
2068
  result = []
2069
  for dir_name in top_dirs:
2070
    if os.path.isdir(dir_name):
2071
      try:
2072
        f_names = utils.ListVisibleFiles(dir_name)
2073
      except EnvironmentError, err:
2074
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2075
        break
2076
      for name in f_names:
2077
        os_path = utils.PathJoin(dir_name, name)
2078
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2079
        if status:
2080
          diagnose = ""
2081
          variants = os_inst.supported_variants
2082
          parameters = os_inst.supported_parameters
2083
          api_versions = os_inst.api_versions
2084
        else:
2085
          diagnose = os_inst
2086
          variants = parameters = api_versions = []
2087
        result.append((name, os_path, status, diagnose, variants,
2088
                       parameters, api_versions))
2089

    
2090
  return result
2091

    
2092

    
2093
def _TryOSFromDisk(name, base_dir=None):
2094
  """Create an OS instance from disk.
2095

2096
  This function will return an OS instance if the given name is a
2097
  valid OS name.
2098

2099
  @type base_dir: string
2100
  @keyword base_dir: Base directory containing OS installations.
2101
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2102
  @rtype: tuple
2103
  @return: success and either the OS instance if we find a valid one,
2104
      or error message
2105

2106
  """
2107
  if base_dir is None:
2108
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2109
  else:
2110
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2111

    
2112
  if os_dir is None:
2113
    return False, "Directory for OS %s not found in search path" % name
2114

    
2115
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2116
  if not status:
2117
    # push the error up
2118
    return status, api_versions
2119

    
2120
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2121
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2122
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2123

    
2124
  # OS Files dictionary, we will populate it with the absolute path
2125
  # names; if the value is True, then it is a required file, otherwise
2126
  # an optional one
2127
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2128

    
2129
  if max(api_versions) >= constants.OS_API_V15:
2130
    os_files[constants.OS_VARIANTS_FILE] = False
2131

    
2132
  if max(api_versions) >= constants.OS_API_V20:
2133
    os_files[constants.OS_PARAMETERS_FILE] = True
2134
  else:
2135
    del os_files[constants.OS_SCRIPT_VERIFY]
2136

    
2137
  for (filename, required) in os_files.items():
2138
    os_files[filename] = utils.PathJoin(os_dir, filename)
2139

    
2140
    try:
2141
      st = os.stat(os_files[filename])
2142
    except EnvironmentError, err:
2143
      if err.errno == errno.ENOENT and not required:
2144
        del os_files[filename]
2145
        continue
2146
      return False, ("File '%s' under path '%s' is missing (%s)" %
2147
                     (filename, os_dir, _ErrnoOrStr(err)))
2148

    
2149
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2150
      return False, ("File '%s' under path '%s' is not a regular file" %
2151
                     (filename, os_dir))
2152

    
2153
    if filename in constants.OS_SCRIPTS:
2154
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2155
        return False, ("File '%s' under path '%s' is not executable" %
2156
                       (filename, os_dir))
2157

    
2158
  variants = []
2159
  if constants.OS_VARIANTS_FILE in os_files:
2160
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2161
    try:
2162
      variants = utils.ReadFile(variants_file).splitlines()
2163
    except EnvironmentError, err:
2164
      # we accept missing files, but not other errors
2165
      if err.errno != errno.ENOENT:
2166
        return False, ("Error while reading the OS variants file at %s: %s" %
2167
                       (variants_file, _ErrnoOrStr(err)))
2168

    
2169
  parameters = []
2170
  if constants.OS_PARAMETERS_FILE in os_files:
2171
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2172
    try:
2173
      parameters = utils.ReadFile(parameters_file).splitlines()
2174
    except EnvironmentError, err:
2175
      return False, ("Error while reading the OS parameters file at %s: %s" %
2176
                     (parameters_file, _ErrnoOrStr(err)))
2177
    parameters = [v.split(None, 1) for v in parameters]
2178

    
2179
  os_obj = objects.OS(name=name, path=os_dir,
2180
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2181
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2182
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2183
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2184
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2185
                                                 None),
2186
                      supported_variants=variants,
2187
                      supported_parameters=parameters,
2188
                      api_versions=api_versions)
2189
  return True, os_obj
2190

    
2191

    
2192
def OSFromDisk(name, base_dir=None):
2193
  """Create an OS instance from disk.
2194

2195
  This function will return an OS instance if the given name is a
2196
  valid OS name. Otherwise, it will raise an appropriate
2197
  L{RPCFail} exception, detailing why this is not a valid OS.
2198

2199
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2200
  an exception but returns true/false status data.
2201

2202
  @type base_dir: string
2203
  @keyword base_dir: Base directory containing OS installations.
2204
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2205
  @rtype: L{objects.OS}
2206
  @return: the OS instance if we find a valid one
2207
  @raise RPCFail: if we don't find a valid OS
2208

2209
  """
2210
  name_only = objects.OS.GetName(name)
2211
  status, payload = _TryOSFromDisk(name_only, base_dir)
2212

    
2213
  if not status:
2214
    _Fail(payload)
2215

    
2216
  return payload
2217

    
2218

    
2219
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2220
  """Calculate the basic environment for an os script.
2221

2222
  @type os_name: str
2223
  @param os_name: full operating system name (including variant)
2224
  @type inst_os: L{objects.OS}
2225
  @param inst_os: operating system for which the environment is being built
2226
  @type os_params: dict
2227
  @param os_params: the OS parameters
2228
  @type debug: integer
2229
  @param debug: debug level (0 or 1, for OS Api 10)
2230
  @rtype: dict
2231
  @return: dict of environment variables
2232
  @raise errors.BlockDeviceError: if the block device
2233
      cannot be found
2234

2235
  """
2236
  result = {}
2237
  api_version = \
2238
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2239
  result["OS_API_VERSION"] = "%d" % api_version
2240
  result["OS_NAME"] = inst_os.name
2241
  result["DEBUG_LEVEL"] = "%d" % debug
2242

    
2243
  # OS variants
2244
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2245
    variant = objects.OS.GetVariant(os_name)
2246
    if not variant:
2247
      variant = inst_os.supported_variants[0]
2248
  else:
2249
    variant = ""
2250
  result["OS_VARIANT"] = variant
2251

    
2252
  # OS params
2253
  for pname, pvalue in os_params.items():
2254
    result["OSP_%s" % pname.upper()] = pvalue
2255

    
2256
  return result
2257

    
2258

    
2259
def OSEnvironment(instance, inst_os, debug=0):
2260
  """Calculate the environment for an os script.
2261

2262
  @type instance: L{objects.Instance}
2263
  @param instance: target instance for the os script run
2264
  @type inst_os: L{objects.OS}
2265
  @param inst_os: operating system for which the environment is being built
2266
  @type debug: integer
2267
  @param debug: debug level (0 or 1, for OS Api 10)
2268
  @rtype: dict
2269
  @return: dict of environment variables
2270
  @raise errors.BlockDeviceError: if the block device
2271
      cannot be found
2272

2273
  """
2274
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2275

    
2276
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2277
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2278

    
2279
  result["HYPERVISOR"] = instance.hypervisor
2280
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2281
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2282
  result["INSTANCE_SECONDARY_NODES"] = \
2283
      ("%s" % " ".join(instance.secondary_nodes))
2284

    
2285
  # Disks
2286
  for idx, disk in enumerate(instance.disks):
2287
    real_disk = _OpenRealBD(disk)
2288
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2289
    result["DISK_%d_ACCESS" % idx] = disk.mode
2290
    if constants.HV_DISK_TYPE in instance.hvparams:
2291
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2292
        instance.hvparams[constants.HV_DISK_TYPE]
2293
    if disk.dev_type in constants.LDS_BLOCK:
2294
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2295
    elif disk.dev_type == constants.LD_FILE:
2296
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2297
        "file:%s" % disk.physical_id[0]
2298

    
2299
  # NICs
2300
  for idx, nic in enumerate(instance.nics):
2301
    result["NIC_%d_MAC" % idx] = nic.mac
2302
    if nic.ip:
2303
      result["NIC_%d_IP" % idx] = nic.ip
2304
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2305
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2306
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2307
    if nic.nicparams[constants.NIC_LINK]:
2308
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2309
    if constants.HV_NIC_TYPE in instance.hvparams:
2310
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2311
        instance.hvparams[constants.HV_NIC_TYPE]
2312

    
2313
  # HV/BE params
2314
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2315
    for key, value in source.items():
2316
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2317

    
2318
  return result
2319

    
2320

    
2321
def BlockdevGrow(disk, amount, dryrun):
2322
  """Grow a stack of block devices.
2323

2324
  This function is called recursively, with the childrens being the
2325
  first ones to resize.
2326

2327
  @type disk: L{objects.Disk}
2328
  @param disk: the disk to be grown
2329
  @type amount: integer
2330
  @param amount: the amount (in mebibytes) to grow with
2331
  @type dryrun: boolean
2332
  @param dryrun: whether to execute the operation in simulation mode
2333
      only, without actually increasing the size
2334
  @rtype: (status, result)
2335
  @return: a tuple with the status of the operation (True/False), and
2336
      the errors message if status is False
2337

2338
  """
2339
  r_dev = _RecursiveFindBD(disk)
2340
  if r_dev is None:
2341
    _Fail("Cannot find block device %s", disk)
2342

    
2343
  try:
2344
    r_dev.Grow(amount, dryrun)
2345
  except errors.BlockDeviceError, err:
2346
    _Fail("Failed to grow block device: %s", err, exc=True)
2347

    
2348

    
2349
def BlockdevSnapshot(disk):
2350
  """Create a snapshot copy of a block device.
2351

2352
  This function is called recursively, and the snapshot is actually created
2353
  just for the leaf lvm backend device.
2354

2355
  @type disk: L{objects.Disk}
2356
  @param disk: the disk to be snapshotted
2357
  @rtype: string
2358
  @return: snapshot disk ID as (vg, lv)
2359

2360
  """
2361
  if disk.dev_type == constants.LD_DRBD8:
2362
    if not disk.children:
2363
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2364
            disk.unique_id)
2365
    return BlockdevSnapshot(disk.children[0])
2366
  elif disk.dev_type == constants.LD_LV:
2367
    r_dev = _RecursiveFindBD(disk)
2368
    if r_dev is not None:
2369
      # FIXME: choose a saner value for the snapshot size
2370
      # let's stay on the safe side and ask for the full size, for now
2371
      return r_dev.Snapshot(disk.size)
2372
    else:
2373
      _Fail("Cannot find block device %s", disk)
2374
  else:
2375
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2376
          disk.unique_id, disk.dev_type)
2377

    
2378

    
2379
def FinalizeExport(instance, snap_disks):
2380
  """Write out the export configuration information.
2381

2382
  @type instance: L{objects.Instance}
2383
  @param instance: the instance which we export, used for
2384
      saving configuration
2385
  @type snap_disks: list of L{objects.Disk}
2386
  @param snap_disks: list of snapshot block devices, which
2387
      will be used to get the actual name of the dump file
2388

2389
  @rtype: None
2390

2391
  """
2392
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2393
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2394

    
2395
  config = objects.SerializableConfigParser()
2396

    
2397
  config.add_section(constants.INISECT_EXP)
2398
  config.set(constants.INISECT_EXP, "version", "0")
2399
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2400
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2401
  config.set(constants.INISECT_EXP, "os", instance.os)
2402
  config.set(constants.INISECT_EXP, "compression", "none")
2403

    
2404
  config.add_section(constants.INISECT_INS)
2405
  config.set(constants.INISECT_INS, "name", instance.name)
2406
  config.set(constants.INISECT_INS, "memory", "%d" %
2407
             instance.beparams[constants.BE_MEMORY])
2408
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2409
             instance.beparams[constants.BE_VCPUS])
2410
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2411
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2412
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2413

    
2414
  nic_total = 0
2415
  for nic_count, nic in enumerate(instance.nics):
2416
    nic_total += 1
2417
    config.set(constants.INISECT_INS, "nic%d_mac" %
2418
               nic_count, "%s" % nic.mac)
2419
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2420
    for param in constants.NICS_PARAMETER_TYPES:
2421
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2422
                 "%s" % nic.nicparams.get(param, None))
2423
  # TODO: redundant: on load can read nics until it doesn't exist
2424
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2425

    
2426
  disk_total = 0
2427
  for disk_count, disk in enumerate(snap_disks):
2428
    if disk:
2429
      disk_total += 1
2430
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2431
                 ("%s" % disk.iv_name))
2432
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2433
                 ("%s" % disk.physical_id[1]))
2434
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2435
                 ("%d" % disk.size))
2436

    
2437
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2438

    
2439
  # New-style hypervisor/backend parameters
2440

    
2441
  config.add_section(constants.INISECT_HYP)
2442
  for name, value in instance.hvparams.items():
2443
    if name not in constants.HVC_GLOBALS:
2444
      config.set(constants.INISECT_HYP, name, str(value))
2445

    
2446
  config.add_section(constants.INISECT_BEP)
2447
  for name, value in instance.beparams.items():
2448
    config.set(constants.INISECT_BEP, name, str(value))
2449

    
2450
  config.add_section(constants.INISECT_OSP)
2451
  for name, value in instance.osparams.items():
2452
    config.set(constants.INISECT_OSP, name, str(value))
2453

    
2454
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2455
                  data=config.Dumps())
2456
  shutil.rmtree(finaldestdir, ignore_errors=True)
2457
  shutil.move(destdir, finaldestdir)
2458

    
2459

    
2460
def ExportInfo(dest):
2461
  """Get export configuration information.
2462

2463
  @type dest: str
2464
  @param dest: directory containing the export
2465

2466
  @rtype: L{objects.SerializableConfigParser}
2467
  @return: a serializable config file containing the
2468
      export info
2469

2470
  """
2471
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2472

    
2473
  config = objects.SerializableConfigParser()
2474
  config.read(cff)
2475

    
2476
  if (not config.has_section(constants.INISECT_EXP) or
2477
      not config.has_section(constants.INISECT_INS)):
2478
    _Fail("Export info file doesn't have the required fields")
2479

    
2480
  return config.Dumps()
2481

    
2482

    
2483
def ListExports():
2484
  """Return a list of exports currently available on this machine.
2485

2486
  @rtype: list
2487
  @return: list of the exports
2488

2489
  """
2490
  if os.path.isdir(constants.EXPORT_DIR):
2491
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2492
  else:
2493
    _Fail("No exports directory")
2494

    
2495

    
2496
def RemoveExport(export):
2497
  """Remove an existing export from the node.
2498

2499
  @type export: str
2500
  @param export: the name of the export to remove
2501
  @rtype: None
2502

2503
  """
2504
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2505

    
2506
  try:
2507
    shutil.rmtree(target)
2508
  except EnvironmentError, err:
2509
    _Fail("Error while removing the export: %s", err, exc=True)
2510

    
2511

    
2512
def BlockdevRename(devlist):
2513
  """Rename a list of block devices.
2514

2515
  @type devlist: list of tuples
2516
  @param devlist: list of tuples of the form  (disk,
2517
      new_logical_id, new_physical_id); disk is an
2518
      L{objects.Disk} object describing the current disk,
2519
      and new logical_id/physical_id is the name we
2520
      rename it to
2521
  @rtype: boolean
2522
  @return: True if all renames succeeded, False otherwise
2523

2524
  """
2525
  msgs = []
2526
  result = True
2527
  for disk, unique_id in devlist:
2528
    dev = _RecursiveFindBD(disk)
2529
    if dev is None:
2530
      msgs.append("Can't find device %s in rename" % str(disk))
2531
      result = False
2532
      continue
2533
    try:
2534
      old_rpath = dev.dev_path
2535
      dev.Rename(unique_id)
2536
      new_rpath = dev.dev_path
2537
      if old_rpath != new_rpath:
2538
        DevCacheManager.RemoveCache(old_rpath)
2539
        # FIXME: we should add the new cache information here, like:
2540
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2541
        # but we don't have the owner here - maybe parse from existing
2542
        # cache? for now, we only lose lvm data when we rename, which
2543
        # is less critical than DRBD or MD
2544
    except errors.BlockDeviceError, err:
2545
      msgs.append("Can't rename device '%s' to '%s': %s" %
2546
                  (dev, unique_id, err))
2547
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2548
      result = False
2549
  if not result:
2550
    _Fail("; ".join(msgs))
2551

    
2552

    
2553
def _TransformFileStorageDir(fs_dir):
2554
  """Checks whether given file_storage_dir is valid.
2555

2556
  Checks wheter the given fs_dir is within the cluster-wide default
2557
  file_storage_dir or the shared_file_storage_dir, which are stored in
2558
  SimpleStore. Only paths under those directories are allowed.
2559

2560
  @type fs_dir: str
2561
  @param fs_dir: the path to check
2562

2563
  @return: the normalized path if valid, None otherwise
2564

2565
  """
2566
  if not constants.ENABLE_FILE_STORAGE:
2567
    _Fail("File storage disabled at configure time")
2568
  cfg = _GetConfig()
2569
  fs_dir = os.path.normpath(fs_dir)
2570
  base_fstore = cfg.GetFileStorageDir()
2571
  base_shared = cfg.GetSharedFileStorageDir()
2572
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2573
          utils.IsBelowDir(base_shared, fs_dir)):
2574
    _Fail("File storage directory '%s' is not under base file"
2575
          " storage directory '%s' or shared storage directory '%s'",
2576
          fs_dir, base_fstore, base_shared)
2577
  return fs_dir
2578

    
2579

    
2580
def CreateFileStorageDir(file_storage_dir):
2581
  """Create file storage directory.
2582

2583
  @type file_storage_dir: str
2584
  @param file_storage_dir: directory to create
2585

2586
  @rtype: tuple
2587
  @return: tuple with first element a boolean indicating wheter dir
2588
      creation was successful or not
2589

2590
  """
2591
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2592
  if os.path.exists(file_storage_dir):
2593
    if not os.path.isdir(file_storage_dir):
2594
      _Fail("Specified storage dir '%s' is not a directory",
2595
            file_storage_dir)
2596
  else:
2597
    try:
2598
      os.makedirs(file_storage_dir, 0750)
2599
    except OSError, err:
2600
      _Fail("Cannot create file storage directory '%s': %s",
2601
            file_storage_dir, err, exc=True)
2602

    
2603

    
2604
def RemoveFileStorageDir(file_storage_dir):
2605
  """Remove file storage directory.
2606

2607
  Remove it only if it's empty. If not log an error and return.
2608

2609
  @type file_storage_dir: str
2610
  @param file_storage_dir: the directory we should cleanup
2611
  @rtype: tuple (success,)
2612
  @return: tuple of one element, C{success}, denoting
2613
      whether the operation was successful
2614

2615
  """
2616
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2617
  if os.path.exists(file_storage_dir):
2618
    if not os.path.isdir(file_storage_dir):
2619
      _Fail("Specified Storage directory '%s' is not a directory",
2620
            file_storage_dir)
2621
    # deletes dir only if empty, otherwise we want to fail the rpc call
2622
    try:
2623
      os.rmdir(file_storage_dir)
2624
    except OSError, err:
2625
      _Fail("Cannot remove file storage directory '%s': %s",
2626
            file_storage_dir, err)
2627

    
2628

    
2629
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2630
  """Rename the file storage directory.
2631

2632
  @type old_file_storage_dir: str
2633
  @param old_file_storage_dir: the current path
2634
  @type new_file_storage_dir: str
2635
  @param new_file_storage_dir: the name we should rename to
2636
  @rtype: tuple (success,)
2637
  @return: tuple of one element, C{success}, denoting
2638
      whether the operation was successful
2639

2640
  """
2641
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2642
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2643
  if not os.path.exists(new_file_storage_dir):
2644
    if os.path.isdir(old_file_storage_dir):
2645
      try:
2646
        os.rename(old_file_storage_dir, new_file_storage_dir)
2647
      except OSError, err:
2648
        _Fail("Cannot rename '%s' to '%s': %s",
2649
              old_file_storage_dir, new_file_storage_dir, err)
2650
    else:
2651
      _Fail("Specified storage dir '%s' is not a directory",
2652
            old_file_storage_dir)
2653
  else:
2654
    if os.path.exists(old_file_storage_dir):
2655
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2656
            old_file_storage_dir, new_file_storage_dir)
2657

    
2658

    
2659
def _EnsureJobQueueFile(file_name):
2660
  """Checks whether the given filename is in the queue directory.
2661

2662
  @type file_name: str
2663
  @param file_name: the file name we should check
2664
  @rtype: None
2665
  @raises RPCFail: if the file is not valid
2666

2667
  """
2668
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2669
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2670

    
2671
  if not result:
2672
    _Fail("Passed job queue file '%s' does not belong to"
2673
          " the queue directory '%s'", file_name, queue_dir)
2674

    
2675

    
2676
def JobQueueUpdate(file_name, content):
2677
  """Updates a file in the queue directory.
2678

2679
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2680
  checking.
2681

2682
  @type file_name: str
2683
  @param file_name: the job file name
2684
  @type content: str
2685
  @param content: the new job contents
2686
  @rtype: boolean
2687
  @return: the success of the operation
2688

2689
  """
2690
  _EnsureJobQueueFile(file_name)
2691
  getents = runtime.GetEnts()
2692

    
2693
  # Write and replace the file atomically
2694
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2695
                  gid=getents.masterd_gid)
2696

    
2697

    
2698
def JobQueueRename(old, new):
2699
  """Renames a job queue file.
2700

2701
  This is just a wrapper over os.rename with proper checking.
2702

2703
  @type old: str
2704
  @param old: the old (actual) file name
2705
  @type new: str
2706
  @param new: the desired file name
2707
  @rtype: tuple
2708
  @return: the success of the operation and payload
2709

2710
  """
2711
  _EnsureJobQueueFile(old)
2712
  _EnsureJobQueueFile(new)
2713

    
2714
  getents = runtime.GetEnts()
2715

    
2716
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2717
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2718

    
2719

    
2720
def BlockdevClose(instance_name, disks):
2721
  """Closes the given block devices.
2722

2723
  This means they will be switched to secondary mode (in case of
2724
  DRBD).
2725

2726
  @param instance_name: if the argument is not empty, the symlinks
2727
      of this instance will be removed
2728
  @type disks: list of L{objects.Disk}
2729
  @param disks: the list of disks to be closed
2730
  @rtype: tuple (success, message)
2731
  @return: a tuple of success and message, where success
2732
      indicates the succes of the operation, and message
2733
      which will contain the error details in case we
2734
      failed
2735

2736
  """
2737
  bdevs = []
2738
  for cf in disks:
2739
    rd = _RecursiveFindBD(cf)
2740
    if rd is None:
2741
      _Fail("Can't find device %s", cf)
2742
    bdevs.append(rd)
2743

    
2744
  msg = []
2745
  for rd in bdevs:
2746
    try:
2747
      rd.Close()
2748
    except errors.BlockDeviceError, err:
2749
      msg.append(str(err))
2750
  if msg:
2751
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2752
  else:
2753
    if instance_name:
2754
      _RemoveBlockDevLinks(instance_name, disks)
2755

    
2756

    
2757
def ValidateHVParams(hvname, hvparams):
2758
  """Validates the given hypervisor parameters.
2759

2760
  @type hvname: string
2761
  @param hvname: the hypervisor name
2762
  @type hvparams: dict
2763
  @param hvparams: the hypervisor parameters to be validated
2764
  @rtype: None
2765

2766
  """
2767
  try:
2768
    hv_type = hypervisor.GetHypervisor(hvname)
2769
    hv_type.ValidateParameters(hvparams)
2770
  except errors.HypervisorError, err:
2771
    _Fail(str(err), log=False)
2772

    
2773

    
2774
def _CheckOSPList(os_obj, parameters):
2775
  """Check whether a list of parameters is supported by the OS.
2776

2777
  @type os_obj: L{objects.OS}
2778
  @param os_obj: OS object to check
2779
  @type parameters: list
2780
  @param parameters: the list of parameters to check
2781

2782
  """
2783
  supported = [v[0] for v in os_obj.supported_parameters]
2784
  delta = frozenset(parameters).difference(supported)
2785
  if delta:
2786
    _Fail("The following parameters are not supported"
2787
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2788

    
2789

    
2790
def ValidateOS(required, osname, checks, osparams):
2791
  """Validate the given OS' parameters.
2792

2793
  @type required: boolean
2794
  @param required: whether absence of the OS should translate into
2795
      failure or not
2796
  @type osname: string
2797
  @param osname: the OS to be validated
2798
  @type checks: list
2799
  @param checks: list of the checks to run (currently only 'parameters')
2800
  @type osparams: dict
2801
  @param osparams: dictionary with OS parameters
2802
  @rtype: boolean
2803
  @return: True if the validation passed, or False if the OS was not
2804
      found and L{required} was false
2805

2806
  """
2807
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2808
    _Fail("Unknown checks required for OS %s: %s", osname,
2809
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2810

    
2811
  name_only = objects.OS.GetName(osname)
2812
  status, tbv = _TryOSFromDisk(name_only, None)
2813

    
2814
  if not status:
2815
    if required:
2816
      _Fail(tbv)
2817
    else:
2818
      return False
2819

    
2820
  if max(tbv.api_versions) < constants.OS_API_V20:
2821
    return True
2822

    
2823
  if constants.OS_VALIDATE_PARAMETERS in checks:
2824
    _CheckOSPList(tbv, osparams.keys())
2825

    
2826
  validate_env = OSCoreEnv(osname, tbv, osparams)
2827
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2828
                        cwd=tbv.path, reset_env=True)
2829
  if result.failed:
2830
    logging.error("os validate command '%s' returned error: %s output: %s",
2831
                  result.cmd, result.fail_reason, result.output)
2832
    _Fail("OS validation script failed (%s), output: %s",
2833
          result.fail_reason, result.output, log=False)
2834

    
2835
  return True
2836

    
2837

    
2838
def DemoteFromMC():
2839
  """Demotes the current node from master candidate role.
2840

2841
  """
2842
  # try to ensure we're not the master by mistake
2843
  master, myself = ssconf.GetMasterAndMyself()
2844
  if master == myself:
2845
    _Fail("ssconf status shows I'm the master node, will not demote")
2846

    
2847
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2848
  if not result.failed:
2849
    _Fail("The master daemon is running, will not demote")
2850

    
2851
  try:
2852
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2853
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2854
  except EnvironmentError, err:
2855
    if err.errno != errno.ENOENT:
2856
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2857

    
2858
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2859

    
2860

    
2861
def _GetX509Filenames(cryptodir, name):
2862
  """Returns the full paths for the private key and certificate.
2863

2864
  """
2865
  return (utils.PathJoin(cryptodir, name),
2866
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2867
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2868

    
2869

    
2870
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2871
  """Creates a new X509 certificate for SSL/TLS.
2872

2873
  @type validity: int
2874
  @param validity: Validity in seconds
2875
  @rtype: tuple; (string, string)
2876
  @return: Certificate name and public part
2877

2878
  """
2879
  (key_pem, cert_pem) = \
2880
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2881
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2882

    
2883
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2884
                              prefix="x509-%s-" % utils.TimestampForFilename())
2885
  try:
2886
    name = os.path.basename(cert_dir)
2887
    assert len(name) > 5
2888

    
2889
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2890

    
2891
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2892
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2893

    
2894
    # Never return private key as it shouldn't leave the node
2895
    return (name, cert_pem)
2896
  except Exception:
2897
    shutil.rmtree(cert_dir, ignore_errors=True)
2898
    raise
2899

    
2900

    
2901
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2902
  """Removes a X509 certificate.
2903

2904
  @type name: string
2905
  @param name: Certificate name
2906

2907
  """
2908
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2909

    
2910
  utils.RemoveFile(key_file)
2911
  utils.RemoveFile(cert_file)
2912

    
2913
  try:
2914
    os.rmdir(cert_dir)
2915
  except EnvironmentError, err:
2916
    _Fail("Cannot remove certificate directory '%s': %s",
2917
          cert_dir, err)
2918

    
2919

    
2920
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2921
  """Returns the command for the requested input/output.
2922

2923
  @type instance: L{objects.Instance}
2924
  @param instance: The instance object
2925
  @param mode: Import/export mode
2926
  @param ieio: Input/output type
2927
  @param ieargs: Input/output arguments
2928

2929
  """
2930
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2931

    
2932
  env = None
2933
  prefix = None
2934
  suffix = None
2935
  exp_size = None
2936

    
2937
  if ieio == constants.IEIO_FILE:
2938
    (filename, ) = ieargs
2939

    
2940
    if not utils.IsNormAbsPath(filename):
2941
      _Fail("Path '%s' is not normalized or absolute", filename)
2942

    
2943
    real_filename = os.path.realpath(filename)
2944
    directory = os.path.dirname(real_filename)
2945

    
2946
    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2947
      _Fail("File '%s' is not under exports directory '%s': %s",
2948
            filename, constants.EXPORT_DIR, real_filename)
2949

    
2950
    # Create directory
2951
    utils.Makedirs(directory, mode=0750)
2952

    
2953
    quoted_filename = utils.ShellQuote(filename)
2954

    
2955
    if mode == constants.IEM_IMPORT:
2956
      suffix = "> %s" % quoted_filename
2957
    elif mode == constants.IEM_EXPORT:
2958
      suffix = "< %s" % quoted_filename
2959

    
2960
      # Retrieve file size
2961
      try:
2962
        st = os.stat(filename)
2963
      except EnvironmentError, err:
2964
        logging.error("Can't stat(2) %s: %s", filename, err)
2965
      else:
2966
        exp_size = utils.BytesToMebibyte(st.st_size)
2967

    
2968
  elif ieio == constants.IEIO_RAW_DISK:
2969
    (disk, ) = ieargs
2970

    
2971
    real_disk = _OpenRealBD(disk)
2972

    
2973
    if mode == constants.IEM_IMPORT:
2974
      # we set here a smaller block size as, due to transport buffering, more
2975
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2976
      # is not already there or we pass a wrong path; we use notrunc to no
2977
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2978
      # much memory; this means that at best, we flush every 64k, which will
2979
      # not be very fast
2980
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2981
                                    " bs=%s oflag=dsync"),
2982
                                    real_disk.dev_path,
2983
                                    str(64 * 1024))
2984

    
2985
    elif mode == constants.IEM_EXPORT:
2986
      # the block size on the read dd is 1MiB to match our units
2987
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2988
                                   real_disk.dev_path,
2989
                                   str(1024 * 1024), # 1 MB
2990
                                   str(disk.size))
2991
      exp_size = disk.size
2992

    
2993
  elif ieio == constants.IEIO_SCRIPT:
2994
    (disk, disk_index, ) = ieargs
2995

    
2996
    assert isinstance(disk_index, (int, long))
2997

    
2998
    real_disk = _OpenRealBD(disk)
2999

    
3000
    inst_os = OSFromDisk(instance.os)
3001
    env = OSEnvironment(instance, inst_os)
3002

    
3003
    if mode == constants.IEM_IMPORT:
3004
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3005
      env["IMPORT_INDEX"] = str(disk_index)
3006
      script = inst_os.import_script
3007

    
3008
    elif mode == constants.IEM_EXPORT:
3009
      env["EXPORT_DEVICE"] = real_disk.dev_path
3010
      env["EXPORT_INDEX"] = str(disk_index)
3011
      script = inst_os.export_script
3012

    
3013
    # TODO: Pass special environment only to script
3014
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3015

    
3016
    if mode == constants.IEM_IMPORT:
3017
      suffix = "| %s" % script_cmd
3018

    
3019
    elif mode == constants.IEM_EXPORT:
3020
      prefix = "%s |" % script_cmd
3021

    
3022
    # Let script predict size
3023
    exp_size = constants.IE_CUSTOM_SIZE
3024

    
3025
  else:
3026
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3027

    
3028
  return (env, prefix, suffix, exp_size)
3029

    
3030

    
3031
def _CreateImportExportStatusDir(prefix):
3032
  """Creates status directory for import/export.
3033

3034
  """
3035
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3036
                          prefix=("%s-%s-" %
3037
                                  (prefix, utils.TimestampForFilename())))
3038

    
3039

    
3040
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3041
                            ieio, ieioargs):
3042
  """Starts an import or export daemon.
3043

3044
  @param mode: Import/output mode
3045
  @type opts: L{objects.ImportExportOptions}
3046
  @param opts: Daemon options
3047
  @type host: string
3048
  @param host: Remote host for export (None for import)
3049
  @type port: int
3050
  @param port: Remote port for export (None for import)
3051
  @type instance: L{objects.Instance}
3052
  @param instance: Instance object
3053
  @type component: string
3054
  @param component: which part of the instance is transferred now,
3055
      e.g. 'disk/0'
3056
  @param ieio: Input/output type
3057
  @param ieioargs: Input/output arguments
3058

3059
  """
3060
  if mode == constants.IEM_IMPORT:
3061
    prefix = "import"
3062

    
3063
    if not (host is None and port is None):
3064
      _Fail("Can not specify host or port on import")
3065

    
3066
  elif mode == constants.IEM_EXPORT:
3067
    prefix = "export"
3068

    
3069
    if host is None or port is None:
3070
      _Fail("Host and port must be specified for an export")
3071

    
3072
  else:
3073
    _Fail("Invalid mode %r", mode)
3074

    
3075
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3076
    _Fail("Cluster certificate can only be used for both key and CA")
3077

    
3078
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3079
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3080

    
3081
  if opts.key_name is None:
3082
    # Use server.pem
3083
    key_path = constants.NODED_CERT_FILE
3084
    cert_path = constants.NODED_CERT_FILE
3085
    assert opts.ca_pem is None
3086
  else:
3087
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3088
                                                 opts.key_name)
3089
    assert opts.ca_pem is not None
3090

    
3091
  for i in [key_path, cert_path]:
3092
    if not os.path.exists(i):
3093
      _Fail("File '%s' does not exist" % i)
3094

    
3095
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3096
  try:
3097
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3098
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3099
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3100

    
3101
    if opts.ca_pem is None:
3102
      # Use server.pem
3103
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
3104
    else:
3105
      ca = opts.ca_pem
3106

    
3107
    # Write CA file
3108
    utils.WriteFile(ca_file, data=ca, mode=0400)
3109

    
3110
    cmd = [
3111
      constants.IMPORT_EXPORT_DAEMON,
3112
      status_file, mode,
3113
      "--key=%s" % key_path,
3114
      "--cert=%s" % cert_path,
3115
      "--ca=%s" % ca_file,
3116
      ]
3117

    
3118
    if host:
3119
      cmd.append("--host=%s" % host)
3120

    
3121
    if port:
3122
      cmd.append("--port=%s" % port)
3123

    
3124
    if opts.ipv6:
3125
      cmd.append("--ipv6")
3126
    else:
3127
      cmd.append("--ipv4")
3128

    
3129
    if opts.compress:
3130
      cmd.append("--compress=%s" % opts.compress)
3131

    
3132
    if opts.magic:
3133
      cmd.append("--magic=%s" % opts.magic)
3134

    
3135
    if exp_size is not None:
3136
      cmd.append("--expected-size=%s" % exp_size)
3137

    
3138
    if cmd_prefix:
3139
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3140

    
3141
    if cmd_suffix:
3142
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3143

    
3144
    if mode == constants.IEM_EXPORT:
3145
      # Retry connection a few times when connecting to remote peer
3146
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3147
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3148
    elif opts.connect_timeout is not None:
3149
      assert mode == constants.IEM_IMPORT
3150
      # Overall timeout for establishing connection while listening
3151
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3152

    
3153
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3154

    
3155
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3156
    # support for receiving a file descriptor for output
3157
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3158
                      output=logfile)
3159

    
3160
    # The import/export name is simply the status directory name
3161
    return os.path.basename(status_dir)
3162

    
3163
  except Exception:
3164
    shutil.rmtree(status_dir, ignore_errors=True)
3165
    raise
3166

    
3167

    
3168
def GetImportExportStatus(names):
3169
  """Returns import/export daemon status.
3170

3171
  @type names: sequence
3172
  @param names: List of names
3173
  @rtype: List of dicts
3174
  @return: Returns a list of the state of each named import/export or None if a
3175
           status couldn't be read
3176

3177
  """
3178
  result = []
3179

    
3180
  for name in names:
3181
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3182
                                 _IES_STATUS_FILE)
3183

    
3184
    try:
3185
      data = utils.ReadFile(status_file)
3186
    except EnvironmentError, err:
3187
      if err.errno != errno.ENOENT:
3188
        raise
3189
      data = None
3190

    
3191
    if not data:
3192
      result.append(None)
3193
      continue
3194

    
3195
    result.append(serializer.LoadJson(data))
3196

    
3197
  return result
3198

    
3199

    
3200
def AbortImportExport(name):
3201
  """Sends SIGTERM to a running import/export daemon.
3202

3203
  """
3204
  logging.info("Abort import/export %s", name)
3205

    
3206
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3207
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3208

    
3209
  if pid:
3210
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3211
                 name, pid)
3212
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3213

    
3214

    
3215
def CleanupImportExport(name):
3216
  """Cleanup after an import or export.
3217

3218
  If the import/export daemon is still running it's killed. Afterwards the
3219
  whole status directory is removed.
3220

3221
  """
3222
  logging.info("Finalizing import/export %s", name)
3223

    
3224
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3225

    
3226
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3227

    
3228
  if pid:
3229
    logging.info("Import/export %s is still running with PID %s",
3230
                 name, pid)
3231
    utils.KillProcess(pid, waitpid=False)
3232

    
3233
  shutil.rmtree(status_dir, ignore_errors=True)
3234

    
3235

    
3236
def _FindDisks(nodes_ip, disks):
3237
  """Sets the physical ID on disks and returns the block devices.
3238

3239
  """
3240
  # set the correct physical ID
3241
  my_name = netutils.Hostname.GetSysName()
3242
  for cf in disks:
3243
    cf.SetPhysicalID(my_name, nodes_ip)
3244

    
3245
  bdevs = []
3246

    
3247
  for cf in disks:
3248
    rd = _RecursiveFindBD(cf)
3249
    if rd is None:
3250
      _Fail("Can't find device %s", cf)
3251
    bdevs.append(rd)
3252
  return bdevs
3253

    
3254

    
3255
def DrbdDisconnectNet(nodes_ip, disks):
3256
  """Disconnects the network on a list of drbd devices.
3257

3258
  """
3259
  bdevs = _FindDisks(nodes_ip, disks)
3260

    
3261
  # disconnect disks
3262
  for rd in bdevs:
3263
    try:
3264
      rd.DisconnectNet()
3265
    except errors.BlockDeviceError, err:
3266
      _Fail("Can't change network configuration to standalone mode: %s",
3267
            err, exc=True)
3268

    
3269

    
3270
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3271
  """Attaches the network on a list of drbd devices.
3272

3273
  """
3274
  bdevs = _FindDisks(nodes_ip, disks)
3275

    
3276
  if multimaster:
3277
    for idx, rd in enumerate(bdevs):
3278
      try:
3279
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3280
      except EnvironmentError, err:
3281
        _Fail("Can't create symlink: %s", err)
3282
  # reconnect disks, switch to new master configuration and if
3283
  # needed primary mode
3284
  for rd in bdevs:
3285
    try:
3286
      rd.AttachNet(multimaster)
3287
    except errors.BlockDeviceError, err:
3288
      _Fail("Can't change network configuration: %s", err)
3289

    
3290
  # wait until the disks are connected; we need to retry the re-attach
3291
  # if the device becomes standalone, as this might happen if the one
3292
  # node disconnects and reconnects in a different mode before the
3293
  # other node reconnects; in this case, one or both of the nodes will
3294
  # decide it has wrong configuration and switch to standalone
3295

    
3296
  def _Attach():
3297
    all_connected = True
3298

    
3299
    for rd in bdevs:
3300
      stats = rd.GetProcStatus()
3301

    
3302
      all_connected = (all_connected and
3303
                       (stats.is_connected or stats.is_in_resync))
3304

    
3305
      if stats.is_standalone:
3306
        # peer had different config info and this node became
3307
        # standalone, even though this should not happen with the
3308
        # new staged way of changing disk configs
3309
        try:
3310
          rd.AttachNet(multimaster)
3311
        except errors.BlockDeviceError, err:
3312
          _Fail("Can't change network configuration: %s", err)
3313

    
3314
    if not all_connected:
3315
      raise utils.RetryAgain()
3316

    
3317
  try:
3318
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3319
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3320
  except utils.RetryTimeout:
3321
    _Fail("Timeout in disk reconnecting")
3322

    
3323
  if multimaster:
3324
    # change to primary mode
3325
    for rd in bdevs:
3326
      try:
3327
        rd.Open()
3328
      except errors.BlockDeviceError, err:
3329
        _Fail("Can't change to primary mode: %s", err)
3330

    
3331

    
3332
def DrbdWaitSync(nodes_ip, disks):
3333
  """Wait until DRBDs have synchronized.
3334

3335
  """
3336
  def _helper(rd):
3337
    stats = rd.GetProcStatus()
3338
    if not (stats.is_connected or stats.is_in_resync):
3339
      raise utils.RetryAgain()
3340
    return stats
3341

    
3342
  bdevs = _FindDisks(nodes_ip, disks)
3343

    
3344
  min_resync = 100
3345
  alldone = True
3346
  for rd in bdevs:
3347
    try:
3348
      # poll each second for 15 seconds
3349
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3350
    except utils.RetryTimeout:
3351
      stats = rd.GetProcStatus()
3352
      # last check
3353
      if not (stats.is_connected or stats.is_in_resync):
3354
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3355
    alldone = alldone and (not stats.is_in_resync)
3356
    if stats.sync_percent is not None:
3357
      min_resync = min(min_resync, stats.sync_percent)
3358

    
3359
  return (alldone, min_resync)
3360

    
3361

    
3362
def GetDrbdUsermodeHelper():
3363
  """Returns DRBD usermode helper currently configured.
3364

3365
  """
3366
  try:
3367
    return bdev.BaseDRBD.GetUsermodeHelper()
3368
  except errors.BlockDeviceError, err:
3369
    _Fail(str(err))
3370

    
3371

    
3372
def PowercycleNode(hypervisor_type):
3373
  """Hard-powercycle the node.
3374

3375
  Because we need to return first, and schedule the powercycle in the
3376
  background, we won't be able to report failures nicely.
3377

3378
  """
3379
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3380
  try:
3381
    pid = os.fork()
3382
  except OSError:
3383
    # if we can't fork, we'll pretend that we're in the child process
3384
    pid = 0
3385
  if pid > 0:
3386
    return "Reboot scheduled in 5 seconds"
3387
  # ensure the child is running on ram
3388
  try:
3389
    utils.Mlockall()
3390
  except Exception: # pylint: disable=W0703
3391
    pass
3392
  time.sleep(5)
3393
  hyper.PowercycleNode()
3394

    
3395

    
3396
class HooksRunner(object):
3397
  """Hook runner.
3398

3399
  This class is instantiated on the node side (ganeti-noded) and not
3400
  on the master side.
3401

3402
  """
3403
  def __init__(self, hooks_base_dir=None):
3404
    """Constructor for hooks runner.
3405

3406
    @type hooks_base_dir: str or None
3407
    @param hooks_base_dir: if not None, this overrides the
3408
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3409

3410
    """
3411
    if hooks_base_dir is None:
3412
      hooks_base_dir = constants.HOOKS_BASE_DIR
3413
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3414
    # constant
3415
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3416

    
3417
  def RunLocalHooks(self, node_list, hpath, phase, env):
3418
    """Check that the hooks will be run only locally and then run them.
3419

3420
    """
3421
    assert len(node_list) == 1
3422
    node = node_list[0]
3423
    _, myself = ssconf.GetMasterAndMyself()
3424
    assert node == myself
3425

    
3426
    results = self.RunHooks(hpath, phase, env)
3427

    
3428
    # Return values in the form expected by HooksMaster
3429
    return {node: (None, False, results)}
3430

    
3431
  def RunHooks(self, hpath, phase, env):
3432
    """Run the scripts in the hooks directory.
3433

3434
    @type hpath: str
3435
    @param hpath: the path to the hooks directory which
3436
        holds the scripts
3437
    @type phase: str
3438
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3439
        L{constants.HOOKS_PHASE_POST}
3440
    @type env: dict
3441
    @param env: dictionary with the environment for the hook
3442
    @rtype: list
3443
    @return: list of 3-element tuples:
3444
      - script path
3445
      - script result, either L{constants.HKR_SUCCESS} or
3446
        L{constants.HKR_FAIL}
3447
      - output of the script
3448

3449
    @raise errors.ProgrammerError: for invalid input
3450
        parameters
3451

3452
    """
3453
    if phase == constants.HOOKS_PHASE_PRE:
3454
      suffix = "pre"
3455
    elif phase == constants.HOOKS_PHASE_POST:
3456
      suffix = "post"
3457
    else:
3458
      _Fail("Unknown hooks phase '%s'", phase)
3459

    
3460
    subdir = "%s-%s.d" % (hpath, suffix)
3461
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3462

    
3463
    results = []
3464

    
3465
    if not os.path.isdir(dir_name):
3466
      # for non-existing/non-dirs, we simply exit instead of logging a
3467
      # warning at every operation
3468
      return results
3469

    
3470
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3471

    
3472
    for (relname, relstatus, runresult)  in runparts_results:
3473
      if relstatus == constants.RUNPARTS_SKIP:
3474
        rrval = constants.HKR_SKIP
3475
        output = ""
3476
      elif relstatus == constants.RUNPARTS_ERR:
3477
        rrval = constants.HKR_FAIL
3478
        output = "Hook script execution error: %s" % runresult
3479
      elif relstatus == constants.RUNPARTS_RUN:
3480
        if runresult.failed:
3481
          rrval = constants.HKR_FAIL
3482
        else:
3483
          rrval = constants.HKR_SUCCESS
3484
        output = utils.SafeEncode(runresult.output.strip())
3485
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3486

    
3487
    return results
3488

    
3489

    
3490
class IAllocatorRunner(object):
3491
  """IAllocator runner.
3492

3493
  This class is instantiated on the node side (ganeti-noded) and not on
3494
  the master side.
3495

3496
  """
3497
  @staticmethod
3498
  def Run(name, idata):
3499
    """Run an iallocator script.
3500

3501
    @type name: str
3502
    @param name: the iallocator script name
3503
    @type idata: str
3504
    @param idata: the allocator input data
3505

3506
    @rtype: tuple
3507
    @return: two element tuple of:
3508
       - status
3509
       - either error message or stdout of allocator (for success)
3510

3511
    """
3512
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3513
                                  os.path.isfile)
3514
    if alloc_script is None:
3515
      _Fail("iallocator module '%s' not found in the search path", name)
3516

    
3517
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3518
    try:
3519
      os.write(fd, idata)
3520
      os.close(fd)
3521
      result = utils.RunCmd([alloc_script, fin_name])
3522
      if result.failed:
3523
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3524
              name, result.fail_reason, result.output)
3525
    finally:
3526
      os.unlink(fin_name)
3527

    
3528
    return result.stdout
3529

    
3530

    
3531
class DevCacheManager(object):
3532
  """Simple class for managing a cache of block device information.
3533

3534
  """
3535
  _DEV_PREFIX = "/dev/"
3536
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3537

    
3538
  @classmethod
3539
  def _ConvertPath(cls, dev_path):
3540
    """Converts a /dev/name path to the cache file name.
3541

3542
    This replaces slashes with underscores and strips the /dev
3543
    prefix. It then returns the full path to the cache file.
3544

3545
    @type dev_path: str
3546
    @param dev_path: the C{/dev/} path name
3547
    @rtype: str
3548
    @return: the converted path name
3549

3550
    """
3551
    if dev_path.startswith(cls._DEV_PREFIX):
3552
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3553
    dev_path = dev_path.replace("/", "_")
3554
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3555
    return fpath
3556

    
3557
  @classmethod
3558
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3559
    """Updates the cache information for a given device.
3560

3561
    @type dev_path: str
3562
    @param dev_path: the pathname of the device
3563
    @type owner: str
3564
    @param owner: the owner (instance name) of the device
3565
    @type on_primary: bool
3566
    @param on_primary: whether this is the primary
3567
        node nor not
3568
    @type iv_name: str
3569
    @param iv_name: the instance-visible name of the
3570
        device, as in objects.Disk.iv_name
3571

3572
    @rtype: None
3573

3574
    """
3575
    if dev_path is None:
3576
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3577
      return
3578
    fpath = cls._ConvertPath(dev_path)
3579
    if on_primary:
3580
      state = "primary"
3581
    else:
3582
      state = "secondary"
3583
    if iv_name is None:
3584
      iv_name = "not_visible"
3585
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3586
    try:
3587
      utils.WriteFile(fpath, data=fdata)
3588
    except EnvironmentError, err:
3589
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3590

    
3591
  @classmethod
3592
  def RemoveCache(cls, dev_path):
3593
    """Remove data for a dev_path.
3594

3595
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3596
    path name and logging.
3597

3598
    @type dev_path: str
3599
    @param dev_path: the pathname of the device
3600

3601
    @rtype: None
3602

3603
    """
3604
    if dev_path is None:
3605
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3606
      return
3607
    fpath = cls._ConvertPath(dev_path)
3608
    try:
3609
      utils.RemoveFile(fpath)
3610
    except EnvironmentError, err:
3611
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)