Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 5a904197

History | View | Annotate | Download (142.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import os
41
import os.path
42
import shutil
43
import time
44
import stat
45
import errno
46
import re
47
import random
48
import logging
49
import tempfile
50
import zlib
51
import base64
52
import signal
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139
  @rtype: None
140

141
  """
142
  json = serializer.DumpJson(trail)
143
  filename = _GetInstReasonFilename(instance_name)
144
  utils.WriteFile(filename, data=json)
145

    
146

    
147
def _Fail(msg, *args, **kwargs):
148
  """Log an error and the raise an RPCFail exception.
149

150
  This exception is then handled specially in the ganeti daemon and
151
  turned into a 'failed' return type. As such, this function is a
152
  useful shortcut for logging the error and returning it to the master
153
  daemon.
154

155
  @type msg: string
156
  @param msg: the text of the exception
157
  @raise RPCFail
158

159
  """
160
  if args:
161
    msg = msg % args
162
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
163
    if "exc" in kwargs and kwargs["exc"]:
164
      logging.exception(msg)
165
    else:
166
      logging.error(msg)
167
  raise RPCFail(msg)
168

    
169

    
170
def _GetConfig():
171
  """Simple wrapper to return a SimpleStore.
172

173
  @rtype: L{ssconf.SimpleStore}
174
  @return: a SimpleStore instance
175

176
  """
177
  return ssconf.SimpleStore()
178

    
179

    
180
def _GetSshRunner(cluster_name):
181
  """Simple wrapper to return an SshRunner.
182

183
  @type cluster_name: str
184
  @param cluster_name: the cluster name, which is needed
185
      by the SshRunner constructor
186
  @rtype: L{ssh.SshRunner}
187
  @return: an SshRunner instance
188

189
  """
190
  return ssh.SshRunner(cluster_name)
191

    
192

    
193
def _Decompress(data):
194
  """Unpacks data compressed by the RPC client.
195

196
  @type data: list or tuple
197
  @param data: Data sent by RPC client
198
  @rtype: str
199
  @return: Decompressed data
200

201
  """
202
  assert isinstance(data, (list, tuple))
203
  assert len(data) == 2
204
  (encoding, content) = data
205
  if encoding == constants.RPC_ENCODING_NONE:
206
    return content
207
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
208
    return zlib.decompress(base64.b64decode(content))
209
  else:
210
    raise AssertionError("Unknown data encoding")
211

    
212

    
213
def _CleanDirectory(path, exclude=None):
214
  """Removes all regular files in a directory.
215

216
  @type path: str
217
  @param path: the directory to clean
218
  @type exclude: list
219
  @param exclude: list of files to be excluded, defaults
220
      to the empty list
221

222
  """
223
  if path not in _ALLOWED_CLEAN_DIRS:
224
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
225
          path)
226

    
227
  if not os.path.isdir(path):
228
    return
229
  if exclude is None:
230
    exclude = []
231
  else:
232
    # Normalize excluded paths
233
    exclude = [os.path.normpath(i) for i in exclude]
234

    
235
  for rel_name in utils.ListVisibleFiles(path):
236
    full_name = utils.PathJoin(path, rel_name)
237
    if full_name in exclude:
238
      continue
239
    if os.path.isfile(full_name) and not os.path.islink(full_name):
240
      utils.RemoveFile(full_name)
241

    
242

    
243
def _BuildUploadFileList():
244
  """Build the list of allowed upload files.
245

246
  This is abstracted so that it's built only once at module import time.
247

248
  """
249
  allowed_files = set([
250
    pathutils.CLUSTER_CONF_FILE,
251
    pathutils.ETC_HOSTS,
252
    pathutils.SSH_KNOWN_HOSTS_FILE,
253
    pathutils.VNC_PASSWORD_FILE,
254
    pathutils.RAPI_CERT_FILE,
255
    pathutils.SPICE_CERT_FILE,
256
    pathutils.SPICE_CACERT_FILE,
257
    pathutils.RAPI_USERS_FILE,
258
    pathutils.CONFD_HMAC_KEY,
259
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
260
    ])
261

    
262
  for hv_name in constants.HYPER_TYPES:
263
    hv_class = hypervisor.GetHypervisorClass(hv_name)
264
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
265

    
266
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
267
    "Allowed file storage paths should never be uploaded via RPC"
268

    
269
  return frozenset(allowed_files)
270

    
271

    
272
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273

    
274

    
275
def JobQueuePurge():
276
  """Removes job queue files and archived jobs.
277

278
  @rtype: tuple
279
  @return: True, None
280

281
  """
282
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
283
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284

    
285

    
286
def GetMasterInfo():
287
  """Returns master information.
288

289
  This is an utility function to compute master information, either
290
  for consumption here or from the node daemon.
291

292
  @rtype: tuple
293
  @return: master_netdev, master_ip, master_name, primary_ip_family,
294
    master_netmask
295
  @raise RPCFail: in case of errors
296

297
  """
298
  try:
299
    cfg = _GetConfig()
300
    master_netdev = cfg.GetMasterNetdev()
301
    master_ip = cfg.GetMasterIP()
302
    master_netmask = cfg.GetMasterNetmask()
303
    master_node = cfg.GetMasterNode()
304
    primary_ip_family = cfg.GetPrimaryIPFamily()
305
  except errors.ConfigurationError, err:
306
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
307
  return (master_netdev, master_ip, master_node, primary_ip_family,
308
          master_netmask)
309

    
310

    
311
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
312
  """Decorator that runs hooks before and after the decorated function.
313

314
  @type hook_opcode: string
315
  @param hook_opcode: opcode of the hook
316
  @type hooks_path: string
317
  @param hooks_path: path of the hooks
318
  @type env_builder_fn: function
319
  @param env_builder_fn: function that returns a dictionary containing the
320
    environment variables for the hooks. Will get all the parameters of the
321
    decorated function.
322
  @raise RPCFail: in case of pre-hook failure
323

324
  """
325
  def decorator(fn):
326
    def wrapper(*args, **kwargs):
327
      _, myself = ssconf.GetMasterAndMyself()
328
      nodes = ([myself], [myself])  # these hooks run locally
329

    
330
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
331

    
332
      cfg = _GetConfig()
333
      hr = HooksRunner()
334
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
335
                                   hr.RunLocalHooks, None, env_fn,
336
                                   logging.warning, cfg.GetClusterName(),
337
                                   cfg.GetMasterNode())
338
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
339
      result = fn(*args, **kwargs)
340
      hm.RunPhase(constants.HOOKS_PHASE_POST)
341

    
342
      return result
343
    return wrapper
344
  return decorator
345

    
346

    
347
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
348
  """Builds environment variables for master IP hooks.
349

350
  @type master_params: L{objects.MasterNetworkParameters}
351
  @param master_params: network parameters of the master
352
  @type use_external_mip_script: boolean
353
  @param use_external_mip_script: whether to use an external master IP
354
    address setup script (unused, but necessary per the implementation of the
355
    _RunLocalHooks decorator)
356

357
  """
358
  # pylint: disable=W0613
359
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
360
  env = {
361
    "MASTER_NETDEV": master_params.netdev,
362
    "MASTER_IP": master_params.ip,
363
    "MASTER_NETMASK": str(master_params.netmask),
364
    "CLUSTER_IP_VERSION": str(ver),
365
  }
366

    
367
  return env
368

    
369

    
370
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
371
  """Execute the master IP address setup script.
372

373
  @type master_params: L{objects.MasterNetworkParameters}
374
  @param master_params: network parameters of the master
375
  @type action: string
376
  @param action: action to pass to the script. Must be one of
377
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
378
  @type use_external_mip_script: boolean
379
  @param use_external_mip_script: whether to use an external master IP
380
    address setup script
381
  @raise backend.RPCFail: if there are errors during the execution of the
382
    script
383

384
  """
385
  env = _BuildMasterIpEnv(master_params)
386

    
387
  if use_external_mip_script:
388
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
389
  else:
390
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
391

    
392
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
393

    
394
  if result.failed:
395
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
396
          (action, result.exit_code, result.output), log=True)
397

    
398

    
399
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
400
               _BuildMasterIpEnv)
401
def ActivateMasterIp(master_params, use_external_mip_script):
402
  """Activate the IP address of the master daemon.
403

404
  @type master_params: L{objects.MasterNetworkParameters}
405
  @param master_params: network parameters of the master
406
  @type use_external_mip_script: boolean
407
  @param use_external_mip_script: whether to use an external master IP
408
    address setup script
409
  @raise RPCFail: in case of errors during the IP startup
410

411
  """
412
  _RunMasterSetupScript(master_params, _MASTER_START,
413
                        use_external_mip_script)
414

    
415

    
416
def StartMasterDaemons(no_voting):
417
  """Activate local node as master node.
418

419
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
420

421
  @type no_voting: boolean
422
  @param no_voting: whether to start ganeti-masterd without a node vote
423
      but still non-interactively
424
  @rtype: None
425

426
  """
427

    
428
  if no_voting:
429
    masterd_args = "--no-voting --yes-do-it"
430
  else:
431
    masterd_args = ""
432

    
433
  env = {
434
    "EXTRA_MASTERD_ARGS": masterd_args,
435
    }
436

    
437
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
438
  if result.failed:
439
    msg = "Can't start Ganeti master: %s" % result.output
440
    logging.error(msg)
441
    _Fail(msg)
442

    
443

    
444
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
445
               _BuildMasterIpEnv)
446
def DeactivateMasterIp(master_params, use_external_mip_script):
447
  """Deactivate the master IP on this node.
448

449
  @type master_params: L{objects.MasterNetworkParameters}
450
  @param master_params: network parameters of the master
451
  @type use_external_mip_script: boolean
452
  @param use_external_mip_script: whether to use an external master IP
453
    address setup script
454
  @raise RPCFail: in case of errors during the IP turndown
455

456
  """
457
  _RunMasterSetupScript(master_params, _MASTER_STOP,
458
                        use_external_mip_script)
459

    
460

    
461
def StopMasterDaemons():
462
  """Stop the master daemons on this node.
463

464
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
465

466
  @rtype: None
467

468
  """
469
  # TODO: log and report back to the caller the error failures; we
470
  # need to decide in which case we fail the RPC for this
471

    
472
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
473
  if result.failed:
474
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
475
                  " and error %s",
476
                  result.cmd, result.exit_code, result.output)
477

    
478

    
479
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
480
  """Change the netmask of the master IP.
481

482
  @param old_netmask: the old value of the netmask
483
  @param netmask: the new value of the netmask
484
  @param master_ip: the master IP
485
  @param master_netdev: the master network device
486

487
  """
488
  if old_netmask == netmask:
489
    return
490

    
491
  if not netutils.IPAddress.Own(master_ip):
492
    _Fail("The master IP address is not up, not attempting to change its"
493
          " netmask")
494

    
495
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
496
                         "%s/%s" % (master_ip, netmask),
497
                         "dev", master_netdev, "label",
498
                         "%s:0" % master_netdev])
499
  if result.failed:
500
    _Fail("Could not set the new netmask on the master IP address")
501

    
502
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
503
                         "%s/%s" % (master_ip, old_netmask),
504
                         "dev", master_netdev, "label",
505
                         "%s:0" % master_netdev])
506
  if result.failed:
507
    _Fail("Could not bring down the master IP address with the old netmask")
508

    
509

    
510
def EtcHostsModify(mode, host, ip):
511
  """Modify a host entry in /etc/hosts.
512

513
  @param mode: The mode to operate. Either add or remove entry
514
  @param host: The host to operate on
515
  @param ip: The ip associated with the entry
516

517
  """
518
  if mode == constants.ETC_HOSTS_ADD:
519
    if not ip:
520
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
521
              " present")
522
    utils.AddHostToEtcHosts(host, ip)
523
  elif mode == constants.ETC_HOSTS_REMOVE:
524
    if ip:
525
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
526
              " parameter is present")
527
    utils.RemoveHostFromEtcHosts(host)
528
  else:
529
    RPCFail("Mode not supported")
530

    
531

    
532
def LeaveCluster(modify_ssh_setup):
533
  """Cleans up and remove the current node.
534

535
  This function cleans up and prepares the current node to be removed
536
  from the cluster.
537

538
  If processing is successful, then it raises an
539
  L{errors.QuitGanetiException} which is used as a special case to
540
  shutdown the node daemon.
541

542
  @param modify_ssh_setup: boolean
543

544
  """
545
  _CleanDirectory(pathutils.DATA_DIR)
546
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
547
  JobQueuePurge()
548

    
549
  if modify_ssh_setup:
550
    try:
551
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
552

    
553
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
554

    
555
      utils.RemoveFile(priv_key)
556
      utils.RemoveFile(pub_key)
557
    except errors.OpExecError:
558
      logging.exception("Error while processing ssh files")
559

    
560
  try:
561
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
562
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
563
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
564
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
565
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
566
  except: # pylint: disable=W0702
567
    logging.exception("Error while removing cluster secrets")
568

    
569
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
570
  if result.failed:
571
    logging.error("Command %s failed with exitcode %s and error %s",
572
                  result.cmd, result.exit_code, result.output)
573

    
574
  # Raise a custom exception (handled in ganeti-noded)
575
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
576

    
577

    
578
def _CheckStorageParams(params, num_params):
579
  """Performs sanity checks for storage parameters.
580

581
  @type params: list
582
  @param params: list of storage parameters
583
  @type num_params: int
584
  @param num_params: expected number of parameters
585

586
  """
587
  if params is None:
588
    raise errors.ProgrammerError("No storage parameters for storage"
589
                                 " reporting is provided.")
590
  if not isinstance(params, list):
591
    raise errors.ProgrammerError("The storage parameters are not of type"
592
                                 " list: '%s'" % params)
593
  if not len(params) == num_params:
594
    raise errors.ProgrammerError("Did not receive the expected number of"
595
                                 "storage parameters: expected %s,"
596
                                 " received '%s'" % (num_params, len(params)))
597

    
598

    
599
def _CheckLvmStorageParams(params):
600
  """Performs sanity check for the 'exclusive storage' flag.
601

602
  @see: C{_CheckStorageParams}
603

604
  """
605
  _CheckStorageParams(params, 1)
606
  excl_stor = params[0]
607
  if not isinstance(params[0], bool):
608
    raise errors.ProgrammerError("Exclusive storage parameter is not"
609
                                 " boolean: '%s'." % excl_stor)
610
  return excl_stor
611

    
612

    
613
def _GetLvmVgSpaceInfo(name, params):
614
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
615

616
  @type name: string
617
  @param name: name of the volume group
618
  @type params: list
619
  @param params: list of storage parameters, which in this case should be
620
    containing only one for exclusive storage
621

622
  """
623
  excl_stor = _CheckLvmStorageParams(params)
624
  return _GetVgInfo(name, excl_stor)
625

    
626

    
627
def _GetVgInfo(
628
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
629
  """Retrieves information about a LVM volume group.
630

631
  """
632
  # TODO: GetVGInfo supports returning information for multiple VGs at once
633
  vginfo = info_fn([name], excl_stor)
634
  if vginfo:
635
    vg_free = int(round(vginfo[0][0], 0))
636
    vg_size = int(round(vginfo[0][1], 0))
637
  else:
638
    vg_free = None
639
    vg_size = None
640

    
641
  return {
642
    "type": constants.ST_LVM_VG,
643
    "name": name,
644
    "storage_free": vg_free,
645
    "storage_size": vg_size,
646
    }
647

    
648

    
649
def _GetLvmPvSpaceInfo(name, params):
650
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
651

652
  @see: C{_GetLvmVgSpaceInfo}
653

654
  """
655
  excl_stor = _CheckLvmStorageParams(params)
656
  return _GetVgSpindlesInfo(name, excl_stor)
657

    
658

    
659
def _GetVgSpindlesInfo(
660
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
661
  """Retrieves information about spindles in an LVM volume group.
662

663
  @type name: string
664
  @param name: VG name
665
  @type excl_stor: bool
666
  @param excl_stor: exclusive storage
667
  @rtype: dict
668
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
669
      free spindles, total spindles respectively
670

671
  """
672
  if excl_stor:
673
    (vg_free, vg_size) = info_fn(name)
674
  else:
675
    vg_free = 0
676
    vg_size = 0
677
  return {
678
    "type": constants.ST_LVM_PV,
679
    "name": name,
680
    "storage_free": vg_free,
681
    "storage_size": vg_size,
682
    }
683

    
684

    
685
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
686
  """Retrieves node information from a hypervisor.
687

688
  The information returned depends on the hypervisor. Common items:
689

690
    - vg_size is the size of the configured volume group in MiB
691
    - vg_free is the free size of the volume group in MiB
692
    - memory_dom0 is the memory allocated for domain0 in MiB
693
    - memory_free is the currently available (free) ram in MiB
694
    - memory_total is the total number of ram in MiB
695
    - hv_version: the hypervisor version, if available
696

697
  @type hvparams: dict of string
698
  @param hvparams: the hypervisor's hvparams
699

700
  """
701
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
702

    
703

    
704
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
705
  """Retrieves node information for all hypervisors.
706

707
  See C{_GetHvInfo} for information on the output.
708

709
  @type hv_specs: list of pairs (string, dict of strings)
710
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
711

712
  """
713
  if hv_specs is None:
714
    return None
715

    
716
  result = []
717
  for hvname, hvparams in hv_specs:
718
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
719
  return result
720

    
721

    
722
def _GetNamedNodeInfo(names, fn):
723
  """Calls C{fn} for all names in C{names} and returns a dictionary.
724

725
  @rtype: None or dict
726

727
  """
728
  if names is None:
729
    return None
730
  else:
731
    return map(fn, names)
732

    
733

    
734
def GetNodeInfo(storage_units, hv_specs):
735
  """Gives back a hash with different information about the node.
736

737
  @type storage_units: list of tuples (string, string, list)
738
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
739
    ask for disk space information. In case of lvm-vg, the identifier is
740
    the VG name. The parameters can contain additional, storage-type-specific
741
    parameters, for example exclusive storage for lvm storage.
742
  @type hv_specs: list of pairs (string, dict of strings)
743
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
744
  @rtype: tuple; (string, None/dict, None/dict)
745
  @return: Tuple containing boot ID, volume group information and hypervisor
746
    information
747

748
  """
749
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
750
  storage_info = _GetNamedNodeInfo(
751
    storage_units,
752
    (lambda (storage_type, storage_key, storage_params):
753
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
754
  hv_info = _GetHvInfoAll(hv_specs)
755
  return (bootid, storage_info, hv_info)
756

    
757

    
758
def _GetFileStorageSpaceInfo(path, params):
759
  """Wrapper around filestorage.GetSpaceInfo.
760

761
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
762
  and ignore the *args parameter to not leak it into the filestorage
763
  module's code.
764

765
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
766
    parameters.
767

768
  """
769
  _CheckStorageParams(params, 0)
770
  return filestorage.GetFileStorageSpaceInfo(path)
771

    
772

    
773
# FIXME: implement storage reporting for all missing storage types.
774
_STORAGE_TYPE_INFO_FN = {
775
  constants.ST_BLOCK: None,
776
  constants.ST_DISKLESS: None,
777
  constants.ST_EXT: None,
778
  constants.ST_FILE: _GetFileStorageSpaceInfo,
779
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
780
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
781
  constants.ST_SHARED_FILE: None,
782
  constants.ST_RADOS: None,
783
}
784

    
785

    
786
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
787
  """Looks up and applies the correct function to calculate free and total
788
  storage for the given storage type.
789

790
  @type storage_type: string
791
  @param storage_type: the storage type for which the storage shall be reported.
792
  @type storage_key: string
793
  @param storage_key: identifier of a storage unit, e.g. the volume group name
794
    of an LVM storage unit
795
  @type args: any
796
  @param args: various parameters that can be used for storage reporting. These
797
    parameters and their semantics vary from storage type to storage type and
798
    are just propagated in this function.
799
  @return: the results of the application of the storage space function (see
800
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
801
    storage type
802
  @raises NotImplementedError: for storage types who don't support space
803
    reporting yet
804
  """
805
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
806
  if fn is not None:
807
    return fn(storage_key, *args)
808
  else:
809
    raise NotImplementedError
810

    
811

    
812
def _CheckExclusivePvs(pvi_list):
813
  """Check that PVs are not shared among LVs
814

815
  @type pvi_list: list of L{objects.LvmPvInfo} objects
816
  @param pvi_list: information about the PVs
817

818
  @rtype: list of tuples (string, list of strings)
819
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
820

821
  """
822
  res = []
823
  for pvi in pvi_list:
824
    if len(pvi.lv_list) > 1:
825
      res.append((pvi.name, pvi.lv_list))
826
  return res
827

    
828

    
829
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
830
                       get_hv_fn=hypervisor.GetHypervisor):
831
  """Verifies the hypervisor. Appends the results to the 'results' list.
832

833
  @type what: C{dict}
834
  @param what: a dictionary of things to check
835
  @type vm_capable: boolean
836
  @param vm_capable: whether or not this node is vm capable
837
  @type result: dict
838
  @param result: dictionary of verification results; results of the
839
    verifications in this function will be added here
840
  @type all_hvparams: dict of dict of string
841
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
842
  @type get_hv_fn: function
843
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
844

845
  """
846
  if not vm_capable:
847
    return
848

    
849
  if constants.NV_HYPERVISOR in what:
850
    result[constants.NV_HYPERVISOR] = {}
851
    for hv_name in what[constants.NV_HYPERVISOR]:
852
      hvparams = all_hvparams[hv_name]
853
      try:
854
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
855
      except errors.HypervisorError, err:
856
        val = "Error while checking hypervisor: %s" % str(err)
857
      result[constants.NV_HYPERVISOR][hv_name] = val
858

    
859

    
860
def _VerifyHvparams(what, vm_capable, result,
861
                    get_hv_fn=hypervisor.GetHypervisor):
862
  """Verifies the hvparams. Appends the results to the 'results' list.
863

864
  @type what: C{dict}
865
  @param what: a dictionary of things to check
866
  @type vm_capable: boolean
867
  @param vm_capable: whether or not this node is vm capable
868
  @type result: dict
869
  @param result: dictionary of verification results; results of the
870
    verifications in this function will be added here
871
  @type get_hv_fn: function
872
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
873

874
  """
875
  if not vm_capable:
876
    return
877

    
878
  if constants.NV_HVPARAMS in what:
879
    result[constants.NV_HVPARAMS] = []
880
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
881
      try:
882
        logging.info("Validating hv %s, %s", hv_name, hvparms)
883
        get_hv_fn(hv_name).ValidateParameters(hvparms)
884
      except errors.HypervisorError, err:
885
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
886

    
887

    
888
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
889
  """Verifies the instance list.
890

891
  @type what: C{dict}
892
  @param what: a dictionary of things to check
893
  @type vm_capable: boolean
894
  @param vm_capable: whether or not this node is vm capable
895
  @type result: dict
896
  @param result: dictionary of verification results; results of the
897
    verifications in this function will be added here
898
  @type all_hvparams: dict of dict of string
899
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
900

901
  """
902
  if constants.NV_INSTANCELIST in what and vm_capable:
903
    # GetInstanceList can fail
904
    try:
905
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
906
                            all_hvparams=all_hvparams)
907
    except RPCFail, err:
908
      val = str(err)
909
    result[constants.NV_INSTANCELIST] = val
910

    
911

    
912
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
913
  """Verifies the node info.
914

915
  @type what: C{dict}
916
  @param what: a dictionary of things to check
917
  @type vm_capable: boolean
918
  @param vm_capable: whether or not this node is vm capable
919
  @type result: dict
920
  @param result: dictionary of verification results; results of the
921
    verifications in this function will be added here
922
  @type all_hvparams: dict of dict of string
923
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
924

925
  """
926
  if constants.NV_HVINFO in what and vm_capable:
927
    hvname = what[constants.NV_HVINFO]
928
    hyper = hypervisor.GetHypervisor(hvname)
929
    hvparams = all_hvparams[hvname]
930
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
931

    
932

    
933
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
934
  """Verify the status of the local node.
935

936
  Based on the input L{what} parameter, various checks are done on the
937
  local node.
938

939
  If the I{filelist} key is present, this list of
940
  files is checksummed and the file/checksum pairs are returned.
941

942
  If the I{nodelist} key is present, we check that we have
943
  connectivity via ssh with the target nodes (and check the hostname
944
  report).
945

946
  If the I{node-net-test} key is present, we check that we have
947
  connectivity to the given nodes via both primary IP and, if
948
  applicable, secondary IPs.
949

950
  @type what: C{dict}
951
  @param what: a dictionary of things to check:
952
      - filelist: list of files for which to compute checksums
953
      - nodelist: list of nodes we should check ssh communication with
954
      - node-net-test: list of nodes we should check node daemon port
955
        connectivity with
956
      - hypervisor: list with hypervisors to run the verify for
957
  @type cluster_name: string
958
  @param cluster_name: the cluster's name
959
  @type all_hvparams: dict of dict of strings
960
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
961
  @type node_groups: a dict of strings
962
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
963
      have only those nodes that are in `what["nodelist"]`)
964
  @type groups_cfg: a dict of dict of strings
965
  @param groups_cfg: a dictionary mapping group uuids to their configuration
966
  @rtype: dict
967
  @return: a dictionary with the same keys as the input dict, and
968
      values representing the result of the checks
969

970
  """
971
  result = {}
972
  my_name = netutils.Hostname.GetSysName()
973
  port = netutils.GetDaemonPort(constants.NODED)
974
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
975

    
976
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
977
  _VerifyHvparams(what, vm_capable, result)
978

    
979
  if constants.NV_FILELIST in what:
980
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
981
                                              what[constants.NV_FILELIST]))
982
    result[constants.NV_FILELIST] = \
983
      dict((vcluster.MakeVirtualPath(key), value)
984
           for (key, value) in fingerprints.items())
985

    
986
  if constants.NV_NODELIST in what:
987
    (nodes, bynode) = what[constants.NV_NODELIST]
988

    
989
    # Add nodes from other groups (different for each node)
990
    try:
991
      nodes.extend(bynode[my_name])
992
    except KeyError:
993
      pass
994

    
995
    # Use a random order
996
    random.shuffle(nodes)
997

    
998
    # Try to contact all nodes
999
    val = {}
1000
    for node in nodes:
1001
      params = groups_cfg.get(node_groups.get(node))
1002
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1003
      logging.debug("Ssh port %s (None = default) for node %s",
1004
                    str(ssh_port), node)
1005
      success, message = _GetSshRunner(cluster_name). \
1006
                            VerifyNodeHostname(node, ssh_port)
1007
      if not success:
1008
        val[node] = message
1009

    
1010
    result[constants.NV_NODELIST] = val
1011

    
1012
  if constants.NV_NODENETTEST in what:
1013
    result[constants.NV_NODENETTEST] = tmp = {}
1014
    my_pip = my_sip = None
1015
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1016
      if name == my_name:
1017
        my_pip = pip
1018
        my_sip = sip
1019
        break
1020
    if not my_pip:
1021
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1022
                      " in the node list")
1023
    else:
1024
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1025
        fail = []
1026
        if not netutils.TcpPing(pip, port, source=my_pip):
1027
          fail.append("primary")
1028
        if sip != pip:
1029
          if not netutils.TcpPing(sip, port, source=my_sip):
1030
            fail.append("secondary")
1031
        if fail:
1032
          tmp[name] = ("failure using the %s interface(s)" %
1033
                       " and ".join(fail))
1034

    
1035
  if constants.NV_MASTERIP in what:
1036
    # FIXME: add checks on incoming data structures (here and in the
1037
    # rest of the function)
1038
    master_name, master_ip = what[constants.NV_MASTERIP]
1039
    if master_name == my_name:
1040
      source = constants.IP4_ADDRESS_LOCALHOST
1041
    else:
1042
      source = None
1043
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1044
                                                     source=source)
1045

    
1046
  if constants.NV_USERSCRIPTS in what:
1047
    result[constants.NV_USERSCRIPTS] = \
1048
      [script for script in what[constants.NV_USERSCRIPTS]
1049
       if not utils.IsExecutable(script)]
1050

    
1051
  if constants.NV_OOB_PATHS in what:
1052
    result[constants.NV_OOB_PATHS] = tmp = []
1053
    for path in what[constants.NV_OOB_PATHS]:
1054
      try:
1055
        st = os.stat(path)
1056
      except OSError, err:
1057
        tmp.append("error stating out of band helper: %s" % err)
1058
      else:
1059
        if stat.S_ISREG(st.st_mode):
1060
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1061
            tmp.append(None)
1062
          else:
1063
            tmp.append("out of band helper %s is not executable" % path)
1064
        else:
1065
          tmp.append("out of band helper %s is not a file" % path)
1066

    
1067
  if constants.NV_LVLIST in what and vm_capable:
1068
    try:
1069
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1070
    except RPCFail, err:
1071
      val = str(err)
1072
    result[constants.NV_LVLIST] = val
1073

    
1074
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1075

    
1076
  if constants.NV_VGLIST in what and vm_capable:
1077
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1078

    
1079
  if constants.NV_PVLIST in what and vm_capable:
1080
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1081
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1082
                                       filter_allocatable=False,
1083
                                       include_lvs=check_exclusive_pvs)
1084
    if check_exclusive_pvs:
1085
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1086
      for pvi in val:
1087
        # Avoid sending useless data on the wire
1088
        pvi.lv_list = []
1089
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1090

    
1091
  if constants.NV_VERSION in what:
1092
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1093
                                    constants.RELEASE_VERSION)
1094

    
1095
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1096

    
1097
  if constants.NV_DRBDVERSION in what and vm_capable:
1098
    try:
1099
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1100
    except errors.BlockDeviceError, err:
1101
      logging.warning("Can't get DRBD version", exc_info=True)
1102
      drbd_version = str(err)
1103
    result[constants.NV_DRBDVERSION] = drbd_version
1104

    
1105
  if constants.NV_DRBDLIST in what and vm_capable:
1106
    try:
1107
      used_minors = drbd.DRBD8.GetUsedDevs()
1108
    except errors.BlockDeviceError, err:
1109
      logging.warning("Can't get used minors list", exc_info=True)
1110
      used_minors = str(err)
1111
    result[constants.NV_DRBDLIST] = used_minors
1112

    
1113
  if constants.NV_DRBDHELPER in what and vm_capable:
1114
    status = True
1115
    try:
1116
      payload = drbd.DRBD8.GetUsermodeHelper()
1117
    except errors.BlockDeviceError, err:
1118
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1119
      status = False
1120
      payload = str(err)
1121
    result[constants.NV_DRBDHELPER] = (status, payload)
1122

    
1123
  if constants.NV_NODESETUP in what:
1124
    result[constants.NV_NODESETUP] = tmpr = []
1125
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1126
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1127
                  " under /sys, missing required directories /sys/block"
1128
                  " and /sys/class/net")
1129
    if (not os.path.isdir("/proc/sys") or
1130
        not os.path.isfile("/proc/sysrq-trigger")):
1131
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1132
                  " under /proc, missing required directory /proc/sys and"
1133
                  " the file /proc/sysrq-trigger")
1134

    
1135
  if constants.NV_TIME in what:
1136
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1137

    
1138
  if constants.NV_OSLIST in what and vm_capable:
1139
    result[constants.NV_OSLIST] = DiagnoseOS()
1140

    
1141
  if constants.NV_BRIDGES in what and vm_capable:
1142
    result[constants.NV_BRIDGES] = [bridge
1143
                                    for bridge in what[constants.NV_BRIDGES]
1144
                                    if not utils.BridgeExists(bridge)]
1145

    
1146
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1147
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1148
        filestorage.ComputeWrongFileStoragePaths()
1149

    
1150
  if what.get(constants.NV_FILE_STORAGE_PATH):
1151
    pathresult = filestorage.CheckFileStoragePath(
1152
        what[constants.NV_FILE_STORAGE_PATH])
1153
    if pathresult:
1154
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1155

    
1156
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1157
    pathresult = filestorage.CheckFileStoragePath(
1158
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1159
    if pathresult:
1160
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1161

    
1162
  return result
1163

    
1164

    
1165
def GetBlockDevSizes(devices):
1166
  """Return the size of the given block devices
1167

1168
  @type devices: list
1169
  @param devices: list of block device nodes to query
1170
  @rtype: dict
1171
  @return:
1172
    dictionary of all block devices under /dev (key). The value is their
1173
    size in MiB.
1174

1175
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1176

1177
  """
1178
  DEV_PREFIX = "/dev/"
1179
  blockdevs = {}
1180

    
1181
  for devpath in devices:
1182
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1183
      continue
1184

    
1185
    try:
1186
      st = os.stat(devpath)
1187
    except EnvironmentError, err:
1188
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1189
      continue
1190

    
1191
    if stat.S_ISBLK(st.st_mode):
1192
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1193
      if result.failed:
1194
        # We don't want to fail, just do not list this device as available
1195
        logging.warning("Cannot get size for block device %s", devpath)
1196
        continue
1197

    
1198
      size = int(result.stdout) / (1024 * 1024)
1199
      blockdevs[devpath] = size
1200
  return blockdevs
1201

    
1202

    
1203
def GetVolumeList(vg_names):
1204
  """Compute list of logical volumes and their size.
1205

1206
  @type vg_names: list
1207
  @param vg_names: the volume groups whose LVs we should list, or
1208
      empty for all volume groups
1209
  @rtype: dict
1210
  @return:
1211
      dictionary of all partions (key) with value being a tuple of
1212
      their size (in MiB), inactive and online status::
1213

1214
        {'xenvg/test1': ('20.06', True, True)}
1215

1216
      in case of errors, a string is returned with the error
1217
      details.
1218

1219
  """
1220
  lvs = {}
1221
  sep = "|"
1222
  if not vg_names:
1223
    vg_names = []
1224
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1225
                         "--separator=%s" % sep,
1226
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1227
  if result.failed:
1228
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1229

    
1230
  for line in result.stdout.splitlines():
1231
    line = line.strip()
1232
    match = _LVSLINE_REGEX.match(line)
1233
    if not match:
1234
      logging.error("Invalid line returned from lvs output: '%s'", line)
1235
      continue
1236
    vg_name, name, size, attr = match.groups()
1237
    inactive = attr[4] == "-"
1238
    online = attr[5] == "o"
1239
    virtual = attr[0] == "v"
1240
    if virtual:
1241
      # we don't want to report such volumes as existing, since they
1242
      # don't really hold data
1243
      continue
1244
    lvs[vg_name + "/" + name] = (size, inactive, online)
1245

    
1246
  return lvs
1247

    
1248

    
1249
def ListVolumeGroups():
1250
  """List the volume groups and their size.
1251

1252
  @rtype: dict
1253
  @return: dictionary with keys volume name and values the
1254
      size of the volume
1255

1256
  """
1257
  return utils.ListVolumeGroups()
1258

    
1259

    
1260
def NodeVolumes():
1261
  """List all volumes on this node.
1262

1263
  @rtype: list
1264
  @return:
1265
    A list of dictionaries, each having four keys:
1266
      - name: the logical volume name,
1267
      - size: the size of the logical volume
1268
      - dev: the physical device on which the LV lives
1269
      - vg: the volume group to which it belongs
1270

1271
    In case of errors, we return an empty list and log the
1272
    error.
1273

1274
    Note that since a logical volume can live on multiple physical
1275
    volumes, the resulting list might include a logical volume
1276
    multiple times.
1277

1278
  """
1279
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1280
                         "--separator=|",
1281
                         "--options=lv_name,lv_size,devices,vg_name"])
1282
  if result.failed:
1283
    _Fail("Failed to list logical volumes, lvs output: %s",
1284
          result.output)
1285

    
1286
  def parse_dev(dev):
1287
    return dev.split("(")[0]
1288

    
1289
  def handle_dev(dev):
1290
    return [parse_dev(x) for x in dev.split(",")]
1291

    
1292
  def map_line(line):
1293
    line = [v.strip() for v in line]
1294
    return [{"name": line[0], "size": line[1],
1295
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1296

    
1297
  all_devs = []
1298
  for line in result.stdout.splitlines():
1299
    if line.count("|") >= 3:
1300
      all_devs.extend(map_line(line.split("|")))
1301
    else:
1302
      logging.warning("Strange line in the output from lvs: '%s'", line)
1303
  return all_devs
1304

    
1305

    
1306
def BridgesExist(bridges_list):
1307
  """Check if a list of bridges exist on the current node.
1308

1309
  @rtype: boolean
1310
  @return: C{True} if all of them exist, C{False} otherwise
1311

1312
  """
1313
  missing = []
1314
  for bridge in bridges_list:
1315
    if not utils.BridgeExists(bridge):
1316
      missing.append(bridge)
1317

    
1318
  if missing:
1319
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1320

    
1321

    
1322
def GetInstanceListForHypervisor(hname, hvparams=None,
1323
                                 get_hv_fn=hypervisor.GetHypervisor):
1324
  """Provides a list of instances of the given hypervisor.
1325

1326
  @type hname: string
1327
  @param hname: name of the hypervisor
1328
  @type hvparams: dict of strings
1329
  @param hvparams: hypervisor parameters for the given hypervisor
1330
  @type get_hv_fn: function
1331
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1332
    name; optional parameter to increase testability
1333

1334
  @rtype: list
1335
  @return: a list of all running instances on the current node
1336
    - instance1.example.com
1337
    - instance2.example.com
1338

1339
  """
1340
  results = []
1341
  try:
1342
    hv = get_hv_fn(hname)
1343
    names = hv.ListInstances(hvparams=hvparams)
1344
    results.extend(names)
1345
  except errors.HypervisorError, err:
1346
    _Fail("Error enumerating instances (hypervisor %s): %s",
1347
          hname, err, exc=True)
1348
  return results
1349

    
1350

    
1351
def GetInstanceList(hypervisor_list, all_hvparams=None,
1352
                    get_hv_fn=hypervisor.GetHypervisor):
1353
  """Provides a list of instances.
1354

1355
  @type hypervisor_list: list
1356
  @param hypervisor_list: the list of hypervisors to query information
1357
  @type all_hvparams: dict of dict of strings
1358
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1359
    cluster-wide hypervisor parameters
1360
  @type get_hv_fn: function
1361
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1362
    name; optional parameter to increase testability
1363

1364
  @rtype: list
1365
  @return: a list of all running instances on the current node
1366
    - instance1.example.com
1367
    - instance2.example.com
1368

1369
  """
1370
  results = []
1371
  for hname in hypervisor_list:
1372
    hvparams = all_hvparams[hname]
1373
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1374
                                                get_hv_fn=get_hv_fn))
1375
  return results
1376

    
1377

    
1378
def GetInstanceInfo(instance, hname, hvparams=None):
1379
  """Gives back the information about an instance as a dictionary.
1380

1381
  @type instance: string
1382
  @param instance: the instance name
1383
  @type hname: string
1384
  @param hname: the hypervisor type of the instance
1385
  @type hvparams: dict of strings
1386
  @param hvparams: the instance's hvparams
1387

1388
  @rtype: dict
1389
  @return: dictionary with the following keys:
1390
      - memory: memory size of instance (int)
1391
      - state: state of instance (HvInstanceState)
1392
      - time: cpu time of instance (float)
1393
      - vcpus: the number of vcpus (int)
1394

1395
  """
1396
  output = {}
1397

    
1398
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1399
                                                          hvparams=hvparams)
1400
  if iinfo is not None:
1401
    output["memory"] = iinfo[2]
1402
    output["vcpus"] = iinfo[3]
1403
    output["state"] = iinfo[4]
1404
    output["time"] = iinfo[5]
1405

    
1406
  return output
1407

    
1408

    
1409
def GetInstanceMigratable(instance):
1410
  """Computes whether an instance can be migrated.
1411

1412
  @type instance: L{objects.Instance}
1413
  @param instance: object representing the instance to be checked.
1414

1415
  @rtype: tuple
1416
  @return: tuple of (result, description) where:
1417
      - result: whether the instance can be migrated or not
1418
      - description: a description of the issue, if relevant
1419

1420
  """
1421
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1422
  iname = instance.name
1423
  if iname not in hyper.ListInstances(instance.hvparams):
1424
    _Fail("Instance %s is not running", iname)
1425

    
1426
  for idx in range(len(instance.disks)):
1427
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1428
    if not os.path.islink(link_name):
1429
      logging.warning("Instance %s is missing symlink %s for disk %d",
1430
                      iname, link_name, idx)
1431

    
1432

    
1433
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1434
  """Gather data about all instances.
1435

1436
  This is the equivalent of L{GetInstanceInfo}, except that it
1437
  computes data for all instances at once, thus being faster if one
1438
  needs data about more than one instance.
1439

1440
  @type hypervisor_list: list
1441
  @param hypervisor_list: list of hypervisors to query for instance data
1442
  @type all_hvparams: dict of dict of strings
1443
  @param all_hvparams: mapping of hypervisor names to hvparams
1444

1445
  @rtype: dict
1446
  @return: dictionary of instance: data, with data having the following keys:
1447
      - memory: memory size of instance (int)
1448
      - state: xen state of instance (string)
1449
      - time: cpu time of instance (float)
1450
      - vcpus: the number of vcpus
1451

1452
  """
1453
  output = {}
1454
  for hname in hypervisor_list:
1455
    hvparams = all_hvparams[hname]
1456
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1457
    if iinfo:
1458
      for name, _, memory, vcpus, state, times in iinfo:
1459
        value = {
1460
          "memory": memory,
1461
          "vcpus": vcpus,
1462
          "state": state,
1463
          "time": times,
1464
          }
1465
        if name in output:
1466
          # we only check static parameters, like memory and vcpus,
1467
          # and not state and time which can change between the
1468
          # invocations of the different hypervisors
1469
          for key in "memory", "vcpus":
1470
            if value[key] != output[name][key]:
1471
              _Fail("Instance %s is running twice"
1472
                    " with different parameters", name)
1473
        output[name] = value
1474

    
1475
  return output
1476

    
1477

    
1478
def GetInstanceConsoleInfo(instance_param_dict,
1479
                           get_hv_fn=hypervisor.GetHypervisor):
1480
  """Gather data about the console access of a set of instances of this node.
1481

1482
  This function assumes that the caller already knows which instances are on
1483
  this node, by calling a function such as L{GetAllInstancesInfo} or
1484
  L{GetInstanceList}.
1485

1486
  For every instance, a large amount of configuration data needs to be
1487
  provided to the hypervisor interface in order to receive the console
1488
  information. Whether this could or should be cut down can be discussed.
1489
  The information is provided in a dictionary indexed by instance name,
1490
  allowing any number of instance queries to be done.
1491

1492
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1493
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1494
    L{objects.NodeGroup}, HvParams, BeParams
1495
  @param instance_param_dict: mapping of instance name to parameters necessary
1496
    for console information retrieval
1497

1498
  @rtype: dict
1499
  @return: dictionary of instance: data, with data having the following keys:
1500
      - instance: instance name
1501
      - kind: console kind
1502
      - message: used with kind == CONS_MESSAGE, indicates console to be
1503
                 unavailable, supplies error message
1504
      - host: host to connect to
1505
      - port: port to use
1506
      - user: user for login
1507
      - command: the command, broken into parts as an array
1508
      - display: unknown, potentially unused?
1509

1510
  """
1511

    
1512
  output = {}
1513
  for inst_name in instance_param_dict:
1514
    instance = instance_param_dict[inst_name]["instance"]
1515
    pnode = instance_param_dict[inst_name]["node"]
1516
    group = instance_param_dict[inst_name]["group"]
1517
    hvparams = instance_param_dict[inst_name]["hvParams"]
1518
    beparams = instance_param_dict[inst_name]["beParams"]
1519

    
1520
    instance = objects.Instance.FromDict(instance)
1521
    pnode = objects.Node.FromDict(pnode)
1522
    group = objects.NodeGroup.FromDict(group)
1523

    
1524
    h = get_hv_fn(instance.hypervisor)
1525
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1526
                                             hvparams, beparams).ToDict()
1527

    
1528
  return output
1529

    
1530

    
1531
def _InstanceLogName(kind, os_name, instance, component):
1532
  """Compute the OS log filename for a given instance and operation.
1533

1534
  The instance name and os name are passed in as strings since not all
1535
  operations have these as part of an instance object.
1536

1537
  @type kind: string
1538
  @param kind: the operation type (e.g. add, import, etc.)
1539
  @type os_name: string
1540
  @param os_name: the os name
1541
  @type instance: string
1542
  @param instance: the name of the instance being imported/added/etc.
1543
  @type component: string or None
1544
  @param component: the name of the component of the instance being
1545
      transferred
1546

1547
  """
1548
  # TODO: Use tempfile.mkstemp to create unique filename
1549
  if component:
1550
    assert "/" not in component
1551
    c_msg = "-%s" % component
1552
  else:
1553
    c_msg = ""
1554
  base = ("%s-%s-%s%s-%s.log" %
1555
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1556
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1557

    
1558

    
1559
def InstanceOsAdd(instance, reinstall, debug):
1560
  """Add an OS to an instance.
1561

1562
  @type instance: L{objects.Instance}
1563
  @param instance: Instance whose OS is to be installed
1564
  @type reinstall: boolean
1565
  @param reinstall: whether this is an instance reinstall
1566
  @type debug: integer
1567
  @param debug: debug level, passed to the OS scripts
1568
  @rtype: None
1569

1570
  """
1571
  inst_os = OSFromDisk(instance.os)
1572

    
1573
  create_env = OSEnvironment(instance, inst_os, debug)
1574
  if reinstall:
1575
    create_env["INSTANCE_REINSTALL"] = "1"
1576

    
1577
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1578

    
1579
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1580
                        cwd=inst_os.path, output=logfile, reset_env=True)
1581
  if result.failed:
1582
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1583
                  " output: %s", result.cmd, result.fail_reason, logfile,
1584
                  result.output)
1585
    lines = [utils.SafeEncode(val)
1586
             for val in utils.TailFile(logfile, lines=20)]
1587
    _Fail("OS create script failed (%s), last lines in the"
1588
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1589

    
1590

    
1591
def RunRenameInstance(instance, old_name, debug):
1592
  """Run the OS rename script for an instance.
1593

1594
  @type instance: L{objects.Instance}
1595
  @param instance: Instance whose OS is to be installed
1596
  @type old_name: string
1597
  @param old_name: previous instance name
1598
  @type debug: integer
1599
  @param debug: debug level, passed to the OS scripts
1600
  @rtype: boolean
1601
  @return: the success of the operation
1602

1603
  """
1604
  inst_os = OSFromDisk(instance.os)
1605

    
1606
  rename_env = OSEnvironment(instance, inst_os, debug)
1607
  rename_env["OLD_INSTANCE_NAME"] = old_name
1608

    
1609
  logfile = _InstanceLogName("rename", instance.os,
1610
                             "%s-%s" % (old_name, instance.name), None)
1611

    
1612
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1613
                        cwd=inst_os.path, output=logfile, reset_env=True)
1614

    
1615
  if result.failed:
1616
    logging.error("os create command '%s' returned error: %s output: %s",
1617
                  result.cmd, result.fail_reason, result.output)
1618
    lines = [utils.SafeEncode(val)
1619
             for val in utils.TailFile(logfile, lines=20)]
1620
    _Fail("OS rename script failed (%s), last lines in the"
1621
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1622

    
1623

    
1624
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1625
  """Returns symlink path for block device.
1626

1627
  """
1628
  if _dir is None:
1629
    _dir = pathutils.DISK_LINKS_DIR
1630

    
1631
  return utils.PathJoin(_dir,
1632
                        ("%s%s%s" %
1633
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1634

    
1635

    
1636
def _SymlinkBlockDev(instance_name, device_path, idx):
1637
  """Set up symlinks to a instance's block device.
1638

1639
  This is an auxiliary function run when an instance is start (on the primary
1640
  node) or when an instance is migrated (on the target node).
1641

1642

1643
  @param instance_name: the name of the target instance
1644
  @param device_path: path of the physical block device, on the node
1645
  @param idx: the disk index
1646
  @return: absolute path to the disk's symlink
1647

1648
  """
1649
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1650
  try:
1651
    os.symlink(device_path, link_name)
1652
  except OSError, err:
1653
    if err.errno == errno.EEXIST:
1654
      if (not os.path.islink(link_name) or
1655
          os.readlink(link_name) != device_path):
1656
        os.remove(link_name)
1657
        os.symlink(device_path, link_name)
1658
    else:
1659
      raise
1660

    
1661
  return link_name
1662

    
1663

    
1664
def _RemoveBlockDevLinks(instance_name, disks):
1665
  """Remove the block device symlinks belonging to the given instance.
1666

1667
  """
1668
  for idx, _ in enumerate(disks):
1669
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1670
    if os.path.islink(link_name):
1671
      try:
1672
        os.remove(link_name)
1673
      except OSError:
1674
        logging.exception("Can't remove symlink '%s'", link_name)
1675

    
1676

    
1677
def _CalculateDeviceURI(instance, disk, device):
1678
  """Get the URI for the device.
1679

1680
  @type instance: L{objects.Instance}
1681
  @param instance: the instance which disk belongs to
1682
  @type disk: L{objects.Disk}
1683
  @param disk: the target disk object
1684
  @type device: L{bdev.BlockDev}
1685
  @param device: the corresponding BlockDevice
1686
  @rtype: string
1687
  @return: the device uri if any else None
1688

1689
  """
1690
  access_mode = disk.params.get(constants.LDP_ACCESS,
1691
                                constants.DISK_KERNELSPACE)
1692
  if access_mode == constants.DISK_USERSPACE:
1693
    # This can raise errors.BlockDeviceError
1694
    return device.GetUserspaceAccessUri(instance.hypervisor)
1695
  else:
1696
    return None
1697

    
1698

    
1699
def _GatherAndLinkBlockDevs(instance):
1700
  """Set up an instance's block device(s).
1701

1702
  This is run on the primary node at instance startup. The block
1703
  devices must be already assembled.
1704

1705
  @type instance: L{objects.Instance}
1706
  @param instance: the instance whose disks we should assemble
1707
  @rtype: list
1708
  @return: list of (disk_object, link_name, drive_uri)
1709

1710
  """
1711
  block_devices = []
1712
  for idx, disk in enumerate(instance.disks):
1713
    device = _RecursiveFindBD(disk)
1714
    if device is None:
1715
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1716
                                    str(disk))
1717
    device.Open()
1718
    try:
1719
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1720
    except OSError, e:
1721
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1722
                                    e.strerror)
1723
    uri = _CalculateDeviceURI(instance, disk, device)
1724

    
1725
    block_devices.append((disk, link_name, uri))
1726

    
1727
  return block_devices
1728

    
1729

    
1730
def StartInstance(instance, startup_paused, reason, store_reason=True):
1731
  """Start an instance.
1732

1733
  @type instance: L{objects.Instance}
1734
  @param instance: the instance object
1735
  @type startup_paused: bool
1736
  @param instance: pause instance at startup?
1737
  @type reason: list of reasons
1738
  @param reason: the reason trail for this startup
1739
  @type store_reason: boolean
1740
  @param store_reason: whether to store the shutdown reason trail on file
1741
  @rtype: None
1742

1743
  """
1744
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1745
                                                   instance.hvparams)
1746

    
1747
  if instance.name in running_instances:
1748
    logging.info("Instance %s already running, not starting", instance.name)
1749
    return
1750

    
1751
  try:
1752
    block_devices = _GatherAndLinkBlockDevs(instance)
1753
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1754
    hyper.StartInstance(instance, block_devices, startup_paused)
1755
    if store_reason:
1756
      _StoreInstReasonTrail(instance.name, reason)
1757
  except errors.BlockDeviceError, err:
1758
    _Fail("Block device error: %s", err, exc=True)
1759
  except errors.HypervisorError, err:
1760
    _RemoveBlockDevLinks(instance.name, instance.disks)
1761
    _Fail("Hypervisor error: %s", err, exc=True)
1762

    
1763

    
1764
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1765
  """Shut an instance down.
1766

1767
  @note: this functions uses polling with a hardcoded timeout.
1768

1769
  @type instance: L{objects.Instance}
1770
  @param instance: the instance object
1771
  @type timeout: integer
1772
  @param timeout: maximum timeout for soft shutdown
1773
  @type reason: list of reasons
1774
  @param reason: the reason trail for this shutdown
1775
  @type store_reason: boolean
1776
  @param store_reason: whether to store the shutdown reason trail on file
1777
  @rtype: None
1778

1779
  """
1780
  hv_name = instance.hypervisor
1781
  hyper = hypervisor.GetHypervisor(hv_name)
1782
  iname = instance.name
1783

    
1784
  if instance.name not in hyper.ListInstances(instance.hvparams):
1785
    logging.info("Instance %s not running, doing nothing", iname)
1786
    return
1787

    
1788
  class _TryShutdown:
1789
    def __init__(self):
1790
      self.tried_once = False
1791

    
1792
    def __call__(self):
1793
      if iname not in hyper.ListInstances(instance.hvparams):
1794
        return
1795

    
1796
      try:
1797
        hyper.StopInstance(instance, retry=self.tried_once)
1798
        if store_reason:
1799
          _StoreInstReasonTrail(instance.name, reason)
1800
      except errors.HypervisorError, err:
1801
        if iname not in hyper.ListInstances(instance.hvparams):
1802
          # if the instance is no longer existing, consider this a
1803
          # success and go to cleanup
1804
          return
1805

    
1806
        _Fail("Failed to stop instance %s: %s", iname, err)
1807

    
1808
      self.tried_once = True
1809

    
1810
      raise utils.RetryAgain()
1811

    
1812
  try:
1813
    utils.Retry(_TryShutdown(), 5, timeout)
1814
  except utils.RetryTimeout:
1815
    # the shutdown did not succeed
1816
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1817

    
1818
    try:
1819
      hyper.StopInstance(instance, force=True)
1820
    except errors.HypervisorError, err:
1821
      if iname in hyper.ListInstances(instance.hvparams):
1822
        # only raise an error if the instance still exists, otherwise
1823
        # the error could simply be "instance ... unknown"!
1824
        _Fail("Failed to force stop instance %s: %s", iname, err)
1825

    
1826
    time.sleep(1)
1827

    
1828
    if iname in hyper.ListInstances(instance.hvparams):
1829
      _Fail("Could not shutdown instance %s even by destroy", iname)
1830

    
1831
  try:
1832
    hyper.CleanupInstance(instance.name)
1833
  except errors.HypervisorError, err:
1834
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1835

    
1836
  _RemoveBlockDevLinks(iname, instance.disks)
1837

    
1838

    
1839
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1840
  """Reboot an instance.
1841

1842
  @type instance: L{objects.Instance}
1843
  @param instance: the instance object to reboot
1844
  @type reboot_type: str
1845
  @param reboot_type: the type of reboot, one the following
1846
    constants:
1847
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1848
        instance OS, do not recreate the VM
1849
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1850
        restart the VM (at the hypervisor level)
1851
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1852
        not accepted here, since that mode is handled differently, in
1853
        cmdlib, and translates into full stop and start of the
1854
        instance (instead of a call_instance_reboot RPC)
1855
  @type shutdown_timeout: integer
1856
  @param shutdown_timeout: maximum timeout for soft shutdown
1857
  @type reason: list of reasons
1858
  @param reason: the reason trail for this reboot
1859
  @rtype: None
1860

1861
  """
1862
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1863
                                                   instance.hvparams)
1864

    
1865
  if instance.name not in running_instances:
1866
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1867

    
1868
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1869
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1870
    try:
1871
      hyper.RebootInstance(instance)
1872
    except errors.HypervisorError, err:
1873
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1874
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1875
    try:
1876
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1877
      result = StartInstance(instance, False, reason, store_reason=False)
1878
      _StoreInstReasonTrail(instance.name, reason)
1879
      return result
1880
    except errors.HypervisorError, err:
1881
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1882
  else:
1883
    _Fail("Invalid reboot_type received: %s", reboot_type)
1884

    
1885

    
1886
def InstanceBalloonMemory(instance, memory):
1887
  """Resize an instance's memory.
1888

1889
  @type instance: L{objects.Instance}
1890
  @param instance: the instance object
1891
  @type memory: int
1892
  @param memory: new memory amount in MB
1893
  @rtype: None
1894

1895
  """
1896
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1897
  running = hyper.ListInstances(instance.hvparams)
1898
  if instance.name not in running:
1899
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1900
    return
1901
  try:
1902
    hyper.BalloonInstanceMemory(instance, memory)
1903
  except errors.HypervisorError, err:
1904
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1905

    
1906

    
1907
def MigrationInfo(instance):
1908
  """Gather information about an instance to be migrated.
1909

1910
  @type instance: L{objects.Instance}
1911
  @param instance: the instance definition
1912

1913
  """
1914
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1915
  try:
1916
    info = hyper.MigrationInfo(instance)
1917
  except errors.HypervisorError, err:
1918
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1919
  return info
1920

    
1921

    
1922
def AcceptInstance(instance, info, target):
1923
  """Prepare the node to accept an instance.
1924

1925
  @type instance: L{objects.Instance}
1926
  @param instance: the instance definition
1927
  @type info: string/data (opaque)
1928
  @param info: migration information, from the source node
1929
  @type target: string
1930
  @param target: target host (usually ip), on this node
1931

1932
  """
1933
  # TODO: why is this required only for DTS_EXT_MIRROR?
1934
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1935
    # Create the symlinks, as the disks are not active
1936
    # in any way
1937
    try:
1938
      _GatherAndLinkBlockDevs(instance)
1939
    except errors.BlockDeviceError, err:
1940
      _Fail("Block device error: %s", err, exc=True)
1941

    
1942
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1943
  try:
1944
    hyper.AcceptInstance(instance, info, target)
1945
  except errors.HypervisorError, err:
1946
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1947
      _RemoveBlockDevLinks(instance.name, instance.disks)
1948
    _Fail("Failed to accept instance: %s", err, exc=True)
1949

    
1950

    
1951
def FinalizeMigrationDst(instance, info, success):
1952
  """Finalize any preparation to accept an instance.
1953

1954
  @type instance: L{objects.Instance}
1955
  @param instance: the instance definition
1956
  @type info: string/data (opaque)
1957
  @param info: migration information, from the source node
1958
  @type success: boolean
1959
  @param success: whether the migration was a success or a failure
1960

1961
  """
1962
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1963
  try:
1964
    hyper.FinalizeMigrationDst(instance, info, success)
1965
  except errors.HypervisorError, err:
1966
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1967

    
1968

    
1969
def MigrateInstance(cluster_name, instance, target, live):
1970
  """Migrates an instance to another node.
1971

1972
  @type cluster_name: string
1973
  @param cluster_name: name of the cluster
1974
  @type instance: L{objects.Instance}
1975
  @param instance: the instance definition
1976
  @type target: string
1977
  @param target: the target node name
1978
  @type live: boolean
1979
  @param live: whether the migration should be done live or not (the
1980
      interpretation of this parameter is left to the hypervisor)
1981
  @raise RPCFail: if migration fails for some reason
1982

1983
  """
1984
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1985

    
1986
  try:
1987
    hyper.MigrateInstance(cluster_name, instance, target, live)
1988
  except errors.HypervisorError, err:
1989
    _Fail("Failed to migrate instance: %s", err, exc=True)
1990

    
1991

    
1992
def FinalizeMigrationSource(instance, success, live):
1993
  """Finalize the instance migration on the source node.
1994

1995
  @type instance: L{objects.Instance}
1996
  @param instance: the instance definition of the migrated instance
1997
  @type success: bool
1998
  @param success: whether the migration succeeded or not
1999
  @type live: bool
2000
  @param live: whether the user requested a live migration or not
2001
  @raise RPCFail: If the execution fails for some reason
2002

2003
  """
2004
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2005

    
2006
  try:
2007
    hyper.FinalizeMigrationSource(instance, success, live)
2008
  except Exception, err:  # pylint: disable=W0703
2009
    _Fail("Failed to finalize the migration on the source node: %s", err,
2010
          exc=True)
2011

    
2012

    
2013
def GetMigrationStatus(instance):
2014
  """Get the migration status
2015

2016
  @type instance: L{objects.Instance}
2017
  @param instance: the instance that is being migrated
2018
  @rtype: L{objects.MigrationStatus}
2019
  @return: the status of the current migration (one of
2020
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2021
           progress info that can be retrieved from the hypervisor
2022
  @raise RPCFail: If the migration status cannot be retrieved
2023

2024
  """
2025
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2026
  try:
2027
    return hyper.GetMigrationStatus(instance)
2028
  except Exception, err:  # pylint: disable=W0703
2029
    _Fail("Failed to get migration status: %s", err, exc=True)
2030

    
2031

    
2032
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2033
  """Hotplug a device
2034

2035
  Hotplug is currently supported only for KVM Hypervisor.
2036
  @type instance: L{objects.Instance}
2037
  @param instance: the instance to which we hotplug a device
2038
  @type action: string
2039
  @param action: the hotplug action to perform
2040
  @type dev_type: string
2041
  @param dev_type: the device type to hotplug
2042
  @type device: either L{objects.NIC} or L{objects.Disk}
2043
  @param device: the device object to hotplug
2044
  @type extra: string
2045
  @param extra: extra info used by hotplug code (e.g. disk link)
2046
  @type seq: int
2047
  @param seq: the index of the device from master perspective
2048
  @raise RPCFail: in case instance does not have KVM hypervisor
2049

2050
  """
2051
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2052
  try:
2053
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2054
  except errors.HotplugError, err:
2055
    _Fail("Hotplug is not supported: %s", err)
2056

    
2057
  if action == constants.HOTPLUG_ACTION_ADD:
2058
    fn = hyper.HotAddDevice
2059
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2060
    fn = hyper.HotDelDevice
2061
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2062
    fn = hyper.HotModDevice
2063
  else:
2064
    assert action in constants.HOTPLUG_ALL_ACTIONS
2065

    
2066
  return fn(instance, dev_type, device, extra, seq)
2067

    
2068

    
2069
def HotplugSupported(instance):
2070
  """Checks if hotplug is generally supported.
2071

2072
  """
2073
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2074
  try:
2075
    hyper.HotplugSupported(instance)
2076
  except errors.HotplugError, err:
2077
    _Fail("Hotplug is not supported: %s", err)
2078

    
2079

    
2080
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2081
  """Creates a block device for an instance.
2082

2083
  @type disk: L{objects.Disk}
2084
  @param disk: the object describing the disk we should create
2085
  @type size: int
2086
  @param size: the size of the physical underlying device, in MiB
2087
  @type owner: str
2088
  @param owner: the name of the instance for which disk is created,
2089
      used for device cache data
2090
  @type on_primary: boolean
2091
  @param on_primary:  indicates if it is the primary node or not
2092
  @type info: string
2093
  @param info: string that will be sent to the physical device
2094
      creation, used for example to set (LVM) tags on LVs
2095
  @type excl_stor: boolean
2096
  @param excl_stor: Whether exclusive_storage is active
2097

2098
  @return: the new unique_id of the device (this can sometime be
2099
      computed only after creation), or None. On secondary nodes,
2100
      it's not required to return anything.
2101

2102
  """
2103
  # TODO: remove the obsolete "size" argument
2104
  # pylint: disable=W0613
2105
  clist = []
2106
  if disk.children:
2107
    for child in disk.children:
2108
      try:
2109
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2110
      except errors.BlockDeviceError, err:
2111
        _Fail("Can't assemble device %s: %s", child, err)
2112
      if on_primary or disk.AssembleOnSecondary():
2113
        # we need the children open in case the device itself has to
2114
        # be assembled
2115
        try:
2116
          # pylint: disable=E1103
2117
          crdev.Open()
2118
        except errors.BlockDeviceError, err:
2119
          _Fail("Can't make child '%s' read-write: %s", child, err)
2120
      clist.append(crdev)
2121

    
2122
  try:
2123
    device = bdev.Create(disk, clist, excl_stor)
2124
  except errors.BlockDeviceError, err:
2125
    _Fail("Can't create block device: %s", err)
2126

    
2127
  if on_primary or disk.AssembleOnSecondary():
2128
    try:
2129
      device.Assemble()
2130
    except errors.BlockDeviceError, err:
2131
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2132
    if on_primary or disk.OpenOnSecondary():
2133
      try:
2134
        device.Open(force=True)
2135
      except errors.BlockDeviceError, err:
2136
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2137
    DevCacheManager.UpdateCache(device.dev_path, owner,
2138
                                on_primary, disk.iv_name)
2139

    
2140
  device.SetInfo(info)
2141

    
2142
  return device.unique_id
2143

    
2144

    
2145
def _WipeDevice(path, offset, size):
2146
  """This function actually wipes the device.
2147

2148
  @param path: The path to the device to wipe
2149
  @param offset: The offset in MiB in the file
2150
  @param size: The size in MiB to write
2151

2152
  """
2153
  # Internal sizes are always in Mebibytes; if the following "dd" command
2154
  # should use a different block size the offset and size given to this
2155
  # function must be adjusted accordingly before being passed to "dd".
2156
  block_size = 1024 * 1024
2157

    
2158
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
2159
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
2160
         "count=%d" % size]
2161
  result = utils.RunCmd(cmd)
2162

    
2163
  if result.failed:
2164
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
2165
          result.fail_reason, result.output)
2166

    
2167

    
2168
def BlockdevWipe(disk, offset, size):
2169
  """Wipes a block device.
2170

2171
  @type disk: L{objects.Disk}
2172
  @param disk: the disk object we want to wipe
2173
  @type offset: int
2174
  @param offset: The offset in MiB in the file
2175
  @type size: int
2176
  @param size: The size in MiB to write
2177

2178
  """
2179
  try:
2180
    rdev = _RecursiveFindBD(disk)
2181
  except errors.BlockDeviceError:
2182
    rdev = None
2183

    
2184
  if not rdev:
2185
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
2186

    
2187
  # Do cross verify some of the parameters
2188
  if offset < 0:
2189
    _Fail("Negative offset")
2190
  if size < 0:
2191
    _Fail("Negative size")
2192
  if offset > rdev.size:
2193
    _Fail("Offset is bigger than device size")
2194
  if (offset + size) > rdev.size:
2195
    _Fail("The provided offset and size to wipe is bigger than device size")
2196

    
2197
  _WipeDevice(rdev.dev_path, offset, size)
2198

    
2199

    
2200
def BlockdevPauseResumeSync(disks, pause):
2201
  """Pause or resume the sync of the block device.
2202

2203
  @type disks: list of L{objects.Disk}
2204
  @param disks: the disks object we want to pause/resume
2205
  @type pause: bool
2206
  @param pause: Wheater to pause or resume
2207

2208
  """
2209
  success = []
2210
  for disk in disks:
2211
    try:
2212
      rdev = _RecursiveFindBD(disk)
2213
    except errors.BlockDeviceError:
2214
      rdev = None
2215

    
2216
    if not rdev:
2217
      success.append((False, ("Cannot change sync for device %s:"
2218
                              " device not found" % disk.iv_name)))
2219
      continue
2220

    
2221
    result = rdev.PauseResumeSync(pause)
2222

    
2223
    if result:
2224
      success.append((result, None))
2225
    else:
2226
      if pause:
2227
        msg = "Pause"
2228
      else:
2229
        msg = "Resume"
2230
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2231

    
2232
  return success
2233

    
2234

    
2235
def BlockdevRemove(disk):
2236
  """Remove a block device.
2237

2238
  @note: This is intended to be called recursively.
2239

2240
  @type disk: L{objects.Disk}
2241
  @param disk: the disk object we should remove
2242
  @rtype: boolean
2243
  @return: the success of the operation
2244

2245
  """
2246
  msgs = []
2247
  try:
2248
    rdev = _RecursiveFindBD(disk)
2249
  except errors.BlockDeviceError, err:
2250
    # probably can't attach
2251
    logging.info("Can't attach to device %s in remove", disk)
2252
    rdev = None
2253
  if rdev is not None:
2254
    r_path = rdev.dev_path
2255

    
2256
    def _TryRemove():
2257
      try:
2258
        rdev.Remove()
2259
        return []
2260
      except errors.BlockDeviceError, err:
2261
        return [str(err)]
2262

    
2263
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2264
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2265
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2266

    
2267
    if not msgs:
2268
      DevCacheManager.RemoveCache(r_path)
2269

    
2270
  if disk.children:
2271
    for child in disk.children:
2272
      try:
2273
        BlockdevRemove(child)
2274
      except RPCFail, err:
2275
        msgs.append(str(err))
2276

    
2277
  if msgs:
2278
    _Fail("; ".join(msgs))
2279

    
2280

    
2281
def _RecursiveAssembleBD(disk, owner, as_primary):
2282
  """Activate a block device for an instance.
2283

2284
  This is run on the primary and secondary nodes for an instance.
2285

2286
  @note: this function is called recursively.
2287

2288
  @type disk: L{objects.Disk}
2289
  @param disk: the disk we try to assemble
2290
  @type owner: str
2291
  @param owner: the name of the instance which owns the disk
2292
  @type as_primary: boolean
2293
  @param as_primary: if we should make the block device
2294
      read/write
2295

2296
  @return: the assembled device or None (in case no device
2297
      was assembled)
2298
  @raise errors.BlockDeviceError: in case there is an error
2299
      during the activation of the children or the device
2300
      itself
2301

2302
  """
2303
  children = []
2304
  if disk.children:
2305
    mcn = disk.ChildrenNeeded()
2306
    if mcn == -1:
2307
      mcn = 0 # max number of Nones allowed
2308
    else:
2309
      mcn = len(disk.children) - mcn # max number of Nones
2310
    for chld_disk in disk.children:
2311
      try:
2312
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2313
      except errors.BlockDeviceError, err:
2314
        if children.count(None) >= mcn:
2315
          raise
2316
        cdev = None
2317
        logging.error("Error in child activation (but continuing): %s",
2318
                      str(err))
2319
      children.append(cdev)
2320

    
2321
  if as_primary or disk.AssembleOnSecondary():
2322
    r_dev = bdev.Assemble(disk, children)
2323
    result = r_dev
2324
    if as_primary or disk.OpenOnSecondary():
2325
      r_dev.Open()
2326
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2327
                                as_primary, disk.iv_name)
2328

    
2329
  else:
2330
    result = True
2331
  return result
2332

    
2333

    
2334
def BlockdevAssemble(disk, owner, as_primary, idx):
2335
  """Activate a block device for an instance.
2336

2337
  This is a wrapper over _RecursiveAssembleBD.
2338

2339
  @rtype: str or boolean
2340
  @return: a tuple with the C{/dev/...} path and the created symlink
2341
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2342

2343
  """
2344
  try:
2345
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2346
    if isinstance(result, BlockDev):
2347
      # pylint: disable=E1103
2348
      dev_path = result.dev_path
2349
      link_name = None
2350
      if as_primary:
2351
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2352
    elif result:
2353
      return result, result
2354
    else:
2355
      _Fail("Unexpected result from _RecursiveAssembleBD")
2356
  except errors.BlockDeviceError, err:
2357
    _Fail("Error while assembling disk: %s", err, exc=True)
2358
  except OSError, err:
2359
    _Fail("Error while symlinking disk: %s", err, exc=True)
2360

    
2361
  return dev_path, link_name
2362

    
2363

    
2364
def BlockdevShutdown(disk):
2365
  """Shut down a block device.
2366

2367
  First, if the device is assembled (Attach() is successful), then
2368
  the device is shutdown. Then the children of the device are
2369
  shutdown.
2370

2371
  This function is called recursively. Note that we don't cache the
2372
  children or such, as oppossed to assemble, shutdown of different
2373
  devices doesn't require that the upper device was active.
2374

2375
  @type disk: L{objects.Disk}
2376
  @param disk: the description of the disk we should
2377
      shutdown
2378
  @rtype: None
2379

2380
  """
2381
  msgs = []
2382
  r_dev = _RecursiveFindBD(disk)
2383
  if r_dev is not None:
2384
    r_path = r_dev.dev_path
2385
    try:
2386
      r_dev.Shutdown()
2387
      DevCacheManager.RemoveCache(r_path)
2388
    except errors.BlockDeviceError, err:
2389
      msgs.append(str(err))
2390

    
2391
  if disk.children:
2392
    for child in disk.children:
2393
      try:
2394
        BlockdevShutdown(child)
2395
      except RPCFail, err:
2396
        msgs.append(str(err))
2397

    
2398
  if msgs:
2399
    _Fail("; ".join(msgs))
2400

    
2401

    
2402
def BlockdevAddchildren(parent_cdev, new_cdevs):
2403
  """Extend a mirrored block device.
2404

2405
  @type parent_cdev: L{objects.Disk}
2406
  @param parent_cdev: the disk to which we should add children
2407
  @type new_cdevs: list of L{objects.Disk}
2408
  @param new_cdevs: the list of children which we should add
2409
  @rtype: None
2410

2411
  """
2412
  parent_bdev = _RecursiveFindBD(parent_cdev)
2413
  if parent_bdev is None:
2414
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2415
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2416
  if new_bdevs.count(None) > 0:
2417
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2418
  parent_bdev.AddChildren(new_bdevs)
2419

    
2420

    
2421
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2422
  """Shrink a mirrored block device.
2423

2424
  @type parent_cdev: L{objects.Disk}
2425
  @param parent_cdev: the disk from which we should remove children
2426
  @type new_cdevs: list of L{objects.Disk}
2427
  @param new_cdevs: the list of children which we should remove
2428
  @rtype: None
2429

2430
  """
2431
  parent_bdev = _RecursiveFindBD(parent_cdev)
2432
  if parent_bdev is None:
2433
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2434
  devs = []
2435
  for disk in new_cdevs:
2436
    rpath = disk.StaticDevPath()
2437
    if rpath is None:
2438
      bd = _RecursiveFindBD(disk)
2439
      if bd is None:
2440
        _Fail("Can't find device %s while removing children", disk)
2441
      else:
2442
        devs.append(bd.dev_path)
2443
    else:
2444
      if not utils.IsNormAbsPath(rpath):
2445
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2446
      devs.append(rpath)
2447
  parent_bdev.RemoveChildren(devs)
2448

    
2449

    
2450
def BlockdevGetmirrorstatus(disks):
2451
  """Get the mirroring status of a list of devices.
2452

2453
  @type disks: list of L{objects.Disk}
2454
  @param disks: the list of disks which we should query
2455
  @rtype: disk
2456
  @return: List of L{objects.BlockDevStatus}, one for each disk
2457
  @raise errors.BlockDeviceError: if any of the disks cannot be
2458
      found
2459

2460
  """
2461
  stats = []
2462
  for dsk in disks:
2463
    rbd = _RecursiveFindBD(dsk)
2464
    if rbd is None:
2465
      _Fail("Can't find device %s", dsk)
2466

    
2467
    stats.append(rbd.CombinedSyncStatus())
2468

    
2469
  return stats
2470

    
2471

    
2472
def BlockdevGetmirrorstatusMulti(disks):
2473
  """Get the mirroring status of a list of devices.
2474

2475
  @type disks: list of L{objects.Disk}
2476
  @param disks: the list of disks which we should query
2477
  @rtype: disk
2478
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2479
    success/failure, status is L{objects.BlockDevStatus} on success, string
2480
    otherwise
2481

2482
  """
2483
  result = []
2484
  for disk in disks:
2485
    try:
2486
      rbd = _RecursiveFindBD(disk)
2487
      if rbd is None:
2488
        result.append((False, "Can't find device %s" % disk))
2489
        continue
2490

    
2491
      status = rbd.CombinedSyncStatus()
2492
    except errors.BlockDeviceError, err:
2493
      logging.exception("Error while getting disk status")
2494
      result.append((False, str(err)))
2495
    else:
2496
      result.append((True, status))
2497

    
2498
  assert len(disks) == len(result)
2499

    
2500
  return result
2501

    
2502

    
2503
def _RecursiveFindBD(disk):
2504
  """Check if a device is activated.
2505

2506
  If so, return information about the real device.
2507

2508
  @type disk: L{objects.Disk}
2509
  @param disk: the disk object we need to find
2510

2511
  @return: None if the device can't be found,
2512
      otherwise the device instance
2513

2514
  """
2515
  children = []
2516
  if disk.children:
2517
    for chdisk in disk.children:
2518
      children.append(_RecursiveFindBD(chdisk))
2519

    
2520
  return bdev.FindDevice(disk, children)
2521

    
2522

    
2523
def _OpenRealBD(disk):
2524
  """Opens the underlying block device of a disk.
2525

2526
  @type disk: L{objects.Disk}
2527
  @param disk: the disk object we want to open
2528

2529
  """
2530
  real_disk = _RecursiveFindBD(disk)
2531
  if real_disk is None:
2532
    _Fail("Block device '%s' is not set up", disk)
2533

    
2534
  real_disk.Open()
2535

    
2536
  return real_disk
2537

    
2538

    
2539
def BlockdevFind(disk):
2540
  """Check if a device is activated.
2541

2542
  If it is, return information about the real device.
2543

2544
  @type disk: L{objects.Disk}
2545
  @param disk: the disk to find
2546
  @rtype: None or objects.BlockDevStatus
2547
  @return: None if the disk cannot be found, otherwise a the current
2548
           information
2549

2550
  """
2551
  try:
2552
    rbd = _RecursiveFindBD(disk)
2553
  except errors.BlockDeviceError, err:
2554
    _Fail("Failed to find device: %s", err, exc=True)
2555

    
2556
  if rbd is None:
2557
    return None
2558

    
2559
  return rbd.GetSyncStatus()
2560

    
2561

    
2562
def BlockdevGetdimensions(disks):
2563
  """Computes the size of the given disks.
2564

2565
  If a disk is not found, returns None instead.
2566

2567
  @type disks: list of L{objects.Disk}
2568
  @param disks: the list of disk to compute the size for
2569
  @rtype: list
2570
  @return: list with elements None if the disk cannot be found,
2571
      otherwise the pair (size, spindles), where spindles is None if the
2572
      device doesn't support that
2573

2574
  """
2575
  result = []
2576
  for cf in disks:
2577
    try:
2578
      rbd = _RecursiveFindBD(cf)
2579
    except errors.BlockDeviceError:
2580
      result.append(None)
2581
      continue
2582
    if rbd is None:
2583
      result.append(None)
2584
    else:
2585
      result.append(rbd.GetActualDimensions())
2586
  return result
2587

    
2588

    
2589
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2590
  """Write a file to the filesystem.
2591

2592
  This allows the master to overwrite(!) a file. It will only perform
2593
  the operation if the file belongs to a list of configuration files.
2594

2595
  @type file_name: str
2596
  @param file_name: the target file name
2597
  @type data: str
2598
  @param data: the new contents of the file
2599
  @type mode: int
2600
  @param mode: the mode to give the file (can be None)
2601
  @type uid: string
2602
  @param uid: the owner of the file
2603
  @type gid: string
2604
  @param gid: the group of the file
2605
  @type atime: float
2606
  @param atime: the atime to set on the file (can be None)
2607
  @type mtime: float
2608
  @param mtime: the mtime to set on the file (can be None)
2609
  @rtype: None
2610

2611
  """
2612
  file_name = vcluster.LocalizeVirtualPath(file_name)
2613

    
2614
  if not os.path.isabs(file_name):
2615
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2616

    
2617
  if file_name not in _ALLOWED_UPLOAD_FILES:
2618
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2619
          file_name)
2620

    
2621
  raw_data = _Decompress(data)
2622

    
2623
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2624
    _Fail("Invalid username/groupname type")
2625

    
2626
  getents = runtime.GetEnts()
2627
  uid = getents.LookupUser(uid)
2628
  gid = getents.LookupGroup(gid)
2629

    
2630
  utils.SafeWriteFile(file_name, None,
2631
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2632
                      atime=atime, mtime=mtime)
2633

    
2634

    
2635
def RunOob(oob_program, command, node, timeout):
2636
  """Executes oob_program with given command on given node.
2637

2638
  @param oob_program: The path to the executable oob_program
2639
  @param command: The command to invoke on oob_program
2640
  @param node: The node given as an argument to the program
2641
  @param timeout: Timeout after which we kill the oob program
2642

2643
  @return: stdout
2644
  @raise RPCFail: If execution fails for some reason
2645

2646
  """
2647
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2648

    
2649
  if result.failed:
2650
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2651
          result.fail_reason, result.output)
2652

    
2653
  return result.stdout
2654

    
2655

    
2656
def _OSOndiskAPIVersion(os_dir):
2657
  """Compute and return the API version of a given OS.
2658

2659
  This function will try to read the API version of the OS residing in
2660
  the 'os_dir' directory.
2661

2662
  @type os_dir: str
2663
  @param os_dir: the directory in which we should look for the OS
2664
  @rtype: tuple
2665
  @return: tuple (status, data) with status denoting the validity and
2666
      data holding either the vaid versions or an error message
2667

2668
  """
2669
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2670

    
2671
  try:
2672
    st = os.stat(api_file)
2673
  except EnvironmentError, err:
2674
    return False, ("Required file '%s' not found under path %s: %s" %
2675
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2676

    
2677
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2678
    return False, ("File '%s' in %s is not a regular file" %
2679
                   (constants.OS_API_FILE, os_dir))
2680

    
2681
  try:
2682
    api_versions = utils.ReadFile(api_file).splitlines()
2683
  except EnvironmentError, err:
2684
    return False, ("Error while reading the API version file at %s: %s" %
2685
                   (api_file, utils.ErrnoOrStr(err)))
2686

    
2687
  try:
2688
    api_versions = [int(version.strip()) for version in api_versions]
2689
  except (TypeError, ValueError), err:
2690
    return False, ("API version(s) can't be converted to integer: %s" %
2691
                   str(err))
2692

    
2693
  return True, api_versions
2694

    
2695

    
2696
def DiagnoseOS(top_dirs=None):
2697
  """Compute the validity for all OSes.
2698

2699
  @type top_dirs: list
2700
  @param top_dirs: the list of directories in which to
2701
      search (if not given defaults to
2702
      L{pathutils.OS_SEARCH_PATH})
2703
  @rtype: list of L{objects.OS}
2704
  @return: a list of tuples (name, path, status, diagnose, variants,
2705
      parameters, api_version) for all (potential) OSes under all
2706
      search paths, where:
2707
          - name is the (potential) OS name
2708
          - path is the full path to the OS
2709
          - status True/False is the validity of the OS
2710
          - diagnose is the error message for an invalid OS, otherwise empty
2711
          - variants is a list of supported OS variants, if any
2712
          - parameters is a list of (name, help) parameters, if any
2713
          - api_version is a list of support OS API versions
2714

2715
  """
2716
  if top_dirs is None:
2717
    top_dirs = pathutils.OS_SEARCH_PATH
2718

    
2719
  result = []
2720
  for dir_name in top_dirs:
2721
    if os.path.isdir(dir_name):
2722
      try:
2723
        f_names = utils.ListVisibleFiles(dir_name)
2724
      except EnvironmentError, err:
2725
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2726
        break
2727
      for name in f_names:
2728
        os_path = utils.PathJoin(dir_name, name)
2729
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2730
        if status:
2731
          diagnose = ""
2732
          variants = os_inst.supported_variants
2733
          parameters = os_inst.supported_parameters
2734
          api_versions = os_inst.api_versions
2735
        else:
2736
          diagnose = os_inst
2737
          variants = parameters = api_versions = []
2738
        result.append((name, os_path, status, diagnose, variants,
2739
                       parameters, api_versions))
2740

    
2741
  return result
2742

    
2743

    
2744
def _TryOSFromDisk(name, base_dir=None):
2745
  """Create an OS instance from disk.
2746

2747
  This function will return an OS instance if the given name is a
2748
  valid OS name.
2749

2750
  @type base_dir: string
2751
  @keyword base_dir: Base directory containing OS installations.
2752
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2753
  @rtype: tuple
2754
  @return: success and either the OS instance if we find a valid one,
2755
      or error message
2756

2757
  """
2758
  if base_dir is None:
2759
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2760
  else:
2761
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2762

    
2763
  if os_dir is None:
2764
    return False, "Directory for OS %s not found in search path" % name
2765

    
2766
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2767
  if not status:
2768
    # push the error up
2769
    return status, api_versions
2770

    
2771
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2772
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2773
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2774

    
2775
  # OS Files dictionary, we will populate it with the absolute path
2776
  # names; if the value is True, then it is a required file, otherwise
2777
  # an optional one
2778
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2779

    
2780
  if max(api_versions) >= constants.OS_API_V15:
2781
    os_files[constants.OS_VARIANTS_FILE] = False
2782

    
2783
  if max(api_versions) >= constants.OS_API_V20:
2784
    os_files[constants.OS_PARAMETERS_FILE] = True
2785
  else:
2786
    del os_files[constants.OS_SCRIPT_VERIFY]
2787

    
2788
  for (filename, required) in os_files.items():
2789
    os_files[filename] = utils.PathJoin(os_dir, filename)
2790

    
2791
    try:
2792
      st = os.stat(os_files[filename])
2793
    except EnvironmentError, err:
2794
      if err.errno == errno.ENOENT and not required:
2795
        del os_files[filename]
2796
        continue
2797
      return False, ("File '%s' under path '%s' is missing (%s)" %
2798
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2799

    
2800
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2801
      return False, ("File '%s' under path '%s' is not a regular file" %
2802
                     (filename, os_dir))
2803

    
2804
    if filename in constants.OS_SCRIPTS:
2805
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2806
        return False, ("File '%s' under path '%s' is not executable" %
2807
                       (filename, os_dir))
2808

    
2809
  variants = []
2810
  if constants.OS_VARIANTS_FILE in os_files:
2811
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2812
    try:
2813
      variants = \
2814
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2815
    except EnvironmentError, err:
2816
      # we accept missing files, but not other errors
2817
      if err.errno != errno.ENOENT:
2818
        return False, ("Error while reading the OS variants file at %s: %s" %
2819
                       (variants_file, utils.ErrnoOrStr(err)))
2820

    
2821
  parameters = []
2822
  if constants.OS_PARAMETERS_FILE in os_files:
2823
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2824
    try:
2825
      parameters = utils.ReadFile(parameters_file).splitlines()
2826
    except EnvironmentError, err:
2827
      return False, ("Error while reading the OS parameters file at %s: %s" %
2828
                     (parameters_file, utils.ErrnoOrStr(err)))
2829
    parameters = [v.split(None, 1) for v in parameters]
2830

    
2831
  os_obj = objects.OS(name=name, path=os_dir,
2832
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2833
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2834
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2835
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2836
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2837
                                                 None),
2838
                      supported_variants=variants,
2839
                      supported_parameters=parameters,
2840
                      api_versions=api_versions)
2841
  return True, os_obj
2842

    
2843

    
2844
def OSFromDisk(name, base_dir=None):
2845
  """Create an OS instance from disk.
2846

2847
  This function will return an OS instance if the given name is a
2848
  valid OS name. Otherwise, it will raise an appropriate
2849
  L{RPCFail} exception, detailing why this is not a valid OS.
2850

2851
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2852
  an exception but returns true/false status data.
2853

2854
  @type base_dir: string
2855
  @keyword base_dir: Base directory containing OS installations.
2856
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2857
  @rtype: L{objects.OS}
2858
  @return: the OS instance if we find a valid one
2859
  @raise RPCFail: if we don't find a valid OS
2860

2861
  """
2862
  name_only = objects.OS.GetName(name)
2863
  status, payload = _TryOSFromDisk(name_only, base_dir)
2864

    
2865
  if not status:
2866
    _Fail(payload)
2867

    
2868
  return payload
2869

    
2870

    
2871
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2872
  """Calculate the basic environment for an os script.
2873

2874
  @type os_name: str
2875
  @param os_name: full operating system name (including variant)
2876
  @type inst_os: L{objects.OS}
2877
  @param inst_os: operating system for which the environment is being built
2878
  @type os_params: dict
2879
  @param os_params: the OS parameters
2880
  @type debug: integer
2881
  @param debug: debug level (0 or 1, for OS Api 10)
2882
  @rtype: dict
2883
  @return: dict of environment variables
2884
  @raise errors.BlockDeviceError: if the block device
2885
      cannot be found
2886

2887
  """
2888
  result = {}
2889
  api_version = \
2890
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2891
  result["OS_API_VERSION"] = "%d" % api_version
2892
  result["OS_NAME"] = inst_os.name
2893
  result["DEBUG_LEVEL"] = "%d" % debug
2894

    
2895
  # OS variants
2896
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2897
    variant = objects.OS.GetVariant(os_name)
2898
    if not variant:
2899
      variant = inst_os.supported_variants[0]
2900
  else:
2901
    variant = ""
2902
  result["OS_VARIANT"] = variant
2903

    
2904
  # OS params
2905
  for pname, pvalue in os_params.items():
2906
    result["OSP_%s" % pname.upper()] = pvalue
2907

    
2908
  # Set a default path otherwise programs called by OS scripts (or
2909
  # even hooks called from OS scripts) might break, and we don't want
2910
  # to have each script require setting a PATH variable
2911
  result["PATH"] = constants.HOOKS_PATH
2912

    
2913
  return result
2914

    
2915

    
2916
def OSEnvironment(instance, inst_os, debug=0):
2917
  """Calculate the environment for an os script.
2918

2919
  @type instance: L{objects.Instance}
2920
  @param instance: target instance for the os script run
2921
  @type inst_os: L{objects.OS}
2922
  @param inst_os: operating system for which the environment is being built
2923
  @type debug: integer
2924
  @param debug: debug level (0 or 1, for OS Api 10)
2925
  @rtype: dict
2926
  @return: dict of environment variables
2927
  @raise errors.BlockDeviceError: if the block device
2928
      cannot be found
2929

2930
  """
2931
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2932

    
2933
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2934
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2935

    
2936
  result["HYPERVISOR"] = instance.hypervisor
2937
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2938
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2939
  result["INSTANCE_SECONDARY_NODES"] = \
2940
      ("%s" % " ".join(instance.secondary_nodes))
2941

    
2942
  # Disks
2943
  for idx, disk in enumerate(instance.disks):
2944
    real_disk = _OpenRealBD(disk)
2945
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2946
    result["DISK_%d_ACCESS" % idx] = disk.mode
2947
    result["DISK_%d_UUID" % idx] = disk.uuid
2948
    if disk.name:
2949
      result["DISK_%d_NAME" % idx] = disk.name
2950
    if constants.HV_DISK_TYPE in instance.hvparams:
2951
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2952
        instance.hvparams[constants.HV_DISK_TYPE]
2953
    if disk.dev_type in constants.DTS_BLOCK:
2954
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2955
    elif disk.dev_type in constants.DTS_FILEBASED:
2956
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2957
        "file:%s" % disk.logical_id[0]
2958

    
2959
  # NICs
2960
  for idx, nic in enumerate(instance.nics):
2961
    result["NIC_%d_MAC" % idx] = nic.mac
2962
    result["NIC_%d_UUID" % idx] = nic.uuid
2963
    if nic.name:
2964
      result["NIC_%d_NAME" % idx] = nic.name
2965
    if nic.ip:
2966
      result["NIC_%d_IP" % idx] = nic.ip
2967
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2968
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2969
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2970
    if nic.nicparams[constants.NIC_LINK]:
2971
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2972
    if nic.netinfo:
2973
      nobj = objects.Network.FromDict(nic.netinfo)
2974
      result.update(nobj.HooksDict("NIC_%d_" % idx))
2975
    if constants.HV_NIC_TYPE in instance.hvparams:
2976
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2977
        instance.hvparams[constants.HV_NIC_TYPE]
2978

    
2979
  # HV/BE params
2980
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2981
    for key, value in source.items():
2982
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2983

    
2984
  return result
2985

    
2986

    
2987
def DiagnoseExtStorage(top_dirs=None):
2988
  """Compute the validity for all ExtStorage Providers.
2989

2990
  @type top_dirs: list
2991
  @param top_dirs: the list of directories in which to
2992
      search (if not given defaults to
2993
      L{pathutils.ES_SEARCH_PATH})
2994
  @rtype: list of L{objects.ExtStorage}
2995
  @return: a list of tuples (name, path, status, diagnose, parameters)
2996
      for all (potential) ExtStorage Providers under all
2997
      search paths, where:
2998
          - name is the (potential) ExtStorage Provider
2999
          - path is the full path to the ExtStorage Provider
3000
          - status True/False is the validity of the ExtStorage Provider
3001
          - diagnose is the error message for an invalid ExtStorage Provider,
3002
            otherwise empty
3003
          - parameters is a list of (name, help) parameters, if any
3004

3005
  """
3006
  if top_dirs is None:
3007
    top_dirs = pathutils.ES_SEARCH_PATH
3008

    
3009
  result = []
3010
  for dir_name in top_dirs:
3011
    if os.path.isdir(dir_name):
3012
      try:
3013
        f_names = utils.ListVisibleFiles(dir_name)
3014
      except EnvironmentError, err:
3015
        logging.exception("Can't list the ExtStorage directory %s: %s",
3016
                          dir_name, err)
3017
        break
3018
      for name in f_names:
3019
        es_path = utils.PathJoin(dir_name, name)
3020
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3021
        if status:
3022
          diagnose = ""
3023
          parameters = es_inst.supported_parameters
3024
        else:
3025
          diagnose = es_inst
3026
          parameters = []
3027
        result.append((name, es_path, status, diagnose, parameters))
3028

    
3029
  return result
3030

    
3031

    
3032
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3033
  """Grow a stack of block devices.
3034

3035
  This function is called recursively, with the childrens being the
3036
  first ones to resize.
3037

3038
  @type disk: L{objects.Disk}
3039
  @param disk: the disk to be grown
3040
  @type amount: integer
3041
  @param amount: the amount (in mebibytes) to grow with
3042
  @type dryrun: boolean
3043
  @param dryrun: whether to execute the operation in simulation mode
3044
      only, without actually increasing the size
3045
  @param backingstore: whether to execute the operation on backing storage
3046
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3047
      whereas LVM, file, RBD are backing storage
3048
  @rtype: (status, result)
3049
  @type excl_stor: boolean
3050
  @param excl_stor: Whether exclusive_storage is active
3051
  @return: a tuple with the status of the operation (True/False), and
3052
      the errors message if status is False
3053

3054
  """
3055
  r_dev = _RecursiveFindBD(disk)
3056
  if r_dev is None:
3057
    _Fail("Cannot find block device %s", disk)
3058

    
3059
  try:
3060
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3061
  except errors.BlockDeviceError, err:
3062
    _Fail("Failed to grow block device: %s", err, exc=True)
3063

    
3064

    
3065
def BlockdevSnapshot(disk):
3066
  """Create a snapshot copy of a block device.
3067

3068
  This function is called recursively, and the snapshot is actually created
3069
  just for the leaf lvm backend device.
3070

3071
  @type disk: L{objects.Disk}
3072
  @param disk: the disk to be snapshotted
3073
  @rtype: string
3074
  @return: snapshot disk ID as (vg, lv)
3075

3076
  """
3077
  if disk.dev_type == constants.DT_DRBD8:
3078
    if not disk.children:
3079
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3080
            disk.unique_id)
3081
    return BlockdevSnapshot(disk.children[0])
3082
  elif disk.dev_type == constants.DT_PLAIN:
3083
    r_dev = _RecursiveFindBD(disk)
3084
    if r_dev is not None:
3085
      # FIXME: choose a saner value for the snapshot size
3086
      # let's stay on the safe side and ask for the full size, for now
3087
      return r_dev.Snapshot(disk.size)
3088
    else:
3089
      _Fail("Cannot find block device %s", disk)
3090
  else:
3091
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3092
          disk.unique_id, disk.dev_type)
3093

    
3094

    
3095
def BlockdevSetInfo(disk, info):
3096
  """Sets 'metadata' information on block devices.
3097

3098
  This function sets 'info' metadata on block devices. Initial
3099
  information is set at device creation; this function should be used
3100
  for example after renames.
3101

3102
  @type disk: L{objects.Disk}
3103
  @param disk: the disk to be grown
3104
  @type info: string
3105
  @param info: new 'info' metadata
3106
  @rtype: (status, result)
3107
  @return: a tuple with the status of the operation (True/False), and
3108
      the errors message if status is False
3109

3110
  """
3111
  r_dev = _RecursiveFindBD(disk)
3112
  if r_dev is None:
3113
    _Fail("Cannot find block device %s", disk)
3114

    
3115
  try:
3116
    r_dev.SetInfo(info)
3117
  except errors.BlockDeviceError, err:
3118
    _Fail("Failed to set information on block device: %s", err, exc=True)
3119

    
3120

    
3121
def FinalizeExport(instance, snap_disks):
3122
  """Write out the export configuration information.
3123

3124
  @type instance: L{objects.Instance}
3125
  @param instance: the instance which we export, used for
3126
      saving configuration
3127
  @type snap_disks: list of L{objects.Disk}
3128
  @param snap_disks: list of snapshot block devices, which
3129
      will be used to get the actual name of the dump file
3130

3131
  @rtype: None
3132

3133
  """
3134
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3135
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3136

    
3137
  config = objects.SerializableConfigParser()
3138

    
3139
  config.add_section(constants.INISECT_EXP)
3140
  config.set(constants.INISECT_EXP, "version", "0")
3141
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3142
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3143
  config.set(constants.INISECT_EXP, "os", instance.os)
3144
  config.set(constants.INISECT_EXP, "compression", "none")
3145

    
3146
  config.add_section(constants.INISECT_INS)
3147
  config.set(constants.INISECT_INS, "name", instance.name)
3148
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3149
             instance.beparams[constants.BE_MAXMEM])
3150
  config.set(constants.INISECT_INS, "minmem", "%d" %
3151
             instance.beparams[constants.BE_MINMEM])
3152
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3153
  config.set(constants.INISECT_INS, "memory", "%d" %
3154
             instance.beparams[constants.BE_MAXMEM])
3155
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3156
             instance.beparams[constants.BE_VCPUS])
3157
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3158
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3159
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3160

    
3161
  nic_total = 0
3162
  for nic_count, nic in enumerate(instance.nics):
3163
    nic_total += 1
3164
    config.set(constants.INISECT_INS, "nic%d_mac" %
3165
               nic_count, "%s" % nic.mac)
3166
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3167
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3168
               "%s" % nic.network)
3169
    for param in constants.NICS_PARAMETER_TYPES:
3170
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3171
                 "%s" % nic.nicparams.get(param, None))
3172
  # TODO: redundant: on load can read nics until it doesn't exist
3173
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3174

    
3175
  disk_total = 0
3176
  for disk_count, disk in enumerate(snap_disks):
3177
    if disk:
3178
      disk_total += 1
3179
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3180
                 ("%s" % disk.iv_name))
3181
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3182
                 ("%s" % disk.logical_id[1]))
3183
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3184
                 ("%d" % disk.size))
3185

    
3186
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3187

    
3188
  # New-style hypervisor/backend parameters
3189

    
3190
  config.add_section(constants.INISECT_HYP)
3191
  for name, value in instance.hvparams.items():
3192
    if name not in constants.HVC_GLOBALS:
3193
      config.set(constants.INISECT_HYP, name, str(value))
3194

    
3195
  config.add_section(constants.INISECT_BEP)
3196
  for name, value in instance.beparams.items():
3197
    config.set(constants.INISECT_BEP, name, str(value))
3198

    
3199
  config.add_section(constants.INISECT_OSP)
3200
  for name, value in instance.osparams.items():
3201
    config.set(constants.INISECT_OSP, name, str(value))
3202

    
3203
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3204
                  data=config.Dumps())
3205
  shutil.rmtree(finaldestdir, ignore_errors=True)
3206
  shutil.move(destdir, finaldestdir)
3207

    
3208

    
3209
def ExportInfo(dest):
3210
  """Get export configuration information.
3211

3212
  @type dest: str
3213
  @param dest: directory containing the export
3214

3215
  @rtype: L{objects.SerializableConfigParser}
3216
  @return: a serializable config file containing the
3217
      export info
3218

3219
  """
3220
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3221

    
3222
  config = objects.SerializableConfigParser()
3223
  config.read(cff)
3224

    
3225
  if (not config.has_section(constants.INISECT_EXP) or
3226
      not config.has_section(constants.INISECT_INS)):
3227
    _Fail("Export info file doesn't have the required fields")
3228

    
3229
  return config.Dumps()
3230

    
3231

    
3232
def ListExports():
3233
  """Return a list of exports currently available on this machine.
3234

3235
  @rtype: list
3236
  @return: list of the exports
3237

3238
  """
3239
  if os.path.isdir(pathutils.EXPORT_DIR):
3240
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3241
  else:
3242
    _Fail("No exports directory")
3243

    
3244

    
3245
def RemoveExport(export):
3246
  """Remove an existing export from the node.
3247

3248
  @type export: str
3249
  @param export: the name of the export to remove
3250
  @rtype: None
3251

3252
  """
3253
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3254

    
3255
  try:
3256
    shutil.rmtree(target)
3257
  except EnvironmentError, err:
3258
    _Fail("Error while removing the export: %s", err, exc=True)
3259

    
3260

    
3261
def BlockdevRename(devlist):
3262
  """Rename a list of block devices.
3263

3264
  @type devlist: list of tuples
3265
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3266
      an L{objects.Disk} object describing the current disk, and new
3267
      unique_id is the name we rename it to
3268
  @rtype: boolean
3269
  @return: True if all renames succeeded, False otherwise
3270

3271
  """
3272
  msgs = []
3273
  result = True
3274
  for disk, unique_id in devlist:
3275
    dev = _RecursiveFindBD(disk)
3276
    if dev is None:
3277
      msgs.append("Can't find device %s in rename" % str(disk))
3278
      result = False
3279
      continue
3280
    try:
3281
      old_rpath = dev.dev_path
3282
      dev.Rename(unique_id)
3283
      new_rpath = dev.dev_path
3284
      if old_rpath != new_rpath:
3285
        DevCacheManager.RemoveCache(old_rpath)
3286
        # FIXME: we should add the new cache information here, like:
3287
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3288
        # but we don't have the owner here - maybe parse from existing
3289
        # cache? for now, we only lose lvm data when we rename, which
3290
        # is less critical than DRBD or MD
3291
    except errors.BlockDeviceError, err:
3292
      msgs.append("Can't rename device '%s' to '%s': %s" %
3293
                  (dev, unique_id, err))
3294
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3295
      result = False
3296
  if not result:
3297
    _Fail("; ".join(msgs))
3298

    
3299

    
3300
def _TransformFileStorageDir(fs_dir):
3301
  """Checks whether given file_storage_dir is valid.
3302

3303
  Checks wheter the given fs_dir is within the cluster-wide default
3304
  file_storage_dir or the shared_file_storage_dir, which are stored in
3305
  SimpleStore. Only paths under those directories are allowed.
3306

3307
  @type fs_dir: str
3308
  @param fs_dir: the path to check
3309

3310
  @return: the normalized path if valid, None otherwise
3311

3312
  """
3313
  filestorage.CheckFileStoragePath(fs_dir)
3314

    
3315
  return os.path.normpath(fs_dir)
3316

    
3317

    
3318
def CreateFileStorageDir(file_storage_dir):
3319
  """Create file storage directory.
3320

3321
  @type file_storage_dir: str
3322
  @param file_storage_dir: directory to create
3323

3324
  @rtype: tuple
3325
  @return: tuple with first element a boolean indicating wheter dir
3326
      creation was successful or not
3327

3328
  """
3329
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3330
  if os.path.exists(file_storage_dir):
3331
    if not os.path.isdir(file_storage_dir):
3332
      _Fail("Specified storage dir '%s' is not a directory",
3333
            file_storage_dir)
3334
  else:
3335
    try:
3336
      os.makedirs(file_storage_dir, 0750)
3337
    except OSError, err:
3338
      _Fail("Cannot create file storage directory '%s': %s",
3339
            file_storage_dir, err, exc=True)
3340

    
3341

    
3342
def RemoveFileStorageDir(file_storage_dir):
3343
  """Remove file storage directory.
3344

3345
  Remove it only if it's empty. If not log an error and return.
3346

3347
  @type file_storage_dir: str
3348
  @param file_storage_dir: the directory we should cleanup
3349
  @rtype: tuple (success,)
3350
  @return: tuple of one element, C{success}, denoting
3351
      whether the operation was successful
3352

3353
  """
3354
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3355
  if os.path.exists(file_storage_dir):
3356
    if not os.path.isdir(file_storage_dir):
3357
      _Fail("Specified Storage directory '%s' is not a directory",
3358
            file_storage_dir)
3359
    # deletes dir only if empty, otherwise we want to fail the rpc call
3360
    try:
3361
      os.rmdir(file_storage_dir)
3362
    except OSError, err:
3363
      _Fail("Cannot remove file storage directory '%s': %s",
3364
            file_storage_dir, err)
3365

    
3366

    
3367
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3368
  """Rename the file storage directory.
3369

3370
  @type old_file_storage_dir: str
3371
  @param old_file_storage_dir: the current path
3372
  @type new_file_storage_dir: str
3373
  @param new_file_storage_dir: the name we should rename to
3374
  @rtype: tuple (success,)
3375
  @return: tuple of one element, C{success}, denoting
3376
      whether the operation was successful
3377

3378
  """
3379
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3380
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3381
  if not os.path.exists(new_file_storage_dir):
3382
    if os.path.isdir(old_file_storage_dir):
3383
      try:
3384
        os.rename(old_file_storage_dir, new_file_storage_dir)
3385
      except OSError, err:
3386
        _Fail("Cannot rename '%s' to '%s': %s",
3387
              old_file_storage_dir, new_file_storage_dir, err)
3388
    else:
3389
      _Fail("Specified storage dir '%s' is not a directory",
3390
            old_file_storage_dir)
3391
  else:
3392
    if os.path.exists(old_file_storage_dir):
3393
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3394
            old_file_storage_dir, new_file_storage_dir)
3395

    
3396

    
3397
def _EnsureJobQueueFile(file_name):
3398
  """Checks whether the given filename is in the queue directory.
3399

3400
  @type file_name: str
3401
  @param file_name: the file name we should check
3402
  @rtype: None
3403
  @raises RPCFail: if the file is not valid
3404

3405
  """
3406
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3407
    _Fail("Passed job queue file '%s' does not belong to"
3408
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3409

    
3410

    
3411
def JobQueueUpdate(file_name, content):
3412
  """Updates a file in the queue directory.
3413

3414
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3415
  checking.
3416

3417
  @type file_name: str
3418
  @param file_name: the job file name
3419
  @type content: str
3420
  @param content: the new job contents
3421
  @rtype: boolean
3422
  @return: the success of the operation
3423

3424
  """
3425
  file_name = vcluster.LocalizeVirtualPath(file_name)
3426

    
3427
  _EnsureJobQueueFile(file_name)
3428
  getents = runtime.GetEnts()
3429

    
3430
  # Write and replace the file atomically
3431
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3432
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3433

    
3434

    
3435
def JobQueueRename(old, new):
3436
  """Renames a job queue file.
3437

3438
  This is just a wrapper over os.rename with proper checking.
3439

3440
  @type old: str
3441
  @param old: the old (actual) file name
3442
  @type new: str
3443
  @param new: the desired file name
3444
  @rtype: tuple
3445
  @return: the success of the operation and payload
3446

3447
  """
3448
  old = vcluster.LocalizeVirtualPath(old)
3449
  new = vcluster.LocalizeVirtualPath(new)
3450

    
3451
  _EnsureJobQueueFile(old)
3452
  _EnsureJobQueueFile(new)
3453

    
3454
  getents = runtime.GetEnts()
3455

    
3456
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3457
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3458

    
3459

    
3460
def BlockdevClose(instance_name, disks):
3461
  """Closes the given block devices.
3462

3463
  This means they will be switched to secondary mode (in case of
3464
  DRBD).
3465

3466
  @param instance_name: if the argument is not empty, the symlinks
3467
      of this instance will be removed
3468
  @type disks: list of L{objects.Disk}
3469
  @param disks: the list of disks to be closed
3470
  @rtype: tuple (success, message)
3471
  @return: a tuple of success and message, where success
3472
      indicates the succes of the operation, and message
3473
      which will contain the error details in case we
3474
      failed
3475

3476
  """
3477
  bdevs = []
3478
  for cf in disks:
3479
    rd = _RecursiveFindBD(cf)
3480
    if rd is None:
3481
      _Fail("Can't find device %s", cf)
3482
    bdevs.append(rd)
3483

    
3484
  msg = []
3485
  for rd in bdevs:
3486
    try:
3487
      rd.Close()
3488
    except errors.BlockDeviceError, err:
3489
      msg.append(str(err))
3490
  if msg:
3491
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3492
  else:
3493
    if instance_name:
3494
      _RemoveBlockDevLinks(instance_name, disks)
3495

    
3496

    
3497
def ValidateHVParams(hvname, hvparams):
3498
  """Validates the given hypervisor parameters.
3499

3500
  @type hvname: string
3501
  @param hvname: the hypervisor name
3502
  @type hvparams: dict
3503
  @param hvparams: the hypervisor parameters to be validated
3504
  @rtype: None
3505

3506
  """
3507
  try:
3508
    hv_type = hypervisor.GetHypervisor(hvname)
3509
    hv_type.ValidateParameters(hvparams)
3510
  except errors.HypervisorError, err:
3511
    _Fail(str(err), log=False)
3512

    
3513

    
3514
def _CheckOSPList(os_obj, parameters):
3515
  """Check whether a list of parameters is supported by the OS.
3516

3517
  @type os_obj: L{objects.OS}
3518
  @param os_obj: OS object to check
3519
  @type parameters: list
3520
  @param parameters: the list of parameters to check
3521

3522
  """
3523
  supported = [v[0] for v in os_obj.supported_parameters]
3524
  delta = frozenset(parameters).difference(supported)
3525
  if delta:
3526
    _Fail("The following parameters are not supported"
3527
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3528

    
3529

    
3530
def ValidateOS(required, osname, checks, osparams):
3531
  """Validate the given OS' parameters.
3532

3533
  @type required: boolean
3534
  @param required: whether absence of the OS should translate into
3535
      failure or not
3536
  @type osname: string
3537
  @param osname: the OS to be validated
3538
  @type checks: list
3539
  @param checks: list of the checks to run (currently only 'parameters')
3540
  @type osparams: dict
3541
  @param osparams: dictionary with OS parameters
3542
  @rtype: boolean
3543
  @return: True if the validation passed, or False if the OS was not
3544
      found and L{required} was false
3545

3546
  """
3547
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3548
    _Fail("Unknown checks required for OS %s: %s", osname,
3549
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3550

    
3551
  name_only = objects.OS.GetName(osname)
3552
  status, tbv = _TryOSFromDisk(name_only, None)
3553

    
3554
  if not status:
3555
    if required:
3556
      _Fail(tbv)
3557
    else:
3558
      return False
3559

    
3560
  if max(tbv.api_versions) < constants.OS_API_V20:
3561
    return True
3562

    
3563
  if constants.OS_VALIDATE_PARAMETERS in checks:
3564
    _CheckOSPList(tbv, osparams.keys())
3565

    
3566
  validate_env = OSCoreEnv(osname, tbv, osparams)
3567
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3568
                        cwd=tbv.path, reset_env=True)
3569
  if result.failed:
3570
    logging.error("os validate command '%s' returned error: %s output: %s",
3571
                  result.cmd, result.fail_reason, result.output)
3572
    _Fail("OS validation script failed (%s), output: %s",
3573
          result.fail_reason, result.output, log=False)
3574

    
3575
  return True
3576

    
3577

    
3578
def DemoteFromMC():
3579
  """Demotes the current node from master candidate role.
3580

3581
  """
3582
  # try to ensure we're not the master by mistake
3583
  master, myself = ssconf.GetMasterAndMyself()
3584
  if master == myself:
3585
    _Fail("ssconf status shows I'm the master node, will not demote")
3586

    
3587
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3588
  if not result.failed:
3589
    _Fail("The master daemon is running, will not demote")
3590

    
3591
  try:
3592
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3593
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3594
  except EnvironmentError, err:
3595
    if err.errno != errno.ENOENT:
3596
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3597

    
3598
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3599

    
3600

    
3601
def _GetX509Filenames(cryptodir, name):
3602
  """Returns the full paths for the private key and certificate.
3603

3604
  """
3605
  return (utils.PathJoin(cryptodir, name),
3606
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3607
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3608

    
3609

    
3610
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3611
  """Creates a new X509 certificate for SSL/TLS.
3612

3613
  @type validity: int
3614
  @param validity: Validity in seconds
3615
  @rtype: tuple; (string, string)
3616
  @return: Certificate name and public part
3617

3618
  """
3619
  (key_pem, cert_pem) = \
3620
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3621
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3622

    
3623
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3624
                              prefix="x509-%s-" % utils.TimestampForFilename())
3625
  try:
3626
    name = os.path.basename(cert_dir)
3627
    assert len(name) > 5
3628

    
3629
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3630

    
3631
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3632
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3633

    
3634
    # Never return private key as it shouldn't leave the node
3635
    return (name, cert_pem)
3636
  except Exception:
3637
    shutil.rmtree(cert_dir, ignore_errors=True)
3638
    raise
3639

    
3640

    
3641
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3642
  """Removes a X509 certificate.
3643

3644
  @type name: string
3645
  @param name: Certificate name
3646

3647
  """
3648
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3649

    
3650
  utils.RemoveFile(key_file)
3651
  utils.RemoveFile(cert_file)
3652

    
3653
  try:
3654
    os.rmdir(cert_dir)
3655
  except EnvironmentError, err:
3656
    _Fail("Cannot remove certificate directory '%s': %s",
3657
          cert_dir, err)
3658

    
3659

    
3660
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3661
  """Returns the command for the requested input/output.
3662

3663
  @type instance: L{objects.Instance}
3664
  @param instance: The instance object
3665
  @param mode: Import/export mode
3666
  @param ieio: Input/output type
3667
  @param ieargs: Input/output arguments
3668

3669
  """
3670
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3671

    
3672
  env = None
3673
  prefix = None
3674
  suffix = None
3675
  exp_size = None
3676

    
3677
  if ieio == constants.IEIO_FILE:
3678
    (filename, ) = ieargs
3679

    
3680
    if not utils.IsNormAbsPath(filename):
3681
      _Fail("Path '%s' is not normalized or absolute", filename)
3682

    
3683
    real_filename = os.path.realpath(filename)
3684
    directory = os.path.dirname(real_filename)
3685

    
3686
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3687
      _Fail("File '%s' is not under exports directory '%s': %s",
3688
            filename, pathutils.EXPORT_DIR, real_filename)
3689

    
3690
    # Create directory
3691
    utils.Makedirs(directory, mode=0750)
3692

    
3693
    quoted_filename = utils.ShellQuote(filename)
3694

    
3695
    if mode == constants.IEM_IMPORT:
3696
      suffix = "> %s" % quoted_filename
3697
    elif mode == constants.IEM_EXPORT:
3698
      suffix = "< %s" % quoted_filename
3699

    
3700
      # Retrieve file size
3701
      try:
3702
        st = os.stat(filename)
3703
      except EnvironmentError, err:
3704
        logging.error("Can't stat(2) %s: %s", filename, err)
3705
      else:
3706
        exp_size = utils.BytesToMebibyte(st.st_size)
3707

    
3708
  elif ieio == constants.IEIO_RAW_DISK:
3709
    (disk, ) = ieargs
3710

    
3711
    real_disk = _OpenRealBD(disk)
3712

    
3713
    if mode == constants.IEM_IMPORT:
3714
      # we use nocreat to fail if the device is not already there or we pass a
3715
      # wrong path; we use notrunc to no attempt truncate on an LV device
3716
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3717
                                   real_disk.dev_path,
3718
                                   str(1024 * 1024)) # 1 MB
3719

    
3720
    elif mode == constants.IEM_EXPORT:
3721
      # the block size on the read dd is 1MiB to match our units
3722
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3723
                                   real_disk.dev_path,
3724
                                   str(1024 * 1024), # 1 MB
3725
                                   str(disk.size))
3726
      exp_size = disk.size
3727

    
3728
  elif ieio == constants.IEIO_SCRIPT:
3729
    (disk, disk_index, ) = ieargs
3730

    
3731
    assert isinstance(disk_index, (int, long))
3732

    
3733
    inst_os = OSFromDisk(instance.os)
3734
    env = OSEnvironment(instance, inst_os)
3735

    
3736
    if mode == constants.IEM_IMPORT:
3737
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3738
      env["IMPORT_INDEX"] = str(disk_index)
3739
      script = inst_os.import_script
3740

    
3741
    elif mode == constants.IEM_EXPORT:
3742
      real_disk = _OpenRealBD(disk)
3743
      env["EXPORT_DEVICE"] = real_disk.dev_path
3744
      env["EXPORT_INDEX"] = str(disk_index)
3745
      script = inst_os.export_script
3746

    
3747
    # TODO: Pass special environment only to script
3748
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3749

    
3750
    if mode == constants.IEM_IMPORT:
3751
      suffix = "| %s" % script_cmd
3752

    
3753
    elif mode == constants.IEM_EXPORT:
3754
      prefix = "%s |" % script_cmd
3755

    
3756
    # Let script predict size
3757
    exp_size = constants.IE_CUSTOM_SIZE
3758

    
3759
  else:
3760
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3761

    
3762
  return (env, prefix, suffix, exp_size)
3763

    
3764

    
3765
def _CreateImportExportStatusDir(prefix):
3766
  """Creates status directory for import/export.
3767

3768
  """
3769
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3770
                          prefix=("%s-%s-" %
3771
                                  (prefix, utils.TimestampForFilename())))
3772

    
3773

    
3774
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3775
                            ieio, ieioargs):
3776
  """Starts an import or export daemon.
3777

3778
  @param mode: Import/output mode
3779
  @type opts: L{objects.ImportExportOptions}
3780
  @param opts: Daemon options
3781
  @type host: string
3782
  @param host: Remote host for export (None for import)
3783
  @type port: int
3784
  @param port: Remote port for export (None for import)
3785
  @type instance: L{objects.Instance}
3786
  @param instance: Instance object
3787
  @type component: string
3788
  @param component: which part of the instance is transferred now,
3789
      e.g. 'disk/0'
3790
  @param ieio: Input/output type
3791
  @param ieioargs: Input/output arguments
3792

3793
  """
3794
  if mode == constants.IEM_IMPORT:
3795
    prefix = "import"
3796

    
3797
    if not (host is None and port is None):
3798
      _Fail("Can not specify host or port on import")
3799

    
3800
  elif mode == constants.IEM_EXPORT:
3801
    prefix = "export"
3802

    
3803
    if host is None or port is None:
3804
      _Fail("Host and port must be specified for an export")
3805

    
3806
  else:
3807
    _Fail("Invalid mode %r", mode)
3808

    
3809
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3810
    _Fail("Cluster certificate can only be used for both key and CA")
3811

    
3812
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3813
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3814

    
3815
  if opts.key_name is None:
3816
    # Use server.pem
3817
    key_path = pathutils.NODED_CERT_FILE
3818
    cert_path = pathutils.NODED_CERT_FILE
3819
    assert opts.ca_pem is None
3820
  else:
3821
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3822
                                                 opts.key_name)
3823
    assert opts.ca_pem is not None
3824

    
3825
  for i in [key_path, cert_path]:
3826
    if not os.path.exists(i):
3827
      _Fail("File '%s' does not exist" % i)
3828

    
3829
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3830
  try:
3831
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3832
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3833
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3834

    
3835
    if opts.ca_pem is None:
3836
      # Use server.pem
3837
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3838
    else:
3839
      ca = opts.ca_pem
3840

    
3841
    # Write CA file
3842
    utils.WriteFile(ca_file, data=ca, mode=0400)
3843

    
3844
    cmd = [
3845
      pathutils.IMPORT_EXPORT_DAEMON,
3846
      status_file, mode,
3847
      "--key=%s" % key_path,
3848
      "--cert=%s" % cert_path,
3849
      "--ca=%s" % ca_file,
3850
      ]
3851

    
3852
    if host:
3853
      cmd.append("--host=%s" % host)
3854

    
3855
    if port:
3856
      cmd.append("--port=%s" % port)
3857

    
3858
    if opts.ipv6:
3859
      cmd.append("--ipv6")
3860
    else:
3861
      cmd.append("--ipv4")
3862

    
3863
    if opts.compress:
3864
      cmd.append("--compress=%s" % opts.compress)
3865

    
3866
    if opts.magic:
3867
      cmd.append("--magic=%s" % opts.magic)
3868

    
3869
    if exp_size is not None:
3870
      cmd.append("--expected-size=%s" % exp_size)
3871

    
3872
    if cmd_prefix:
3873
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3874

    
3875
    if cmd_suffix:
3876
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3877

    
3878
    if mode == constants.IEM_EXPORT:
3879
      # Retry connection a few times when connecting to remote peer
3880
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3881
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3882
    elif opts.connect_timeout is not None:
3883
      assert mode == constants.IEM_IMPORT
3884
      # Overall timeout for establishing connection while listening
3885
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3886

    
3887
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3888

    
3889
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3890
    # support for receiving a file descriptor for output
3891
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3892
                      output=logfile)
3893

    
3894
    # The import/export name is simply the status directory name
3895
    return os.path.basename(status_dir)
3896

    
3897
  except Exception:
3898
    shutil.rmtree(status_dir, ignore_errors=True)
3899
    raise
3900

    
3901

    
3902
def GetImportExportStatus(names):
3903
  """Returns import/export daemon status.
3904

3905
  @type names: sequence
3906
  @param names: List of names
3907
  @rtype: List of dicts
3908
  @return: Returns a list of the state of each named import/export or None if a
3909
           status couldn't be read
3910

3911
  """
3912
  result = []
3913

    
3914
  for name in names:
3915
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3916
                                 _IES_STATUS_FILE)
3917

    
3918
    try:
3919
      data = utils.ReadFile(status_file)
3920
    except EnvironmentError, err:
3921
      if err.errno != errno.ENOENT:
3922
        raise
3923
      data = None
3924

    
3925
    if not data:
3926
      result.append(None)
3927
      continue
3928

    
3929
    result.append(serializer.LoadJson(data))
3930

    
3931
  return result
3932

    
3933

    
3934
def AbortImportExport(name):
3935
  """Sends SIGTERM to a running import/export daemon.
3936

3937
  """
3938
  logging.info("Abort import/export %s", name)
3939

    
3940
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3941
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3942

    
3943
  if pid:
3944
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3945
                 name, pid)
3946
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3947

    
3948

    
3949
def CleanupImportExport(name):
3950
  """Cleanup after an import or export.
3951

3952
  If the import/export daemon is still running it's killed. Afterwards the
3953
  whole status directory is removed.
3954

3955
  """
3956
  logging.info("Finalizing import/export %s", name)
3957

    
3958
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3959

    
3960
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3961

    
3962
  if pid:
3963
    logging.info("Import/export %s is still running with PID %s",
3964
                 name, pid)
3965
    utils.KillProcess(pid, waitpid=False)
3966

    
3967
  shutil.rmtree(status_dir, ignore_errors=True)
3968

    
3969

    
3970
def _FindDisks(disks):
3971
  """Finds attached L{BlockDev}s for the given disks.
3972

3973
  @type disks: list of L{objects.Disk}
3974
  @param disks: the disk objects we need to find
3975

3976
  @return: list of L{BlockDev} objects or C{None} if a given disk
3977
           was not found or was no attached.
3978

3979
  """
3980
  bdevs = []
3981

    
3982
  for disk in disks:
3983
    rd = _RecursiveFindBD(disk)
3984
    if rd is None:
3985
      _Fail("Can't find device %s", disk)
3986
    bdevs.append(rd)
3987
  return bdevs
3988

    
3989

    
3990
def DrbdDisconnectNet(disks):
3991
  """Disconnects the network on a list of drbd devices.
3992

3993
  """
3994
  bdevs = _FindDisks(disks)
3995

    
3996
  # disconnect disks
3997
  for rd in bdevs:
3998
    try:
3999
      rd.DisconnectNet()
4000
    except errors.BlockDeviceError, err:
4001
      _Fail("Can't change network configuration to standalone mode: %s",
4002
            err, exc=True)
4003

    
4004

    
4005
def DrbdAttachNet(disks, instance_name, multimaster):
4006
  """Attaches the network on a list of drbd devices.
4007

4008
  """
4009
  bdevs = _FindDisks(disks)
4010

    
4011
  if multimaster:
4012
    for idx, rd in enumerate(bdevs):
4013
      try:
4014
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
4015
      except EnvironmentError, err:
4016
        _Fail("Can't create symlink: %s", err)
4017
  # reconnect disks, switch to new master configuration and if
4018
  # needed primary mode
4019
  for rd in bdevs:
4020
    try:
4021
      rd.AttachNet(multimaster)
4022
    except errors.BlockDeviceError, err:
4023
      _Fail("Can't change network configuration: %s", err)
4024

    
4025
  # wait until the disks are connected; we need to retry the re-attach
4026
  # if the device becomes standalone, as this might happen if the one
4027
  # node disconnects and reconnects in a different mode before the
4028
  # other node reconnects; in this case, one or both of the nodes will
4029
  # decide it has wrong configuration and switch to standalone
4030

    
4031
  def _Attach():
4032
    all_connected = True
4033

    
4034
    for rd in bdevs:
4035
      stats = rd.GetProcStatus()
4036

    
4037
      if multimaster:
4038
        # In the multimaster case we have to wait explicitly until
4039
        # the resource is Connected and UpToDate/UpToDate, because
4040
        # we promote *both nodes* to primary directly afterwards.
4041
        # Being in resync is not enough, since there is a race during which we
4042
        # may promote a node with an Outdated disk to primary, effectively
4043
        # tearing down the connection.
4044
        all_connected = (all_connected and
4045
                         stats.is_connected and
4046
                         stats.is_disk_uptodate and
4047
                         stats.peer_disk_uptodate)
4048
      else:
4049
        all_connected = (all_connected and
4050
                         (stats.is_connected or stats.is_in_resync))
4051

    
4052
      if stats.is_standalone:
4053
        # peer had different config info and this node became
4054
        # standalone, even though this should not happen with the
4055
        # new staged way of changing disk configs
4056
        try:
4057
          rd.AttachNet(multimaster)
4058
        except errors.BlockDeviceError, err:
4059
          _Fail("Can't change network configuration: %s", err)
4060

    
4061
    if not all_connected:
4062
      raise utils.RetryAgain()
4063

    
4064
  try:
4065
    # Start with a delay of 100 miliseconds and go up to 5 seconds
4066
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
4067
  except utils.RetryTimeout:
4068
    _Fail("Timeout in disk reconnecting")
4069

    
4070
  if multimaster:
4071
    # change to primary mode
4072
    for rd in bdevs:
4073
      try:
4074
        rd.Open()
4075
      except errors.BlockDeviceError, err:
4076
        _Fail("Can't change to primary mode: %s", err)
4077

    
4078

    
4079
def DrbdWaitSync(disks):
4080
  """Wait until DRBDs have synchronized.
4081

4082
  """
4083
  def _helper(rd):
4084
    stats = rd.GetProcStatus()
4085
    if not (stats.is_connected or stats.is_in_resync):
4086
      raise utils.RetryAgain()
4087
    return stats
4088

    
4089
  bdevs = _FindDisks(disks)
4090

    
4091
  min_resync = 100
4092
  alldone = True
4093
  for rd in bdevs:
4094
    try:
4095
      # poll each second for 15 seconds
4096
      stats = utils.Retry(_helper, 1, 15, args=[rd])
4097
    except utils.RetryTimeout:
4098
      stats = rd.GetProcStatus()
4099
      # last check
4100
      if not (stats.is_connected or stats.is_in_resync):
4101
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
4102
    alldone = alldone and (not stats.is_in_resync)
4103
    if stats.sync_percent is not None:
4104
      min_resync = min(min_resync, stats.sync_percent)
4105

    
4106
  return (alldone, min_resync)
4107

    
4108

    
4109
def DrbdNeedsActivation(disks):
4110
  """Checks which of the passed disks needs activation and returns their UUIDs.
4111

4112
  """
4113
  faulty_disks = []
4114

    
4115
  for disk in disks:
4116
    rd = _RecursiveFindBD(disk)
4117
    if rd is None:
4118
      faulty_disks.append(disk)
4119
      continue
4120

    
4121
    stats = rd.GetProcStatus()
4122
    if stats.is_standalone or stats.is_diskless:
4123
      faulty_disks.append(disk)
4124

    
4125
  return [disk.uuid for disk in faulty_disks]
4126

    
4127

    
4128
def GetDrbdUsermodeHelper():
4129
  """Returns DRBD usermode helper currently configured.
4130

4131
  """
4132
  try:
4133
    return drbd.DRBD8.GetUsermodeHelper()
4134
  except errors.BlockDeviceError, err:
4135
    _Fail(str(err))
4136

    
4137

    
4138
def PowercycleNode(hypervisor_type, hvparams=None):
4139
  """Hard-powercycle the node.
4140

4141
  Because we need to return first, and schedule the powercycle in the
4142
  background, we won't be able to report failures nicely.
4143

4144
  """
4145
  hyper = hypervisor.GetHypervisor(hypervisor_type)
4146
  try:
4147
    pid = os.fork()
4148
  except OSError:
4149
    # if we can't fork, we'll pretend that we're in the child process
4150
    pid = 0
4151
  if pid > 0:
4152
    return "Reboot scheduled in 5 seconds"
4153
  # ensure the child is running on ram
4154
  try:
4155
    utils.Mlockall()
4156
  except Exception: # pylint: disable=W0703
4157
    pass
4158
  time.sleep(5)
4159
  hyper.PowercycleNode(hvparams=hvparams)
4160

    
4161

    
4162
def _VerifyRestrictedCmdName(cmd):
4163
  """Verifies a restricted command name.
4164

4165
  @type cmd: string
4166
  @param cmd: Command name
4167
  @rtype: tuple; (boolean, string or None)
4168
  @return: The tuple's first element is the status; if C{False}, the second
4169
    element is an error message string, otherwise it's C{None}
4170

4171
  """
4172
  if not cmd.strip():
4173
    return (False, "Missing command name")
4174

    
4175
  if os.path.basename(cmd) != cmd:
4176
    return (False, "Invalid command name")
4177

    
4178
  if not constants.EXT_PLUGIN_MASK.match(cmd):
4179
    return (False, "Command name contains forbidden characters")
4180

    
4181
  return (True, None)
4182

    
4183

    
4184
def _CommonRestrictedCmdCheck(path, owner):
4185
  """Common checks for restricted command file system directories and files.
4186

4187
  @type path: string
4188
  @param path: Path to check
4189
  @param owner: C{None} or tuple containing UID and GID
4190
  @rtype: tuple; (boolean, string or C{os.stat} result)
4191
  @return: The tuple's first element is the status; if C{False}, the second
4192
    element is an error message string, otherwise it's the result of C{os.stat}
4193

4194
  """
4195
  if owner is None:
4196
    # Default to root as owner
4197
    owner = (0, 0)
4198

    
4199
  try:
4200
    st = os.stat(path)
4201
  except EnvironmentError, err:
4202
    return (False, "Can't stat(2) '%s': %s" % (path, err))
4203

    
4204
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
4205
    return (False, "Permissions on '%s' are too permissive" % path)
4206

    
4207
  if (st.st_uid, st.st_gid) != owner:
4208
    (owner_uid, owner_gid) = owner
4209
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
4210

    
4211
  return (True, st)
4212

    
4213

    
4214
def _VerifyRestrictedCmdDirectory(path, _owner=None):
4215
  """Verifies restricted command directory.
4216

4217
  @type path: string
4218
  @param path: Path to check
4219
  @rtype: tuple; (boolean, string or None)
4220
  @return: The tuple's first element is the status; if C{False}, the second
4221
    element is an error message string, otherwise it's C{None}
4222

4223
  """
4224
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4225

    
4226
  if not status:
4227
    return (False, value)
4228

    
4229
  if not stat.S_ISDIR(value.st_mode):
4230
    return (False, "Path '%s' is not a directory" % path)
4231

    
4232
  return (True, None)
4233

    
4234

    
4235
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4236
  """Verifies a whole restricted command and returns its executable filename.
4237

4238
  @type path: string
4239
  @param path: Directory containing restricted commands
4240
  @type cmd: string
4241
  @param cmd: Command name
4242
  @rtype: tuple; (boolean, string)
4243
  @return: The tuple's first element is the status; if C{False}, the second
4244
    element is an error message string, otherwise the second element is the
4245
    absolute path to the executable
4246

4247
  """
4248
  executable = utils.PathJoin(path, cmd)
4249

    
4250
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4251

    
4252
  if not status:
4253
    return (False, msg)
4254

    
4255
  if not utils.IsExecutable(executable):
4256
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4257

    
4258
  return (True, executable)
4259

    
4260

    
4261
def _PrepareRestrictedCmd(path, cmd,
4262
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4263
                          _verify_name=_VerifyRestrictedCmdName,
4264
                          _verify_cmd=_VerifyRestrictedCmd):
4265
  """Performs a number of tests on a restricted command.
4266

4267
  @type path: string
4268
  @param path: Directory containing restricted commands
4269
  @type cmd: string
4270
  @param cmd: Command name
4271
  @return: Same as L{_VerifyRestrictedCmd}
4272

4273
  """
4274
  # Verify the directory first
4275
  (status, msg) = _verify_dir(path)
4276
  if status:
4277
    # Check command if everything was alright
4278
    (status, msg) = _verify_name(cmd)
4279

    
4280
  if not status:
4281
    return (False, msg)
4282

    
4283
  # Check actual executable
4284
  return _verify_cmd(path, cmd)
4285

    
4286

    
4287
def RunRestrictedCmd(cmd,
4288
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4289
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4290
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4291
                     _sleep_fn=time.sleep,
4292
                     _prepare_fn=_PrepareRestrictedCmd,
4293
                     _runcmd_fn=utils.RunCmd,
4294
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4295
  """Executes a restricted command after performing strict tests.
4296

4297
  @type cmd: string
4298
  @param cmd: Command name
4299
  @rtype: string
4300
  @return: Command output
4301
  @raise RPCFail: In case of an error
4302

4303
  """
4304
  logging.info("Preparing to run restricted command '%s'", cmd)
4305

    
4306
  if not _enabled:
4307
    _Fail("Restricted commands disabled at configure time")
4308

    
4309
  lock = None
4310
  try:
4311
    cmdresult = None
4312
    try:
4313
      lock = utils.FileLock.Open(_lock_file)
4314
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4315

    
4316
      (status, value) = _prepare_fn(_path, cmd)
4317

    
4318
      if status:
4319
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4320
                               postfork_fn=lambda _: lock.Unlock())
4321
      else:
4322
        logging.error(value)
4323
    except Exception: # pylint: disable=W0703
4324
      # Keep original error in log
4325
      logging.exception("Caught exception")
4326

    
4327
    if cmdresult is None:
4328
      logging.info("Sleeping for %0.1f seconds before returning",
4329
                   _RCMD_INVALID_DELAY)
4330
      _sleep_fn(_RCMD_INVALID_DELAY)
4331

    
4332
      # Do not include original error message in returned error
4333
      _Fail("Executing command '%s' failed" % cmd)
4334
    elif cmdresult.failed or cmdresult.fail_reason:
4335
      _Fail("Restricted command '%s' failed: %s; output: %s",
4336
            cmd, cmdresult.fail_reason, cmdresult.output)
4337
    else:
4338
      return cmdresult.output
4339
  finally:
4340
    if lock is not None:
4341
      # Release lock at last
4342
      lock.Close()
4343
      lock = None
4344

    
4345

    
4346
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4347
  """Creates or removes the watcher pause file.
4348

4349
  @type until: None or number
4350
  @param until: Unix timestamp saying until when the watcher shouldn't run
4351

4352
  """
4353
  if until is None:
4354
    logging.info("Received request to no longer pause watcher")
4355
    utils.RemoveFile(_filename)
4356
  else:
4357
    logging.info("Received request to pause watcher until %s", until)
4358

    
4359
    if not ht.TNumber(until):
4360
      _Fail("Duration must be numeric")
4361

    
4362
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4363

    
4364

    
4365
def ConfigureOVS(ovs_name, ovs_link):
4366
  """Creates a OpenvSwitch on the node.
4367

4368
  This function sets up a OpenvSwitch on the node with given name nad
4369
  connects it via a given eth device.
4370

4371
  @type ovs_name: string
4372
  @param ovs_name: Name of the OpenvSwitch to create.
4373
  @type ovs_link: None or string
4374
  @param ovs_link: Ethernet device for outside connection (can be missing)
4375

4376
  """
4377
  # Initialize the OpenvSwitch
4378
  result = utils.RunCmd(["ovs-vsctl", "add-br", ovs_name])
4379
  if result.failed:
4380
    _Fail("Failed to create openvswitch. Script return value: %s, output: '%s'"
4381
          % (result.exit_code, result.output), log=True)
4382

    
4383
  # And connect it to a physical interface, if given
4384
  if ovs_link:
4385
    result = utils.RunCmd(["ovs-vsctl", "add-port", ovs_name, ovs_link])
4386
    if result.failed:
4387
      _Fail("Failed to connect openvswitch to  interface %s. Script return"
4388
            " value: %s, output: '%s'" % (ovs_link, result.exit_code,
4389
            result.output), log=True)
4390

    
4391

    
4392
class HooksRunner(object):
4393
  """Hook runner.
4394

4395
  This class is instantiated on the node side (ganeti-noded) and not
4396
  on the master side.
4397

4398
  """
4399
  def __init__(self, hooks_base_dir=None):
4400
    """Constructor for hooks runner.
4401

4402
    @type hooks_base_dir: str or None
4403
    @param hooks_base_dir: if not None, this overrides the
4404
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4405

4406
    """
4407
    if hooks_base_dir is None:
4408
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4409
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4410
    # constant
4411
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4412

    
4413
  def RunLocalHooks(self, node_list, hpath, phase, env):
4414
    """Check that the hooks will be run only locally and then run them.
4415

4416
    """
4417
    assert len(node_list) == 1
4418
    node = node_list[0]
4419
    _, myself = ssconf.GetMasterAndMyself()
4420
    assert node == myself
4421

    
4422
    results = self.RunHooks(hpath, phase, env)
4423

    
4424
    # Return values in the form expected by HooksMaster
4425
    return {node: (None, False, results)}
4426

    
4427
  def RunHooks(self, hpath, phase, env):
4428
    """Run the scripts in the hooks directory.
4429

4430
    @type hpath: str
4431
    @param hpath: the path to the hooks directory which
4432
        holds the scripts
4433
    @type phase: str
4434
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4435
        L{constants.HOOKS_PHASE_POST}
4436
    @type env: dict
4437
    @param env: dictionary with the environment for the hook
4438
    @rtype: list
4439
    @return: list of 3-element tuples:
4440
      - script path
4441
      - script result, either L{constants.HKR_SUCCESS} or
4442
        L{constants.HKR_FAIL}
4443
      - output of the script
4444

4445
    @raise errors.ProgrammerError: for invalid input
4446
        parameters
4447

4448
    """
4449
    if phase == constants.HOOKS_PHASE_PRE:
4450
      suffix = "pre"
4451
    elif phase == constants.HOOKS_PHASE_POST:
4452
      suffix = "post"
4453
    else:
4454
      _Fail("Unknown hooks phase '%s'", phase)
4455

    
4456
    subdir = "%s-%s.d" % (hpath, suffix)
4457
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4458

    
4459
    results = []
4460

    
4461
    if not os.path.isdir(dir_name):
4462
      # for non-existing/non-dirs, we simply exit instead of logging a
4463
      # warning at every operation
4464
      return results
4465

    
4466
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4467

    
4468
    for (relname, relstatus, runresult) in runparts_results:
4469
      if relstatus == constants.RUNPARTS_SKIP:
4470
        rrval = constants.HKR_SKIP
4471
        output = ""
4472
      elif relstatus == constants.RUNPARTS_ERR:
4473
        rrval = constants.HKR_FAIL
4474
        output = "Hook script execution error: %s" % runresult
4475
      elif relstatus == constants.RUNPARTS_RUN:
4476
        if runresult.failed:
4477
          rrval = constants.HKR_FAIL
4478
        else:
4479
          rrval = constants.HKR_SUCCESS
4480
        output = utils.SafeEncode(runresult.output.strip())
4481
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4482

    
4483
    return results
4484

    
4485

    
4486
class IAllocatorRunner(object):
4487
  """IAllocator runner.
4488

4489
  This class is instantiated on the node side (ganeti-noded) and not on
4490
  the master side.
4491

4492
  """
4493
  @staticmethod
4494
  def Run(name, idata, ial_params):
4495
    """Run an iallocator script.
4496

4497
    @type name: str
4498
    @param name: the iallocator script name
4499
    @type idata: str
4500
    @param idata: the allocator input data
4501
    @type ial_params: list
4502
    @param ial_params: the iallocator parameters
4503

4504
    @rtype: tuple
4505
    @return: two element tuple of:
4506
       - status
4507
       - either error message or stdout of allocator (for success)
4508

4509
    """
4510
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4511
                                  os.path.isfile)
4512
    if alloc_script is None:
4513
      _Fail("iallocator module '%s' not found in the search path", name)
4514

    
4515
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4516
    try:
4517
      os.write(fd, idata)
4518
      os.close(fd)
4519
      result = utils.RunCmd([alloc_script, fin_name] + ial_params)
4520
      if result.failed:
4521
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4522
              name, result.fail_reason, result.output)
4523
    finally:
4524
      os.unlink(fin_name)
4525

    
4526
    return result.stdout
4527

    
4528

    
4529
class DevCacheManager(object):
4530
  """Simple class for managing a cache of block device information.
4531

4532
  """
4533
  _DEV_PREFIX = "/dev/"
4534
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4535

    
4536
  @classmethod
4537
  def _ConvertPath(cls, dev_path):
4538
    """Converts a /dev/name path to the cache file name.
4539

4540
    This replaces slashes with underscores and strips the /dev
4541
    prefix. It then returns the full path to the cache file.
4542

4543
    @type dev_path: str
4544
    @param dev_path: the C{/dev/} path name
4545
    @rtype: str
4546
    @return: the converted path name
4547

4548
    """
4549
    if dev_path.startswith(cls._DEV_PREFIX):
4550
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4551
    dev_path = dev_path.replace("/", "_")
4552
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4553
    return fpath
4554

    
4555
  @classmethod
4556
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4557
    """Updates the cache information for a given device.
4558

4559
    @type dev_path: str
4560
    @param dev_path: the pathname of the device
4561
    @type owner: str
4562
    @param owner: the owner (instance name) of the device
4563
    @type on_primary: bool
4564
    @param on_primary: whether this is the primary
4565
        node nor not
4566
    @type iv_name: str
4567
    @param iv_name: the instance-visible name of the
4568
        device, as in objects.Disk.iv_name
4569

4570
    @rtype: None
4571

4572
    """
4573
    if dev_path is None:
4574
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4575
      return
4576
    fpath = cls._ConvertPath(dev_path)
4577
    if on_primary:
4578
      state = "primary"
4579
    else:
4580
      state = "secondary"
4581
    if iv_name is None:
4582
      iv_name = "not_visible"
4583
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4584
    try:
4585
      utils.WriteFile(fpath, data=fdata)
4586
    except EnvironmentError, err:
4587
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4588

    
4589
  @classmethod
4590
  def RemoveCache(cls, dev_path):
4591
    """Remove data for a dev_path.
4592

4593
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4594
    path name and logging.
4595

4596
    @type dev_path: str
4597
    @param dev_path: the pathname of the device
4598

4599
    @rtype: None
4600

4601
    """
4602
    if dev_path is None:
4603
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4604
      return
4605
    fpath = cls._ConvertPath(dev_path)
4606
    try:
4607
      utils.RemoveFile(fpath)
4608
    except EnvironmentError, err:
4609
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)