Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ d9d1e541

History | View | Annotate | Download (16.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import compat
39
from ganeti import serializer
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import utils
43
from ganeti import objects
44
from ganeti import pathutils
45

    
46

    
47
KEY_METHOD = "method"
48
KEY_ARGS = "args"
49
KEY_SUCCESS = "success"
50
KEY_RESULT = "result"
51
KEY_VERSION = "version"
52

    
53
REQ_SUBMIT_JOB = "SubmitJob"
54
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = "SubmitJobToDrainedQueue"
55
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
56
REQ_PICKUP_JOB = "PickupJob"
57
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
58
REQ_CANCEL_JOB = "CancelJob"
59
REQ_ARCHIVE_JOB = "ArchiveJob"
60
REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
61
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
62
REQ_QUERY = "Query"
63
REQ_QUERY_FIELDS = "QueryFields"
64
REQ_QUERY_JOBS = "QueryJobs"
65
REQ_QUERY_INSTANCES = "QueryInstances"
66
REQ_QUERY_NODES = "QueryNodes"
67
REQ_QUERY_GROUPS = "QueryGroups"
68
REQ_QUERY_NETWORKS = "QueryNetworks"
69
REQ_QUERY_EXPORTS = "QueryExports"
70
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
71
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
72
REQ_QUERY_TAGS = "QueryTags"
73
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
74
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
75

    
76
#: List of all LUXI requests
77
REQ_ALL = compat.UniqueFrozenset([
78
  REQ_ARCHIVE_JOB,
79
  REQ_AUTO_ARCHIVE_JOBS,
80
  REQ_CANCEL_JOB,
81
  REQ_CHANGE_JOB_PRIORITY,
82
  REQ_PICKUP_JOB,
83
  REQ_QUERY,
84
  REQ_QUERY_CLUSTER_INFO,
85
  REQ_QUERY_CONFIG_VALUES,
86
  REQ_QUERY_EXPORTS,
87
  REQ_QUERY_FIELDS,
88
  REQ_QUERY_GROUPS,
89
  REQ_QUERY_INSTANCES,
90
  REQ_QUERY_JOBS,
91
  REQ_QUERY_NODES,
92
  REQ_QUERY_NETWORKS,
93
  REQ_QUERY_TAGS,
94
  REQ_SET_DRAIN_FLAG,
95
  REQ_SET_WATCHER_PAUSE,
96
  REQ_SUBMIT_JOB,
97
  REQ_SUBMIT_JOB_TO_DRAINED_QUEUE,
98
  REQ_SUBMIT_MANY_JOBS,
99
  REQ_WAIT_FOR_JOB_CHANGE,
100
  ])
101

    
102
DEF_CTMO = 10
103
DEF_RWTO = 60
104

    
105
# WaitForJobChange timeout
106
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
107

    
108

    
109
class ProtocolError(errors.LuxiError):
110
  """Denotes an error in the LUXI protocol."""
111

    
112

    
113
class ConnectionClosedError(ProtocolError):
114
  """Connection closed error."""
115

    
116

    
117
class TimeoutError(ProtocolError):
118
  """Operation timeout error."""
119

    
120

    
121
class RequestError(ProtocolError):
122
  """Error on request.
123

124
  This signifies an error in the request format or request handling,
125
  but not (e.g.) an error in starting up an instance.
126

127
  Some common conditions that can trigger this exception:
128
    - job submission failed because the job data was wrong
129
    - query failed because required fields were missing
130

131
  """
132

    
133

    
134
class NoMasterError(ProtocolError):
135
  """The master cannot be reached.
136

137
  This means that the master daemon is not running or the socket has
138
  been removed.
139

140
  """
141

    
142

    
143
class PermissionError(ProtocolError):
144
  """Permission denied while connecting to the master socket.
145

146
  This means the user doesn't have the proper rights.
147

148
  """
149

    
150

    
151
class Transport:
152
  """Low-level transport class.
153

154
  This is used on the client side.
155

156
  This could be replace by any other class that provides the same
157
  semantics to the Client. This means:
158
    - can send messages and receive messages
159
    - safe for multithreading
160

161
  """
162

    
163
  def __init__(self, address, timeouts=None):
164
    """Constructor for the Client class.
165

166
    Arguments:
167
      - address: a valid address the the used transport class
168
      - timeout: a list of timeouts, to be used on connect and read/write
169

170
    There are two timeouts used since we might want to wait for a long
171
    time for a response, but the connect timeout should be lower.
172

173
    If not passed, we use a default of 10 and respectively 60 seconds.
174

175
    Note that on reading data, since the timeout applies to an
176
    invidual receive, it might be that the total duration is longer
177
    than timeout value passed (we make a hard limit at twice the read
178
    timeout).
179

180
    """
181
    self.address = address
182
    if timeouts is None:
183
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
184
    else:
185
      self._ctimeout, self._rwtimeout = timeouts
186

    
187
    self.socket = None
188
    self._buffer = ""
189
    self._msgs = collections.deque()
190

    
191
    try:
192
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
193

    
194
      # Try to connect
195
      try:
196
        utils.Retry(self._Connect, 1.0, self._ctimeout,
197
                    args=(self.socket, address, self._ctimeout))
198
      except utils.RetryTimeout:
199
        raise TimeoutError("Connect timed out")
200

    
201
      self.socket.settimeout(self._rwtimeout)
202
    except (socket.error, NoMasterError):
203
      if self.socket is not None:
204
        self.socket.close()
205
      self.socket = None
206
      raise
207

    
208
  @staticmethod
209
  def _Connect(sock, address, timeout):
210
    sock.settimeout(timeout)
211
    try:
212
      sock.connect(address)
213
    except socket.timeout, err:
214
      raise TimeoutError("Connect timed out: %s" % str(err))
215
    except socket.error, err:
216
      error_code = err.args[0]
217
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
218
        raise NoMasterError(address)
219
      elif error_code in (errno.EPERM, errno.EACCES):
220
        raise PermissionError(address)
221
      elif error_code == errno.EAGAIN:
222
        # Server's socket backlog is full at the moment
223
        raise utils.RetryAgain()
224
      raise
225

    
226
  def _CheckSocket(self):
227
    """Make sure we are connected.
228

229
    """
230
    if self.socket is None:
231
      raise ProtocolError("Connection is closed")
232

    
233
  def Send(self, msg):
234
    """Send a message.
235

236
    This just sends a message and doesn't wait for the response.
237

238
    """
239
    if constants.LUXI_EOM in msg:
240
      raise ProtocolError("Message terminator found in payload")
241

    
242
    self._CheckSocket()
243
    try:
244
      # TODO: sendall is not guaranteed to send everything
245
      self.socket.sendall(msg + constants.LUXI_EOM)
246
    except socket.timeout, err:
247
      raise TimeoutError("Sending timeout: %s" % str(err))
248

    
249
  def Recv(self):
250
    """Try to receive a message from the socket.
251

252
    In case we already have messages queued, we just return from the
253
    queue. Otherwise, we try to read data with a _rwtimeout network
254
    timeout, and making sure we don't go over 2x_rwtimeout as a global
255
    limit.
256

257
    """
258
    self._CheckSocket()
259
    etime = time.time() + self._rwtimeout
260
    while not self._msgs:
261
      if time.time() > etime:
262
        raise TimeoutError("Extended receive timeout")
263
      while True:
264
        try:
265
          data = self.socket.recv(4096)
266
        except socket.timeout, err:
267
          raise TimeoutError("Receive timeout: %s" % str(err))
268
        except socket.error, err:
269
          if err.args and err.args[0] == errno.EAGAIN:
270
            continue
271
          raise
272
        break
273
      if not data:
274
        raise ConnectionClosedError("Connection closed while reading")
275
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
276
      self._buffer = new_msgs.pop()
277
      self._msgs.extend(new_msgs)
278
    return self._msgs.popleft()
279

    
280
  def Call(self, msg):
281
    """Send a message and wait for the response.
282

283
    This is just a wrapper over Send and Recv.
284

285
    """
286
    self.Send(msg)
287
    return self.Recv()
288

    
289
  def Close(self):
290
    """Close the socket"""
291
    if self.socket is not None:
292
      self.socket.close()
293
      self.socket = None
294

    
295

    
296
def ParseRequest(msg):
297
  """Parses a LUXI request message.
298

299
  """
300
  try:
301
    request = serializer.LoadJson(msg)
302
  except ValueError, err:
303
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
304

    
305
  logging.debug("LUXI request: %s", request)
306

    
307
  if not isinstance(request, dict):
308
    logging.error("LUXI request not a dict: %r", msg)
309
    raise ProtocolError("Invalid LUXI request (not a dict)")
310

    
311
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
312
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
313
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
314

    
315
  if method is None or args is None:
316
    logging.error("LUXI request missing method or arguments: %r", msg)
317
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
318
                         " in request): %r") % msg)
319

    
320
  return (method, args, version)
321

    
322

    
323
def ParseResponse(msg):
324
  """Parses a LUXI response message.
325

326
  """
327
  # Parse the result
328
  try:
329
    data = serializer.LoadJson(msg)
330
  except KeyboardInterrupt:
331
    raise
332
  except Exception, err:
333
    raise ProtocolError("Error while deserializing response: %s" % str(err))
334

    
335
  # Validate response
336
  if not (isinstance(data, dict) and
337
          KEY_SUCCESS in data and
338
          KEY_RESULT in data):
339
    raise ProtocolError("Invalid response from server: %r" % data)
340

    
341
  return (data[KEY_SUCCESS], data[KEY_RESULT],
342
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
343

    
344

    
345
def FormatResponse(success, result, version=None):
346
  """Formats a LUXI response message.
347

348
  """
349
  response = {
350
    KEY_SUCCESS: success,
351
    KEY_RESULT: result,
352
    }
353

    
354
  if version is not None:
355
    response[KEY_VERSION] = version
356

    
357
  logging.debug("LUXI response: %s", response)
358

    
359
  return serializer.DumpJson(response)
360

    
361

    
362
def FormatRequest(method, args, version=None):
363
  """Formats a LUXI request message.
364

365
  """
366
  # Build request
367
  request = {
368
    KEY_METHOD: method,
369
    KEY_ARGS: args,
370
    }
371

    
372
  if version is not None:
373
    request[KEY_VERSION] = version
374

    
375
  # Serialize the request
376
  return serializer.DumpJson(request)
377

    
378

    
379
def CallLuxiMethod(transport_cb, method, args, version=None):
380
  """Send a LUXI request via a transport and return the response.
381

382
  """
383
  assert callable(transport_cb)
384

    
385
  request_msg = FormatRequest(method, args, version=version)
386

    
387
  # Send request and wait for response
388
  response_msg = transport_cb(request_msg)
389

    
390
  (success, result, resp_version) = ParseResponse(response_msg)
391

    
392
  # Verify version if there was one in the response
393
  if resp_version is not None and resp_version != version:
394
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
395
                           (version, resp_version))
396

    
397
  if success:
398
    return result
399

    
400
  errors.MaybeRaise(result)
401
  raise RequestError(result)
402

    
403

    
404
class Client(object):
405
  """High-level client implementation.
406

407
  This uses a backing Transport-like class on top of which it
408
  implements data serialization/deserialization.
409

410
  """
411
  def __init__(self, address=None, timeouts=None, transport=Transport):
412
    """Constructor for the Client class.
413

414
    Arguments:
415
      - address: a valid address the the used transport class
416
      - timeout: a list of timeouts, to be used on connect and read/write
417
      - transport: a Transport-like class
418

419

420
    If timeout is not passed, the default timeouts of the transport
421
    class are used.
422

423
    """
424
    if address is None:
425
      address = pathutils.MASTER_SOCKET
426
    self.address = address
427
    self.timeouts = timeouts
428
    self.transport_class = transport
429
    self.transport = None
430
    self._InitTransport()
431

    
432
  def _InitTransport(self):
433
    """(Re)initialize the transport if needed.
434

435
    """
436
    if self.transport is None:
437
      self.transport = self.transport_class(self.address,
438
                                            timeouts=self.timeouts)
439

    
440
  def _CloseTransport(self):
441
    """Close the transport, ignoring errors.
442

443
    """
444
    if self.transport is None:
445
      return
446
    try:
447
      old_transp = self.transport
448
      self.transport = None
449
      old_transp.Close()
450
    except Exception: # pylint: disable=W0703
451
      pass
452

    
453
  def _SendMethodCall(self, data):
454
    # Send request and wait for response
455
    try:
456
      self._InitTransport()
457
      return self.transport.Call(data)
458
    except Exception:
459
      self._CloseTransport()
460
      raise
461

    
462
  def Close(self):
463
    """Close the underlying connection.
464

465
    """
466
    self._CloseTransport()
467

    
468
  def CallMethod(self, method, args):
469
    """Send a generic request and return the response.
470

471
    """
472
    if not isinstance(args, (list, tuple)):
473
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
474
                                   " expected list, got %s" % type(args))
475
    return CallLuxiMethod(self._SendMethodCall, method, args,
476
                          version=constants.LUXI_VERSION)
477

    
478
  def SetQueueDrainFlag(self, drain_flag):
479
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
480

    
481
  def SetWatcherPause(self, until):
482
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
483

    
484
  def PickupJob(self, job):
485
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
486

    
487
  def SubmitJob(self, ops):
488
    ops_state = map(lambda op: op.__getstate__(), ops)
489
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
490

    
491
  def SubmitJobToDrainedQueue(self, ops):
492
    ops_state = map(lambda op: op.__getstate__(), ops)
493
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
494

    
495
  def SubmitManyJobs(self, jobs):
496
    jobs_state = []
497
    for ops in jobs:
498
      jobs_state.append([op.__getstate__() for op in ops])
499
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
500

    
501
  def CancelJob(self, job_id):
502
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
503

    
504
  def ArchiveJob(self, job_id):
505
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
506

    
507
  def ChangeJobPriority(self, job_id, priority):
508
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
509

    
510
  def AutoArchiveJobs(self, age):
511
    timeout = (DEF_RWTO - 1) / 2
512
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
513

    
514
  def WaitForJobChangeOnce(self, job_id, fields,
515
                           prev_job_info, prev_log_serial,
516
                           timeout=WFJC_TIMEOUT):
517
    """Waits for changes on a job.
518

519
    @param job_id: Job ID
520
    @type fields: list
521
    @param fields: List of field names to be observed
522
    @type prev_job_info: None or list
523
    @param prev_job_info: Previously received job information
524
    @type prev_log_serial: None or int/long
525
    @param prev_log_serial: Highest log serial number previously received
526
    @type timeout: int/float
527
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
528
                    be capped to that value)
529

530
    """
531
    assert timeout >= 0, "Timeout can not be negative"
532
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
533
                           (job_id, fields, prev_job_info,
534
                            prev_log_serial,
535
                            min(WFJC_TIMEOUT, timeout)))
536

    
537
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
538
    while True:
539
      result = self.WaitForJobChangeOnce(job_id, fields,
540
                                         prev_job_info, prev_log_serial)
541
      if result != constants.JOB_NOTCHANGED:
542
        break
543
    return result
544

    
545
  def Query(self, what, fields, qfilter):
546
    """Query for resources/items.
547

548
    @param what: One of L{constants.QR_VIA_LUXI}
549
    @type fields: List of strings
550
    @param fields: List of requested fields
551
    @type qfilter: None or list
552
    @param qfilter: Query filter
553
    @rtype: L{objects.QueryResponse}
554

555
    """
556
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
557
    return objects.QueryResponse.FromDict(result)
558

    
559
  def QueryFields(self, what, fields):
560
    """Query for available fields.
561

562
    @param what: One of L{constants.QR_VIA_LUXI}
563
    @type fields: None or list of strings
564
    @param fields: List of requested fields
565
    @rtype: L{objects.QueryFieldsResponse}
566

567
    """
568
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
569
    return objects.QueryFieldsResponse.FromDict(result)
570

    
571
  def QueryJobs(self, job_ids, fields):
572
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
573

    
574
  def QueryInstances(self, names, fields, use_locking):
575
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
576

    
577
  def QueryNodes(self, names, fields, use_locking):
578
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
579

    
580
  def QueryGroups(self, names, fields, use_locking):
581
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
582

    
583
  def QueryNetworks(self, names, fields, use_locking):
584
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
585

    
586
  def QueryExports(self, nodes, use_locking):
587
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
588

    
589
  def QueryClusterInfo(self):
590
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
591

    
592
  def QueryConfigValues(self, fields):
593
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
594

    
595
  def QueryTags(self, kind, name):
596
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))