Statistics
| Branch: | Tag: | Revision:

root / lib / rapi / client.py @ dfc8ad25

History | View | Annotate | Download (28.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti RAPI client."""
23

    
24
import httplib
25
import urllib2
26
import logging
27
import simplejson
28
import socket
29
import urllib
30
import OpenSSL
31
import distutils.version
32

    
33

    
34
GANETI_RAPI_PORT = 5080
35

    
36
HTTP_DELETE = "DELETE"
37
HTTP_GET = "GET"
38
HTTP_PUT = "PUT"
39
HTTP_POST = "POST"
40
HTTP_OK = 200
41
HTTP_APP_JSON = "application/json"
42

    
43
REPLACE_DISK_PRI = "replace_on_primary"
44
REPLACE_DISK_SECONDARY = "replace_on_secondary"
45
REPLACE_DISK_CHG = "replace_new_secondary"
46
REPLACE_DISK_AUTO = "replace_auto"
47
VALID_REPLACEMENT_MODES = frozenset([
48
  REPLACE_DISK_PRI,
49
  REPLACE_DISK_SECONDARY,
50
  REPLACE_DISK_CHG,
51
  REPLACE_DISK_AUTO,
52
  ])
53
VALID_NODE_ROLES = frozenset([
54
  "drained", "master", "master-candidate", "offline", "regular",
55
  ])
56
VALID_STORAGE_TYPES = frozenset(["file", "lvm-pv", "lvm-vg"])
57

    
58

    
59
class Error(Exception):
60
  """Base error class for this module.
61

62
  """
63
  pass
64

    
65

    
66
class CertificateError(Error):
67
  """Raised when a problem is found with the SSL certificate.
68

69
  """
70
  pass
71

    
72

    
73
class GanetiApiError(Error):
74
  """Generic error raised from Ganeti API.
75

76
  """
77
  def __init__(self, msg, code=None):
78
    Error.__init__(self, msg)
79
    self.code = code
80

    
81

    
82
class InvalidReplacementMode(Error):
83
  """Raised when an invalid disk replacement mode is attempted.
84

85
  """
86
  pass
87

    
88

    
89
class InvalidStorageType(Error):
90
  """Raised when an invalid storage type is used.
91

92
  """
93
  pass
94

    
95

    
96
class InvalidNodeRole(Error):
97
  """Raised when an invalid node role is used.
98

99
  """
100
  pass
101

    
102

    
103
def FormatX509Name(x509_name):
104
  """Formats an X509 name.
105

106
  @type x509_name: OpenSSL.crypto.X509Name
107

108
  """
109
  try:
110
    # Only supported in pyOpenSSL 0.7 and above
111
    get_components_fn = x509_name.get_components
112
  except AttributeError:
113
    return repr(x509_name)
114
  else:
115
    return "".join("/%s=%s" % (name, value)
116
                   for name, value in get_components_fn())
117

    
118

    
119
class CertAuthorityVerify:
120
  """Certificate verificator for SSL context.
121

122
  Configures SSL context to verify server's certificate.
123

124
  """
125
  _CAPATH_MINVERSION = "0.9"
126
  _DEFVFYPATHS_MINVERSION = "0.9"
127

    
128
  _PYOPENSSL_VERSION = OpenSSL.__version__
129
  _PARSED_PYOPENSSL_VERSION = distutils.version.LooseVersion(_PYOPENSSL_VERSION)
130

    
131
  _SUPPORT_CAPATH = (_PARSED_PYOPENSSL_VERSION >= _CAPATH_MINVERSION)
132
  _SUPPORT_DEFVFYPATHS = (_PARSED_PYOPENSSL_VERSION >= _DEFVFYPATHS_MINVERSION)
133

    
134
  def __init__(self, cafile=None, capath=None, use_default_verify_paths=False):
135
    """Initializes this class.
136

137
    @type cafile: string
138
    @param cafile: In which file we can find the certificates
139
    @type capath: string
140
    @param capath: In which directory we can find the certificates
141
    @type use_default_verify_paths: bool
142
    @param use_default_verify_paths: Whether the platform provided CA
143
                                     certificates are to be used for
144
                                     verification purposes
145

146
    """
147
    self._cafile = cafile
148
    self._capath = capath
149
    self._use_default_verify_paths = use_default_verify_paths
150

    
151
    if self._capath is not None and not self._SUPPORT_CAPATH:
152
      raise Error(("PyOpenSSL %s has no support for a CA directory,"
153
                   " version %s or above is required") %
154
                  (self._PYOPENSSL_VERSION, self._CAPATH_MINVERSION))
155

    
156
    if self._use_default_verify_paths and not self._SUPPORT_DEFVFYPATHS:
157
      raise Error(("PyOpenSSL %s has no support for using default verification"
158
                   " paths, version %s or above is required") %
159
                  (self._PYOPENSSL_VERSION, self._DEFVFYPATHS_MINVERSION))
160

    
161
  @staticmethod
162
  def _VerifySslCertCb(logger, _, cert, errnum, errdepth, ok):
163
    """Callback for SSL certificate verification.
164

165
    @param logger: Logging object
166

167
    """
168
    if ok:
169
      log_fn = logger.debug
170
    else:
171
      log_fn = logger.error
172

    
173
    log_fn("Verifying SSL certificate at depth %s, subject '%s', issuer '%s'",
174
           errdepth, FormatX509Name(cert.get_subject()),
175
           FormatX509Name(cert.get_issuer()))
176

    
177
    if not ok:
178
      try:
179
        # Only supported in pyOpenSSL 0.7 and above
180
        # pylint: disable-msg=E1101
181
        fn = OpenSSL.crypto.X509_verify_cert_error_string
182
      except AttributeError:
183
        errmsg = ""
184
      else:
185
        errmsg = ":%s" % fn(errnum)
186

    
187
      logger.error("verify error:num=%s%s", errnum, errmsg)
188

    
189
    return ok
190

    
191
  def __call__(self, ctx, logger):
192
    """Configures an SSL context to verify certificates.
193

194
    @type ctx: OpenSSL.SSL.Context
195
    @param ctx: SSL context
196

197
    """
198
    if self._use_default_verify_paths:
199
      ctx.set_default_verify_paths()
200

    
201
    if self._cafile or self._capath:
202
      if self._SUPPORT_CAPATH:
203
        ctx.load_verify_locations(self._cafile, self._capath)
204
      else:
205
        ctx.load_verify_locations(self._cafile)
206

    
207
    ctx.set_verify(OpenSSL.SSL.VERIFY_PEER,
208
                   lambda conn, cert, errnum, errdepth, ok: \
209
                     self._VerifySslCertCb(logger, conn, cert,
210
                                           errnum, errdepth, ok))
211

    
212

    
213
class _HTTPSConnectionOpenSSL(httplib.HTTPSConnection):
214
  """HTTPS Connection handler that verifies the SSL certificate.
215

216
  """
217
  def __init__(self, *args, **kwargs):
218
    """Initializes this class.
219

220
    """
221
    httplib.HTTPSConnection.__init__(self, *args, **kwargs)
222
    self._logger = None
223
    self._config_ssl_verification = None
224

    
225
  def Setup(self, logger, config_ssl_verification):
226
    """Sets the SSL verification config function.
227

228
    @param logger: Logging object
229
    @type config_ssl_verification: callable
230

231
    """
232
    assert self._logger is None
233
    assert self._config_ssl_verification is None
234

    
235
    self._logger = logger
236
    self._config_ssl_verification = config_ssl_verification
237

    
238
  def connect(self):
239
    """Connect to the server specified when the object was created.
240

241
    This ensures that SSL certificates are verified.
242

243
    """
244
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
245

    
246
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
247
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
248

    
249
    if self._config_ssl_verification:
250
      self._config_ssl_verification(ctx, self._logger)
251

    
252
    ssl = OpenSSL.SSL.Connection(ctx, sock)
253
    ssl.connect((self.host, self.port))
254

    
255
    self.sock = httplib.FakeSocket(sock, ssl)
256

    
257

    
258
class _HTTPSHandler(urllib2.HTTPSHandler):
259
  def __init__(self, logger, config_ssl_verification):
260
    """Initializes this class.
261

262
    @param logger: Logging object
263
    @type config_ssl_verification: callable
264
    @param config_ssl_verification: Function to configure SSL context for
265
                                    certificate verification
266

267
    """
268
    urllib2.HTTPSHandler.__init__(self)
269
    self._logger = logger
270
    self._config_ssl_verification = config_ssl_verification
271

    
272
  def _CreateHttpsConnection(self, *args, **kwargs):
273
    """Wrapper around L{_HTTPSConnectionOpenSSL} to add SSL verification.
274

275
    This wrapper is necessary provide a compatible API to urllib2.
276

277
    """
278
    conn = _HTTPSConnectionOpenSSL(*args, **kwargs)
279
    conn.Setup(self._logger, self._config_ssl_verification)
280
    return conn
281

    
282
  def https_open(self, req):
283
    """Creates HTTPS connection.
284

285
    Called by urllib2.
286

287
    """
288
    return self.do_open(self._CreateHttpsConnection, req)
289

    
290

    
291
class _RapiRequest(urllib2.Request):
292
  def __init__(self, method, url, headers, data):
293
    """Initializes this class.
294

295
    """
296
    urllib2.Request.__init__(self, url, data=data, headers=headers)
297
    self._method = method
298

    
299
  def get_method(self):
300
    """Returns the HTTP request method.
301

302
    """
303
    return self._method
304

    
305

    
306
class GanetiRapiClient(object):
307
  """Ganeti RAPI client.
308

309
  """
310
  USER_AGENT = "Ganeti RAPI Client"
311
  _json_encoder = simplejson.JSONEncoder(sort_keys=True)
312

    
313
  def __init__(self, host, port=GANETI_RAPI_PORT,
314
               username=None, password=None,
315
               config_ssl_verification=None, ignore_proxy=False,
316
               logger=logging):
317
    """Constructor.
318

319
    @type host: string
320
    @param host: the ganeti cluster master to interact with
321
    @type port: int
322
    @param port: the port on which the RAPI is running (default is 5080)
323
    @type username: string
324
    @param username: the username to connect with
325
    @type password: string
326
    @param password: the password to connect with
327
    @type config_ssl_verification: callable
328
    @param config_ssl_verification: Function to configure SSL context for
329
                                    certificate verification
330
    @type ignore_proxy: bool
331
    @param ignore_proxy: Whether to ignore proxy settings
332
    @param logger: Logging object
333

334
    """
335
    self._host = host
336
    self._port = port
337
    self._logger = logger
338

    
339
    self._base_url = "https://%s:%s" % (host, port)
340

    
341
    handlers = [_HTTPSHandler(self._logger, config_ssl_verification)]
342

    
343
    if username is not None:
344
      pwmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
345
      pwmgr.add_password(None, self._base_url, username, password)
346
      handlers.append(urllib2.HTTPBasicAuthHandler(pwmgr))
347
    elif password:
348
      raise Error("Specified password without username")
349

    
350
    if ignore_proxy:
351
      handlers.append(urllib2.ProxyHandler({}))
352

    
353
    self._http = urllib2.build_opener(*handlers) # pylint: disable-msg=W0142
354

    
355
    self._headers = {
356
      "Accept": HTTP_APP_JSON,
357
      "Content-type": HTTP_APP_JSON,
358
      "User-Agent": self.USER_AGENT,
359
      }
360

    
361
  def _SendRequest(self, method, path, query, content):
362
    """Sends an HTTP request.
363

364
    This constructs a full URL, encodes and decodes HTTP bodies, and
365
    handles invalid responses in a pythonic way.
366

367
    @type method: string
368
    @param method: HTTP method to use
369
    @type path: string
370
    @param path: HTTP URL path
371
    @type query: list of two-tuples
372
    @param query: query arguments to pass to urllib.urlencode
373
    @type content: str or None
374
    @param content: HTTP body content
375

376
    @rtype: str
377
    @return: JSON-Decoded response
378

379
    @raises CertificateError: If an invalid SSL certificate is found
380
    @raises GanetiApiError: If an invalid response is returned
381

382
    """
383
    assert path.startswith("/")
384

    
385
    if content:
386
      encoded_content = self._json_encoder.encode(content)
387
    else:
388
      encoded_content = None
389

    
390
    # Build URL
391
    url = [self._base_url, path]
392
    if query:
393
      url.append("?")
394
      url.append(urllib.urlencode(query))
395

    
396
    req = _RapiRequest(method, "".join(url), self._headers, encoded_content)
397

    
398
    try:
399
      resp = self._http.open(req)
400
      encoded_response_content = resp.read()
401
    except (OpenSSL.SSL.Error, OpenSSL.crypto.Error), err:
402
      raise CertificateError("SSL issue: %s" % err)
403

    
404
    if encoded_response_content:
405
      response_content = simplejson.loads(encoded_response_content)
406
    else:
407
      response_content = None
408

    
409
    # TODO: Are there other status codes that are valid? (redirect?)
410
    if resp.code != HTTP_OK:
411
      if isinstance(response_content, dict):
412
        msg = ("%s %s: %s" %
413
               (response_content["code"],
414
                response_content["message"],
415
                response_content["explain"]))
416
      else:
417
        msg = str(response_content)
418

    
419
      raise GanetiApiError(msg, code=resp.code)
420

    
421
    return response_content
422

    
423
  @staticmethod
424
  def _CheckStorageType(storage_type):
425
    """Checks a storage type for validity.
426

427
    """
428
    if storage_type not in VALID_STORAGE_TYPES:
429
      raise InvalidStorageType("%s is an invalid storage type" % storage_type)
430

    
431
  def GetVersion(self):
432
    """Gets the Remote API version running on the cluster.
433

434
    @rtype: int
435
    @return: Ganeti Remote API version
436

437
    """
438
    return self._SendRequest(HTTP_GET, "/version", None, None)
439

    
440
  def GetOperatingSystems(self):
441
    """Gets the Operating Systems running in the Ganeti cluster.
442

443
    @rtype: list of str
444
    @return: operating systems
445

446
    """
447
    return self._SendRequest(HTTP_GET, "/2/os", None, None)
448

    
449
  def GetInfo(self):
450
    """Gets info about the cluster.
451

452
    @rtype: dict
453
    @return: information about the cluster
454

455
    """
456
    return self._SendRequest(HTTP_GET, "/2/info", None, None)
457

    
458
  def GetClusterTags(self):
459
    """Gets the cluster tags.
460

461
    @rtype: list of str
462
    @return: cluster tags
463

464
    """
465
    return self._SendRequest(HTTP_GET, "/2/tags", None, None)
466

    
467
  def AddClusterTags(self, tags, dry_run=False):
468
    """Adds tags to the cluster.
469

470
    @type tags: list of str
471
    @param tags: tags to add to the cluster
472
    @type dry_run: bool
473
    @param dry_run: whether to perform a dry run
474

475
    @rtype: int
476
    @return: job id
477

478
    """
479
    query = [("tag", t) for t in tags]
480
    if dry_run:
481
      query.append(("dry-run", 1))
482

    
483
    return self._SendRequest(HTTP_PUT, "/2/tags", query, None)
484

    
485
  def DeleteClusterTags(self, tags, dry_run=False):
486
    """Deletes tags from the cluster.
487

488
    @type tags: list of str
489
    @param tags: tags to delete
490
    @type dry_run: bool
491
    @param dry_run: whether to perform a dry run
492

493
    """
494
    query = [("tag", t) for t in tags]
495
    if dry_run:
496
      query.append(("dry-run", 1))
497

    
498
    return self._SendRequest(HTTP_DELETE, "/2/tags", query, None)
499

    
500
  def GetInstances(self, bulk=False):
501
    """Gets information about instances on the cluster.
502

503
    @type bulk: bool
504
    @param bulk: whether to return all information about all instances
505

506
    @rtype: list of dict or list of str
507
    @return: if bulk is True, info about the instances, else a list of instances
508

509
    """
510
    query = []
511
    if bulk:
512
      query.append(("bulk", 1))
513

    
514
    instances = self._SendRequest(HTTP_GET, "/2/instances", query, None)
515
    if bulk:
516
      return instances
517
    else:
518
      return [i["id"] for i in instances]
519

    
520
  def GetInstanceInfo(self, instance):
521
    """Gets information about an instance.
522

523
    @type instance: str
524
    @param instance: instance whose info to return
525

526
    @rtype: dict
527
    @return: info about the instance
528

529
    """
530
    return self._SendRequest(HTTP_GET, "/2/instances/%s" % instance, None, None)
531

    
532
  def CreateInstance(self, dry_run=False):
533
    """Creates a new instance.
534

535
    @type dry_run: bool
536
    @param dry_run: whether to perform a dry run
537

538
    @rtype: int
539
    @return: job id
540

541
    """
542
    # TODO: Pass arguments needed to actually create an instance.
543
    query = []
544
    if dry_run:
545
      query.append(("dry-run", 1))
546

    
547
    return self._SendRequest(HTTP_POST, "/2/instances", query, None)
548

    
549
  def DeleteInstance(self, instance, dry_run=False):
550
    """Deletes an instance.
551

552
    @type instance: str
553
    @param instance: the instance to delete
554

555
    @rtype: int
556
    @return: job id
557

558
    """
559
    query = []
560
    if dry_run:
561
      query.append(("dry-run", 1))
562

    
563
    return self._SendRequest(HTTP_DELETE, "/2/instances/%s" % instance,
564
                             query, None)
565

    
566
  def GetInstanceTags(self, instance):
567
    """Gets tags for an instance.
568

569
    @type instance: str
570
    @param instance: instance whose tags to return
571

572
    @rtype: list of str
573
    @return: tags for the instance
574

575
    """
576
    return self._SendRequest(HTTP_GET, "/2/instances/%s/tags" % instance,
577
                             None, None)
578

    
579
  def AddInstanceTags(self, instance, tags, dry_run=False):
580
    """Adds tags to an instance.
581

582
    @type instance: str
583
    @param instance: instance to add tags to
584
    @type tags: list of str
585
    @param tags: tags to add to the instance
586
    @type dry_run: bool
587
    @param dry_run: whether to perform a dry run
588

589
    @rtype: int
590
    @return: job id
591

592
    """
593
    query = [("tag", t) for t in tags]
594
    if dry_run:
595
      query.append(("dry-run", 1))
596

    
597
    return self._SendRequest(HTTP_PUT, "/2/instances/%s/tags" % instance,
598
                             query, None)
599

    
600
  def DeleteInstanceTags(self, instance, tags, dry_run=False):
601
    """Deletes tags from an instance.
602

603
    @type instance: str
604
    @param instance: instance to delete tags from
605
    @type tags: list of str
606
    @param tags: tags to delete
607
    @type dry_run: bool
608
    @param dry_run: whether to perform a dry run
609

610
    """
611
    query = [("tag", t) for t in tags]
612
    if dry_run:
613
      query.append(("dry-run", 1))
614

    
615
    return self._SendRequest(HTTP_DELETE, "/2/instances/%s/tags" % instance,
616
                             query, None)
617

    
618
  def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None,
619
                     dry_run=False):
620
    """Reboots an instance.
621

622
    @type instance: str
623
    @param instance: instance to rebot
624
    @type reboot_type: str
625
    @param reboot_type: one of: hard, soft, full
626
    @type ignore_secondaries: bool
627
    @param ignore_secondaries: if True, ignores errors for the secondary node
628
        while re-assembling disks (in hard-reboot mode only)
629
    @type dry_run: bool
630
    @param dry_run: whether to perform a dry run
631

632
    """
633
    query = []
634
    if reboot_type:
635
      query.append(("type", reboot_type))
636
    if ignore_secondaries is not None:
637
      query.append(("ignore_secondaries", ignore_secondaries))
638
    if dry_run:
639
      query.append(("dry-run", 1))
640

    
641
    return self._SendRequest(HTTP_POST, "/2/instances/%s/reboot" % instance,
642
                             query, None)
643

    
644
  def ShutdownInstance(self, instance, dry_run=False):
645
    """Shuts down an instance.
646

647
    @type instance: str
648
    @param instance: the instance to shut down
649
    @type dry_run: bool
650
    @param dry_run: whether to perform a dry run
651

652
    """
653
    query = []
654
    if dry_run:
655
      query.append(("dry-run", 1))
656

    
657
    return self._SendRequest(HTTP_PUT, "/2/instances/%s/shutdown" % instance,
658
                             query, None)
659

    
660
  def StartupInstance(self, instance, dry_run=False):
661
    """Starts up an instance.
662

663
    @type instance: str
664
    @param instance: the instance to start up
665
    @type dry_run: bool
666
    @param dry_run: whether to perform a dry run
667

668
    """
669
    query = []
670
    if dry_run:
671
      query.append(("dry-run", 1))
672

    
673
    return self._SendRequest(HTTP_PUT, "/2/instances/%s/startup" % instance,
674
                             query, None)
675

    
676
  def ReinstallInstance(self, instance, os, no_startup=False):
677
    """Reinstalls an instance.
678

679
    @type instance: str
680
    @param instance: the instance to reinstall
681
    @type os: str
682
    @param os: the os to reinstall
683
    @type no_startup: bool
684
    @param no_startup: whether to start the instance automatically
685

686
    """
687
    query = [("os", os)]
688
    if no_startup:
689
      query.append(("nostartup", 1))
690
    return self._SendRequest(HTTP_POST, "/2/instances/%s/reinstall" % instance,
691
                             query, None)
692

    
693
  def ReplaceInstanceDisks(self, instance, disks, mode=REPLACE_DISK_AUTO,
694
                           remote_node=None, iallocator=None, dry_run=False):
695
    """Replaces disks on an instance.
696

697
    @type instance: str
698
    @param instance: instance whose disks to replace
699
    @type disks: list of str
700
    @param disks: disks to replace
701
    @type mode: str
702
    @param mode: replacement mode to use (defaults to replace_auto)
703
    @type remote_node: str or None
704
    @param remote_node: new secondary node to use (for use with
705
        replace_new_secondary mode)
706
    @type iallocator: str or None
707
    @param iallocator: instance allocator plugin to use (for use with
708
                       replace_auto mode)
709
    @type dry_run: bool
710
    @param dry_run: whether to perform a dry run
711

712
    @rtype: int
713
    @return: job id
714

715
    @raises InvalidReplacementMode: If an invalid disk replacement mode is given
716
    @raises GanetiApiError: If no secondary node is given with a non-auto
717
        replacement mode is requested.
718

719
    """
720
    if mode not in VALID_REPLACEMENT_MODES:
721
      raise InvalidReplacementMode("%s is not a valid disk replacement mode" %
722
                                   mode)
723

    
724
    query = [
725
      ("mode", mode),
726
      ("disks", ",".join(disks)),
727
      ]
728

    
729
    if mode == REPLACE_DISK_AUTO:
730
      query.append(("iallocator", iallocator))
731
    elif mode == REPLACE_DISK_SECONDARY:
732
      if remote_node is None:
733
        raise GanetiApiError("Missing secondary node")
734
      query.append(("remote_node", remote_node))
735

    
736
    if dry_run:
737
      query.append(("dry-run", 1))
738

    
739
    return self._SendRequest(HTTP_POST,
740
                             "/2/instances/%s/replace-disks" % instance,
741
                             query, None)
742

    
743
  def GetJobs(self):
744
    """Gets all jobs for the cluster.
745

746
    @rtype: list of int
747
    @return: job ids for the cluster
748

749
    """
750
    return [int(j["id"])
751
            for j in self._SendRequest(HTTP_GET, "/2/jobs", None, None)]
752

    
753
  def GetJobStatus(self, job_id):
754
    """Gets the status of a job.
755

756
    @type job_id: int
757
    @param job_id: job id whose status to query
758

759
    @rtype: dict
760
    @return: job status
761

762
    """
763
    return self._SendRequest(HTTP_GET, "/2/jobs/%s" % job_id, None, None)
764

    
765
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
766
    """Waits for job changes.
767

768
    @type job_id: int
769
    @param job_id: Job ID for which to wait
770

771
    """
772
    body = {
773
      "fields": fields,
774
      "previous_job_info": prev_job_info,
775
      "previous_log_serial": prev_log_serial,
776
      }
777

    
778
    return self._SendRequest(HTTP_GET, "/2/jobs/%s/wait" % job_id, None, body)
779

    
780
  def CancelJob(self, job_id, dry_run=False):
781
    """Cancels a job.
782

783
    @type job_id: int
784
    @param job_id: id of the job to delete
785
    @type dry_run: bool
786
    @param dry_run: whether to perform a dry run
787

788
    """
789
    query = []
790
    if dry_run:
791
      query.append(("dry-run", 1))
792

    
793
    return self._SendRequest(HTTP_DELETE, "/2/jobs/%s" % job_id, query, None)
794

    
795
  def GetNodes(self, bulk=False):
796
    """Gets all nodes in the cluster.
797

798
    @type bulk: bool
799
    @param bulk: whether to return all information about all instances
800

801
    @rtype: list of dict or str
802
    @return: if bulk is true, info about nodes in the cluster,
803
        else list of nodes in the cluster
804

805
    """
806
    query = []
807
    if bulk:
808
      query.append(("bulk", 1))
809

    
810
    nodes = self._SendRequest(HTTP_GET, "/2/nodes", query, None)
811
    if bulk:
812
      return nodes
813
    else:
814
      return [n["id"] for n in nodes]
815

    
816
  def GetNodeInfo(self, node):
817
    """Gets information about a node.
818

819
    @type node: str
820
    @param node: node whose info to return
821

822
    @rtype: dict
823
    @return: info about the node
824

825
    """
826
    return self._SendRequest(HTTP_GET, "/2/nodes/%s" % node, None, None)
827

    
828
  def EvacuateNode(self, node, iallocator=None, remote_node=None,
829
                   dry_run=False):
830
    """Evacuates instances from a Ganeti node.
831

832
    @type node: str
833
    @param node: node to evacuate
834
    @type iallocator: str or None
835
    @param iallocator: instance allocator to use
836
    @type remote_node: str
837
    @param remote_node: node to evaucate to
838
    @type dry_run: bool
839
    @param dry_run: whether to perform a dry run
840

841
    @rtype: int
842
    @return: job id
843

844
    @raises GanetiApiError: if an iallocator and remote_node are both specified
845

846
    """
847
    if iallocator and remote_node:
848
      raise GanetiApiError("Only one of iallocator or remote_node can be used")
849

    
850
    query = []
851
    if iallocator:
852
      query.append(("iallocator", iallocator))
853
    if remote_node:
854
      query.append(("remote_node", remote_node))
855
    if dry_run:
856
      query.append(("dry-run", 1))
857

    
858
    return self._SendRequest(HTTP_POST, "/2/nodes/%s/evacuate" % node,
859
                             query, None)
860

    
861
  def MigrateNode(self, node, live=True, dry_run=False):
862
    """Migrates all primary instances from a node.
863

864
    @type node: str
865
    @param node: node to migrate
866
    @type live: bool
867
    @param live: whether to use live migration
868
    @type dry_run: bool
869
    @param dry_run: whether to perform a dry run
870

871
    @rtype: int
872
    @return: job id
873

874
    """
875
    query = []
876
    if live:
877
      query.append(("live", 1))
878
    if dry_run:
879
      query.append(("dry-run", 1))
880

    
881
    return self._SendRequest(HTTP_POST, "/2/nodes/%s/migrate" % node,
882
                             query, None)
883

    
884
  def GetNodeRole(self, node):
885
    """Gets the current role for a node.
886

887
    @type node: str
888
    @param node: node whose role to return
889

890
    @rtype: str
891
    @return: the current role for a node
892

893
    """
894
    return self._SendRequest(HTTP_GET, "/2/nodes/%s/role" % node, None, None)
895

    
896
  def SetNodeRole(self, node, role, force=False):
897
    """Sets the role for a node.
898

899
    @type node: str
900
    @param node: the node whose role to set
901
    @type role: str
902
    @param role: the role to set for the node
903
    @type force: bool
904
    @param force: whether to force the role change
905

906
    @rtype: int
907
    @return: job id
908

909
    @raise InvalidNodeRole: If an invalid node role is specified
910

911
    """
912
    if role not in VALID_NODE_ROLES:
913
      raise InvalidNodeRole("%s is not a valid node role" % role)
914

    
915
    query = [("force", force)]
916

    
917
    return self._SendRequest(HTTP_PUT, "/2/nodes/%s/role" % node,
918
                             query, role)
919

    
920
  def GetNodeStorageUnits(self, node, storage_type, output_fields):
921
    """Gets the storage units for a node.
922

923
    @type node: str
924
    @param node: the node whose storage units to return
925
    @type storage_type: str
926
    @param storage_type: storage type whose units to return
927
    @type output_fields: str
928
    @param output_fields: storage type fields to return
929

930
    @rtype: int
931
    @return: job id where results can be retrieved
932

933
    @raise InvalidStorageType: If an invalid storage type is specified
934

935
    """
936
    # TODO: Add default for storage_type & output_fields
937
    self._CheckStorageType(storage_type)
938

    
939
    query = [
940
      ("storage_type", storage_type),
941
      ("output_fields", output_fields),
942
      ]
943

    
944
    return self._SendRequest(HTTP_GET, "/2/nodes/%s/storage" % node,
945
                             query, None)
946

    
947
  def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=True):
948
    """Modifies parameters of storage units on the node.
949

950
    @type node: str
951
    @param node: node whose storage units to modify
952
    @type storage_type: str
953
    @param storage_type: storage type whose units to modify
954
    @type name: str
955
    @param name: name of the storage unit
956
    @type allocatable: bool
957
    @param allocatable: TODO: Document me
958

959
    @rtype: int
960
    @return: job id
961

962
    @raise InvalidStorageType: If an invalid storage type is specified
963

964
    """
965
    self._CheckStorageType(storage_type)
966

    
967
    query = [
968
      ("storage_type", storage_type),
969
      ("name", name),
970
      ("allocatable", allocatable),
971
      ]
972

    
973
    return self._SendRequest(HTTP_PUT, "/2/nodes/%s/storage/modify" % node,
974
                             query, None)
975

    
976
  def RepairNodeStorageUnits(self, node, storage_type, name):
977
    """Repairs a storage unit on the node.
978

979
    @type node: str
980
    @param node: node whose storage units to repair
981
    @type storage_type: str
982
    @param storage_type: storage type to repair
983
    @type name: str
984
    @param name: name of the storage unit to repair
985

986
    @rtype: int
987
    @return: job id
988

989
    @raise InvalidStorageType: If an invalid storage type is specified
990

991
    """
992
    self._CheckStorageType(storage_type)
993

    
994
    query = [
995
      ("storage_type", storage_type),
996
      ("name", name),
997
      ]
998

    
999
    return self._SendRequest(HTTP_PUT, "/2/nodes/%s/storage/repair" % node,
1000
                             query, None)
1001

    
1002
  def GetNodeTags(self, node):
1003
    """Gets the tags for a node.
1004

1005
    @type node: str
1006
    @param node: node whose tags to return
1007

1008
    @rtype: list of str
1009
    @return: tags for the node
1010

1011
    """
1012
    return self._SendRequest(HTTP_GET, "/2/nodes/%s/tags" % node, None, None)
1013

    
1014
  def AddNodeTags(self, node, tags, dry_run=False):
1015
    """Adds tags to a node.
1016

1017
    @type node: str
1018
    @param node: node to add tags to
1019
    @type tags: list of str
1020
    @param tags: tags to add to the node
1021
    @type dry_run: bool
1022
    @param dry_run: whether to perform a dry run
1023

1024
    @rtype: int
1025
    @return: job id
1026

1027
    """
1028
    query = [("tag", t) for t in tags]
1029
    if dry_run:
1030
      query.append(("dry-run", 1))
1031

    
1032
    return self._SendRequest(HTTP_PUT, "/2/nodes/%s/tags" % node,
1033
                             query, tags)
1034

    
1035
  def DeleteNodeTags(self, node, tags, dry_run=False):
1036
    """Delete tags from a node.
1037

1038
    @type node: str
1039
    @param node: node to remove tags from
1040
    @type tags: list of str
1041
    @param tags: tags to remove from the node
1042
    @type dry_run: bool
1043
    @param dry_run: whether to perform a dry run
1044

1045
    @rtype: int
1046
    @return: job id
1047

1048
    """
1049
    query = [("tag", t) for t in tags]
1050
    if dry_run:
1051
      query.append(("dry-run", 1))
1052

    
1053
    return self._SendRequest(HTTP_DELETE, "/2/nodes/%s/tags" % node,
1054
                             query, None)