Statistics
| Branch: | Tag: | Revision:

root / lib / rapi / client.py @ a198b2d9

History | View | Annotate | Download (30.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti RAPI client."""
23

    
24
import httplib
25
import urllib2
26
import logging
27
import simplejson
28
import socket
29
import urllib
30
import OpenSSL
31
import distutils.version
32

    
33

    
34
GANETI_RAPI_PORT = 5080
35
GANETI_RAPI_VERSION = 2
36

    
37
HTTP_DELETE = "DELETE"
38
HTTP_GET = "GET"
39
HTTP_PUT = "PUT"
40
HTTP_POST = "POST"
41
HTTP_OK = 200
42
HTTP_APP_JSON = "application/json"
43

    
44
REPLACE_DISK_PRI = "replace_on_primary"
45
REPLACE_DISK_SECONDARY = "replace_on_secondary"
46
REPLACE_DISK_CHG = "replace_new_secondary"
47
REPLACE_DISK_AUTO = "replace_auto"
48
VALID_REPLACEMENT_MODES = frozenset([
49
  REPLACE_DISK_PRI,
50
  REPLACE_DISK_SECONDARY,
51
  REPLACE_DISK_CHG,
52
  REPLACE_DISK_AUTO,
53
  ])
54
VALID_NODE_ROLES = frozenset([
55
  "drained", "master", "master-candidate", "offline", "regular",
56
  ])
57
VALID_STORAGE_TYPES = frozenset(["file", "lvm-pv", "lvm-vg"])
58

    
59

    
60
class Error(Exception):
61
  """Base error class for this module.
62

63
  """
64
  pass
65

    
66

    
67
class CertificateError(Error):
68
  """Raised when a problem is found with the SSL certificate.
69

70
  """
71
  pass
72

    
73

    
74
class GanetiApiError(Error):
75
  """Generic error raised from Ganeti API.
76

77
  """
78
  def __init__(self, msg, code=None):
79
    Error.__init__(self, msg)
80
    self.code = code
81

    
82

    
83
class InvalidReplacementMode(Error):
84
  """Raised when an invalid disk replacement mode is attempted.
85

86
  """
87
  pass
88

    
89

    
90
class InvalidStorageType(Error):
91
  """Raised when an invalid storage type is used.
92

93
  """
94
  pass
95

    
96

    
97
class InvalidNodeRole(Error):
98
  """Raised when an invalid node role is used.
99

100
  """
101
  pass
102

    
103

    
104
def FormatX509Name(x509_name):
105
  """Formats an X509 name.
106

107
  @type x509_name: OpenSSL.crypto.X509Name
108

109
  """
110
  try:
111
    # Only supported in pyOpenSSL 0.7 and above
112
    get_components_fn = x509_name.get_components
113
  except AttributeError:
114
    return repr(x509_name)
115
  else:
116
    return "".join("/%s=%s" % (name, value)
117
                   for name, value in get_components_fn())
118

    
119

    
120
class CertAuthorityVerify:
121
  """Certificate verificator for SSL context.
122

123
  Configures SSL context to verify server's certificate.
124

125
  """
126
  _CAPATH_MINVERSION = "0.9"
127
  _DEFVFYPATHS_MINVERSION = "0.9"
128

    
129
  _PYOPENSSL_VERSION = OpenSSL.__version__
130
  _PARSED_PYOPENSSL_VERSION = distutils.version.LooseVersion(_PYOPENSSL_VERSION)
131

    
132
  _SUPPORT_CAPATH = (_PARSED_PYOPENSSL_VERSION >= _CAPATH_MINVERSION)
133
  _SUPPORT_DEFVFYPATHS = (_PARSED_PYOPENSSL_VERSION >= _DEFVFYPATHS_MINVERSION)
134

    
135
  def __init__(self, cafile=None, capath=None, use_default_verify_paths=False):
136
    """Initializes this class.
137

138
    @type cafile: string
139
    @param cafile: In which file we can find the certificates
140
    @type capath: string
141
    @param capath: In which directory we can find the certificates
142
    @type use_default_verify_paths: bool
143
    @param use_default_verify_paths: Whether the platform provided CA
144
                                     certificates are to be used for
145
                                     verification purposes
146

147
    """
148
    self._cafile = cafile
149
    self._capath = capath
150
    self._use_default_verify_paths = use_default_verify_paths
151

    
152
    if self._capath is not None and not self._SUPPORT_CAPATH:
153
      raise Error(("PyOpenSSL %s has no support for a CA directory,"
154
                   " version %s or above is required") %
155
                  (self._PYOPENSSL_VERSION, self._CAPATH_MINVERSION))
156

    
157
    if self._use_default_verify_paths and not self._SUPPORT_DEFVFYPATHS:
158
      raise Error(("PyOpenSSL %s has no support for using default verification"
159
                   " paths, version %s or above is required") %
160
                  (self._PYOPENSSL_VERSION, self._DEFVFYPATHS_MINVERSION))
161

    
162
  @staticmethod
163
  def _VerifySslCertCb(logger, _, cert, errnum, errdepth, ok):
164
    """Callback for SSL certificate verification.
165

166
    @param logger: Logging object
167

168
    """
169
    if ok:
170
      log_fn = logger.debug
171
    else:
172
      log_fn = logger.error
173

    
174
    log_fn("Verifying SSL certificate at depth %s, subject '%s', issuer '%s'",
175
           errdepth, FormatX509Name(cert.get_subject()),
176
           FormatX509Name(cert.get_issuer()))
177

    
178
    if not ok:
179
      try:
180
        # Only supported in pyOpenSSL 0.7 and above
181
        # pylint: disable-msg=E1101
182
        fn = OpenSSL.crypto.X509_verify_cert_error_string
183
      except AttributeError:
184
        errmsg = ""
185
      else:
186
        errmsg = ":%s" % fn(errnum)
187

    
188
      logger.error("verify error:num=%s%s", errnum, errmsg)
189

    
190
    return ok
191

    
192
  def __call__(self, ctx, logger):
193
    """Configures an SSL context to verify certificates.
194

195
    @type ctx: OpenSSL.SSL.Context
196
    @param ctx: SSL context
197

198
    """
199
    if self._use_default_verify_paths:
200
      ctx.set_default_verify_paths()
201

    
202
    if self._cafile or self._capath:
203
      if self._SUPPORT_CAPATH:
204
        ctx.load_verify_locations(self._cafile, self._capath)
205
      else:
206
        ctx.load_verify_locations(self._cafile)
207

    
208
    ctx.set_verify(OpenSSL.SSL.VERIFY_PEER,
209
                   lambda conn, cert, errnum, errdepth, ok: \
210
                     self._VerifySslCertCb(logger, conn, cert,
211
                                           errnum, errdepth, ok))
212

    
213

    
214
class _HTTPSConnectionOpenSSL(httplib.HTTPSConnection):
215
  """HTTPS Connection handler that verifies the SSL certificate.
216

217
  """
218
  def __init__(self, *args, **kwargs):
219
    """Initializes this class.
220

221
    """
222
    httplib.HTTPSConnection.__init__(self, *args, **kwargs)
223
    self._logger = None
224
    self._config_ssl_verification = None
225

    
226
  def Setup(self, logger, config_ssl_verification):
227
    """Sets the SSL verification config function.
228

229
    @param logger: Logging object
230
    @type config_ssl_verification: callable
231

232
    """
233
    assert self._logger is None
234
    assert self._config_ssl_verification is None
235

    
236
    self._logger = logger
237
    self._config_ssl_verification = config_ssl_verification
238

    
239
  def connect(self):
240
    """Connect to the server specified when the object was created.
241

242
    This ensures that SSL certificates are verified.
243

244
    """
245
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
246

    
247
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
248
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
249

    
250
    if self._config_ssl_verification:
251
      self._config_ssl_verification(ctx, self._logger)
252

    
253
    ssl = OpenSSL.SSL.Connection(ctx, sock)
254
    ssl.connect((self.host, self.port))
255

    
256
    self.sock = httplib.FakeSocket(sock, ssl)
257

    
258

    
259
class _HTTPSHandler(urllib2.HTTPSHandler):
260
  def __init__(self, logger, config_ssl_verification):
261
    """Initializes this class.
262

263
    @param logger: Logging object
264
    @type config_ssl_verification: callable
265
    @param config_ssl_verification: Function to configure SSL context for
266
                                    certificate verification
267

268
    """
269
    urllib2.HTTPSHandler.__init__(self)
270
    self._logger = logger
271
    self._config_ssl_verification = config_ssl_verification
272

    
273
  def _CreateHttpsConnection(self, *args, **kwargs):
274
    """Wrapper around L{_HTTPSConnectionOpenSSL} to add SSL verification.
275

276
    This wrapper is necessary provide a compatible API to urllib2.
277

278
    """
279
    conn = _HTTPSConnectionOpenSSL(*args, **kwargs)
280
    conn.Setup(self._logger, self._config_ssl_verification)
281
    return conn
282

    
283
  def https_open(self, req):
284
    """Creates HTTPS connection.
285

286
    Called by urllib2.
287

288
    """
289
    return self.do_open(self._CreateHttpsConnection, req)
290

    
291

    
292
class _RapiRequest(urllib2.Request):
293
  def __init__(self, method, url, headers, data):
294
    """Initializes this class.
295

296
    """
297
    urllib2.Request.__init__(self, url, data=data, headers=headers)
298
    self._method = method
299

    
300
  def get_method(self):
301
    """Returns the HTTP request method.
302

303
    """
304
    return self._method
305

    
306

    
307
class GanetiRapiClient(object):
308
  """Ganeti RAPI client.
309

310
  """
311
  USER_AGENT = "Ganeti RAPI Client"
312
  _json_encoder = simplejson.JSONEncoder(sort_keys=True)
313

    
314
  def __init__(self, host, port=GANETI_RAPI_PORT,
315
               username=None, password=None,
316
               config_ssl_verification=None, ignore_proxy=False,
317
               logger=logging):
318
    """Constructor.
319

320
    @type host: string
321
    @param host: the ganeti cluster master to interact with
322
    @type port: int
323
    @param port: the port on which the RAPI is running (default is 5080)
324
    @type username: string
325
    @param username: the username to connect with
326
    @type password: string
327
    @param password: the password to connect with
328
    @type config_ssl_verification: callable
329
    @param config_ssl_verification: Function to configure SSL context for
330
                                    certificate verification
331
    @type ignore_proxy: bool
332
    @param ignore_proxy: Whether to ignore proxy settings
333
    @param logger: Logging object
334

335
    """
336
    self._host = host
337
    self._port = port
338
    self._logger = logger
339

    
340
    self._base_url = "https://%s:%s" % (host, port)
341

    
342
    handlers = [_HTTPSHandler(self._logger, config_ssl_verification)]
343

    
344
    if username is not None:
345
      pwmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
346
      pwmgr.add_password(None, self._base_url, username, password)
347
      handlers.append(urllib2.HTTPBasicAuthHandler(pwmgr))
348
    elif password:
349
      raise Error("Specified password without username")
350

    
351
    if ignore_proxy:
352
      handlers.append(urllib2.ProxyHandler({}))
353

    
354
    self._http = urllib2.build_opener(*handlers) # pylint: disable-msg=W0142
355

    
356
    self._headers = {
357
      "Accept": HTTP_APP_JSON,
358
      "Content-type": HTTP_APP_JSON,
359
      "User-Agent": self.USER_AGENT,
360
      }
361

    
362
  def _SendRequest(self, method, path, query, content):
363
    """Sends an HTTP request.
364

365
    This constructs a full URL, encodes and decodes HTTP bodies, and
366
    handles invalid responses in a pythonic way.
367

368
    @type method: string
369
    @param method: HTTP method to use
370
    @type path: string
371
    @param path: HTTP URL path
372
    @type query: list of two-tuples
373
    @param query: query arguments to pass to urllib.urlencode
374
    @type content: str or None
375
    @param content: HTTP body content
376

377
    @rtype: str
378
    @return: JSON-Decoded response
379

380
    @raises CertificateError: If an invalid SSL certificate is found
381
    @raises GanetiApiError: If an invalid response is returned
382

383
    """
384
    assert path.startswith("/")
385

    
386
    if content:
387
      encoded_content = self._json_encoder.encode(content)
388
    else:
389
      encoded_content = None
390

    
391
    # Build URL
392
    url = [self._base_url, path]
393
    if query:
394
      url.append("?")
395
      url.append(urllib.urlencode(query))
396

    
397
    req = _RapiRequest(method, "".join(url), self._headers, encoded_content)
398

    
399
    try:
400
      resp = self._http.open(req)
401
      encoded_response_content = resp.read()
402
    except (OpenSSL.SSL.Error, OpenSSL.crypto.Error), err:
403
      raise CertificateError("SSL issue: %s" % err)
404

    
405
    if encoded_response_content:
406
      response_content = simplejson.loads(encoded_response_content)
407
    else:
408
      response_content = None
409

    
410
    # TODO: Are there other status codes that are valid? (redirect?)
411
    if resp.code != HTTP_OK:
412
      if isinstance(response_content, dict):
413
        msg = ("%s %s: %s" %
414
               (response_content["code"],
415
                response_content["message"],
416
                response_content["explain"]))
417
      else:
418
        msg = str(response_content)
419

    
420
      raise GanetiApiError(msg, code=resp.code)
421

    
422
    return response_content
423

    
424
  @staticmethod
425
  def _CheckStorageType(storage_type):
426
    """Checks a storage type for validity.
427

428
    """
429
    if storage_type not in VALID_STORAGE_TYPES:
430
      raise InvalidStorageType("%s is an invalid storage type" % storage_type)
431

    
432
  def GetVersion(self):
433
    """Gets the Remote API version running on the cluster.
434

435
    @rtype: int
436
    @return: Ganeti Remote API version
437

438
    """
439
    return self._SendRequest(HTTP_GET, "/version", None, None)
440

    
441
  def GetOperatingSystems(self):
442
    """Gets the Operating Systems running in the Ganeti cluster.
443

444
    @rtype: list of str
445
    @return: operating systems
446

447
    """
448
    return self._SendRequest(HTTP_GET, "/%s/os" % GANETI_RAPI_VERSION,
449
                             None, None)
450

    
451
  def GetInfo(self):
452
    """Gets info about the cluster.
453

454
    @rtype: dict
455
    @return: information about the cluster
456

457
    """
458
    return self._SendRequest(HTTP_GET, "/%s/info" % GANETI_RAPI_VERSION,
459
                             None, None)
460

    
461
  def GetClusterTags(self):
462
    """Gets the cluster tags.
463

464
    @rtype: list of str
465
    @return: cluster tags
466

467
    """
468
    return self._SendRequest(HTTP_GET, "/%s/tags" % GANETI_RAPI_VERSION,
469
                             None, None)
470

    
471
  def AddClusterTags(self, tags, dry_run=False):
472
    """Adds tags to the cluster.
473

474
    @type tags: list of str
475
    @param tags: tags to add to the cluster
476
    @type dry_run: bool
477
    @param dry_run: whether to perform a dry run
478

479
    @rtype: int
480
    @return: job id
481

482
    """
483
    query = [("tag", t) for t in tags]
484
    if dry_run:
485
      query.append(("dry-run", 1))
486

    
487
    return self._SendRequest(HTTP_PUT, "/%s/tags" % GANETI_RAPI_VERSION,
488
                             query, None)
489

    
490
  def DeleteClusterTags(self, tags, dry_run=False):
491
    """Deletes tags from the cluster.
492

493
    @type tags: list of str
494
    @param tags: tags to delete
495
    @type dry_run: bool
496
    @param dry_run: whether to perform a dry run
497

498
    """
499
    query = [("tag", t) for t in tags]
500
    if dry_run:
501
      query.append(("dry-run", 1))
502

    
503
    return self._SendRequest(HTTP_DELETE, "/%s/tags" % GANETI_RAPI_VERSION,
504
                             query, None)
505

    
506
  def GetInstances(self, bulk=False):
507
    """Gets information about instances on the cluster.
508

509
    @type bulk: bool
510
    @param bulk: whether to return all information about all instances
511

512
    @rtype: list of dict or list of str
513
    @return: if bulk is True, info about the instances, else a list of instances
514

515
    """
516
    query = []
517
    if bulk:
518
      query.append(("bulk", 1))
519

    
520
    instances = self._SendRequest(HTTP_GET,
521
                                  "/%s/instances" % GANETI_RAPI_VERSION,
522
                                  query, None)
523
    if bulk:
524
      return instances
525
    else:
526
      return [i["id"] for i in instances]
527

    
528
  def GetInstanceInfo(self, instance):
529
    """Gets information about an instance.
530

531
    @type instance: str
532
    @param instance: instance whose info to return
533

534
    @rtype: dict
535
    @return: info about the instance
536

537
    """
538
    return self._SendRequest(HTTP_GET,
539
                             ("/%s/instances/%s" %
540
                              (GANETI_RAPI_VERSION, instance)), None, None)
541

    
542
  def CreateInstance(self, dry_run=False):
543
    """Creates a new instance.
544

545
    @type dry_run: bool
546
    @param dry_run: whether to perform a dry run
547

548
    @rtype: int
549
    @return: job id
550

551
    """
552
    # TODO: Pass arguments needed to actually create an instance.
553
    query = []
554
    if dry_run:
555
      query.append(("dry-run", 1))
556

    
557
    return self._SendRequest(HTTP_POST, "/%s/instances" % GANETI_RAPI_VERSION,
558
                             query, None)
559

    
560
  def DeleteInstance(self, instance, dry_run=False):
561
    """Deletes an instance.
562

563
    @type instance: str
564
    @param instance: the instance to delete
565

566
    @rtype: int
567
    @return: job id
568

569
    """
570
    query = []
571
    if dry_run:
572
      query.append(("dry-run", 1))
573

    
574
    return self._SendRequest(HTTP_DELETE,
575
                             ("/%s/instances/%s" %
576
                              (GANETI_RAPI_VERSION, instance)), query, None)
577

    
578
  def GetInstanceTags(self, instance):
579
    """Gets tags for an instance.
580

581
    @type instance: str
582
    @param instance: instance whose tags to return
583

584
    @rtype: list of str
585
    @return: tags for the instance
586

587
    """
588
    return self._SendRequest(HTTP_GET,
589
                             ("/%s/instances/%s/tags" %
590
                              (GANETI_RAPI_VERSION, instance)), None, None)
591

    
592
  def AddInstanceTags(self, instance, tags, dry_run=False):
593
    """Adds tags to an instance.
594

595
    @type instance: str
596
    @param instance: instance to add tags to
597
    @type tags: list of str
598
    @param tags: tags to add to the instance
599
    @type dry_run: bool
600
    @param dry_run: whether to perform a dry run
601

602
    @rtype: int
603
    @return: job id
604

605
    """
606
    query = [("tag", t) for t in tags]
607
    if dry_run:
608
      query.append(("dry-run", 1))
609

    
610
    return self._SendRequest(HTTP_PUT,
611
                             ("/%s/instances/%s/tags" %
612
                              (GANETI_RAPI_VERSION, instance)), query, None)
613

    
614
  def DeleteInstanceTags(self, instance, tags, dry_run=False):
615
    """Deletes tags from an instance.
616

617
    @type instance: str
618
    @param instance: instance to delete tags from
619
    @type tags: list of str
620
    @param tags: tags to delete
621
    @type dry_run: bool
622
    @param dry_run: whether to perform a dry run
623

624
    """
625
    query = [("tag", t) for t in tags]
626
    if dry_run:
627
      query.append(("dry-run", 1))
628

    
629
    return self._SendRequest(HTTP_DELETE,
630
                             ("/%s/instances/%s/tags" %
631
                              (GANETI_RAPI_VERSION, instance)), query, None)
632

    
633
  def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None,
634
                     dry_run=False):
635
    """Reboots an instance.
636

637
    @type instance: str
638
    @param instance: instance to rebot
639
    @type reboot_type: str
640
    @param reboot_type: one of: hard, soft, full
641
    @type ignore_secondaries: bool
642
    @param ignore_secondaries: if True, ignores errors for the secondary node
643
        while re-assembling disks (in hard-reboot mode only)
644
    @type dry_run: bool
645
    @param dry_run: whether to perform a dry run
646

647
    """
648
    query = []
649
    if reboot_type:
650
      query.append(("type", reboot_type))
651
    if ignore_secondaries is not None:
652
      query.append(("ignore_secondaries", ignore_secondaries))
653
    if dry_run:
654
      query.append(("dry-run", 1))
655

    
656
    return self._SendRequest(HTTP_POST,
657
                             ("/%s/instances/%s/reboot" %
658
                              (GANETI_RAPI_VERSION, instance)), query, None)
659

    
660
  def ShutdownInstance(self, instance, dry_run=False):
661
    """Shuts down an instance.
662

663
    @type instance: str
664
    @param instance: the instance to shut down
665
    @type dry_run: bool
666
    @param dry_run: whether to perform a dry run
667

668
    """
669
    query = []
670
    if dry_run:
671
      query.append(("dry-run", 1))
672

    
673
    return self._SendRequest(HTTP_PUT,
674
                             ("/%s/instances/%s/shutdown" %
675
                              (GANETI_RAPI_VERSION, instance)), query, None)
676

    
677
  def StartupInstance(self, instance, dry_run=False):
678
    """Starts up an instance.
679

680
    @type instance: str
681
    @param instance: the instance to start up
682
    @type dry_run: bool
683
    @param dry_run: whether to perform a dry run
684

685
    """
686
    query = []
687
    if dry_run:
688
      query.append(("dry-run", 1))
689

    
690
    return self._SendRequest(HTTP_PUT,
691
                             ("/%s/instances/%s/startup" %
692
                              (GANETI_RAPI_VERSION, instance)), query, None)
693

    
694
  def ReinstallInstance(self, instance, os, no_startup=False):
695
    """Reinstalls an instance.
696

697
    @type instance: str
698
    @param instance: the instance to reinstall
699
    @type os: str
700
    @param os: the os to reinstall
701
    @type no_startup: bool
702
    @param no_startup: whether to start the instance automatically
703

704
    """
705
    query = [("os", os)]
706
    if no_startup:
707
      query.append(("nostartup", 1))
708
    return self._SendRequest(HTTP_POST,
709
                             ("/%s/instances/%s/reinstall" %
710
                              (GANETI_RAPI_VERSION, instance)), query, None)
711

    
712
  def ReplaceInstanceDisks(self, instance, disks, mode=REPLACE_DISK_AUTO,
713
                           remote_node=None, iallocator=None, dry_run=False):
714
    """Replaces disks on an instance.
715

716
    @type instance: str
717
    @param instance: instance whose disks to replace
718
    @type disks: list of str
719
    @param disks: disks to replace
720
    @type mode: str
721
    @param mode: replacement mode to use (defaults to replace_auto)
722
    @type remote_node: str or None
723
    @param remote_node: new secondary node to use (for use with
724
        replace_new_secondary mode)
725
    @type iallocator: str or None
726
    @param iallocator: instance allocator plugin to use (for use with
727
                       replace_auto mode)
728
    @type dry_run: bool
729
    @param dry_run: whether to perform a dry run
730

731
    @rtype: int
732
    @return: job id
733

734
    @raises InvalidReplacementMode: If an invalid disk replacement mode is given
735
    @raises GanetiApiError: If no secondary node is given with a non-auto
736
        replacement mode is requested.
737

738
    """
739
    if mode not in VALID_REPLACEMENT_MODES:
740
      raise InvalidReplacementMode("%s is not a valid disk replacement mode" %
741
                                   mode)
742

    
743
    query = [
744
      ("mode", mode),
745
      ("disks", ",".join(disks)),
746
      ]
747

    
748
    if mode == REPLACE_DISK_AUTO:
749
      query.append(("iallocator", iallocator))
750
    elif mode == REPLACE_DISK_SECONDARY:
751
      if remote_node is None:
752
        raise GanetiApiError("Missing secondary node")
753
      query.append(("remote_node", remote_node))
754

    
755
    if dry_run:
756
      query.append(("dry-run", 1))
757

    
758
    return self._SendRequest(HTTP_POST,
759
                             ("/%s/instances/%s/replace-disks" %
760
                              (GANETI_RAPI_VERSION, instance)), query, None)
761

    
762
  def GetJobs(self):
763
    """Gets all jobs for the cluster.
764

765
    @rtype: list of int
766
    @return: job ids for the cluster
767

768
    """
769
    return [int(j["id"])
770
            for j in self._SendRequest(HTTP_GET,
771
                                       "/%s/jobs" % GANETI_RAPI_VERSION,
772
                                       None, None)]
773

    
774
  def GetJobStatus(self, job_id):
775
    """Gets the status of a job.
776

777
    @type job_id: int
778
    @param job_id: job id whose status to query
779

780
    @rtype: dict
781
    @return: job status
782

783
    """
784
    return self._SendRequest(HTTP_GET,
785
                             "/%s/jobs/%s" % (GANETI_RAPI_VERSION, job_id),
786
                             None, None)
787

    
788
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
789
    """Waits for job changes.
790

791
    @type job_id: int
792
    @param job_id: Job ID for which to wait
793

794
    """
795
    body = {
796
      "fields": fields,
797
      "previous_job_info": prev_job_info,
798
      "previous_log_serial": prev_log_serial,
799
      }
800

    
801
    return self._SendRequest(HTTP_GET,
802
                             "/%s/jobs/%s/wait" % (GANETI_RAPI_VERSION, job_id),
803
                             None, body)
804

    
805
  def CancelJob(self, job_id, dry_run=False):
806
    """Cancels a job.
807

808
    @type job_id: int
809
    @param job_id: id of the job to delete
810
    @type dry_run: bool
811
    @param dry_run: whether to perform a dry run
812

813
    """
814
    query = []
815
    if dry_run:
816
      query.append(("dry-run", 1))
817

    
818
    return self._SendRequest(HTTP_DELETE,
819
                             "/%s/jobs/%s" % (GANETI_RAPI_VERSION, job_id),
820
                             query, None)
821

    
822
  def GetNodes(self, bulk=False):
823
    """Gets all nodes in the cluster.
824

825
    @type bulk: bool
826
    @param bulk: whether to return all information about all instances
827

828
    @rtype: list of dict or str
829
    @return: if bulk is true, info about nodes in the cluster,
830
        else list of nodes in the cluster
831

832
    """
833
    query = []
834
    if bulk:
835
      query.append(("bulk", 1))
836

    
837
    nodes = self._SendRequest(HTTP_GET, "/%s/nodes" % GANETI_RAPI_VERSION,
838
                              query, None)
839
    if bulk:
840
      return nodes
841
    else:
842
      return [n["id"] for n in nodes]
843

    
844
  def GetNodeInfo(self, node):
845
    """Gets information about a node.
846

847
    @type node: str
848
    @param node: node whose info to return
849

850
    @rtype: dict
851
    @return: info about the node
852

853
    """
854
    return self._SendRequest(HTTP_GET,
855
                             "/%s/nodes/%s" % (GANETI_RAPI_VERSION, node),
856
                             None, None)
857

    
858
  def EvacuateNode(self, node, iallocator=None, remote_node=None,
859
                   dry_run=False):
860
    """Evacuates instances from a Ganeti node.
861

862
    @type node: str
863
    @param node: node to evacuate
864
    @type iallocator: str or None
865
    @param iallocator: instance allocator to use
866
    @type remote_node: str
867
    @param remote_node: node to evaucate to
868
    @type dry_run: bool
869
    @param dry_run: whether to perform a dry run
870

871
    @rtype: int
872
    @return: job id
873

874
    @raises GanetiApiError: if an iallocator and remote_node are both specified
875

876
    """
877
    if iallocator and remote_node:
878
      raise GanetiApiError("Only one of iallocator or remote_node can be used")
879

    
880
    query = []
881
    if iallocator:
882
      query.append(("iallocator", iallocator))
883
    if remote_node:
884
      query.append(("remote_node", remote_node))
885
    if dry_run:
886
      query.append(("dry-run", 1))
887

    
888
    return self._SendRequest(HTTP_POST,
889
                             ("/%s/nodes/%s/evacuate" %
890
                              (GANETI_RAPI_VERSION, node)), query, None)
891

    
892
  def MigrateNode(self, node, live=True, dry_run=False):
893
    """Migrates all primary instances from a node.
894

895
    @type node: str
896
    @param node: node to migrate
897
    @type live: bool
898
    @param live: whether to use live migration
899
    @type dry_run: bool
900
    @param dry_run: whether to perform a dry run
901

902
    @rtype: int
903
    @return: job id
904

905
    """
906
    query = []
907
    if live:
908
      query.append(("live", 1))
909
    if dry_run:
910
      query.append(("dry-run", 1))
911

    
912
    return self._SendRequest(HTTP_POST,
913
                             ("/%s/nodes/%s/migrate" %
914
                              (GANETI_RAPI_VERSION, node)), query, None)
915

    
916
  def GetNodeRole(self, node):
917
    """Gets the current role for a node.
918

919
    @type node: str
920
    @param node: node whose role to return
921

922
    @rtype: str
923
    @return: the current role for a node
924

925
    """
926
    return self._SendRequest(HTTP_GET,
927
                             ("/%s/nodes/%s/role" %
928
                              (GANETI_RAPI_VERSION, node)), None, None)
929

    
930
  def SetNodeRole(self, node, role, force=False):
931
    """Sets the role for a node.
932

933
    @type node: str
934
    @param node: the node whose role to set
935
    @type role: str
936
    @param role: the role to set for the node
937
    @type force: bool
938
    @param force: whether to force the role change
939

940
    @rtype: int
941
    @return: job id
942

943
    @raise InvalidNodeRole: If an invalid node role is specified
944

945
    """
946
    if role not in VALID_NODE_ROLES:
947
      raise InvalidNodeRole("%s is not a valid node role" % role)
948

    
949
    query = [("force", force)]
950

    
951
    return self._SendRequest(HTTP_PUT,
952
                             ("/%s/nodes/%s/role" %
953
                              (GANETI_RAPI_VERSION, node)), query, role)
954

    
955
  def GetNodeStorageUnits(self, node, storage_type, output_fields):
956
    """Gets the storage units for a node.
957

958
    @type node: str
959
    @param node: the node whose storage units to return
960
    @type storage_type: str
961
    @param storage_type: storage type whose units to return
962
    @type output_fields: str
963
    @param output_fields: storage type fields to return
964

965
    @rtype: int
966
    @return: job id where results can be retrieved
967

968
    @raise InvalidStorageType: If an invalid storage type is specified
969

970
    """
971
    # TODO: Add default for storage_type & output_fields
972
    self._CheckStorageType(storage_type)
973

    
974
    query = [
975
      ("storage_type", storage_type),
976
      ("output_fields", output_fields),
977
      ]
978

    
979
    return self._SendRequest(HTTP_GET,
980
                             ("/%s/nodes/%s/storage" %
981
                              (GANETI_RAPI_VERSION, node)), query, None)
982

    
983
  def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=True):
984
    """Modifies parameters of storage units on the node.
985

986
    @type node: str
987
    @param node: node whose storage units to modify
988
    @type storage_type: str
989
    @param storage_type: storage type whose units to modify
990
    @type name: str
991
    @param name: name of the storage unit
992
    @type allocatable: bool
993
    @param allocatable: TODO: Document me
994

995
    @rtype: int
996
    @return: job id
997

998
    @raise InvalidStorageType: If an invalid storage type is specified
999

1000
    """
1001
    self._CheckStorageType(storage_type)
1002

    
1003
    query = [
1004
      ("storage_type", storage_type),
1005
      ("name", name),
1006
      ("allocatable", allocatable),
1007
      ]
1008

    
1009
    return self._SendRequest(HTTP_PUT,
1010
                             ("/%s/nodes/%s/storage/modify" %
1011
                              (GANETI_RAPI_VERSION, node)), query, None)
1012

    
1013
  def RepairNodeStorageUnits(self, node, storage_type, name):
1014
    """Repairs a storage unit on the node.
1015

1016
    @type node: str
1017
    @param node: node whose storage units to repair
1018
    @type storage_type: str
1019
    @param storage_type: storage type to repair
1020
    @type name: str
1021
    @param name: name of the storage unit to repair
1022

1023
    @rtype: int
1024
    @return: job id
1025

1026
    @raise InvalidStorageType: If an invalid storage type is specified
1027

1028
    """
1029
    self._CheckStorageType(storage_type)
1030

    
1031
    query = [
1032
      ("storage_type", storage_type),
1033
      ("name", name),
1034
      ]
1035

    
1036
    return self._SendRequest(HTTP_PUT,
1037
                             ("/%s/nodes/%s/storage/repair" %
1038
                              (GANETI_RAPI_VERSION, node)), query, None)
1039

    
1040
  def GetNodeTags(self, node):
1041
    """Gets the tags for a node.
1042

1043
    @type node: str
1044
    @param node: node whose tags to return
1045

1046
    @rtype: list of str
1047
    @return: tags for the node
1048

1049
    """
1050
    return self._SendRequest(HTTP_GET,
1051
                             ("/%s/nodes/%s/tags" %
1052
                              (GANETI_RAPI_VERSION, node)), None, None)
1053

    
1054
  def AddNodeTags(self, node, tags, dry_run=False):
1055
    """Adds tags to a node.
1056

1057
    @type node: str
1058
    @param node: node to add tags to
1059
    @type tags: list of str
1060
    @param tags: tags to add to the node
1061
    @type dry_run: bool
1062
    @param dry_run: whether to perform a dry run
1063

1064
    @rtype: int
1065
    @return: job id
1066

1067
    """
1068
    query = [("tag", t) for t in tags]
1069
    if dry_run:
1070
      query.append(("dry-run", 1))
1071

    
1072
    return self._SendRequest(HTTP_PUT,
1073
                             ("/%s/nodes/%s/tags" %
1074
                              (GANETI_RAPI_VERSION, node)), query, tags)
1075

    
1076
  def DeleteNodeTags(self, node, tags, dry_run=False):
1077
    """Delete tags from a node.
1078

1079
    @type node: str
1080
    @param node: node to remove tags from
1081
    @type tags: list of str
1082
    @param tags: tags to remove from the node
1083
    @type dry_run: bool
1084
    @param dry_run: whether to perform a dry run
1085

1086
    @rtype: int
1087
    @return: job id
1088

1089
    """
1090
    query = [("tag", t) for t in tags]
1091
    if dry_run:
1092
      query.append(("dry-run", 1))
1093

    
1094
    return self._SendRequest(HTTP_DELETE,
1095
                             ("/%s/nodes/%s/tags" %
1096
                              (GANETI_RAPI_VERSION, node)), query, None)