Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 229fb4ea

History | View | Annotate | Download (146.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import base64
41
import errno
42
import logging
43
import os
44
import os.path
45
import random
46
import re
47
import shutil
48
import signal
49
import stat
50
import tempfile
51
import time
52
import zlib
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139

140
  @type trail: list of reasons
141
  @param trail: reason trail
142

143
  @rtype: None
144

145
  """
146
  json = serializer.DumpJson(trail)
147
  filename = _GetInstReasonFilename(instance_name)
148
  utils.WriteFile(filename, data=json)
149

    
150

    
151
def _Fail(msg, *args, **kwargs):
152
  """Log an error and the raise an RPCFail exception.
153

154
  This exception is then handled specially in the ganeti daemon and
155
  turned into a 'failed' return type. As such, this function is a
156
  useful shortcut for logging the error and returning it to the master
157
  daemon.
158

159
  @type msg: string
160
  @param msg: the text of the exception
161
  @raise RPCFail
162

163
  """
164
  if args:
165
    msg = msg % args
166
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
167
    if "exc" in kwargs and kwargs["exc"]:
168
      logging.exception(msg)
169
    else:
170
      logging.error(msg)
171
  raise RPCFail(msg)
172

    
173

    
174
def _GetConfig():
175
  """Simple wrapper to return a SimpleStore.
176

177
  @rtype: L{ssconf.SimpleStore}
178
  @return: a SimpleStore instance
179

180
  """
181
  return ssconf.SimpleStore()
182

    
183

    
184
def _GetSshRunner(cluster_name):
185
  """Simple wrapper to return an SshRunner.
186

187
  @type cluster_name: str
188
  @param cluster_name: the cluster name, which is needed
189
      by the SshRunner constructor
190
  @rtype: L{ssh.SshRunner}
191
  @return: an SshRunner instance
192

193
  """
194
  return ssh.SshRunner(cluster_name)
195

    
196

    
197
def _Decompress(data):
198
  """Unpacks data compressed by the RPC client.
199

200
  @type data: list or tuple
201
  @param data: Data sent by RPC client
202
  @rtype: str
203
  @return: Decompressed data
204

205
  """
206
  assert isinstance(data, (list, tuple))
207
  assert len(data) == 2
208
  (encoding, content) = data
209
  if encoding == constants.RPC_ENCODING_NONE:
210
    return content
211
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
212
    return zlib.decompress(base64.b64decode(content))
213
  else:
214
    raise AssertionError("Unknown data encoding")
215

    
216

    
217
def _CleanDirectory(path, exclude=None):
218
  """Removes all regular files in a directory.
219

220
  @type path: str
221
  @param path: the directory to clean
222
  @type exclude: list
223
  @param exclude: list of files to be excluded, defaults
224
      to the empty list
225

226
  """
227
  if path not in _ALLOWED_CLEAN_DIRS:
228
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
229
          path)
230

    
231
  if not os.path.isdir(path):
232
    return
233
  if exclude is None:
234
    exclude = []
235
  else:
236
    # Normalize excluded paths
237
    exclude = [os.path.normpath(i) for i in exclude]
238

    
239
  for rel_name in utils.ListVisibleFiles(path):
240
    full_name = utils.PathJoin(path, rel_name)
241
    if full_name in exclude:
242
      continue
243
    if os.path.isfile(full_name) and not os.path.islink(full_name):
244
      utils.RemoveFile(full_name)
245

    
246

    
247
def _BuildUploadFileList():
248
  """Build the list of allowed upload files.
249

250
  This is abstracted so that it's built only once at module import time.
251

252
  """
253
  allowed_files = set([
254
    pathutils.CLUSTER_CONF_FILE,
255
    pathutils.ETC_HOSTS,
256
    pathutils.SSH_KNOWN_HOSTS_FILE,
257
    pathutils.VNC_PASSWORD_FILE,
258
    pathutils.RAPI_CERT_FILE,
259
    pathutils.SPICE_CERT_FILE,
260
    pathutils.SPICE_CACERT_FILE,
261
    pathutils.RAPI_USERS_FILE,
262
    pathutils.CONFD_HMAC_KEY,
263
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
264
    ])
265

    
266
  for hv_name in constants.HYPER_TYPES:
267
    hv_class = hypervisor.GetHypervisorClass(hv_name)
268
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
269

    
270
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
271
    "Allowed file storage paths should never be uploaded via RPC"
272

    
273
  return frozenset(allowed_files)
274

    
275

    
276
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
277

    
278

    
279
def JobQueuePurge():
280
  """Removes job queue files and archived jobs.
281

282
  @rtype: tuple
283
  @return: True, None
284

285
  """
286
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
287
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
288

    
289

    
290
def GetMasterNodeName():
291
  """Returns the master node name.
292

293
  @rtype: string
294
  @return: name of the master node
295
  @raise RPCFail: in case of errors
296

297
  """
298
  try:
299
    return _GetConfig().GetMasterNode()
300
  except errors.ConfigurationError, err:
301
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
302

    
303

    
304
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
305
  """Decorator that runs hooks before and after the decorated function.
306

307
  @type hook_opcode: string
308
  @param hook_opcode: opcode of the hook
309
  @type hooks_path: string
310
  @param hooks_path: path of the hooks
311
  @type env_builder_fn: function
312
  @param env_builder_fn: function that returns a dictionary containing the
313
    environment variables for the hooks. Will get all the parameters of the
314
    decorated function.
315
  @raise RPCFail: in case of pre-hook failure
316

317
  """
318
  def decorator(fn):
319
    def wrapper(*args, **kwargs):
320
      _, myself = ssconf.GetMasterAndMyself()
321
      nodes = ([myself], [myself])  # these hooks run locally
322

    
323
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
324

    
325
      cfg = _GetConfig()
326
      hr = HooksRunner()
327
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
328
                                   hr.RunLocalHooks, None, env_fn, None,
329
                                   logging.warning, cfg.GetClusterName(),
330
                                   cfg.GetMasterNode())
331
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
332
      result = fn(*args, **kwargs)
333
      hm.RunPhase(constants.HOOKS_PHASE_POST)
334

    
335
      return result
336
    return wrapper
337
  return decorator
338

    
339

    
340
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
341
  """Builds environment variables for master IP hooks.
342

343
  @type master_params: L{objects.MasterNetworkParameters}
344
  @param master_params: network parameters of the master
345
  @type use_external_mip_script: boolean
346
  @param use_external_mip_script: whether to use an external master IP
347
    address setup script (unused, but necessary per the implementation of the
348
    _RunLocalHooks decorator)
349

350
  """
351
  # pylint: disable=W0613
352
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
353
  env = {
354
    "MASTER_NETDEV": master_params.netdev,
355
    "MASTER_IP": master_params.ip,
356
    "MASTER_NETMASK": str(master_params.netmask),
357
    "CLUSTER_IP_VERSION": str(ver),
358
  }
359

    
360
  return env
361

    
362

    
363
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
364
  """Execute the master IP address setup script.
365

366
  @type master_params: L{objects.MasterNetworkParameters}
367
  @param master_params: network parameters of the master
368
  @type action: string
369
  @param action: action to pass to the script. Must be one of
370
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
371
  @type use_external_mip_script: boolean
372
  @param use_external_mip_script: whether to use an external master IP
373
    address setup script
374
  @raise backend.RPCFail: if there are errors during the execution of the
375
    script
376

377
  """
378
  env = _BuildMasterIpEnv(master_params)
379

    
380
  if use_external_mip_script:
381
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
382
  else:
383
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
384

    
385
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
386

    
387
  if result.failed:
388
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
389
          (action, result.exit_code, result.output), log=True)
390

    
391

    
392
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
393
               _BuildMasterIpEnv)
394
def ActivateMasterIp(master_params, use_external_mip_script):
395
  """Activate the IP address of the master daemon.
396

397
  @type master_params: L{objects.MasterNetworkParameters}
398
  @param master_params: network parameters of the master
399
  @type use_external_mip_script: boolean
400
  @param use_external_mip_script: whether to use an external master IP
401
    address setup script
402
  @raise RPCFail: in case of errors during the IP startup
403

404
  """
405
  _RunMasterSetupScript(master_params, _MASTER_START,
406
                        use_external_mip_script)
407

    
408

    
409
def StartMasterDaemons(no_voting):
410
  """Activate local node as master node.
411

412
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
413

414
  @type no_voting: boolean
415
  @param no_voting: whether to start ganeti-masterd without a node vote
416
      but still non-interactively
417
  @rtype: None
418

419
  """
420

    
421
  if no_voting:
422
    masterd_args = "--no-voting --yes-do-it"
423
  else:
424
    masterd_args = ""
425

    
426
  env = {
427
    "EXTRA_MASTERD_ARGS": masterd_args,
428
    }
429

    
430
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
431
  if result.failed:
432
    msg = "Can't start Ganeti master: %s" % result.output
433
    logging.error(msg)
434
    _Fail(msg)
435

    
436

    
437
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
438
               _BuildMasterIpEnv)
439
def DeactivateMasterIp(master_params, use_external_mip_script):
440
  """Deactivate the master IP on this node.
441

442
  @type master_params: L{objects.MasterNetworkParameters}
443
  @param master_params: network parameters of the master
444
  @type use_external_mip_script: boolean
445
  @param use_external_mip_script: whether to use an external master IP
446
    address setup script
447
  @raise RPCFail: in case of errors during the IP turndown
448

449
  """
450
  _RunMasterSetupScript(master_params, _MASTER_STOP,
451
                        use_external_mip_script)
452

    
453

    
454
def StopMasterDaemons():
455
  """Stop the master daemons on this node.
456

457
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
458

459
  @rtype: None
460

461
  """
462
  # TODO: log and report back to the caller the error failures; we
463
  # need to decide in which case we fail the RPC for this
464

    
465
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
466
  if result.failed:
467
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
468
                  " and error %s",
469
                  result.cmd, result.exit_code, result.output)
470

    
471

    
472
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
473
  """Change the netmask of the master IP.
474

475
  @param old_netmask: the old value of the netmask
476
  @param netmask: the new value of the netmask
477
  @param master_ip: the master IP
478
  @param master_netdev: the master network device
479

480
  """
481
  if old_netmask == netmask:
482
    return
483

    
484
  if not netutils.IPAddress.Own(master_ip):
485
    _Fail("The master IP address is not up, not attempting to change its"
486
          " netmask")
487

    
488
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
489
                         "%s/%s" % (master_ip, netmask),
490
                         "dev", master_netdev, "label",
491
                         "%s:0" % master_netdev])
492
  if result.failed:
493
    _Fail("Could not set the new netmask on the master IP address")
494

    
495
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
496
                         "%s/%s" % (master_ip, old_netmask),
497
                         "dev", master_netdev, "label",
498
                         "%s:0" % master_netdev])
499
  if result.failed:
500
    _Fail("Could not bring down the master IP address with the old netmask")
501

    
502

    
503
def EtcHostsModify(mode, host, ip):
504
  """Modify a host entry in /etc/hosts.
505

506
  @param mode: The mode to operate. Either add or remove entry
507
  @param host: The host to operate on
508
  @param ip: The ip associated with the entry
509

510
  """
511
  if mode == constants.ETC_HOSTS_ADD:
512
    if not ip:
513
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
514
              " present")
515
    utils.AddHostToEtcHosts(host, ip)
516
  elif mode == constants.ETC_HOSTS_REMOVE:
517
    if ip:
518
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
519
              " parameter is present")
520
    utils.RemoveHostFromEtcHosts(host)
521
  else:
522
    RPCFail("Mode not supported")
523

    
524

    
525
def LeaveCluster(modify_ssh_setup):
526
  """Cleans up and remove the current node.
527

528
  This function cleans up and prepares the current node to be removed
529
  from the cluster.
530

531
  If processing is successful, then it raises an
532
  L{errors.QuitGanetiException} which is used as a special case to
533
  shutdown the node daemon.
534

535
  @param modify_ssh_setup: boolean
536

537
  """
538
  _CleanDirectory(pathutils.DATA_DIR)
539
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
540
  JobQueuePurge()
541

    
542
  if modify_ssh_setup:
543
    try:
544
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
545

    
546
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
547

    
548
      utils.RemoveFile(priv_key)
549
      utils.RemoveFile(pub_key)
550
    except errors.OpExecError:
551
      logging.exception("Error while processing ssh files")
552

    
553
  try:
554
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
555
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
556
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
557
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
558
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
559
  except: # pylint: disable=W0702
560
    logging.exception("Error while removing cluster secrets")
561

    
562
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
563
  if result.failed:
564
    logging.error("Command %s failed with exitcode %s and error %s",
565
                  result.cmd, result.exit_code, result.output)
566

    
567
  # Raise a custom exception (handled in ganeti-noded)
568
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
569

    
570

    
571
def _CheckStorageParams(params, num_params):
572
  """Performs sanity checks for storage parameters.
573

574
  @type params: list
575
  @param params: list of storage parameters
576
  @type num_params: int
577
  @param num_params: expected number of parameters
578

579
  """
580
  if params is None:
581
    raise errors.ProgrammerError("No storage parameters for storage"
582
                                 " reporting is provided.")
583
  if not isinstance(params, list):
584
    raise errors.ProgrammerError("The storage parameters are not of type"
585
                                 " list: '%s'" % params)
586
  if not len(params) == num_params:
587
    raise errors.ProgrammerError("Did not receive the expected number of"
588
                                 "storage parameters: expected %s,"
589
                                 " received '%s'" % (num_params, len(params)))
590

    
591

    
592
def _CheckLvmStorageParams(params):
593
  """Performs sanity check for the 'exclusive storage' flag.
594

595
  @see: C{_CheckStorageParams}
596

597
  """
598
  _CheckStorageParams(params, 1)
599
  excl_stor = params[0]
600
  if not isinstance(params[0], bool):
601
    raise errors.ProgrammerError("Exclusive storage parameter is not"
602
                                 " boolean: '%s'." % excl_stor)
603
  return excl_stor
604

    
605

    
606
def _GetLvmVgSpaceInfo(name, params):
607
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
608

609
  @type name: string
610
  @param name: name of the volume group
611
  @type params: list
612
  @param params: list of storage parameters, which in this case should be
613
    containing only one for exclusive storage
614

615
  """
616
  excl_stor = _CheckLvmStorageParams(params)
617
  return _GetVgInfo(name, excl_stor)
618

    
619

    
620
def _GetVgInfo(
621
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
622
  """Retrieves information about a LVM volume group.
623

624
  """
625
  # TODO: GetVGInfo supports returning information for multiple VGs at once
626
  vginfo = info_fn([name], excl_stor)
627
  if vginfo:
628
    vg_free = int(round(vginfo[0][0], 0))
629
    vg_size = int(round(vginfo[0][1], 0))
630
  else:
631
    vg_free = None
632
    vg_size = None
633

    
634
  return {
635
    "type": constants.ST_LVM_VG,
636
    "name": name,
637
    "storage_free": vg_free,
638
    "storage_size": vg_size,
639
    }
640

    
641

    
642
def _GetLvmPvSpaceInfo(name, params):
643
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
644

645
  @see: C{_GetLvmVgSpaceInfo}
646

647
  """
648
  excl_stor = _CheckLvmStorageParams(params)
649
  return _GetVgSpindlesInfo(name, excl_stor)
650

    
651

    
652
def _GetVgSpindlesInfo(
653
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
654
  """Retrieves information about spindles in an LVM volume group.
655

656
  @type name: string
657
  @param name: VG name
658
  @type excl_stor: bool
659
  @param excl_stor: exclusive storage
660
  @rtype: dict
661
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
662
      free spindles, total spindles respectively
663

664
  """
665
  if excl_stor:
666
    (vg_free, vg_size) = info_fn(name)
667
  else:
668
    vg_free = 0
669
    vg_size = 0
670
  return {
671
    "type": constants.ST_LVM_PV,
672
    "name": name,
673
    "storage_free": vg_free,
674
    "storage_size": vg_size,
675
    }
676

    
677

    
678
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
679
  """Retrieves node information from a hypervisor.
680

681
  The information returned depends on the hypervisor. Common items:
682

683
    - vg_size is the size of the configured volume group in MiB
684
    - vg_free is the free size of the volume group in MiB
685
    - memory_dom0 is the memory allocated for domain0 in MiB
686
    - memory_free is the currently available (free) ram in MiB
687
    - memory_total is the total number of ram in MiB
688
    - hv_version: the hypervisor version, if available
689

690
  @type hvparams: dict of string
691
  @param hvparams: the hypervisor's hvparams
692

693
  """
694
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
695

    
696

    
697
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
698
  """Retrieves node information for all hypervisors.
699

700
  See C{_GetHvInfo} for information on the output.
701

702
  @type hv_specs: list of pairs (string, dict of strings)
703
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
704

705
  """
706
  if hv_specs is None:
707
    return None
708

    
709
  result = []
710
  for hvname, hvparams in hv_specs:
711
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
712
  return result
713

    
714

    
715
def _GetNamedNodeInfo(names, fn):
716
  """Calls C{fn} for all names in C{names} and returns a dictionary.
717

718
  @rtype: None or dict
719

720
  """
721
  if names is None:
722
    return None
723
  else:
724
    return map(fn, names)
725

    
726

    
727
def GetNodeInfo(storage_units, hv_specs):
728
  """Gives back a hash with different information about the node.
729

730
  @type storage_units: list of tuples (string, string, list)
731
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
732
    ask for disk space information. In case of lvm-vg, the identifier is
733
    the VG name. The parameters can contain additional, storage-type-specific
734
    parameters, for example exclusive storage for lvm storage.
735
  @type hv_specs: list of pairs (string, dict of strings)
736
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
737
  @rtype: tuple; (string, None/dict, None/dict)
738
  @return: Tuple containing boot ID, volume group information and hypervisor
739
    information
740

741
  """
742
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
743
  storage_info = _GetNamedNodeInfo(
744
    storage_units,
745
    (lambda (storage_type, storage_key, storage_params):
746
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
747
  hv_info = _GetHvInfoAll(hv_specs)
748
  return (bootid, storage_info, hv_info)
749

    
750

    
751
def _GetFileStorageSpaceInfo(path, params):
752
  """Wrapper around filestorage.GetSpaceInfo.
753

754
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
755
  and ignore the *args parameter to not leak it into the filestorage
756
  module's code.
757

758
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
759
    parameters.
760

761
  """
762
  _CheckStorageParams(params, 0)
763
  return filestorage.GetFileStorageSpaceInfo(path)
764

    
765

    
766
# FIXME: implement storage reporting for all missing storage types.
767
_STORAGE_TYPE_INFO_FN = {
768
  constants.ST_BLOCK: None,
769
  constants.ST_DISKLESS: None,
770
  constants.ST_EXT: None,
771
  constants.ST_FILE: _GetFileStorageSpaceInfo,
772
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
773
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
774
  constants.ST_SHARED_FILE: None,
775
  constants.ST_RADOS: None,
776
}
777

    
778

    
779
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
780
  """Looks up and applies the correct function to calculate free and total
781
  storage for the given storage type.
782

783
  @type storage_type: string
784
  @param storage_type: the storage type for which the storage shall be reported.
785
  @type storage_key: string
786
  @param storage_key: identifier of a storage unit, e.g. the volume group name
787
    of an LVM storage unit
788
  @type args: any
789
  @param args: various parameters that can be used for storage reporting. These
790
    parameters and their semantics vary from storage type to storage type and
791
    are just propagated in this function.
792
  @return: the results of the application of the storage space function (see
793
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
794
    storage type
795
  @raises NotImplementedError: for storage types who don't support space
796
    reporting yet
797
  """
798
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
799
  if fn is not None:
800
    return fn(storage_key, *args)
801
  else:
802
    raise NotImplementedError
803

    
804

    
805
def _CheckExclusivePvs(pvi_list):
806
  """Check that PVs are not shared among LVs
807

808
  @type pvi_list: list of L{objects.LvmPvInfo} objects
809
  @param pvi_list: information about the PVs
810

811
  @rtype: list of tuples (string, list of strings)
812
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
813

814
  """
815
  res = []
816
  for pvi in pvi_list:
817
    if len(pvi.lv_list) > 1:
818
      res.append((pvi.name, pvi.lv_list))
819
  return res
820

    
821

    
822
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
823
                       get_hv_fn=hypervisor.GetHypervisor):
824
  """Verifies the hypervisor. Appends the results to the 'results' list.
825

826
  @type what: C{dict}
827
  @param what: a dictionary of things to check
828
  @type vm_capable: boolean
829
  @param vm_capable: whether or not this node is vm capable
830
  @type result: dict
831
  @param result: dictionary of verification results; results of the
832
    verifications in this function will be added here
833
  @type all_hvparams: dict of dict of string
834
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
835
  @type get_hv_fn: function
836
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
837

838
  """
839
  if not vm_capable:
840
    return
841

    
842
  if constants.NV_HYPERVISOR in what:
843
    result[constants.NV_HYPERVISOR] = {}
844
    for hv_name in what[constants.NV_HYPERVISOR]:
845
      hvparams = all_hvparams[hv_name]
846
      try:
847
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
848
      except errors.HypervisorError, err:
849
        val = "Error while checking hypervisor: %s" % str(err)
850
      result[constants.NV_HYPERVISOR][hv_name] = val
851

    
852

    
853
def _VerifyHvparams(what, vm_capable, result,
854
                    get_hv_fn=hypervisor.GetHypervisor):
855
  """Verifies the hvparams. Appends the results to the 'results' list.
856

857
  @type what: C{dict}
858
  @param what: a dictionary of things to check
859
  @type vm_capable: boolean
860
  @param vm_capable: whether or not this node is vm capable
861
  @type result: dict
862
  @param result: dictionary of verification results; results of the
863
    verifications in this function will be added here
864
  @type get_hv_fn: function
865
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
866

867
  """
868
  if not vm_capable:
869
    return
870

    
871
  if constants.NV_HVPARAMS in what:
872
    result[constants.NV_HVPARAMS] = []
873
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
874
      try:
875
        logging.info("Validating hv %s, %s", hv_name, hvparms)
876
        get_hv_fn(hv_name).ValidateParameters(hvparms)
877
      except errors.HypervisorError, err:
878
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
879

    
880

    
881
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
882
  """Verifies the instance list.
883

884
  @type what: C{dict}
885
  @param what: a dictionary of things to check
886
  @type vm_capable: boolean
887
  @param vm_capable: whether or not this node is vm capable
888
  @type result: dict
889
  @param result: dictionary of verification results; results of the
890
    verifications in this function will be added here
891
  @type all_hvparams: dict of dict of string
892
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
893

894
  """
895
  if constants.NV_INSTANCELIST in what and vm_capable:
896
    # GetInstanceList can fail
897
    try:
898
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
899
                            all_hvparams=all_hvparams)
900
    except RPCFail, err:
901
      val = str(err)
902
    result[constants.NV_INSTANCELIST] = val
903

    
904

    
905
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
906
  """Verifies the node info.
907

908
  @type what: C{dict}
909
  @param what: a dictionary of things to check
910
  @type vm_capable: boolean
911
  @param vm_capable: whether or not this node is vm capable
912
  @type result: dict
913
  @param result: dictionary of verification results; results of the
914
    verifications in this function will be added here
915
  @type all_hvparams: dict of dict of string
916
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
917

918
  """
919
  if constants.NV_HVINFO in what and vm_capable:
920
    hvname = what[constants.NV_HVINFO]
921
    hyper = hypervisor.GetHypervisor(hvname)
922
    hvparams = all_hvparams[hvname]
923
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
924

    
925

    
926
def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
927
  """Verify the existance and validity of the client SSL certificate.
928

929
  """
930
  create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
931
  if not os.path.exists(cert_file):
932
    return (constants.CV_ERROR,
933
            "The client certificate does not exist. Run '%s' to create"
934
            " client certificates for all nodes." % create_cert_cmd)
935

    
936
  (errcode, msg) = utils.VerifyCertificate(cert_file)
937
  if errcode is not None:
938
    return (errcode, msg)
939
  else:
940
    # if everything is fine, we return the digest to be compared to the config
941
    return (None, utils.GetCertificateDigest(cert_filename=cert_file))
942

    
943

    
944
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
945
  """Verify the status of the local node.
946

947
  Based on the input L{what} parameter, various checks are done on the
948
  local node.
949

950
  If the I{filelist} key is present, this list of
951
  files is checksummed and the file/checksum pairs are returned.
952

953
  If the I{nodelist} key is present, we check that we have
954
  connectivity via ssh with the target nodes (and check the hostname
955
  report).
956

957
  If the I{node-net-test} key is present, we check that we have
958
  connectivity to the given nodes via both primary IP and, if
959
  applicable, secondary IPs.
960

961
  @type what: C{dict}
962
  @param what: a dictionary of things to check:
963
      - filelist: list of files for which to compute checksums
964
      - nodelist: list of nodes we should check ssh communication with
965
      - node-net-test: list of nodes we should check node daemon port
966
        connectivity with
967
      - hypervisor: list with hypervisors to run the verify for
968
  @type cluster_name: string
969
  @param cluster_name: the cluster's name
970
  @type all_hvparams: dict of dict of strings
971
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
972
  @type node_groups: a dict of strings
973
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
974
      have only those nodes that are in `what["nodelist"]`)
975
  @type groups_cfg: a dict of dict of strings
976
  @param groups_cfg: a dictionary mapping group uuids to their configuration
977
  @rtype: dict
978
  @return: a dictionary with the same keys as the input dict, and
979
      values representing the result of the checks
980

981
  """
982
  result = {}
983
  my_name = netutils.Hostname.GetSysName()
984
  port = netutils.GetDaemonPort(constants.NODED)
985
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
986

    
987
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
988
  _VerifyHvparams(what, vm_capable, result)
989

    
990
  if constants.NV_FILELIST in what:
991
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
992
                                              what[constants.NV_FILELIST]))
993
    result[constants.NV_FILELIST] = \
994
      dict((vcluster.MakeVirtualPath(key), value)
995
           for (key, value) in fingerprints.items())
996

    
997
  if constants.NV_CLIENT_CERT in what:
998
    result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
999

    
1000
  if constants.NV_NODELIST in what:
1001
    (nodes, bynode) = what[constants.NV_NODELIST]
1002

    
1003
    # Add nodes from other groups (different for each node)
1004
    try:
1005
      nodes.extend(bynode[my_name])
1006
    except KeyError:
1007
      pass
1008

    
1009
    # Use a random order
1010
    random.shuffle(nodes)
1011

    
1012
    # Try to contact all nodes
1013
    val = {}
1014
    for node in nodes:
1015
      params = groups_cfg.get(node_groups.get(node))
1016
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1017
      logging.debug("Ssh port %s (None = default) for node %s",
1018
                    str(ssh_port), node)
1019
      success, message = _GetSshRunner(cluster_name). \
1020
                            VerifyNodeHostname(node, ssh_port)
1021
      if not success:
1022
        val[node] = message
1023

    
1024
    result[constants.NV_NODELIST] = val
1025

    
1026
  if constants.NV_NODENETTEST in what:
1027
    result[constants.NV_NODENETTEST] = tmp = {}
1028
    my_pip = my_sip = None
1029
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1030
      if name == my_name:
1031
        my_pip = pip
1032
        my_sip = sip
1033
        break
1034
    if not my_pip:
1035
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1036
                      " in the node list")
1037
    else:
1038
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1039
        fail = []
1040
        if not netutils.TcpPing(pip, port, source=my_pip):
1041
          fail.append("primary")
1042
        if sip != pip:
1043
          if not netutils.TcpPing(sip, port, source=my_sip):
1044
            fail.append("secondary")
1045
        if fail:
1046
          tmp[name] = ("failure using the %s interface(s)" %
1047
                       " and ".join(fail))
1048

    
1049
  if constants.NV_MASTERIP in what:
1050
    # FIXME: add checks on incoming data structures (here and in the
1051
    # rest of the function)
1052
    master_name, master_ip = what[constants.NV_MASTERIP]
1053
    if master_name == my_name:
1054
      source = constants.IP4_ADDRESS_LOCALHOST
1055
    else:
1056
      source = None
1057
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1058
                                                     source=source)
1059

    
1060
  if constants.NV_USERSCRIPTS in what:
1061
    result[constants.NV_USERSCRIPTS] = \
1062
      [script for script in what[constants.NV_USERSCRIPTS]
1063
       if not utils.IsExecutable(script)]
1064

    
1065
  if constants.NV_OOB_PATHS in what:
1066
    result[constants.NV_OOB_PATHS] = tmp = []
1067
    for path in what[constants.NV_OOB_PATHS]:
1068
      try:
1069
        st = os.stat(path)
1070
      except OSError, err:
1071
        tmp.append("error stating out of band helper: %s" % err)
1072
      else:
1073
        if stat.S_ISREG(st.st_mode):
1074
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1075
            tmp.append(None)
1076
          else:
1077
            tmp.append("out of band helper %s is not executable" % path)
1078
        else:
1079
          tmp.append("out of band helper %s is not a file" % path)
1080

    
1081
  if constants.NV_LVLIST in what and vm_capable:
1082
    try:
1083
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1084
    except RPCFail, err:
1085
      val = str(err)
1086
    result[constants.NV_LVLIST] = val
1087

    
1088
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1089

    
1090
  if constants.NV_VGLIST in what and vm_capable:
1091
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1092

    
1093
  if constants.NV_PVLIST in what and vm_capable:
1094
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1095
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1096
                                       filter_allocatable=False,
1097
                                       include_lvs=check_exclusive_pvs)
1098
    if check_exclusive_pvs:
1099
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1100
      for pvi in val:
1101
        # Avoid sending useless data on the wire
1102
        pvi.lv_list = []
1103
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1104

    
1105
  if constants.NV_VERSION in what:
1106
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1107
                                    constants.RELEASE_VERSION)
1108

    
1109
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1110

    
1111
  if constants.NV_DRBDVERSION in what and vm_capable:
1112
    try:
1113
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1114
    except errors.BlockDeviceError, err:
1115
      logging.warning("Can't get DRBD version", exc_info=True)
1116
      drbd_version = str(err)
1117
    result[constants.NV_DRBDVERSION] = drbd_version
1118

    
1119
  if constants.NV_DRBDLIST in what and vm_capable:
1120
    try:
1121
      used_minors = drbd.DRBD8.GetUsedDevs()
1122
    except errors.BlockDeviceError, err:
1123
      logging.warning("Can't get used minors list", exc_info=True)
1124
      used_minors = str(err)
1125
    result[constants.NV_DRBDLIST] = used_minors
1126

    
1127
  if constants.NV_DRBDHELPER in what and vm_capable:
1128
    status = True
1129
    try:
1130
      payload = drbd.DRBD8.GetUsermodeHelper()
1131
    except errors.BlockDeviceError, err:
1132
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1133
      status = False
1134
      payload = str(err)
1135
    result[constants.NV_DRBDHELPER] = (status, payload)
1136

    
1137
  if constants.NV_NODESETUP in what:
1138
    result[constants.NV_NODESETUP] = tmpr = []
1139
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1140
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1141
                  " under /sys, missing required directories /sys/block"
1142
                  " and /sys/class/net")
1143
    if (not os.path.isdir("/proc/sys") or
1144
        not os.path.isfile("/proc/sysrq-trigger")):
1145
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1146
                  " under /proc, missing required directory /proc/sys and"
1147
                  " the file /proc/sysrq-trigger")
1148

    
1149
  if constants.NV_TIME in what:
1150
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1151

    
1152
  if constants.NV_OSLIST in what and vm_capable:
1153
    result[constants.NV_OSLIST] = DiagnoseOS()
1154

    
1155
  if constants.NV_BRIDGES in what and vm_capable:
1156
    result[constants.NV_BRIDGES] = [bridge
1157
                                    for bridge in what[constants.NV_BRIDGES]
1158
                                    if not utils.BridgeExists(bridge)]
1159

    
1160
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1161
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1162
        filestorage.ComputeWrongFileStoragePaths()
1163

    
1164
  if what.get(constants.NV_FILE_STORAGE_PATH):
1165
    pathresult = filestorage.CheckFileStoragePath(
1166
        what[constants.NV_FILE_STORAGE_PATH])
1167
    if pathresult:
1168
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1169

    
1170
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1171
    pathresult = filestorage.CheckFileStoragePath(
1172
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1173
    if pathresult:
1174
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1175

    
1176
  return result
1177

    
1178

    
1179
def GetCryptoTokens(token_requests):
1180
  """Perform actions on the node's cryptographic tokens.
1181

1182
  Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1183
  for 'ssl'. Action 'get' returns the digest of the public client ssl
1184
  certificate. Action 'create' creates a new client certificate and private key
1185
  and also returns the digest of the certificate. The third parameter of a
1186
  token request are optional parameters for the actions, so far only the
1187
  filename is supported.
1188

1189
  @type token_requests: list of tuples of (string, string, dict), where the
1190
    first string is in constants.CRYPTO_TYPES, the second in
1191
    constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1192
    to string.
1193
  @param token_requests: list of requests of cryptographic tokens and actions
1194
    to perform on them. The actions come with a dictionary of options.
1195
  @rtype: list of tuples (string, string)
1196
  @return: list of tuples of the token type and the public crypto token
1197

1198
  """
1199
  getents = runtime.GetEnts()
1200
  _VALID_CERT_FILES = [pathutils.NODED_CERT_FILE,
1201
                       pathutils.NODED_CLIENT_CERT_FILE,
1202
                       pathutils.NODED_CLIENT_CERT_FILE_TMP]
1203
  _DEFAULT_CERT_FILE = pathutils.NODED_CLIENT_CERT_FILE
1204
  tokens = []
1205
  for (token_type, action, options) in token_requests:
1206
    if token_type not in constants.CRYPTO_TYPES:
1207
      raise errors.ProgrammerError("Token type '%s' not supported." %
1208
                                   token_type)
1209
    if action not in constants.CRYPTO_ACTIONS:
1210
      raise errors.ProgrammerError("Action '%s' is not supported." %
1211
                                   action)
1212
    if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1213
      if action == constants.CRYPTO_ACTION_CREATE:
1214

    
1215
        # extract file name from options
1216
        cert_filename = None
1217
        if options:
1218
          cert_filename = options.get(constants.CRYPTO_OPTION_CERT_FILE)
1219
        if not cert_filename:
1220
          cert_filename = _DEFAULT_CERT_FILE
1221
        # For security reason, we don't allow arbitrary filenames
1222
        if not cert_filename in _VALID_CERT_FILES:
1223
          raise errors.ProgrammerError(
1224
            "The certificate file name path '%s' is not allowed." %
1225
            cert_filename)
1226

    
1227
        # extract serial number from options
1228
        serial_no = None
1229
        if options:
1230
          try:
1231
            serial_no = int(options[constants.CRYPTO_OPTION_SERIAL_NO])
1232
          except ValueError:
1233
            raise errors.ProgrammerError(
1234
              "The given serial number is not an intenger: %s." %
1235
              options.get(constants.CRYPTO_OPTION_SERIAL_NO))
1236
          except KeyError:
1237
            raise errors.ProgrammerError("No serial number was provided.")
1238

    
1239
        if not serial_no:
1240
          raise errors.ProgrammerError(
1241
            "Cannot create an SSL certificate without a serial no.")
1242

    
1243
        utils.GenerateNewSslCert(
1244
          True, cert_filename, serial_no,
1245
          "Create new client SSL certificate in %s." % cert_filename,
1246
          uid=getents.masterd_uid, gid=getents.masterd_gid)
1247
        tokens.append((token_type,
1248
                       utils.GetCertificateDigest(
1249
                         cert_filename=cert_filename)))
1250
      elif action == constants.CRYPTO_ACTION_GET:
1251
        tokens.append((token_type,
1252
                       utils.GetCertificateDigest()))
1253
  return tokens
1254

    
1255

    
1256
def GetBlockDevSizes(devices):
1257
  """Return the size of the given block devices
1258

1259
  @type devices: list
1260
  @param devices: list of block device nodes to query
1261
  @rtype: dict
1262
  @return:
1263
    dictionary of all block devices under /dev (key). The value is their
1264
    size in MiB.
1265

1266
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1267

1268
  """
1269
  DEV_PREFIX = "/dev/"
1270
  blockdevs = {}
1271

    
1272
  for devpath in devices:
1273
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1274
      continue
1275

    
1276
    try:
1277
      st = os.stat(devpath)
1278
    except EnvironmentError, err:
1279
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1280
      continue
1281

    
1282
    if stat.S_ISBLK(st.st_mode):
1283
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1284
      if result.failed:
1285
        # We don't want to fail, just do not list this device as available
1286
        logging.warning("Cannot get size for block device %s", devpath)
1287
        continue
1288

    
1289
      size = int(result.stdout) / (1024 * 1024)
1290
      blockdevs[devpath] = size
1291
  return blockdevs
1292

    
1293

    
1294
def GetVolumeList(vg_names):
1295
  """Compute list of logical volumes and their size.
1296

1297
  @type vg_names: list
1298
  @param vg_names: the volume groups whose LVs we should list, or
1299
      empty for all volume groups
1300
  @rtype: dict
1301
  @return:
1302
      dictionary of all partions (key) with value being a tuple of
1303
      their size (in MiB), inactive and online status::
1304

1305
        {'xenvg/test1': ('20.06', True, True)}
1306

1307
      in case of errors, a string is returned with the error
1308
      details.
1309

1310
  """
1311
  lvs = {}
1312
  sep = "|"
1313
  if not vg_names:
1314
    vg_names = []
1315
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1316
                         "--separator=%s" % sep,
1317
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1318
  if result.failed:
1319
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1320

    
1321
  for line in result.stdout.splitlines():
1322
    line = line.strip()
1323
    match = _LVSLINE_REGEX.match(line)
1324
    if not match:
1325
      logging.error("Invalid line returned from lvs output: '%s'", line)
1326
      continue
1327
    vg_name, name, size, attr = match.groups()
1328
    inactive = attr[4] == "-"
1329
    online = attr[5] == "o"
1330
    virtual = attr[0] == "v"
1331
    if virtual:
1332
      # we don't want to report such volumes as existing, since they
1333
      # don't really hold data
1334
      continue
1335
    lvs[vg_name + "/" + name] = (size, inactive, online)
1336

    
1337
  return lvs
1338

    
1339

    
1340
def ListVolumeGroups():
1341
  """List the volume groups and their size.
1342

1343
  @rtype: dict
1344
  @return: dictionary with keys volume name and values the
1345
      size of the volume
1346

1347
  """
1348
  return utils.ListVolumeGroups()
1349

    
1350

    
1351
def NodeVolumes():
1352
  """List all volumes on this node.
1353

1354
  @rtype: list
1355
  @return:
1356
    A list of dictionaries, each having four keys:
1357
      - name: the logical volume name,
1358
      - size: the size of the logical volume
1359
      - dev: the physical device on which the LV lives
1360
      - vg: the volume group to which it belongs
1361

1362
    In case of errors, we return an empty list and log the
1363
    error.
1364

1365
    Note that since a logical volume can live on multiple physical
1366
    volumes, the resulting list might include a logical volume
1367
    multiple times.
1368

1369
  """
1370
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1371
                         "--separator=|",
1372
                         "--options=lv_name,lv_size,devices,vg_name"])
1373
  if result.failed:
1374
    _Fail("Failed to list logical volumes, lvs output: %s",
1375
          result.output)
1376

    
1377
  def parse_dev(dev):
1378
    return dev.split("(")[0]
1379

    
1380
  def handle_dev(dev):
1381
    return [parse_dev(x) for x in dev.split(",")]
1382

    
1383
  def map_line(line):
1384
    line = [v.strip() for v in line]
1385
    return [{"name": line[0], "size": line[1],
1386
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1387

    
1388
  all_devs = []
1389
  for line in result.stdout.splitlines():
1390
    if line.count("|") >= 3:
1391
      all_devs.extend(map_line(line.split("|")))
1392
    else:
1393
      logging.warning("Strange line in the output from lvs: '%s'", line)
1394
  return all_devs
1395

    
1396

    
1397
def BridgesExist(bridges_list):
1398
  """Check if a list of bridges exist on the current node.
1399

1400
  @rtype: boolean
1401
  @return: C{True} if all of them exist, C{False} otherwise
1402

1403
  """
1404
  missing = []
1405
  for bridge in bridges_list:
1406
    if not utils.BridgeExists(bridge):
1407
      missing.append(bridge)
1408

    
1409
  if missing:
1410
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1411

    
1412

    
1413
def GetInstanceListForHypervisor(hname, hvparams=None,
1414
                                 get_hv_fn=hypervisor.GetHypervisor):
1415
  """Provides a list of instances of the given hypervisor.
1416

1417
  @type hname: string
1418
  @param hname: name of the hypervisor
1419
  @type hvparams: dict of strings
1420
  @param hvparams: hypervisor parameters for the given hypervisor
1421
  @type get_hv_fn: function
1422
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1423
    name; optional parameter to increase testability
1424

1425
  @rtype: list
1426
  @return: a list of all running instances on the current node
1427
    - instance1.example.com
1428
    - instance2.example.com
1429

1430
  """
1431
  results = []
1432
  try:
1433
    hv = get_hv_fn(hname)
1434
    names = hv.ListInstances(hvparams=hvparams)
1435
    results.extend(names)
1436
  except errors.HypervisorError, err:
1437
    _Fail("Error enumerating instances (hypervisor %s): %s",
1438
          hname, err, exc=True)
1439
  return results
1440

    
1441

    
1442
def GetInstanceList(hypervisor_list, all_hvparams=None,
1443
                    get_hv_fn=hypervisor.GetHypervisor):
1444
  """Provides a list of instances.
1445

1446
  @type hypervisor_list: list
1447
  @param hypervisor_list: the list of hypervisors to query information
1448
  @type all_hvparams: dict of dict of strings
1449
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1450
    cluster-wide hypervisor parameters
1451
  @type get_hv_fn: function
1452
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1453
    name; optional parameter to increase testability
1454

1455
  @rtype: list
1456
  @return: a list of all running instances on the current node
1457
    - instance1.example.com
1458
    - instance2.example.com
1459

1460
  """
1461
  results = []
1462
  for hname in hypervisor_list:
1463
    hvparams = all_hvparams[hname]
1464
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1465
                                                get_hv_fn=get_hv_fn))
1466
  return results
1467

    
1468

    
1469
def GetInstanceInfo(instance, hname, hvparams=None):
1470
  """Gives back the information about an instance as a dictionary.
1471

1472
  @type instance: string
1473
  @param instance: the instance name
1474
  @type hname: string
1475
  @param hname: the hypervisor type of the instance
1476
  @type hvparams: dict of strings
1477
  @param hvparams: the instance's hvparams
1478

1479
  @rtype: dict
1480
  @return: dictionary with the following keys:
1481
      - memory: memory size of instance (int)
1482
      - state: state of instance (HvInstanceState)
1483
      - time: cpu time of instance (float)
1484
      - vcpus: the number of vcpus (int)
1485

1486
  """
1487
  output = {}
1488

    
1489
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1490
                                                          hvparams=hvparams)
1491
  if iinfo is not None:
1492
    output["memory"] = iinfo[2]
1493
    output["vcpus"] = iinfo[3]
1494
    output["state"] = iinfo[4]
1495
    output["time"] = iinfo[5]
1496

    
1497
  return output
1498

    
1499

    
1500
def GetInstanceMigratable(instance):
1501
  """Computes whether an instance can be migrated.
1502

1503
  @type instance: L{objects.Instance}
1504
  @param instance: object representing the instance to be checked.
1505

1506
  @rtype: tuple
1507
  @return: tuple of (result, description) where:
1508
      - result: whether the instance can be migrated or not
1509
      - description: a description of the issue, if relevant
1510

1511
  """
1512
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1513
  iname = instance.name
1514
  if iname not in hyper.ListInstances(instance.hvparams):
1515
    _Fail("Instance %s is not running", iname)
1516

    
1517
  for idx in range(len(instance.disks)):
1518
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1519
    if not os.path.islink(link_name):
1520
      logging.warning("Instance %s is missing symlink %s for disk %d",
1521
                      iname, link_name, idx)
1522

    
1523

    
1524
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1525
  """Gather data about all instances.
1526

1527
  This is the equivalent of L{GetInstanceInfo}, except that it
1528
  computes data for all instances at once, thus being faster if one
1529
  needs data about more than one instance.
1530

1531
  @type hypervisor_list: list
1532
  @param hypervisor_list: list of hypervisors to query for instance data
1533
  @type all_hvparams: dict of dict of strings
1534
  @param all_hvparams: mapping of hypervisor names to hvparams
1535

1536
  @rtype: dict
1537
  @return: dictionary of instance: data, with data having the following keys:
1538
      - memory: memory size of instance (int)
1539
      - state: xen state of instance (string)
1540
      - time: cpu time of instance (float)
1541
      - vcpus: the number of vcpus
1542

1543
  """
1544
  output = {}
1545
  for hname in hypervisor_list:
1546
    hvparams = all_hvparams[hname]
1547
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1548
    if iinfo:
1549
      for name, _, memory, vcpus, state, times in iinfo:
1550
        value = {
1551
          "memory": memory,
1552
          "vcpus": vcpus,
1553
          "state": state,
1554
          "time": times,
1555
          }
1556
        if name in output:
1557
          # we only check static parameters, like memory and vcpus,
1558
          # and not state and time which can change between the
1559
          # invocations of the different hypervisors
1560
          for key in "memory", "vcpus":
1561
            if value[key] != output[name][key]:
1562
              _Fail("Instance %s is running twice"
1563
                    " with different parameters", name)
1564
        output[name] = value
1565

    
1566
  return output
1567

    
1568

    
1569
def GetInstanceConsoleInfo(instance_param_dict,
1570
                           get_hv_fn=hypervisor.GetHypervisor):
1571
  """Gather data about the console access of a set of instances of this node.
1572

1573
  This function assumes that the caller already knows which instances are on
1574
  this node, by calling a function such as L{GetAllInstancesInfo} or
1575
  L{GetInstanceList}.
1576

1577
  For every instance, a large amount of configuration data needs to be
1578
  provided to the hypervisor interface in order to receive the console
1579
  information. Whether this could or should be cut down can be discussed.
1580
  The information is provided in a dictionary indexed by instance name,
1581
  allowing any number of instance queries to be done.
1582

1583
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1584
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1585
    L{objects.NodeGroup}, HvParams, BeParams
1586
  @param instance_param_dict: mapping of instance name to parameters necessary
1587
    for console information retrieval
1588

1589
  @rtype: dict
1590
  @return: dictionary of instance: data, with data having the following keys:
1591
      - instance: instance name
1592
      - kind: console kind
1593
      - message: used with kind == CONS_MESSAGE, indicates console to be
1594
                 unavailable, supplies error message
1595
      - host: host to connect to
1596
      - port: port to use
1597
      - user: user for login
1598
      - command: the command, broken into parts as an array
1599
      - display: unknown, potentially unused?
1600

1601
  """
1602

    
1603
  output = {}
1604
  for inst_name in instance_param_dict:
1605
    instance = instance_param_dict[inst_name]["instance"]
1606
    pnode = instance_param_dict[inst_name]["node"]
1607
    group = instance_param_dict[inst_name]["group"]
1608
    hvparams = instance_param_dict[inst_name]["hvParams"]
1609
    beparams = instance_param_dict[inst_name]["beParams"]
1610

    
1611
    instance = objects.Instance.FromDict(instance)
1612
    pnode = objects.Node.FromDict(pnode)
1613
    group = objects.NodeGroup.FromDict(group)
1614

    
1615
    h = get_hv_fn(instance.hypervisor)
1616
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1617
                                             hvparams, beparams).ToDict()
1618

    
1619
  return output
1620

    
1621

    
1622
def _InstanceLogName(kind, os_name, instance, component):
1623
  """Compute the OS log filename for a given instance and operation.
1624

1625
  The instance name and os name are passed in as strings since not all
1626
  operations have these as part of an instance object.
1627

1628
  @type kind: string
1629
  @param kind: the operation type (e.g. add, import, etc.)
1630
  @type os_name: string
1631
  @param os_name: the os name
1632
  @type instance: string
1633
  @param instance: the name of the instance being imported/added/etc.
1634
  @type component: string or None
1635
  @param component: the name of the component of the instance being
1636
      transferred
1637

1638
  """
1639
  # TODO: Use tempfile.mkstemp to create unique filename
1640
  if component:
1641
    assert "/" not in component
1642
    c_msg = "-%s" % component
1643
  else:
1644
    c_msg = ""
1645
  base = ("%s-%s-%s%s-%s.log" %
1646
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1647
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1648

    
1649

    
1650
def InstanceOsAdd(instance, reinstall, debug):
1651
  """Add an OS to an instance.
1652

1653
  @type instance: L{objects.Instance}
1654
  @param instance: Instance whose OS is to be installed
1655
  @type reinstall: boolean
1656
  @param reinstall: whether this is an instance reinstall
1657
  @type debug: integer
1658
  @param debug: debug level, passed to the OS scripts
1659
  @rtype: None
1660

1661
  """
1662
  inst_os = OSFromDisk(instance.os)
1663

    
1664
  create_env = OSEnvironment(instance, inst_os, debug)
1665
  if reinstall:
1666
    create_env["INSTANCE_REINSTALL"] = "1"
1667

    
1668
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1669

    
1670
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1671
                        cwd=inst_os.path, output=logfile, reset_env=True)
1672
  if result.failed:
1673
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1674
                  " output: %s", result.cmd, result.fail_reason, logfile,
1675
                  result.output)
1676
    lines = [utils.SafeEncode(val)
1677
             for val in utils.TailFile(logfile, lines=20)]
1678
    _Fail("OS create script failed (%s), last lines in the"
1679
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1680

    
1681

    
1682
def RunRenameInstance(instance, old_name, debug):
1683
  """Run the OS rename script for an instance.
1684

1685
  @type instance: L{objects.Instance}
1686
  @param instance: Instance whose OS is to be installed
1687
  @type old_name: string
1688
  @param old_name: previous instance name
1689
  @type debug: integer
1690
  @param debug: debug level, passed to the OS scripts
1691
  @rtype: boolean
1692
  @return: the success of the operation
1693

1694
  """
1695
  inst_os = OSFromDisk(instance.os)
1696

    
1697
  rename_env = OSEnvironment(instance, inst_os, debug)
1698
  rename_env["OLD_INSTANCE_NAME"] = old_name
1699

    
1700
  logfile = _InstanceLogName("rename", instance.os,
1701
                             "%s-%s" % (old_name, instance.name), None)
1702

    
1703
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1704
                        cwd=inst_os.path, output=logfile, reset_env=True)
1705

    
1706
  if result.failed:
1707
    logging.error("os create command '%s' returned error: %s output: %s",
1708
                  result.cmd, result.fail_reason, result.output)
1709
    lines = [utils.SafeEncode(val)
1710
             for val in utils.TailFile(logfile, lines=20)]
1711
    _Fail("OS rename script failed (%s), last lines in the"
1712
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1713

    
1714

    
1715
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1716
  """Returns symlink path for block device.
1717

1718
  """
1719
  if _dir is None:
1720
    _dir = pathutils.DISK_LINKS_DIR
1721

    
1722
  return utils.PathJoin(_dir,
1723
                        ("%s%s%s" %
1724
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1725

    
1726

    
1727
def _SymlinkBlockDev(instance_name, device_path, idx):
1728
  """Set up symlinks to a instance's block device.
1729

1730
  This is an auxiliary function run when an instance is start (on the primary
1731
  node) or when an instance is migrated (on the target node).
1732

1733

1734
  @param instance_name: the name of the target instance
1735
  @param device_path: path of the physical block device, on the node
1736
  @param idx: the disk index
1737
  @return: absolute path to the disk's symlink
1738

1739
  """
1740
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1741
  try:
1742
    os.symlink(device_path, link_name)
1743
  except OSError, err:
1744
    if err.errno == errno.EEXIST:
1745
      if (not os.path.islink(link_name) or
1746
          os.readlink(link_name) != device_path):
1747
        os.remove(link_name)
1748
        os.symlink(device_path, link_name)
1749
    else:
1750
      raise
1751

    
1752
  return link_name
1753

    
1754

    
1755
def _RemoveBlockDevLinks(instance_name, disks):
1756
  """Remove the block device symlinks belonging to the given instance.
1757

1758
  """
1759
  for idx, _ in enumerate(disks):
1760
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1761
    if os.path.islink(link_name):
1762
      try:
1763
        os.remove(link_name)
1764
      except OSError:
1765
        logging.exception("Can't remove symlink '%s'", link_name)
1766

    
1767

    
1768
def _CalculateDeviceURI(instance, disk, device):
1769
  """Get the URI for the device.
1770

1771
  @type instance: L{objects.Instance}
1772
  @param instance: the instance which disk belongs to
1773
  @type disk: L{objects.Disk}
1774
  @param disk: the target disk object
1775
  @type device: L{bdev.BlockDev}
1776
  @param device: the corresponding BlockDevice
1777
  @rtype: string
1778
  @return: the device uri if any else None
1779

1780
  """
1781
  access_mode = disk.params.get(constants.LDP_ACCESS,
1782
                                constants.DISK_KERNELSPACE)
1783
  if access_mode == constants.DISK_USERSPACE:
1784
    # This can raise errors.BlockDeviceError
1785
    return device.GetUserspaceAccessUri(instance.hypervisor)
1786
  else:
1787
    return None
1788

    
1789

    
1790
def _GatherAndLinkBlockDevs(instance):
1791
  """Set up an instance's block device(s).
1792

1793
  This is run on the primary node at instance startup. The block
1794
  devices must be already assembled.
1795

1796
  @type instance: L{objects.Instance}
1797
  @param instance: the instance whose disks we should assemble
1798
  @rtype: list
1799
  @return: list of (disk_object, link_name, drive_uri)
1800

1801
  """
1802
  block_devices = []
1803
  for idx, disk in enumerate(instance.disks):
1804
    device = _RecursiveFindBD(disk)
1805
    if device is None:
1806
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1807
                                    str(disk))
1808
    device.Open()
1809
    try:
1810
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1811
    except OSError, e:
1812
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1813
                                    e.strerror)
1814
    uri = _CalculateDeviceURI(instance, disk, device)
1815

    
1816
    block_devices.append((disk, link_name, uri))
1817

    
1818
  return block_devices
1819

    
1820

    
1821
def StartInstance(instance, startup_paused, reason, store_reason=True):
1822
  """Start an instance.
1823

1824
  @type instance: L{objects.Instance}
1825
  @param instance: the instance object
1826
  @type startup_paused: bool
1827
  @param instance: pause instance at startup?
1828
  @type reason: list of reasons
1829
  @param reason: the reason trail for this startup
1830
  @type store_reason: boolean
1831
  @param store_reason: whether to store the shutdown reason trail on file
1832
  @rtype: None
1833

1834
  """
1835
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1836
                                                   instance.hvparams)
1837

    
1838
  if instance.name in running_instances:
1839
    logging.info("Instance %s already running, not starting", instance.name)
1840
    return
1841

    
1842
  try:
1843
    block_devices = _GatherAndLinkBlockDevs(instance)
1844
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1845
    hyper.StartInstance(instance, block_devices, startup_paused)
1846
    if store_reason:
1847
      _StoreInstReasonTrail(instance.name, reason)
1848
  except errors.BlockDeviceError, err:
1849
    _Fail("Block device error: %s", err, exc=True)
1850
  except errors.HypervisorError, err:
1851
    _RemoveBlockDevLinks(instance.name, instance.disks)
1852
    _Fail("Hypervisor error: %s", err, exc=True)
1853

    
1854

    
1855
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1856
  """Shut an instance down.
1857

1858
  @note: this functions uses polling with a hardcoded timeout.
1859

1860
  @type instance: L{objects.Instance}
1861
  @param instance: the instance object
1862
  @type timeout: integer
1863
  @param timeout: maximum timeout for soft shutdown
1864
  @type reason: list of reasons
1865
  @param reason: the reason trail for this shutdown
1866
  @type store_reason: boolean
1867
  @param store_reason: whether to store the shutdown reason trail on file
1868
  @rtype: None
1869

1870
  """
1871
  hv_name = instance.hypervisor
1872
  hyper = hypervisor.GetHypervisor(hv_name)
1873
  iname = instance.name
1874

    
1875
  if instance.name not in hyper.ListInstances(instance.hvparams):
1876
    logging.info("Instance %s not running, doing nothing", iname)
1877
    return
1878

    
1879
  class _TryShutdown:
1880
    def __init__(self):
1881
      self.tried_once = False
1882

    
1883
    def __call__(self):
1884
      if iname not in hyper.ListInstances(instance.hvparams):
1885
        return
1886

    
1887
      try:
1888
        hyper.StopInstance(instance, retry=self.tried_once, timeout=timeout)
1889
        if store_reason:
1890
          _StoreInstReasonTrail(instance.name, reason)
1891
      except errors.HypervisorError, err:
1892
        if iname not in hyper.ListInstances(instance.hvparams):
1893
          # if the instance is no longer existing, consider this a
1894
          # success and go to cleanup
1895
          return
1896

    
1897
        _Fail("Failed to stop instance %s: %s", iname, err)
1898

    
1899
      self.tried_once = True
1900

    
1901
      raise utils.RetryAgain()
1902

    
1903
  try:
1904
    utils.Retry(_TryShutdown(), 5, timeout)
1905
  except utils.RetryTimeout:
1906
    # the shutdown did not succeed
1907
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1908

    
1909
    try:
1910
      hyper.StopInstance(instance, force=True)
1911
    except errors.HypervisorError, err:
1912
      if iname in hyper.ListInstances(instance.hvparams):
1913
        # only raise an error if the instance still exists, otherwise
1914
        # the error could simply be "instance ... unknown"!
1915
        _Fail("Failed to force stop instance %s: %s", iname, err)
1916

    
1917
    time.sleep(1)
1918

    
1919
    if iname in hyper.ListInstances(instance.hvparams):
1920
      _Fail("Could not shutdown instance %s even by destroy", iname)
1921

    
1922
  try:
1923
    hyper.CleanupInstance(instance.name)
1924
  except errors.HypervisorError, err:
1925
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1926

    
1927
  _RemoveBlockDevLinks(iname, instance.disks)
1928

    
1929

    
1930
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1931
  """Reboot an instance.
1932

1933
  @type instance: L{objects.Instance}
1934
  @param instance: the instance object to reboot
1935
  @type reboot_type: str
1936
  @param reboot_type: the type of reboot, one the following
1937
    constants:
1938
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1939
        instance OS, do not recreate the VM
1940
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1941
        restart the VM (at the hypervisor level)
1942
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1943
        not accepted here, since that mode is handled differently, in
1944
        cmdlib, and translates into full stop and start of the
1945
        instance (instead of a call_instance_reboot RPC)
1946
  @type shutdown_timeout: integer
1947
  @param shutdown_timeout: maximum timeout for soft shutdown
1948
  @type reason: list of reasons
1949
  @param reason: the reason trail for this reboot
1950
  @rtype: None
1951

1952
  """
1953
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1954
                                                   instance.hvparams)
1955

    
1956
  if instance.name not in running_instances:
1957
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1958

    
1959
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1960
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1961
    try:
1962
      hyper.RebootInstance(instance)
1963
    except errors.HypervisorError, err:
1964
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1965
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1966
    try:
1967
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1968
      result = StartInstance(instance, False, reason, store_reason=False)
1969
      _StoreInstReasonTrail(instance.name, reason)
1970
      return result
1971
    except errors.HypervisorError, err:
1972
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1973
  else:
1974
    _Fail("Invalid reboot_type received: %s", reboot_type)
1975

    
1976

    
1977
def InstanceBalloonMemory(instance, memory):
1978
  """Resize an instance's memory.
1979

1980
  @type instance: L{objects.Instance}
1981
  @param instance: the instance object
1982
  @type memory: int
1983
  @param memory: new memory amount in MB
1984
  @rtype: None
1985

1986
  """
1987
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1988
  running = hyper.ListInstances(instance.hvparams)
1989
  if instance.name not in running:
1990
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1991
    return
1992
  try:
1993
    hyper.BalloonInstanceMemory(instance, memory)
1994
  except errors.HypervisorError, err:
1995
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1996

    
1997

    
1998
def MigrationInfo(instance):
1999
  """Gather information about an instance to be migrated.
2000

2001
  @type instance: L{objects.Instance}
2002
  @param instance: the instance definition
2003

2004
  """
2005
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2006
  try:
2007
    info = hyper.MigrationInfo(instance)
2008
  except errors.HypervisorError, err:
2009
    _Fail("Failed to fetch migration information: %s", err, exc=True)
2010
  return info
2011

    
2012

    
2013
def AcceptInstance(instance, info, target):
2014
  """Prepare the node to accept an instance.
2015

2016
  @type instance: L{objects.Instance}
2017
  @param instance: the instance definition
2018
  @type info: string/data (opaque)
2019
  @param info: migration information, from the source node
2020
  @type target: string
2021
  @param target: target host (usually ip), on this node
2022

2023
  """
2024
  # TODO: why is this required only for DTS_EXT_MIRROR?
2025
  if instance.disk_template in constants.DTS_EXT_MIRROR:
2026
    # Create the symlinks, as the disks are not active
2027
    # in any way
2028
    try:
2029
      _GatherAndLinkBlockDevs(instance)
2030
    except errors.BlockDeviceError, err:
2031
      _Fail("Block device error: %s", err, exc=True)
2032

    
2033
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2034
  try:
2035
    hyper.AcceptInstance(instance, info, target)
2036
  except errors.HypervisorError, err:
2037
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2038
      _RemoveBlockDevLinks(instance.name, instance.disks)
2039
    _Fail("Failed to accept instance: %s", err, exc=True)
2040

    
2041

    
2042
def FinalizeMigrationDst(instance, info, success):
2043
  """Finalize any preparation to accept an instance.
2044

2045
  @type instance: L{objects.Instance}
2046
  @param instance: the instance definition
2047
  @type info: string/data (opaque)
2048
  @param info: migration information, from the source node
2049
  @type success: boolean
2050
  @param success: whether the migration was a success or a failure
2051

2052
  """
2053
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2054
  try:
2055
    hyper.FinalizeMigrationDst(instance, info, success)
2056
  except errors.HypervisorError, err:
2057
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2058

    
2059

    
2060
def MigrateInstance(cluster_name, instance, target, live):
2061
  """Migrates an instance to another node.
2062

2063
  @type cluster_name: string
2064
  @param cluster_name: name of the cluster
2065
  @type instance: L{objects.Instance}
2066
  @param instance: the instance definition
2067
  @type target: string
2068
  @param target: the target node name
2069
  @type live: boolean
2070
  @param live: whether the migration should be done live or not (the
2071
      interpretation of this parameter is left to the hypervisor)
2072
  @raise RPCFail: if migration fails for some reason
2073

2074
  """
2075
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2076

    
2077
  try:
2078
    hyper.MigrateInstance(cluster_name, instance, target, live)
2079
  except errors.HypervisorError, err:
2080
    _Fail("Failed to migrate instance: %s", err, exc=True)
2081

    
2082

    
2083
def FinalizeMigrationSource(instance, success, live):
2084
  """Finalize the instance migration on the source node.
2085

2086
  @type instance: L{objects.Instance}
2087
  @param instance: the instance definition of the migrated instance
2088
  @type success: bool
2089
  @param success: whether the migration succeeded or not
2090
  @type live: bool
2091
  @param live: whether the user requested a live migration or not
2092
  @raise RPCFail: If the execution fails for some reason
2093

2094
  """
2095
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2096

    
2097
  try:
2098
    hyper.FinalizeMigrationSource(instance, success, live)
2099
  except Exception, err:  # pylint: disable=W0703
2100
    _Fail("Failed to finalize the migration on the source node: %s", err,
2101
          exc=True)
2102

    
2103

    
2104
def GetMigrationStatus(instance):
2105
  """Get the migration status
2106

2107
  @type instance: L{objects.Instance}
2108
  @param instance: the instance that is being migrated
2109
  @rtype: L{objects.MigrationStatus}
2110
  @return: the status of the current migration (one of
2111
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2112
           progress info that can be retrieved from the hypervisor
2113
  @raise RPCFail: If the migration status cannot be retrieved
2114

2115
  """
2116
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2117
  try:
2118
    return hyper.GetMigrationStatus(instance)
2119
  except Exception, err:  # pylint: disable=W0703
2120
    _Fail("Failed to get migration status: %s", err, exc=True)
2121

    
2122

    
2123
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2124
  """Hotplug a device
2125

2126
  Hotplug is currently supported only for KVM Hypervisor.
2127
  @type instance: L{objects.Instance}
2128
  @param instance: the instance to which we hotplug a device
2129
  @type action: string
2130
  @param action: the hotplug action to perform
2131
  @type dev_type: string
2132
  @param dev_type: the device type to hotplug
2133
  @type device: either L{objects.NIC} or L{objects.Disk}
2134
  @param device: the device object to hotplug
2135
  @type extra: string
2136
  @param extra: extra info used by hotplug code (e.g. disk link)
2137
  @type seq: int
2138
  @param seq: the index of the device from master perspective
2139
  @raise RPCFail: in case instance does not have KVM hypervisor
2140

2141
  """
2142
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2143
  try:
2144
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2145
  except errors.HotplugError, err:
2146
    _Fail("Hotplug is not supported: %s", err)
2147

    
2148
  if action == constants.HOTPLUG_ACTION_ADD:
2149
    fn = hyper.HotAddDevice
2150
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2151
    fn = hyper.HotDelDevice
2152
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2153
    fn = hyper.HotModDevice
2154
  else:
2155
    assert action in constants.HOTPLUG_ALL_ACTIONS
2156

    
2157
  return fn(instance, dev_type, device, extra, seq)
2158

    
2159

    
2160
def HotplugSupported(instance):
2161
  """Checks if hotplug is generally supported.
2162

2163
  """
2164
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2165
  try:
2166
    hyper.HotplugSupported(instance)
2167
  except errors.HotplugError, err:
2168
    _Fail("Hotplug is not supported: %s", err)
2169

    
2170

    
2171
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2172
  """Creates a block device for an instance.
2173

2174
  @type disk: L{objects.Disk}
2175
  @param disk: the object describing the disk we should create
2176
  @type size: int
2177
  @param size: the size of the physical underlying device, in MiB
2178
  @type owner: str
2179
  @param owner: the name of the instance for which disk is created,
2180
      used for device cache data
2181
  @type on_primary: boolean
2182
  @param on_primary:  indicates if it is the primary node or not
2183
  @type info: string
2184
  @param info: string that will be sent to the physical device
2185
      creation, used for example to set (LVM) tags on LVs
2186
  @type excl_stor: boolean
2187
  @param excl_stor: Whether exclusive_storage is active
2188

2189
  @return: the new unique_id of the device (this can sometime be
2190
      computed only after creation), or None. On secondary nodes,
2191
      it's not required to return anything.
2192

2193
  """
2194
  # TODO: remove the obsolete "size" argument
2195
  # pylint: disable=W0613
2196
  clist = []
2197
  if disk.children:
2198
    for child in disk.children:
2199
      try:
2200
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2201
      except errors.BlockDeviceError, err:
2202
        _Fail("Can't assemble device %s: %s", child, err)
2203
      if on_primary or disk.AssembleOnSecondary():
2204
        # we need the children open in case the device itself has to
2205
        # be assembled
2206
        try:
2207
          # pylint: disable=E1103
2208
          crdev.Open()
2209
        except errors.BlockDeviceError, err:
2210
          _Fail("Can't make child '%s' read-write: %s", child, err)
2211
      clist.append(crdev)
2212

    
2213
  try:
2214
    device = bdev.Create(disk, clist, excl_stor)
2215
  except errors.BlockDeviceError, err:
2216
    _Fail("Can't create block device: %s", err)
2217

    
2218
  if on_primary or disk.AssembleOnSecondary():
2219
    try:
2220
      device.Assemble()
2221
    except errors.BlockDeviceError, err:
2222
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2223
    if on_primary or disk.OpenOnSecondary():
2224
      try:
2225
        device.Open(force=True)
2226
      except errors.BlockDeviceError, err:
2227
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2228
    DevCacheManager.UpdateCache(device.dev_path, owner,
2229
                                on_primary, disk.iv_name)
2230

    
2231
  device.SetInfo(info)
2232

    
2233
  return device.unique_id
2234

    
2235

    
2236
def _DumpDevice(source_path, target_path, offset, size):
2237
  """This function images/wipes the device using a local file.
2238

2239
  @type source_path: string
2240
  @param source_path: path of the image or data source (e.g., "/dev/zero")
2241

2242
  @type target_path: string
2243
  @param target_path: path of the device to image/wipe
2244

2245
  @type offset: int
2246
  @param offset: offset in MiB in the output file
2247

2248
  @type size: int
2249
  @param size: maximum size in MiB to write (data source might be smaller)
2250

2251
  @return: None
2252
  @raise RPCFail: in case of failure
2253

2254
  """
2255
  # Internal sizes are always in Mebibytes; if the following "dd" command
2256
  # should use a different block size the offset and size given to this
2257
  # function must be adjusted accordingly before being passed to "dd".
2258
  block_size = 1024 * 1024
2259

    
2260
  cmd = [constants.DD_CMD, "if=%s" % source_path, "seek=%d" % offset,
2261
         "bs=%s" % block_size, "oflag=direct", "of=%s" % target_path,
2262
         "count=%d" % size]
2263
  result = utils.RunCmd(cmd)
2264

    
2265
  if result.failed:
2266
    _Fail("Dump command '%s' exited with error: %s; output: %s", result.cmd,
2267
          result.fail_reason, result.output)
2268

    
2269

    
2270
def BlockdevWipe(disk, offset, size):
2271
  """Wipes a block device.
2272

2273
  @type disk: L{objects.Disk}
2274
  @param disk: the disk object we want to wipe
2275
  @type offset: int
2276
  @param offset: The offset in MiB in the file
2277
  @type size: int
2278
  @param size: The size in MiB to write
2279

2280
  """
2281
  try:
2282
    rdev = _RecursiveFindBD(disk)
2283
  except errors.BlockDeviceError:
2284
    rdev = None
2285

    
2286
  if not rdev:
2287
    _Fail("Cannot wipe device %s: device not found", disk.iv_name)
2288
  if offset < 0:
2289
    _Fail("Negative offset")
2290
  if size < 0:
2291
    _Fail("Negative size")
2292
  if offset > rdev.size:
2293
    _Fail("Wipe offset is bigger than device size")
2294
  if (offset + size) > rdev.size:
2295
    _Fail("Wipe offset and size are bigger than device size")
2296

    
2297
  _DumpDevice("/dev/zero", rdev.dev_path, offset, size)
2298

    
2299

    
2300

    
2301
def BlockdevPauseResumeSync(disks, pause):
2302
  """Pause or resume the sync of the block device.
2303

2304
  @type disks: list of L{objects.Disk}
2305
  @param disks: the disks object we want to pause/resume
2306
  @type pause: bool
2307
  @param pause: Wheater to pause or resume
2308

2309
  """
2310
  success = []
2311
  for disk in disks:
2312
    try:
2313
      rdev = _RecursiveFindBD(disk)
2314
    except errors.BlockDeviceError:
2315
      rdev = None
2316

    
2317
    if not rdev:
2318
      success.append((False, ("Cannot change sync for device %s:"
2319
                              " device not found" % disk.iv_name)))
2320
      continue
2321

    
2322
    result = rdev.PauseResumeSync(pause)
2323

    
2324
    if result:
2325
      success.append((result, None))
2326
    else:
2327
      if pause:
2328
        msg = "Pause"
2329
      else:
2330
        msg = "Resume"
2331
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2332

    
2333
  return success
2334

    
2335

    
2336
def BlockdevRemove(disk):
2337
  """Remove a block device.
2338

2339
  @note: This is intended to be called recursively.
2340

2341
  @type disk: L{objects.Disk}
2342
  @param disk: the disk object we should remove
2343
  @rtype: boolean
2344
  @return: the success of the operation
2345

2346
  """
2347
  msgs = []
2348
  try:
2349
    rdev = _RecursiveFindBD(disk)
2350
  except errors.BlockDeviceError, err:
2351
    # probably can't attach
2352
    logging.info("Can't attach to device %s in remove", disk)
2353
    rdev = None
2354
  if rdev is not None:
2355
    r_path = rdev.dev_path
2356

    
2357
    def _TryRemove():
2358
      try:
2359
        rdev.Remove()
2360
        return []
2361
      except errors.BlockDeviceError, err:
2362
        return [str(err)]
2363

    
2364
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2365
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2366
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2367

    
2368
    if not msgs:
2369
      DevCacheManager.RemoveCache(r_path)
2370

    
2371
  if disk.children:
2372
    for child in disk.children:
2373
      try:
2374
        BlockdevRemove(child)
2375
      except RPCFail, err:
2376
        msgs.append(str(err))
2377

    
2378
  if msgs:
2379
    _Fail("; ".join(msgs))
2380

    
2381

    
2382
def _RecursiveAssembleBD(disk, owner, as_primary):
2383
  """Activate a block device for an instance.
2384

2385
  This is run on the primary and secondary nodes for an instance.
2386

2387
  @note: this function is called recursively.
2388

2389
  @type disk: L{objects.Disk}
2390
  @param disk: the disk we try to assemble
2391
  @type owner: str
2392
  @param owner: the name of the instance which owns the disk
2393
  @type as_primary: boolean
2394
  @param as_primary: if we should make the block device
2395
      read/write
2396

2397
  @return: the assembled device or None (in case no device
2398
      was assembled)
2399
  @raise errors.BlockDeviceError: in case there is an error
2400
      during the activation of the children or the device
2401
      itself
2402

2403
  """
2404
  children = []
2405
  if disk.children:
2406
    mcn = disk.ChildrenNeeded()
2407
    if mcn == -1:
2408
      mcn = 0 # max number of Nones allowed
2409
    else:
2410
      mcn = len(disk.children) - mcn # max number of Nones
2411
    for chld_disk in disk.children:
2412
      try:
2413
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2414
      except errors.BlockDeviceError, err:
2415
        if children.count(None) >= mcn:
2416
          raise
2417
        cdev = None
2418
        logging.error("Error in child activation (but continuing): %s",
2419
                      str(err))
2420
      children.append(cdev)
2421

    
2422
  if as_primary or disk.AssembleOnSecondary():
2423
    r_dev = bdev.Assemble(disk, children)
2424
    result = r_dev
2425
    if as_primary or disk.OpenOnSecondary():
2426
      r_dev.Open()
2427
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2428
                                as_primary, disk.iv_name)
2429

    
2430
  else:
2431
    result = True
2432
  return result
2433

    
2434

    
2435
def BlockdevAssemble(disk, owner, as_primary, idx):
2436
  """Activate a block device for an instance.
2437

2438
  This is a wrapper over _RecursiveAssembleBD.
2439

2440
  @rtype: str or boolean
2441
  @return: a tuple with the C{/dev/...} path and the created symlink
2442
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2443

2444
  """
2445
  try:
2446
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2447
    if isinstance(result, BlockDev):
2448
      # pylint: disable=E1103
2449
      dev_path = result.dev_path
2450
      link_name = None
2451
      if as_primary:
2452
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2453
    elif result:
2454
      return result, result
2455
    else:
2456
      _Fail("Unexpected result from _RecursiveAssembleBD")
2457
  except errors.BlockDeviceError, err:
2458
    _Fail("Error while assembling disk: %s", err, exc=True)
2459
  except OSError, err:
2460
    _Fail("Error while symlinking disk: %s", err, exc=True)
2461

    
2462
  return dev_path, link_name
2463

    
2464

    
2465
def BlockdevShutdown(disk):
2466
  """Shut down a block device.
2467

2468
  First, if the device is assembled (Attach() is successful), then
2469
  the device is shutdown. Then the children of the device are
2470
  shutdown.
2471

2472
  This function is called recursively. Note that we don't cache the
2473
  children or such, as oppossed to assemble, shutdown of different
2474
  devices doesn't require that the upper device was active.
2475

2476
  @type disk: L{objects.Disk}
2477
  @param disk: the description of the disk we should
2478
      shutdown
2479
  @rtype: None
2480

2481
  """
2482
  msgs = []
2483
  r_dev = _RecursiveFindBD(disk)
2484
  if r_dev is not None:
2485
    r_path = r_dev.dev_path
2486
    try:
2487
      r_dev.Shutdown()
2488
      DevCacheManager.RemoveCache(r_path)
2489
    except errors.BlockDeviceError, err:
2490
      msgs.append(str(err))
2491

    
2492
  if disk.children:
2493
    for child in disk.children:
2494
      try:
2495
        BlockdevShutdown(child)
2496
      except RPCFail, err:
2497
        msgs.append(str(err))
2498

    
2499
  if msgs:
2500
    _Fail("; ".join(msgs))
2501

    
2502

    
2503
def BlockdevAddchildren(parent_cdev, new_cdevs):
2504
  """Extend a mirrored block device.
2505

2506
  @type parent_cdev: L{objects.Disk}
2507
  @param parent_cdev: the disk to which we should add children
2508
  @type new_cdevs: list of L{objects.Disk}
2509
  @param new_cdevs: the list of children which we should add
2510
  @rtype: None
2511

2512
  """
2513
  parent_bdev = _RecursiveFindBD(parent_cdev)
2514
  if parent_bdev is None:
2515
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2516
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2517
  if new_bdevs.count(None) > 0:
2518
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2519
  parent_bdev.AddChildren(new_bdevs)
2520

    
2521

    
2522
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2523
  """Shrink a mirrored block device.
2524

2525
  @type parent_cdev: L{objects.Disk}
2526
  @param parent_cdev: the disk from which we should remove children
2527
  @type new_cdevs: list of L{objects.Disk}
2528
  @param new_cdevs: the list of children which we should remove
2529
  @rtype: None
2530

2531
  """
2532
  parent_bdev = _RecursiveFindBD(parent_cdev)
2533
  if parent_bdev is None:
2534
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2535
  devs = []
2536
  for disk in new_cdevs:
2537
    rpath = disk.StaticDevPath()
2538
    if rpath is None:
2539
      bd = _RecursiveFindBD(disk)
2540
      if bd is None:
2541
        _Fail("Can't find device %s while removing children", disk)
2542
      else:
2543
        devs.append(bd.dev_path)
2544
    else:
2545
      if not utils.IsNormAbsPath(rpath):
2546
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2547
      devs.append(rpath)
2548
  parent_bdev.RemoveChildren(devs)
2549

    
2550

    
2551
def BlockdevGetmirrorstatus(disks):
2552
  """Get the mirroring status of a list of devices.
2553

2554
  @type disks: list of L{objects.Disk}
2555
  @param disks: the list of disks which we should query
2556
  @rtype: disk
2557
  @return: List of L{objects.BlockDevStatus}, one for each disk
2558
  @raise errors.BlockDeviceError: if any of the disks cannot be
2559
      found
2560

2561
  """
2562
  stats = []
2563
  for dsk in disks:
2564
    rbd = _RecursiveFindBD(dsk)
2565
    if rbd is None:
2566
      _Fail("Can't find device %s", dsk)
2567

    
2568
    stats.append(rbd.CombinedSyncStatus())
2569

    
2570
  return stats
2571

    
2572

    
2573
def BlockdevGetmirrorstatusMulti(disks):
2574
  """Get the mirroring status of a list of devices.
2575

2576
  @type disks: list of L{objects.Disk}
2577
  @param disks: the list of disks which we should query
2578
  @rtype: disk
2579
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2580
    success/failure, status is L{objects.BlockDevStatus} on success, string
2581
    otherwise
2582

2583
  """
2584
  result = []
2585
  for disk in disks:
2586
    try:
2587
      rbd = _RecursiveFindBD(disk)
2588
      if rbd is None:
2589
        result.append((False, "Can't find device %s" % disk))
2590
        continue
2591

    
2592
      status = rbd.CombinedSyncStatus()
2593
    except errors.BlockDeviceError, err:
2594
      logging.exception("Error while getting disk status")
2595
      result.append((False, str(err)))
2596
    else:
2597
      result.append((True, status))
2598

    
2599
  assert len(disks) == len(result)
2600

    
2601
  return result
2602

    
2603

    
2604
def _RecursiveFindBD(disk):
2605
  """Check if a device is activated.
2606

2607
  If so, return information about the real device.
2608

2609
  @type disk: L{objects.Disk}
2610
  @param disk: the disk object we need to find
2611

2612
  @return: None if the device can't be found,
2613
      otherwise the device instance
2614

2615
  """
2616
  children = []
2617
  if disk.children:
2618
    for chdisk in disk.children:
2619
      children.append(_RecursiveFindBD(chdisk))
2620

    
2621
  return bdev.FindDevice(disk, children)
2622

    
2623

    
2624
def _OpenRealBD(disk):
2625
  """Opens the underlying block device of a disk.
2626

2627
  @type disk: L{objects.Disk}
2628
  @param disk: the disk object we want to open
2629

2630
  """
2631
  real_disk = _RecursiveFindBD(disk)
2632
  if real_disk is None:
2633
    _Fail("Block device '%s' is not set up", disk)
2634

    
2635
  real_disk.Open()
2636

    
2637
  return real_disk
2638

    
2639

    
2640
def BlockdevFind(disk):
2641
  """Check if a device is activated.
2642

2643
  If it is, return information about the real device.
2644

2645
  @type disk: L{objects.Disk}
2646
  @param disk: the disk to find
2647
  @rtype: None or objects.BlockDevStatus
2648
  @return: None if the disk cannot be found, otherwise a the current
2649
           information
2650

2651
  """
2652
  try:
2653
    rbd = _RecursiveFindBD(disk)
2654
  except errors.BlockDeviceError, err:
2655
    _Fail("Failed to find device: %s", err, exc=True)
2656

    
2657
  if rbd is None:
2658
    return None
2659

    
2660
  return rbd.GetSyncStatus()
2661

    
2662

    
2663
def BlockdevGetdimensions(disks):
2664
  """Computes the size of the given disks.
2665

2666
  If a disk is not found, returns None instead.
2667

2668
  @type disks: list of L{objects.Disk}
2669
  @param disks: the list of disk to compute the size for
2670
  @rtype: list
2671
  @return: list with elements None if the disk cannot be found,
2672
      otherwise the pair (size, spindles), where spindles is None if the
2673
      device doesn't support that
2674

2675
  """
2676
  result = []
2677
  for cf in disks:
2678
    try:
2679
      rbd = _RecursiveFindBD(cf)
2680
    except errors.BlockDeviceError:
2681
      result.append(None)
2682
      continue
2683
    if rbd is None:
2684
      result.append(None)
2685
    else:
2686
      result.append(rbd.GetActualDimensions())
2687
  return result
2688

    
2689

    
2690
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2691
  """Write a file to the filesystem.
2692

2693
  This allows the master to overwrite(!) a file. It will only perform
2694
  the operation if the file belongs to a list of configuration files.
2695

2696
  @type file_name: str
2697
  @param file_name: the target file name
2698
  @type data: str
2699
  @param data: the new contents of the file
2700
  @type mode: int
2701
  @param mode: the mode to give the file (can be None)
2702
  @type uid: string
2703
  @param uid: the owner of the file
2704
  @type gid: string
2705
  @param gid: the group of the file
2706
  @type atime: float
2707
  @param atime: the atime to set on the file (can be None)
2708
  @type mtime: float
2709
  @param mtime: the mtime to set on the file (can be None)
2710
  @rtype: None
2711

2712
  """
2713
  file_name = vcluster.LocalizeVirtualPath(file_name)
2714

    
2715
  if not os.path.isabs(file_name):
2716
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2717

    
2718
  if file_name not in _ALLOWED_UPLOAD_FILES:
2719
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2720
          file_name)
2721

    
2722
  raw_data = _Decompress(data)
2723

    
2724
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2725
    _Fail("Invalid username/groupname type")
2726

    
2727
  getents = runtime.GetEnts()
2728
  uid = getents.LookupUser(uid)
2729
  gid = getents.LookupGroup(gid)
2730

    
2731
  utils.SafeWriteFile(file_name, None,
2732
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2733
                      atime=atime, mtime=mtime)
2734

    
2735

    
2736
def RunOob(oob_program, command, node, timeout):
2737
  """Executes oob_program with given command on given node.
2738

2739
  @param oob_program: The path to the executable oob_program
2740
  @param command: The command to invoke on oob_program
2741
  @param node: The node given as an argument to the program
2742
  @param timeout: Timeout after which we kill the oob program
2743

2744
  @return: stdout
2745
  @raise RPCFail: If execution fails for some reason
2746

2747
  """
2748
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2749

    
2750
  if result.failed:
2751
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2752
          result.fail_reason, result.output)
2753

    
2754
  return result.stdout
2755

    
2756

    
2757
def _OSOndiskAPIVersion(os_dir):
2758
  """Compute and return the API version of a given OS.
2759

2760
  This function will try to read the API version of the OS residing in
2761
  the 'os_dir' directory.
2762

2763
  @type os_dir: str
2764
  @param os_dir: the directory in which we should look for the OS
2765
  @rtype: tuple
2766
  @return: tuple (status, data) with status denoting the validity and
2767
      data holding either the vaid versions or an error message
2768

2769
  """
2770
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2771

    
2772
  try:
2773
    st = os.stat(api_file)
2774
  except EnvironmentError, err:
2775
    return False, ("Required file '%s' not found under path %s: %s" %
2776
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2777

    
2778
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2779
    return False, ("File '%s' in %s is not a regular file" %
2780
                   (constants.OS_API_FILE, os_dir))
2781

    
2782
  try:
2783
    api_versions = utils.ReadFile(api_file).splitlines()
2784
  except EnvironmentError, err:
2785
    return False, ("Error while reading the API version file at %s: %s" %
2786
                   (api_file, utils.ErrnoOrStr(err)))
2787

    
2788
  try:
2789
    api_versions = [int(version.strip()) for version in api_versions]
2790
  except (TypeError, ValueError), err:
2791
    return False, ("API version(s) can't be converted to integer: %s" %
2792
                   str(err))
2793

    
2794
  return True, api_versions
2795

    
2796

    
2797
def DiagnoseOS(top_dirs=None):
2798
  """Compute the validity for all OSes.
2799

2800
  @type top_dirs: list
2801
  @param top_dirs: the list of directories in which to
2802
      search (if not given defaults to
2803
      L{pathutils.OS_SEARCH_PATH})
2804
  @rtype: list of L{objects.OS}
2805
  @return: a list of tuples (name, path, status, diagnose, variants,
2806
      parameters, api_version) for all (potential) OSes under all
2807
      search paths, where:
2808
          - name is the (potential) OS name
2809
          - path is the full path to the OS
2810
          - status True/False is the validity of the OS
2811
          - diagnose is the error message for an invalid OS, otherwise empty
2812
          - variants is a list of supported OS variants, if any
2813
          - parameters is a list of (name, help) parameters, if any
2814
          - api_version is a list of support OS API versions
2815

2816
  """
2817
  if top_dirs is None:
2818
    top_dirs = pathutils.OS_SEARCH_PATH
2819

    
2820
  result = []
2821
  for dir_name in top_dirs:
2822
    if os.path.isdir(dir_name):
2823
      try:
2824
        f_names = utils.ListVisibleFiles(dir_name)
2825
      except EnvironmentError, err:
2826
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2827
        break
2828
      for name in f_names:
2829
        os_path = utils.PathJoin(dir_name, name)
2830
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2831
        if status:
2832
          diagnose = ""
2833
          variants = os_inst.supported_variants
2834
          parameters = os_inst.supported_parameters
2835
          api_versions = os_inst.api_versions
2836
        else:
2837
          diagnose = os_inst
2838
          variants = parameters = api_versions = []
2839
        result.append((name, os_path, status, diagnose, variants,
2840
                       parameters, api_versions))
2841

    
2842
  return result
2843

    
2844

    
2845
def _TryOSFromDisk(name, base_dir=None):
2846
  """Create an OS instance from disk.
2847

2848
  This function will return an OS instance if the given name is a
2849
  valid OS name.
2850

2851
  @type base_dir: string
2852
  @keyword base_dir: Base directory containing OS installations.
2853
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2854
  @rtype: tuple
2855
  @return: success and either the OS instance if we find a valid one,
2856
      or error message
2857

2858
  """
2859
  if base_dir is None:
2860
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2861
  else:
2862
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2863

    
2864
  if os_dir is None:
2865
    return False, "Directory for OS %s not found in search path" % name
2866

    
2867
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2868
  if not status:
2869
    # push the error up
2870
    return status, api_versions
2871

    
2872
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2873
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2874
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2875

    
2876
  # OS Files dictionary, we will populate it with the absolute path
2877
  # names; if the value is True, then it is a required file, otherwise
2878
  # an optional one
2879
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2880

    
2881
  if max(api_versions) >= constants.OS_API_V15:
2882
    os_files[constants.OS_VARIANTS_FILE] = False
2883

    
2884
  if max(api_versions) >= constants.OS_API_V20:
2885
    os_files[constants.OS_PARAMETERS_FILE] = True
2886
  else:
2887
    del os_files[constants.OS_SCRIPT_VERIFY]
2888

    
2889
  for (filename, required) in os_files.items():
2890
    os_files[filename] = utils.PathJoin(os_dir, filename)
2891

    
2892
    try:
2893
      st = os.stat(os_files[filename])
2894
    except EnvironmentError, err:
2895
      if err.errno == errno.ENOENT and not required:
2896
        del os_files[filename]
2897
        continue
2898
      return False, ("File '%s' under path '%s' is missing (%s)" %
2899
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2900

    
2901
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2902
      return False, ("File '%s' under path '%s' is not a regular file" %
2903
                     (filename, os_dir))
2904

    
2905
    if filename in constants.OS_SCRIPTS:
2906
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2907
        return False, ("File '%s' under path '%s' is not executable" %
2908
                       (filename, os_dir))
2909

    
2910
  variants = []
2911
  if constants.OS_VARIANTS_FILE in os_files:
2912
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2913
    try:
2914
      variants = \
2915
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2916
    except EnvironmentError, err:
2917
      # we accept missing files, but not other errors
2918
      if err.errno != errno.ENOENT:
2919
        return False, ("Error while reading the OS variants file at %s: %s" %
2920
                       (variants_file, utils.ErrnoOrStr(err)))
2921

    
2922
  parameters = []
2923
  if constants.OS_PARAMETERS_FILE in os_files:
2924
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2925
    try:
2926
      parameters = utils.ReadFile(parameters_file).splitlines()
2927
    except EnvironmentError, err:
2928
      return False, ("Error while reading the OS parameters file at %s: %s" %
2929
                     (parameters_file, utils.ErrnoOrStr(err)))
2930
    parameters = [v.split(None, 1) for v in parameters]
2931

    
2932
  os_obj = objects.OS(name=name, path=os_dir,
2933
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2934
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2935
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2936
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2937
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2938
                                                 None),
2939
                      supported_variants=variants,
2940
                      supported_parameters=parameters,
2941
                      api_versions=api_versions)
2942
  return True, os_obj
2943

    
2944

    
2945
def OSFromDisk(name, base_dir=None):
2946
  """Create an OS instance from disk.
2947

2948
  This function will return an OS instance if the given name is a
2949
  valid OS name. Otherwise, it will raise an appropriate
2950
  L{RPCFail} exception, detailing why this is not a valid OS.
2951

2952
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2953
  an exception but returns true/false status data.
2954

2955
  @type base_dir: string
2956
  @keyword base_dir: Base directory containing OS installations.
2957
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2958
  @rtype: L{objects.OS}
2959
  @return: the OS instance if we find a valid one
2960
  @raise RPCFail: if we don't find a valid OS
2961

2962
  """
2963
  name_only = objects.OS.GetName(name)
2964
  status, payload = _TryOSFromDisk(name_only, base_dir)
2965

    
2966
  if not status:
2967
    _Fail(payload)
2968

    
2969
  return payload
2970

    
2971

    
2972
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2973
  """Calculate the basic environment for an os script.
2974

2975
  @type os_name: str
2976
  @param os_name: full operating system name (including variant)
2977
  @type inst_os: L{objects.OS}
2978
  @param inst_os: operating system for which the environment is being built
2979
  @type os_params: dict
2980
  @param os_params: the OS parameters
2981
  @type debug: integer
2982
  @param debug: debug level (0 or 1, for OS Api 10)
2983
  @rtype: dict
2984
  @return: dict of environment variables
2985
  @raise errors.BlockDeviceError: if the block device
2986
      cannot be found
2987

2988
  """
2989
  result = {}
2990
  api_version = \
2991
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2992
  result["OS_API_VERSION"] = "%d" % api_version
2993
  result["OS_NAME"] = inst_os.name
2994
  result["DEBUG_LEVEL"] = "%d" % debug
2995

    
2996
  # OS variants
2997
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2998
    variant = objects.OS.GetVariant(os_name)
2999
    if not variant:
3000
      variant = inst_os.supported_variants[0]
3001
  else:
3002
    variant = ""
3003
  result["OS_VARIANT"] = variant
3004

    
3005
  # OS params
3006
  for pname, pvalue in os_params.items():
3007
    result["OSP_%s" % pname.upper()] = pvalue
3008

    
3009
  # Set a default path otherwise programs called by OS scripts (or
3010
  # even hooks called from OS scripts) might break, and we don't want
3011
  # to have each script require setting a PATH variable
3012
  result["PATH"] = constants.HOOKS_PATH
3013

    
3014
  return result
3015

    
3016

    
3017
def OSEnvironment(instance, inst_os, debug=0):
3018
  """Calculate the environment for an os script.
3019

3020
  @type instance: L{objects.Instance}
3021
  @param instance: target instance for the os script run
3022
  @type inst_os: L{objects.OS}
3023
  @param inst_os: operating system for which the environment is being built
3024
  @type debug: integer
3025
  @param debug: debug level (0 or 1, for OS Api 10)
3026
  @rtype: dict
3027
  @return: dict of environment variables
3028
  @raise errors.BlockDeviceError: if the block device
3029
      cannot be found
3030

3031
  """
3032
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
3033

    
3034
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
3035
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
3036

    
3037
  result["HYPERVISOR"] = instance.hypervisor
3038
  result["DISK_COUNT"] = "%d" % len(instance.disks)
3039
  result["NIC_COUNT"] = "%d" % len(instance.nics)
3040
  result["INSTANCE_SECONDARY_NODES"] = \
3041
      ("%s" % " ".join(instance.secondary_nodes))
3042

    
3043
  # Disks
3044
  for idx, disk in enumerate(instance.disks):
3045
    real_disk = _OpenRealBD(disk)
3046
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
3047
    result["DISK_%d_ACCESS" % idx] = disk.mode
3048
    result["DISK_%d_UUID" % idx] = disk.uuid
3049
    if disk.name:
3050
      result["DISK_%d_NAME" % idx] = disk.name
3051
    if constants.HV_DISK_TYPE in instance.hvparams:
3052
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
3053
        instance.hvparams[constants.HV_DISK_TYPE]
3054
    if disk.dev_type in constants.DTS_BLOCK:
3055
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
3056
    elif disk.dev_type in constants.DTS_FILEBASED:
3057
      result["DISK_%d_BACKEND_TYPE" % idx] = \
3058
        "file:%s" % disk.logical_id[0]
3059

    
3060
  # NICs
3061
  for idx, nic in enumerate(instance.nics):
3062
    result["NIC_%d_MAC" % idx] = nic.mac
3063
    result["NIC_%d_UUID" % idx] = nic.uuid
3064
    if nic.name:
3065
      result["NIC_%d_NAME" % idx] = nic.name
3066
    if nic.ip:
3067
      result["NIC_%d_IP" % idx] = nic.ip
3068
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
3069
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3070
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
3071
    if nic.nicparams[constants.NIC_LINK]:
3072
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
3073
    if nic.netinfo:
3074
      nobj = objects.Network.FromDict(nic.netinfo)
3075
      result.update(nobj.HooksDict("NIC_%d_" % idx))
3076
    if constants.HV_NIC_TYPE in instance.hvparams:
3077
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
3078
        instance.hvparams[constants.HV_NIC_TYPE]
3079

    
3080
  # HV/BE params
3081
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
3082
    for key, value in source.items():
3083
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
3084

    
3085
  return result
3086

    
3087

    
3088
def DiagnoseExtStorage(top_dirs=None):
3089
  """Compute the validity for all ExtStorage Providers.
3090

3091
  @type top_dirs: list
3092
  @param top_dirs: the list of directories in which to
3093
      search (if not given defaults to
3094
      L{pathutils.ES_SEARCH_PATH})
3095
  @rtype: list of L{objects.ExtStorage}
3096
  @return: a list of tuples (name, path, status, diagnose, parameters)
3097
      for all (potential) ExtStorage Providers under all
3098
      search paths, where:
3099
          - name is the (potential) ExtStorage Provider
3100
          - path is the full path to the ExtStorage Provider
3101
          - status True/False is the validity of the ExtStorage Provider
3102
          - diagnose is the error message for an invalid ExtStorage Provider,
3103
            otherwise empty
3104
          - parameters is a list of (name, help) parameters, if any
3105

3106
  """
3107
  if top_dirs is None:
3108
    top_dirs = pathutils.ES_SEARCH_PATH
3109

    
3110
  result = []
3111
  for dir_name in top_dirs:
3112
    if os.path.isdir(dir_name):
3113
      try:
3114
        f_names = utils.ListVisibleFiles(dir_name)
3115
      except EnvironmentError, err:
3116
        logging.exception("Can't list the ExtStorage directory %s: %s",
3117
                          dir_name, err)
3118
        break
3119
      for name in f_names:
3120
        es_path = utils.PathJoin(dir_name, name)
3121
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3122
        if status:
3123
          diagnose = ""
3124
          parameters = es_inst.supported_parameters
3125
        else:
3126
          diagnose = es_inst
3127
          parameters = []
3128
        result.append((name, es_path, status, diagnose, parameters))
3129

    
3130
  return result
3131

    
3132

    
3133
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3134
  """Grow a stack of block devices.
3135

3136
  This function is called recursively, with the childrens being the
3137
  first ones to resize.
3138

3139
  @type disk: L{objects.Disk}
3140
  @param disk: the disk to be grown
3141
  @type amount: integer
3142
  @param amount: the amount (in mebibytes) to grow with
3143
  @type dryrun: boolean
3144
  @param dryrun: whether to execute the operation in simulation mode
3145
      only, without actually increasing the size
3146
  @param backingstore: whether to execute the operation on backing storage
3147
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3148
      whereas LVM, file, RBD are backing storage
3149
  @rtype: (status, result)
3150
  @type excl_stor: boolean
3151
  @param excl_stor: Whether exclusive_storage is active
3152
  @return: a tuple with the status of the operation (True/False), and
3153
      the errors message if status is False
3154

3155
  """
3156
  r_dev = _RecursiveFindBD(disk)
3157
  if r_dev is None:
3158
    _Fail("Cannot find block device %s", disk)
3159

    
3160
  try:
3161
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3162
  except errors.BlockDeviceError, err:
3163
    _Fail("Failed to grow block device: %s", err, exc=True)
3164

    
3165

    
3166
def BlockdevSnapshot(disk):
3167
  """Create a snapshot copy of a block device.
3168

3169
  This function is called recursively, and the snapshot is actually created
3170
  just for the leaf lvm backend device.
3171

3172
  @type disk: L{objects.Disk}
3173
  @param disk: the disk to be snapshotted
3174
  @rtype: string
3175
  @return: snapshot disk ID as (vg, lv)
3176

3177
  """
3178
  if disk.dev_type == constants.DT_DRBD8:
3179
    if not disk.children:
3180
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3181
            disk.unique_id)
3182
    return BlockdevSnapshot(disk.children[0])
3183
  elif disk.dev_type == constants.DT_PLAIN:
3184
    r_dev = _RecursiveFindBD(disk)
3185
    if r_dev is not None:
3186
      # FIXME: choose a saner value for the snapshot size
3187
      # let's stay on the safe side and ask for the full size, for now
3188
      return r_dev.Snapshot(disk.size)
3189
    else:
3190
      _Fail("Cannot find block device %s", disk)
3191
  else:
3192
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3193
          disk.unique_id, disk.dev_type)
3194

    
3195

    
3196
def BlockdevSetInfo(disk, info):
3197
  """Sets 'metadata' information on block devices.
3198

3199
  This function sets 'info' metadata on block devices. Initial
3200
  information is set at device creation; this function should be used
3201
  for example after renames.
3202

3203
  @type disk: L{objects.Disk}
3204
  @param disk: the disk to be grown
3205
  @type info: string
3206
  @param info: new 'info' metadata
3207
  @rtype: (status, result)
3208
  @return: a tuple with the status of the operation (True/False), and
3209
      the errors message if status is False
3210

3211
  """
3212
  r_dev = _RecursiveFindBD(disk)
3213
  if r_dev is None:
3214
    _Fail("Cannot find block device %s", disk)
3215

    
3216
  try:
3217
    r_dev.SetInfo(info)
3218
  except errors.BlockDeviceError, err:
3219
    _Fail("Failed to set information on block device: %s", err, exc=True)
3220

    
3221

    
3222
def FinalizeExport(instance, snap_disks):
3223
  """Write out the export configuration information.
3224

3225
  @type instance: L{objects.Instance}
3226
  @param instance: the instance which we export, used for
3227
      saving configuration
3228
  @type snap_disks: list of L{objects.Disk}
3229
  @param snap_disks: list of snapshot block devices, which
3230
      will be used to get the actual name of the dump file
3231

3232
  @rtype: None
3233

3234
  """
3235
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3236
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3237

    
3238
  config = objects.SerializableConfigParser()
3239

    
3240
  config.add_section(constants.INISECT_EXP)
3241
  config.set(constants.INISECT_EXP, "version", "0")
3242
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3243
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3244
  config.set(constants.INISECT_EXP, "os", instance.os)
3245
  config.set(constants.INISECT_EXP, "compression", "none")
3246

    
3247
  config.add_section(constants.INISECT_INS)
3248
  config.set(constants.INISECT_INS, "name", instance.name)
3249
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3250
             instance.beparams[constants.BE_MAXMEM])
3251
  config.set(constants.INISECT_INS, "minmem", "%d" %
3252
             instance.beparams[constants.BE_MINMEM])
3253
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3254
  config.set(constants.INISECT_INS, "memory", "%d" %
3255
             instance.beparams[constants.BE_MAXMEM])
3256
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3257
             instance.beparams[constants.BE_VCPUS])
3258
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3259
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3260
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3261

    
3262
  nic_total = 0
3263
  for nic_count, nic in enumerate(instance.nics):
3264
    nic_total += 1
3265
    config.set(constants.INISECT_INS, "nic%d_mac" %
3266
               nic_count, "%s" % nic.mac)
3267
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3268
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3269
               "%s" % nic.network)
3270
    config.set(constants.INISECT_INS, "nic%d_name" % nic_count,
3271
               "%s" % nic.name)
3272
    for param in constants.NICS_PARAMETER_TYPES:
3273
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3274
                 "%s" % nic.nicparams.get(param, None))
3275
  # TODO: redundant: on load can read nics until it doesn't exist
3276
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3277

    
3278
  disk_total = 0
3279
  for disk_count, disk in enumerate(snap_disks):
3280
    if disk:
3281
      disk_total += 1
3282
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3283
                 ("%s" % disk.iv_name))
3284
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3285
                 ("%s" % disk.logical_id[1]))
3286
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3287
                 ("%d" % disk.size))
3288
      config.set(constants.INISECT_INS, "disk%d_name" % disk_count,
3289
                 "%s" % disk.name)
3290

    
3291
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3292

    
3293
  # New-style hypervisor/backend parameters
3294

    
3295
  config.add_section(constants.INISECT_HYP)
3296
  for name, value in instance.hvparams.items():
3297
    if name not in constants.HVC_GLOBALS:
3298
      config.set(constants.INISECT_HYP, name, str(value))
3299

    
3300
  config.add_section(constants.INISECT_BEP)
3301
  for name, value in instance.beparams.items():
3302
    config.set(constants.INISECT_BEP, name, str(value))
3303

    
3304
  config.add_section(constants.INISECT_OSP)
3305
  for name, value in instance.osparams.items():
3306
    config.set(constants.INISECT_OSP, name, str(value))
3307

    
3308
  config.add_section(constants.INISECT_OSP_PRIVATE)
3309
  for name, value in instance.osparams_private.items():
3310
    config.set(constants.INISECT_OSP_PRIVATE, name, str(value.Get()))
3311

    
3312
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3313
                  data=config.Dumps())
3314
  shutil.rmtree(finaldestdir, ignore_errors=True)
3315
  shutil.move(destdir, finaldestdir)
3316

    
3317

    
3318
def ExportInfo(dest):
3319
  """Get export configuration information.
3320

3321
  @type dest: str
3322
  @param dest: directory containing the export
3323

3324
  @rtype: L{objects.SerializableConfigParser}
3325
  @return: a serializable config file containing the
3326
      export info
3327

3328
  """
3329
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3330

    
3331
  config = objects.SerializableConfigParser()
3332
  config.read(cff)
3333

    
3334
  if (not config.has_section(constants.INISECT_EXP) or
3335
      not config.has_section(constants.INISECT_INS)):
3336
    _Fail("Export info file doesn't have the required fields")
3337

    
3338
  return config.Dumps()
3339

    
3340

    
3341
def ListExports():
3342
  """Return a list of exports currently available on this machine.
3343

3344
  @rtype: list
3345
  @return: list of the exports
3346

3347
  """
3348
  if os.path.isdir(pathutils.EXPORT_DIR):
3349
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3350
  else:
3351
    _Fail("No exports directory")
3352

    
3353

    
3354
def RemoveExport(export):
3355
  """Remove an existing export from the node.
3356

3357
  @type export: str
3358
  @param export: the name of the export to remove
3359
  @rtype: None
3360

3361
  """
3362
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3363

    
3364
  try:
3365
    shutil.rmtree(target)
3366
  except EnvironmentError, err:
3367
    _Fail("Error while removing the export: %s", err, exc=True)
3368

    
3369

    
3370
def BlockdevRename(devlist):
3371
  """Rename a list of block devices.
3372

3373
  @type devlist: list of tuples
3374
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3375
      an L{objects.Disk} object describing the current disk, and new
3376
      unique_id is the name we rename it to
3377
  @rtype: boolean
3378
  @return: True if all renames succeeded, False otherwise
3379

3380
  """
3381
  msgs = []
3382
  result = True
3383
  for disk, unique_id in devlist:
3384
    dev = _RecursiveFindBD(disk)
3385
    if dev is None:
3386
      msgs.append("Can't find device %s in rename" % str(disk))
3387
      result = False
3388
      continue
3389
    try:
3390
      old_rpath = dev.dev_path
3391
      dev.Rename(unique_id)
3392
      new_rpath = dev.dev_path
3393
      if old_rpath != new_rpath:
3394
        DevCacheManager.RemoveCache(old_rpath)
3395
        # FIXME: we should add the new cache information here, like:
3396
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3397
        # but we don't have the owner here - maybe parse from existing
3398
        # cache? for now, we only lose lvm data when we rename, which
3399
        # is less critical than DRBD or MD
3400
    except errors.BlockDeviceError, err:
3401
      msgs.append("Can't rename device '%s' to '%s': %s" %
3402
                  (dev, unique_id, err))
3403
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3404
      result = False
3405
  if not result:
3406
    _Fail("; ".join(msgs))
3407

    
3408

    
3409
def _TransformFileStorageDir(fs_dir):
3410
  """Checks whether given file_storage_dir is valid.
3411

3412
  Checks wheter the given fs_dir is within the cluster-wide default
3413
  file_storage_dir or the shared_file_storage_dir, which are stored in
3414
  SimpleStore. Only paths under those directories are allowed.
3415

3416
  @type fs_dir: str
3417
  @param fs_dir: the path to check
3418

3419
  @return: the normalized path if valid, None otherwise
3420

3421
  """
3422
  filestorage.CheckFileStoragePath(fs_dir)
3423

    
3424
  return os.path.normpath(fs_dir)
3425

    
3426

    
3427
def CreateFileStorageDir(file_storage_dir):
3428
  """Create file storage directory.
3429

3430
  @type file_storage_dir: str
3431
  @param file_storage_dir: directory to create
3432

3433
  @rtype: tuple
3434
  @return: tuple with first element a boolean indicating wheter dir
3435
      creation was successful or not
3436

3437
  """
3438
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3439
  if os.path.exists(file_storage_dir):
3440
    if not os.path.isdir(file_storage_dir):
3441
      _Fail("Specified storage dir '%s' is not a directory",
3442
            file_storage_dir)
3443
  else:
3444
    try:
3445
      os.makedirs(file_storage_dir, 0750)
3446
    except OSError, err:
3447
      _Fail("Cannot create file storage directory '%s': %s",
3448
            file_storage_dir, err, exc=True)
3449

    
3450

    
3451
def RemoveFileStorageDir(file_storage_dir):
3452
  """Remove file storage directory.
3453

3454
  Remove it only if it's empty. If not log an error and return.
3455

3456
  @type file_storage_dir: str
3457
  @param file_storage_dir: the directory we should cleanup
3458
  @rtype: tuple (success,)
3459
  @return: tuple of one element, C{success}, denoting
3460
      whether the operation was successful
3461

3462
  """
3463
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3464
  if os.path.exists(file_storage_dir):
3465
    if not os.path.isdir(file_storage_dir):
3466
      _Fail("Specified Storage directory '%s' is not a directory",
3467
            file_storage_dir)
3468
    # deletes dir only if empty, otherwise we want to fail the rpc call
3469
    try:
3470
      os.rmdir(file_storage_dir)
3471
    except OSError, err:
3472
      _Fail("Cannot remove file storage directory '%s': %s",
3473
            file_storage_dir, err)
3474

    
3475

    
3476
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3477
  """Rename the file storage directory.
3478

3479
  @type old_file_storage_dir: str
3480
  @param old_file_storage_dir: the current path
3481
  @type new_file_storage_dir: str
3482
  @param new_file_storage_dir: the name we should rename to
3483
  @rtype: tuple (success,)
3484
  @return: tuple of one element, C{success}, denoting
3485
      whether the operation was successful
3486

3487
  """
3488
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3489
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3490
  if not os.path.exists(new_file_storage_dir):
3491
    if os.path.isdir(old_file_storage_dir):
3492
      try:
3493
        os.rename(old_file_storage_dir, new_file_storage_dir)
3494
      except OSError, err:
3495
        _Fail("Cannot rename '%s' to '%s': %s",
3496
              old_file_storage_dir, new_file_storage_dir, err)
3497
    else:
3498
      _Fail("Specified storage dir '%s' is not a directory",
3499
            old_file_storage_dir)
3500
  else:
3501
    if os.path.exists(old_file_storage_dir):
3502
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3503
            old_file_storage_dir, new_file_storage_dir)
3504

    
3505

    
3506
def _EnsureJobQueueFile(file_name):
3507
  """Checks whether the given filename is in the queue directory.
3508

3509
  @type file_name: str
3510
  @param file_name: the file name we should check
3511
  @rtype: None
3512
  @raises RPCFail: if the file is not valid
3513

3514
  """
3515
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3516
    _Fail("Passed job queue file '%s' does not belong to"
3517
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3518

    
3519

    
3520
def JobQueueUpdate(file_name, content):
3521
  """Updates a file in the queue directory.
3522

3523
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3524
  checking.
3525

3526
  @type file_name: str
3527
  @param file_name: the job file name
3528
  @type content: str
3529
  @param content: the new job contents
3530
  @rtype: boolean
3531
  @return: the success of the operation
3532

3533
  """
3534
  file_name = vcluster.LocalizeVirtualPath(file_name)
3535

    
3536
  _EnsureJobQueueFile(file_name)
3537
  getents = runtime.GetEnts()
3538

    
3539
  # Write and replace the file atomically
3540
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3541
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3542

    
3543

    
3544
def JobQueueRename(old, new):
3545
  """Renames a job queue file.
3546

3547
  This is just a wrapper over os.rename with proper checking.
3548

3549
  @type old: str
3550
  @param old: the old (actual) file name
3551
  @type new: str
3552
  @param new: the desired file name
3553
  @rtype: tuple
3554
  @return: the success of the operation and payload
3555

3556
  """
3557
  old = vcluster.LocalizeVirtualPath(old)
3558
  new = vcluster.LocalizeVirtualPath(new)
3559

    
3560
  _EnsureJobQueueFile(old)
3561
  _EnsureJobQueueFile(new)
3562

    
3563
  getents = runtime.GetEnts()
3564

    
3565
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3566
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3567

    
3568

    
3569
def BlockdevClose(instance_name, disks):
3570
  """Closes the given block devices.
3571

3572
  This means they will be switched to secondary mode (in case of
3573
  DRBD).
3574

3575
  @param instance_name: if the argument is not empty, the symlinks
3576
      of this instance will be removed
3577
  @type disks: list of L{objects.Disk}
3578
  @param disks: the list of disks to be closed
3579
  @rtype: tuple (success, message)
3580
  @return: a tuple of success and message, where success
3581
      indicates the succes of the operation, and message
3582
      which will contain the error details in case we
3583
      failed
3584

3585
  """
3586
  bdevs = []
3587
  for cf in disks:
3588
    rd = _RecursiveFindBD(cf)
3589
    if rd is None:
3590
      _Fail("Can't find device %s", cf)
3591
    bdevs.append(rd)
3592

    
3593
  msg = []
3594
  for rd in bdevs:
3595
    try:
3596
      rd.Close()
3597
    except errors.BlockDeviceError, err:
3598
      msg.append(str(err))
3599
  if msg:
3600
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3601
  else:
3602
    if instance_name:
3603
      _RemoveBlockDevLinks(instance_name, disks)
3604

    
3605

    
3606
def ValidateHVParams(hvname, hvparams):
3607
  """Validates the given hypervisor parameters.
3608

3609
  @type hvname: string
3610
  @param hvname: the hypervisor name
3611
  @type hvparams: dict
3612
  @param hvparams: the hypervisor parameters to be validated
3613
  @rtype: None
3614

3615
  """
3616
  try:
3617
    hv_type = hypervisor.GetHypervisor(hvname)
3618
    hv_type.ValidateParameters(hvparams)
3619
  except errors.HypervisorError, err:
3620
    _Fail(str(err), log=False)
3621

    
3622

    
3623
def _CheckOSPList(os_obj, parameters):
3624
  """Check whether a list of parameters is supported by the OS.
3625

3626
  @type os_obj: L{objects.OS}
3627
  @param os_obj: OS object to check
3628
  @type parameters: list
3629
  @param parameters: the list of parameters to check
3630

3631
  """
3632
  supported = [v[0] for v in os_obj.supported_parameters]
3633
  delta = frozenset(parameters).difference(supported)
3634
  if delta:
3635
    _Fail("The following parameters are not supported"
3636
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3637

    
3638

    
3639
def ValidateOS(required, osname, checks, osparams):
3640
  """Validate the given OS parameters.
3641

3642
  @type required: boolean
3643
  @param required: whether absence of the OS should translate into
3644
      failure or not
3645
  @type osname: string
3646
  @param osname: the OS to be validated
3647
  @type checks: list
3648
  @param checks: list of the checks to run (currently only 'parameters')
3649
  @type osparams: dict
3650
  @param osparams: dictionary with OS parameters, some of which may be
3651
                   private.
3652
  @rtype: boolean
3653
  @return: True if the validation passed, or False if the OS was not
3654
      found and L{required} was false
3655

3656
  """
3657
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3658
    _Fail("Unknown checks required for OS %s: %s", osname,
3659
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3660

    
3661
  name_only = objects.OS.GetName(osname)
3662
  status, tbv = _TryOSFromDisk(name_only, None)
3663

    
3664
  if not status:
3665
    if required:
3666
      _Fail(tbv)
3667
    else:
3668
      return False
3669

    
3670
  if max(tbv.api_versions) < constants.OS_API_V20:
3671
    return True
3672

    
3673
  if constants.OS_VALIDATE_PARAMETERS in checks:
3674
    _CheckOSPList(tbv, osparams.keys())
3675

    
3676
  validate_env = OSCoreEnv(osname, tbv, osparams)
3677
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3678
                        cwd=tbv.path, reset_env=True)
3679
  if result.failed:
3680
    logging.error("os validate command '%s' returned error: %s output: %s",
3681
                  result.cmd, result.fail_reason, result.output)
3682
    _Fail("OS validation script failed (%s), output: %s",
3683
          result.fail_reason, result.output, log=False)
3684

    
3685
  return True
3686

    
3687

    
3688
def DemoteFromMC():
3689
  """Demotes the current node from master candidate role.
3690

3691
  """
3692
  # try to ensure we're not the master by mistake
3693
  master, myself = ssconf.GetMasterAndMyself()
3694
  if master == myself:
3695
    _Fail("ssconf status shows I'm the master node, will not demote")
3696

    
3697
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3698
  if not result.failed:
3699
    _Fail("The master daemon is running, will not demote")
3700

    
3701
  try:
3702
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3703
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3704
  except EnvironmentError, err:
3705
    if err.errno != errno.ENOENT:
3706
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3707

    
3708
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3709

    
3710

    
3711
def _GetX509Filenames(cryptodir, name):
3712
  """Returns the full paths for the private key and certificate.
3713

3714
  """
3715
  return (utils.PathJoin(cryptodir, name),
3716
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3717
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3718

    
3719

    
3720
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3721
  """Creates a new X509 certificate for SSL/TLS.
3722

3723
  @type validity: int
3724
  @param validity: Validity in seconds
3725
  @rtype: tuple; (string, string)
3726
  @return: Certificate name and public part
3727

3728
  """
3729
  (key_pem, cert_pem) = \
3730
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3731
                                     min(validity, _MAX_SSL_CERT_VALIDITY), 1)
3732

    
3733
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3734
                              prefix="x509-%s-" % utils.TimestampForFilename())
3735
  try:
3736
    name = os.path.basename(cert_dir)
3737
    assert len(name) > 5
3738

    
3739
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3740

    
3741
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3742
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3743

    
3744
    # Never return private key as it shouldn't leave the node
3745
    return (name, cert_pem)
3746
  except Exception:
3747
    shutil.rmtree(cert_dir, ignore_errors=True)
3748
    raise
3749

    
3750

    
3751
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3752
  """Removes a X509 certificate.
3753

3754
  @type name: string
3755
  @param name: Certificate name
3756

3757
  """
3758
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3759

    
3760
  utils.RemoveFile(key_file)
3761
  utils.RemoveFile(cert_file)
3762

    
3763
  try:
3764
    os.rmdir(cert_dir)
3765
  except EnvironmentError, err:
3766
    _Fail("Cannot remove certificate directory '%s': %s",
3767
          cert_dir, err)
3768

    
3769

    
3770
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3771
  """Returns the command for the requested input/output.
3772

3773
  @type instance: L{objects.Instance}
3774
  @param instance: The instance object
3775
  @param mode: Import/export mode
3776
  @param ieio: Input/output type
3777
  @param ieargs: Input/output arguments
3778

3779
  """
3780
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3781

    
3782
  env = None
3783
  prefix = None
3784
  suffix = None
3785
  exp_size = None
3786

    
3787
  if ieio == constants.IEIO_FILE:
3788
    (filename, ) = ieargs
3789

    
3790
    if not utils.IsNormAbsPath(filename):
3791
      _Fail("Path '%s' is not normalized or absolute", filename)
3792

    
3793
    real_filename = os.path.realpath(filename)
3794
    directory = os.path.dirname(real_filename)
3795

    
3796
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3797
      _Fail("File '%s' is not under exports directory '%s': %s",
3798
            filename, pathutils.EXPORT_DIR, real_filename)
3799

    
3800
    # Create directory
3801
    utils.Makedirs(directory, mode=0750)
3802

    
3803
    quoted_filename = utils.ShellQuote(filename)
3804

    
3805
    if mode == constants.IEM_IMPORT:
3806
      suffix = "> %s" % quoted_filename
3807
    elif mode == constants.IEM_EXPORT:
3808
      suffix = "< %s" % quoted_filename
3809

    
3810
      # Retrieve file size
3811
      try:
3812
        st = os.stat(filename)
3813
      except EnvironmentError, err:
3814
        logging.error("Can't stat(2) %s: %s", filename, err)
3815
      else:
3816
        exp_size = utils.BytesToMebibyte(st.st_size)
3817

    
3818
  elif ieio == constants.IEIO_RAW_DISK:
3819
    (disk, ) = ieargs
3820

    
3821
    real_disk = _OpenRealBD(disk)
3822

    
3823
    if mode == constants.IEM_IMPORT:
3824
      # we use nocreat to fail if the device is not already there or we pass a
3825
      # wrong path; we use notrunc to no attempt truncate on an LV device
3826
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3827
                                   real_disk.dev_path,
3828
                                   str(1024 * 1024)) # 1 MB
3829

    
3830
    elif mode == constants.IEM_EXPORT:
3831
      # the block size on the read dd is 1MiB to match our units
3832
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3833
                                   real_disk.dev_path,
3834
                                   str(1024 * 1024), # 1 MB
3835
                                   str(disk.size))
3836
      exp_size = disk.size
3837

    
3838
  elif ieio == constants.IEIO_SCRIPT:
3839
    (disk, disk_index, ) = ieargs
3840

    
3841
    assert isinstance(disk_index, (int, long))
3842

    
3843
    inst_os = OSFromDisk(instance.os)
3844
    env = OSEnvironment(instance, inst_os)
3845

    
3846
    if mode == constants.IEM_IMPORT:
3847
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3848
      env["IMPORT_INDEX"] = str(disk_index)
3849
      script = inst_os.import_script
3850

    
3851
    elif mode == constants.IEM_EXPORT:
3852
      real_disk = _OpenRealBD(disk)
3853
      env["EXPORT_DEVICE"] = real_disk.dev_path
3854
      env["EXPORT_INDEX"] = str(disk_index)
3855
      script = inst_os.export_script
3856

    
3857
    # TODO: Pass special environment only to script
3858
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3859

    
3860
    if mode == constants.IEM_IMPORT:
3861
      suffix = "| %s" % script_cmd
3862

    
3863
    elif mode == constants.IEM_EXPORT:
3864
      prefix = "%s |" % script_cmd
3865

    
3866
    # Let script predict size
3867
    exp_size = constants.IE_CUSTOM_SIZE
3868

    
3869
  else:
3870
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3871

    
3872
  return (env, prefix, suffix, exp_size)
3873

    
3874

    
3875
def _CreateImportExportStatusDir(prefix):
3876
  """Creates status directory for import/export.
3877

3878
  """
3879
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3880
                          prefix=("%s-%s-" %
3881
                                  (prefix, utils.TimestampForFilename())))
3882

    
3883

    
3884
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3885
                            ieio, ieioargs):
3886
  """Starts an import or export daemon.
3887

3888
  @param mode: Import/output mode
3889
  @type opts: L{objects.ImportExportOptions}
3890
  @param opts: Daemon options
3891
  @type host: string
3892
  @param host: Remote host for export (None for import)
3893
  @type port: int
3894
  @param port: Remote port for export (None for import)
3895
  @type instance: L{objects.Instance}
3896
  @param instance: Instance object
3897
  @type component: string
3898
  @param component: which part of the instance is transferred now,
3899
      e.g. 'disk/0'
3900
  @param ieio: Input/output type
3901
  @param ieioargs: Input/output arguments
3902

3903
  """
3904
  if mode == constants.IEM_IMPORT:
3905
    prefix = "import"
3906

    
3907
    if not (host is None and port is None):
3908
      _Fail("Can not specify host or port on import")
3909

    
3910
  elif mode == constants.IEM_EXPORT:
3911
    prefix = "export"
3912

    
3913
    if host is None or port is None:
3914
      _Fail("Host and port must be specified for an export")
3915

    
3916
  else:
3917
    _Fail("Invalid mode %r", mode)
3918

    
3919
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3920
    _Fail("Cluster certificate can only be used for both key and CA")
3921

    
3922
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3923
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3924

    
3925
  if opts.key_name is None:
3926
    # Use server.pem
3927
    key_path = pathutils.NODED_CERT_FILE
3928
    cert_path = pathutils.NODED_CERT_FILE
3929
    assert opts.ca_pem is None
3930
  else:
3931
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3932
                                                 opts.key_name)
3933
    assert opts.ca_pem is not None
3934

    
3935
  for i in [key_path, cert_path]:
3936
    if not os.path.exists(i):
3937
      _Fail("File '%s' does not exist" % i)
3938

    
3939
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3940
  try:
3941
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3942
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3943
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3944

    
3945
    if opts.ca_pem is None:
3946
      # Use server.pem
3947
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3948
    else:
3949
      ca = opts.ca_pem
3950

    
3951
    # Write CA file
3952
    utils.WriteFile(ca_file, data=ca, mode=0400)
3953

    
3954
    cmd = [
3955
      pathutils.IMPORT_EXPORT_DAEMON,
3956
      status_file, mode,
3957
      "--key=%s" % key_path,
3958
      "--cert=%s" % cert_path,
3959
      "--ca=%s" % ca_file,
3960
      ]
3961

    
3962
    if host:
3963
      cmd.append("--host=%s" % host)
3964

    
3965
    if port:
3966
      cmd.append("--port=%s" % port)
3967

    
3968
    if opts.ipv6:
3969
      cmd.append("--ipv6")
3970
    else:
3971
      cmd.append("--ipv4")
3972

    
3973
    if opts.compress:
3974
      cmd.append("--compress=%s" % opts.compress)
3975

    
3976
    if opts.magic:
3977
      cmd.append("--magic=%s" % opts.magic)
3978

    
3979
    if exp_size is not None:
3980
      cmd.append("--expected-size=%s" % exp_size)
3981

    
3982
    if cmd_prefix:
3983
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3984

    
3985
    if cmd_suffix:
3986
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3987

    
3988
    if mode == constants.IEM_EXPORT:
3989
      # Retry connection a few times when connecting to remote peer
3990
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3991
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3992
    elif opts.connect_timeout is not None:
3993
      assert mode == constants.IEM_IMPORT
3994
      # Overall timeout for establishing connection while listening
3995
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3996

    
3997
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3998

    
3999
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
4000
    # support for receiving a file descriptor for output
4001
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
4002
                      output=logfile)
4003

    
4004
    # The import/export name is simply the status directory name
4005
    return os.path.basename(status_dir)
4006

    
4007
  except Exception:
4008
    shutil.rmtree(status_dir, ignore_errors=True)
4009
    raise
4010

    
4011

    
4012
def GetImportExportStatus(names):
4013
  """Returns import/export daemon status.
4014

4015
  @type names: sequence
4016
  @param names: List of names
4017
  @rtype: List of dicts
4018
  @return: Returns a list of the state of each named import/export or None if a
4019
           status couldn't be read
4020

4021
  """
4022
  result = []
4023

    
4024
  for name in names:
4025
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
4026
                                 _IES_STATUS_FILE)
4027

    
4028
    try:
4029
      data = utils.ReadFile(status_file)
4030
    except EnvironmentError, err:
4031
      if err.errno != errno.ENOENT:
4032
        raise
4033
      data = None
4034

    
4035
    if not data:
4036
      result.append(None)
4037
      continue
4038

    
4039
    result.append(serializer.LoadJson(data))
4040

    
4041
  return result
4042

    
4043

    
4044
def AbortImportExport(name):
4045
  """Sends SIGTERM to a running import/export daemon.
4046

4047
  """
4048
  logging.info("Abort import/export %s", name)
4049

    
4050
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4051
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4052

    
4053
  if pid:
4054
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
4055
                 name, pid)
4056
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
4057

    
4058

    
4059
def CleanupImportExport(name):
4060
  """Cleanup after an import or export.
4061

4062
  If the import/export daemon is still running it's killed. Afterwards the
4063
  whole status directory is removed.
4064

4065
  """
4066
  logging.info("Finalizing import/export %s", name)
4067

    
4068
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4069

    
4070
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4071

    
4072
  if pid:
4073
    logging.info("Import/export %s is still running with PID %s",
4074
                 name, pid)
4075
    utils.KillProcess(pid, waitpid=False)
4076

    
4077
  shutil.rmtree(status_dir, ignore_errors=True)
4078

    
4079

    
4080
def _FindDisks(disks):
4081
  """Finds attached L{BlockDev}s for the given disks.
4082

4083
  @type disks: list of L{objects.Disk}
4084
  @param disks: the disk objects we need to find
4085

4086
  @return: list of L{BlockDev} objects or C{None} if a given disk
4087
           was not found or was no attached.
4088

4089
  """
4090
  bdevs = []
4091

    
4092
  for disk in disks:
4093
    rd = _RecursiveFindBD(disk)
4094
    if rd is None:
4095
      _Fail("Can't find device %s", disk)
4096
    bdevs.append(rd)
4097
  return bdevs
4098

    
4099

    
4100
def DrbdDisconnectNet(disks):
4101
  """Disconnects the network on a list of drbd devices.
4102

4103
  """
4104
  bdevs = _FindDisks(disks)
4105

    
4106
  # disconnect disks
4107
  for rd in bdevs:
4108
    try:
4109
      rd.DisconnectNet()
4110
    except errors.BlockDeviceError, err:
4111
      _Fail("Can't change network configuration to standalone mode: %s",
4112
            err, exc=True)
4113

    
4114

    
4115
def DrbdAttachNet(disks, instance_name, multimaster):
4116
  """Attaches the network on a list of drbd devices.
4117

4118
  """
4119
  bdevs = _FindDisks(disks)
4120

    
4121
  if multimaster:
4122
    for idx, rd in enumerate(bdevs):
4123
      try:
4124
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
4125
      except EnvironmentError, err:
4126
        _Fail("Can't create symlink: %s", err)
4127
  # reconnect disks, switch to new master configuration and if
4128
  # needed primary mode
4129
  for rd in bdevs:
4130
    try:
4131
      rd.AttachNet(multimaster)
4132
    except errors.BlockDeviceError, err:
4133
      _Fail("Can't change network configuration: %s", err)
4134

    
4135
  # wait until the disks are connected; we need to retry the re-attach
4136
  # if the device becomes standalone, as this might happen if the one
4137
  # node disconnects and reconnects in a different mode before the
4138
  # other node reconnects; in this case, one or both of the nodes will
4139
  # decide it has wrong configuration and switch to standalone
4140

    
4141
  def _Attach():
4142
    all_connected = True
4143

    
4144
    for rd in bdevs:
4145
      stats = rd.GetProcStatus()
4146

    
4147
      if multimaster:
4148
        # In the multimaster case we have to wait explicitly until
4149
        # the resource is Connected and UpToDate/UpToDate, because
4150
        # we promote *both nodes* to primary directly afterwards.
4151
        # Being in resync is not enough, since there is a race during which we
4152
        # may promote a node with an Outdated disk to primary, effectively
4153
        # tearing down the connection.
4154
        all_connected = (all_connected and
4155
                         stats.is_connected and
4156
                         stats.is_disk_uptodate and
4157
                         stats.peer_disk_uptodate)
4158
      else:
4159
        all_connected = (all_connected and
4160
                         (stats.is_connected or stats.is_in_resync))
4161

    
4162
      if stats.is_standalone:
4163
        # peer had different config info and this node became
4164
        # standalone, even though this should not happen with the
4165
        # new staged way of changing disk configs
4166
        try:
4167
          rd.AttachNet(multimaster)
4168
        except errors.BlockDeviceError, err:
4169
          _Fail("Can't change network configuration: %s", err)
4170

    
4171
    if not all_connected:
4172
      raise utils.RetryAgain()
4173

    
4174
  try:
4175
    # Start with a delay of 100 miliseconds and go up to 5 seconds
4176
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
4177
  except utils.RetryTimeout:
4178
    _Fail("Timeout in disk reconnecting")
4179

    
4180
  if multimaster:
4181
    # change to primary mode
4182
    for rd in bdevs:
4183
      try:
4184
        rd.Open()
4185
      except errors.BlockDeviceError, err:
4186
        _Fail("Can't change to primary mode: %s", err)
4187

    
4188

    
4189
def DrbdWaitSync(disks):
4190
  """Wait until DRBDs have synchronized.
4191

4192
  """
4193
  def _helper(rd):
4194
    stats = rd.GetProcStatus()
4195
    if not (stats.is_connected or stats.is_in_resync):
4196
      raise utils.RetryAgain()
4197
    return stats
4198

    
4199
  bdevs = _FindDisks(disks)
4200

    
4201
  min_resync = 100
4202
  alldone = True
4203
  for rd in bdevs:
4204
    try:
4205
      # poll each second for 15 seconds
4206
      stats = utils.Retry(_helper, 1, 15, args=[rd])
4207
    except utils.RetryTimeout:
4208
      stats = rd.GetProcStatus()
4209
      # last check
4210
      if not (stats.is_connected or stats.is_in_resync):
4211
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
4212
    alldone = alldone and (not stats.is_in_resync)
4213
    if stats.sync_percent is not None:
4214
      min_resync = min(min_resync, stats.sync_percent)
4215

    
4216
  return (alldone, min_resync)
4217

    
4218

    
4219
def DrbdNeedsActivation(disks):
4220
  """Checks which of the passed disks needs activation and returns their UUIDs.
4221

4222
  """
4223
  faulty_disks = []
4224

    
4225
  for disk in disks:
4226
    rd = _RecursiveFindBD(disk)
4227
    if rd is None:
4228
      faulty_disks.append(disk)
4229
      continue
4230

    
4231
    stats = rd.GetProcStatus()
4232
    if stats.is_standalone or stats.is_diskless:
4233
      faulty_disks.append(disk)
4234

    
4235
  return [disk.uuid for disk in faulty_disks]
4236

    
4237

    
4238
def GetDrbdUsermodeHelper():
4239
  """Returns DRBD usermode helper currently configured.
4240

4241
  """
4242
  try:
4243
    return drbd.DRBD8.GetUsermodeHelper()
4244
  except errors.BlockDeviceError, err:
4245
    _Fail(str(err))
4246

    
4247

    
4248
def PowercycleNode(hypervisor_type, hvparams=None):
4249
  """Hard-powercycle the node.
4250

4251
  Because we need to return first, and schedule the powercycle in the
4252
  background, we won't be able to report failures nicely.
4253

4254
  """
4255
  hyper = hypervisor.GetHypervisor(hypervisor_type)
4256
  try:
4257
    pid = os.fork()
4258
  except OSError:
4259
    # if we can't fork, we'll pretend that we're in the child process
4260
    pid = 0
4261
  if pid > 0:
4262
    return "Reboot scheduled in 5 seconds"
4263
  # ensure the child is running on ram
4264
  try:
4265
    utils.Mlockall()
4266
  except Exception: # pylint: disable=W0703
4267
    pass
4268
  time.sleep(5)
4269
  hyper.PowercycleNode(hvparams=hvparams)
4270

    
4271

    
4272
def _VerifyRestrictedCmdName(cmd):
4273
  """Verifies a restricted command name.
4274

4275
  @type cmd: string
4276
  @param cmd: Command name
4277
  @rtype: tuple; (boolean, string or None)
4278
  @return: The tuple's first element is the status; if C{False}, the second
4279
    element is an error message string, otherwise it's C{None}
4280

4281
  """
4282
  if not cmd.strip():
4283
    return (False, "Missing command name")
4284

    
4285
  if os.path.basename(cmd) != cmd:
4286
    return (False, "Invalid command name")
4287

    
4288
  if not constants.EXT_PLUGIN_MASK.match(cmd):
4289
    return (False, "Command name contains forbidden characters")
4290

    
4291
  return (True, None)
4292

    
4293

    
4294
def _CommonRestrictedCmdCheck(path, owner):
4295
  """Common checks for restricted command file system directories and files.
4296

4297
  @type path: string
4298
  @param path: Path to check
4299
  @param owner: C{None} or tuple containing UID and GID
4300
  @rtype: tuple; (boolean, string or C{os.stat} result)
4301
  @return: The tuple's first element is the status; if C{False}, the second
4302
    element is an error message string, otherwise it's the result of C{os.stat}
4303

4304
  """
4305
  if owner is None:
4306
    # Default to root as owner
4307
    owner = (0, 0)
4308

    
4309
  try:
4310
    st = os.stat(path)
4311
  except EnvironmentError, err:
4312
    return (False, "Can't stat(2) '%s': %s" % (path, err))
4313

    
4314
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
4315
    return (False, "Permissions on '%s' are too permissive" % path)
4316

    
4317
  if (st.st_uid, st.st_gid) != owner:
4318
    (owner_uid, owner_gid) = owner
4319
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
4320

    
4321
  return (True, st)
4322

    
4323

    
4324
def _VerifyRestrictedCmdDirectory(path, _owner=None):
4325
  """Verifies restricted command directory.
4326

4327
  @type path: string
4328
  @param path: Path to check
4329
  @rtype: tuple; (boolean, string or None)
4330
  @return: The tuple's first element is the status; if C{False}, the second
4331
    element is an error message string, otherwise it's C{None}
4332

4333
  """
4334
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
4335

    
4336
  if not status:
4337
    return (False, value)
4338

    
4339
  if not stat.S_ISDIR(value.st_mode):
4340
    return (False, "Path '%s' is not a directory" % path)
4341

    
4342
  return (True, None)
4343

    
4344

    
4345
def _VerifyRestrictedCmd(path, cmd, _owner=None):
4346
  """Verifies a whole restricted command and returns its executable filename.
4347

4348
  @type path: string
4349
  @param path: Directory containing restricted commands
4350
  @type cmd: string
4351
  @param cmd: Command name
4352
  @rtype: tuple; (boolean, string)
4353
  @return: The tuple's first element is the status; if C{False}, the second
4354
    element is an error message string, otherwise the second element is the
4355
    absolute path to the executable
4356

4357
  """
4358
  executable = utils.PathJoin(path, cmd)
4359

    
4360
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4361

    
4362
  if not status:
4363
    return (False, msg)
4364

    
4365
  if not utils.IsExecutable(executable):
4366
    return (False, "access(2) thinks '%s' can't be executed" % executable)
4367

    
4368
  return (True, executable)
4369

    
4370

    
4371
def _PrepareRestrictedCmd(path, cmd,
4372
                          _verify_dir=_VerifyRestrictedCmdDirectory,
4373
                          _verify_name=_VerifyRestrictedCmdName,
4374
                          _verify_cmd=_VerifyRestrictedCmd):
4375
  """Performs a number of tests on a restricted command.
4376

4377
  @type path: string
4378
  @param path: Directory containing restricted commands
4379
  @type cmd: string
4380
  @param cmd: Command name
4381
  @return: Same as L{_VerifyRestrictedCmd}
4382

4383
  """
4384
  # Verify the directory first
4385
  (status, msg) = _verify_dir(path)
4386
  if status:
4387
    # Check command if everything was alright
4388
    (status, msg) = _verify_name(cmd)
4389

    
4390
  if not status:
4391
    return (False, msg)
4392

    
4393
  # Check actual executable
4394
  return _verify_cmd(path, cmd)
4395

    
4396

    
4397
def RunRestrictedCmd(cmd,
4398
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
4399
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4400
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
4401
                     _sleep_fn=time.sleep,
4402
                     _prepare_fn=_PrepareRestrictedCmd,
4403
                     _runcmd_fn=utils.RunCmd,
4404
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4405
  """Executes a restricted command after performing strict tests.
4406

4407
  @type cmd: string
4408
  @param cmd: Command name
4409
  @rtype: string
4410
  @return: Command output
4411
  @raise RPCFail: In case of an error
4412

4413
  """
4414
  logging.info("Preparing to run restricted command '%s'", cmd)
4415

    
4416
  if not _enabled:
4417
    _Fail("Restricted commands disabled at configure time")
4418

    
4419
  lock = None
4420
  try:
4421
    cmdresult = None
4422
    try:
4423
      lock = utils.FileLock.Open(_lock_file)
4424
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
4425

    
4426
      (status, value) = _prepare_fn(_path, cmd)
4427

    
4428
      if status:
4429
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4430
                               postfork_fn=lambda _: lock.Unlock())
4431
      else:
4432
        logging.error(value)
4433
    except Exception: # pylint: disable=W0703
4434
      # Keep original error in log
4435
      logging.exception("Caught exception")
4436

    
4437
    if cmdresult is None:
4438
      logging.info("Sleeping for %0.1f seconds before returning",
4439
                   _RCMD_INVALID_DELAY)
4440
      _sleep_fn(_RCMD_INVALID_DELAY)
4441

    
4442
      # Do not include original error message in returned error
4443
      _Fail("Executing command '%s' failed" % cmd)
4444
    elif cmdresult.failed or cmdresult.fail_reason:
4445
      _Fail("Restricted command '%s' failed: %s; output: %s",
4446
            cmd, cmdresult.fail_reason, cmdresult.output)
4447
    else:
4448
      return cmdresult.output
4449
  finally:
4450
    if lock is not None:
4451
      # Release lock at last
4452
      lock.Close()
4453
      lock = None
4454

    
4455

    
4456
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4457
  """Creates or removes the watcher pause file.
4458

4459
  @type until: None or number
4460
  @param until: Unix timestamp saying until when the watcher shouldn't run
4461

4462
  """
4463
  if until is None:
4464
    logging.info("Received request to no longer pause watcher")
4465
    utils.RemoveFile(_filename)
4466
  else:
4467
    logging.info("Received request to pause watcher until %s", until)
4468

    
4469
    if not ht.TNumber(until):
4470
      _Fail("Duration must be numeric")
4471

    
4472
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4473

    
4474

    
4475
def ConfigureOVS(ovs_name, ovs_link):
4476
  """Creates a OpenvSwitch on the node.
4477

4478
  This function sets up a OpenvSwitch on the node with given name nad
4479
  connects it via a given eth device.
4480

4481
  @type ovs_name: string
4482
  @param ovs_name: Name of the OpenvSwitch to create.
4483
  @type ovs_link: None or string
4484
  @param ovs_link: Ethernet device for outside connection (can be missing)
4485

4486
  """
4487
  # Initialize the OpenvSwitch
4488
  result = utils.RunCmd(["ovs-vsctl", "add-br", ovs_name])
4489
  if result.failed:
4490
    _Fail("Failed to create openvswitch. Script return value: %s, output: '%s'"
4491
          % (result.exit_code, result.output), log=True)
4492

    
4493
  # And connect it to a physical interface, if given
4494
  if ovs_link:
4495
    result = utils.RunCmd(["ovs-vsctl", "add-port", ovs_name, ovs_link])
4496
    if result.failed:
4497
      _Fail("Failed to connect openvswitch to  interface %s. Script return"
4498
            " value: %s, output: '%s'" % (ovs_link, result.exit_code,
4499
            result.output), log=True)
4500

    
4501

    
4502
class HooksRunner(object):
4503
  """Hook runner.
4504

4505
  This class is instantiated on the node side (ganeti-noded) and not
4506
  on the master side.
4507

4508
  """
4509
  def __init__(self, hooks_base_dir=None):
4510
    """Constructor for hooks runner.
4511

4512
    @type hooks_base_dir: str or None
4513
    @param hooks_base_dir: if not None, this overrides the
4514
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4515

4516
    """
4517
    if hooks_base_dir is None:
4518
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
4519
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
4520
    # constant
4521
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4522

    
4523
  def RunLocalHooks(self, node_list, hpath, phase, env):
4524
    """Check that the hooks will be run only locally and then run them.
4525

4526
    """
4527
    assert len(node_list) == 1
4528
    node = node_list[0]
4529
    _, myself = ssconf.GetMasterAndMyself()
4530
    assert node == myself
4531

    
4532
    results = self.RunHooks(hpath, phase, env)
4533

    
4534
    # Return values in the form expected by HooksMaster
4535
    return {node: (None, False, results)}
4536

    
4537
  def RunHooks(self, hpath, phase, env):
4538
    """Run the scripts in the hooks directory.
4539

4540
    @type hpath: str
4541
    @param hpath: the path to the hooks directory which
4542
        holds the scripts
4543
    @type phase: str
4544
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
4545
        L{constants.HOOKS_PHASE_POST}
4546
    @type env: dict
4547
    @param env: dictionary with the environment for the hook
4548
    @rtype: list
4549
    @return: list of 3-element tuples:
4550
      - script path
4551
      - script result, either L{constants.HKR_SUCCESS} or
4552
        L{constants.HKR_FAIL}
4553
      - output of the script
4554

4555
    @raise errors.ProgrammerError: for invalid input
4556
        parameters
4557

4558
    """
4559
    if phase == constants.HOOKS_PHASE_PRE:
4560
      suffix = "pre"
4561
    elif phase == constants.HOOKS_PHASE_POST:
4562
      suffix = "post"
4563
    else:
4564
      _Fail("Unknown hooks phase '%s'", phase)
4565

    
4566
    subdir = "%s-%s.d" % (hpath, suffix)
4567
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4568

    
4569
    results = []
4570

    
4571
    if not os.path.isdir(dir_name):
4572
      # for non-existing/non-dirs, we simply exit instead of logging a
4573
      # warning at every operation
4574
      return results
4575

    
4576
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4577

    
4578
    for (relname, relstatus, runresult) in runparts_results:
4579
      if relstatus == constants.RUNPARTS_SKIP:
4580
        rrval = constants.HKR_SKIP
4581
        output = ""
4582
      elif relstatus == constants.RUNPARTS_ERR:
4583
        rrval = constants.HKR_FAIL
4584
        output = "Hook script execution error: %s" % runresult
4585
      elif relstatus == constants.RUNPARTS_RUN:
4586
        if runresult.failed:
4587
          rrval = constants.HKR_FAIL
4588
        else:
4589
          rrval = constants.HKR_SUCCESS
4590
        output = utils.SafeEncode(runresult.output.strip())
4591
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4592

    
4593
    return results
4594

    
4595

    
4596
class IAllocatorRunner(object):
4597
  """IAllocator runner.
4598

4599
  This class is instantiated on the node side (ganeti-noded) and not on
4600
  the master side.
4601

4602
  """
4603
  @staticmethod
4604
  def Run(name, idata, ial_params):
4605
    """Run an iallocator script.
4606

4607
    @type name: str
4608
    @param name: the iallocator script name
4609
    @type idata: str
4610
    @param idata: the allocator input data
4611
    @type ial_params: list
4612
    @param ial_params: the iallocator parameters
4613

4614
    @rtype: tuple
4615
    @return: two element tuple of:
4616
       - status
4617
       - either error message or stdout of allocator (for success)
4618

4619
    """
4620
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4621
                                  os.path.isfile)
4622
    if alloc_script is None:
4623
      _Fail("iallocator module '%s' not found in the search path", name)
4624

    
4625
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4626
    try:
4627
      os.write(fd, idata)
4628
      os.close(fd)
4629
      result = utils.RunCmd([alloc_script, fin_name] + ial_params)
4630
      if result.failed:
4631
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4632
              name, result.fail_reason, result.output)
4633
    finally:
4634
      os.unlink(fin_name)
4635

    
4636
    return result.stdout
4637

    
4638

    
4639
class DevCacheManager(object):
4640
  """Simple class for managing a cache of block device information.
4641

4642
  """
4643
  _DEV_PREFIX = "/dev/"
4644
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4645

    
4646
  @classmethod
4647
  def _ConvertPath(cls, dev_path):
4648
    """Converts a /dev/name path to the cache file name.
4649

4650
    This replaces slashes with underscores and strips the /dev
4651
    prefix. It then returns the full path to the cache file.
4652

4653
    @type dev_path: str
4654
    @param dev_path: the C{/dev/} path name
4655
    @rtype: str
4656
    @return: the converted path name
4657

4658
    """
4659
    if dev_path.startswith(cls._DEV_PREFIX):
4660
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4661
    dev_path = dev_path.replace("/", "_")
4662
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4663
    return fpath
4664

    
4665
  @classmethod
4666
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4667
    """Updates the cache information for a given device.
4668

4669
    @type dev_path: str
4670
    @param dev_path: the pathname of the device
4671
    @type owner: str
4672
    @param owner: the owner (instance name) of the device
4673
    @type on_primary: bool
4674
    @param on_primary: whether this is the primary
4675
        node nor not
4676
    @type iv_name: str
4677
    @param iv_name: the instance-visible name of the
4678
        device, as in objects.Disk.iv_name
4679

4680
    @rtype: None
4681

4682
    """
4683
    if dev_path is None:
4684
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4685
      return
4686
    fpath = cls._ConvertPath(dev_path)
4687
    if on_primary:
4688
      state = "primary"
4689
    else:
4690
      state = "secondary"
4691
    if iv_name is None:
4692
      iv_name = "not_visible"
4693
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4694
    try:
4695
      utils.WriteFile(fpath, data=fdata)
4696
    except EnvironmentError, err:
4697
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4698

    
4699
  @classmethod
4700
  def RemoveCache(cls, dev_path):
4701
    """Remove data for a dev_path.
4702

4703
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4704
    path name and logging.
4705

4706
    @type dev_path: str
4707
    @param dev_path: the pathname of the device
4708

4709
    @rtype: None
4710

4711
    """
4712
    if dev_path is None:
4713
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4714
      return
4715
    fpath = cls._ConvertPath(dev_path)
4716
    try:
4717
      utils.RemoveFile(fpath)
4718
    except EnvironmentError, err:
4719
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)