Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 1dec44b2

History | View | Annotate | Download (7.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the LUXI protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
from ganeti import constants
33
from ganeti import objects
34
import ganeti.rpc.client as cl
35
from ganeti.rpc.errors import RequestError
36
from ganeti.rpc.transport import Transport
37

    
38
__all__ = [
39
  # classes:
40
  "Client"
41
  ]
42

    
43
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
44
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
45
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
46
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
47
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
48
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
49
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
50
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
51
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
52
REQ_QUERY = constants.LUXI_REQ_QUERY
53
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
54
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
55
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
56
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
57
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
58
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
59
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
60
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
61
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
62
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
63
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
64
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
65
REQ_ALL = constants.LUXI_REQ_ALL
66

    
67
DEF_RWTO = constants.LUXI_DEF_RWTO
68
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
69

    
70

    
71
class Client(cl.AbstractClient):
72
  """High-level client implementation.
73

74
  This uses a backing Transport-like class on top of which it
75
  implements data serialization/deserialization.
76

77
  """
78
  def __init__(self, address=None, timeouts=None, transport=Transport):
79
    """Constructor for the Client class.
80

81
    Arguments are the same as for L{AbstractClient}.
82

83
    """
84
    super(Client, self).__init__(address, timeouts, transport)
85
    # Override the version of the protocol:
86
    self.version = constants.LUXI_VERSION
87

    
88
  def SetQueueDrainFlag(self, drain_flag):
89
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
90

    
91
  def SetWatcherPause(self, until):
92
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
93

    
94
  def PickupJob(self, job):
95
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
96

    
97
  def SubmitJob(self, ops):
98
    ops_state = map(lambda op: op.__getstate__(), ops)
99
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
100

    
101
  def SubmitJobToDrainedQueue(self, ops):
102
    ops_state = map(lambda op: op.__getstate__(), ops)
103
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
104

    
105
  def SubmitManyJobs(self, jobs):
106
    jobs_state = []
107
    for ops in jobs:
108
      jobs_state.append([op.__getstate__() for op in ops])
109
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
110

    
111
  @staticmethod
112
  def _PrepareJobId(request_name, job_id):
113
    try:
114
      return int(job_id)
115
    except ValueError:
116
      raise RequestError("Invalid parameter passed to %s as job id: "
117
                         " expected integer, got value %s" %
118
                         (request_name, job_id))
119

    
120
  def CancelJob(self, job_id):
121
    job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id)
122
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
123

    
124
  def ArchiveJob(self, job_id):
125
    job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id)
126
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
127

    
128
  def ChangeJobPriority(self, job_id, priority):
129
    job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id)
130
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
131

    
132
  def AutoArchiveJobs(self, age):
133
    timeout = (DEF_RWTO - 1) / 2
134
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
135

    
136
  def WaitForJobChangeOnce(self, job_id, fields,
137
                           prev_job_info, prev_log_serial,
138
                           timeout=WFJC_TIMEOUT):
139
    """Waits for changes on a job.
140

141
    @param job_id: Job ID
142
    @type fields: list
143
    @param fields: List of field names to be observed
144
    @type prev_job_info: None or list
145
    @param prev_job_info: Previously received job information
146
    @type prev_log_serial: None or int/long
147
    @param prev_log_serial: Highest log serial number previously received
148
    @type timeout: int/float
149
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
150
                    be capped to that value)
151

152
    """
153
    assert timeout >= 0, "Timeout can not be negative"
154
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
155
                           (job_id, fields, prev_job_info,
156
                            prev_log_serial,
157
                            min(WFJC_TIMEOUT, timeout)))
158

    
159
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
160
    job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id)
161
    while True:
162
      result = self.WaitForJobChangeOnce(job_id, fields,
163
                                         prev_job_info, prev_log_serial)
164
      if result != constants.JOB_NOTCHANGED:
165
        break
166
    return result
167

    
168
  def Query(self, what, fields, qfilter):
169
    """Query for resources/items.
170

171
    @param what: One of L{constants.QR_VIA_LUXI}
172
    @type fields: List of strings
173
    @param fields: List of requested fields
174
    @type qfilter: None or list
175
    @param qfilter: Query filter
176
    @rtype: L{objects.QueryResponse}
177

178
    """
179
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
180
    return objects.QueryResponse.FromDict(result)
181

    
182
  def QueryFields(self, what, fields):
183
    """Query for available fields.
184

185
    @param what: One of L{constants.QR_VIA_LUXI}
186
    @type fields: None or list of strings
187
    @param fields: List of requested fields
188
    @rtype: L{objects.QueryFieldsResponse}
189

190
    """
191
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
192
    return objects.QueryFieldsResponse.FromDict(result)
193

    
194
  def QueryJobs(self, job_ids, fields):
195
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
196

    
197
  def QueryInstances(self, names, fields, use_locking):
198
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
199

    
200
  def QueryNodes(self, names, fields, use_locking):
201
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
202

    
203
  def QueryGroups(self, names, fields, use_locking):
204
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
205

    
206
  def QueryNetworks(self, names, fields, use_locking):
207
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
208

    
209
  def QueryExports(self, nodes, use_locking):
210
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
211

    
212
  def QueryClusterInfo(self):
213
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
214

    
215
  def QueryConfigValues(self, fields):
216
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
217

    
218
  def QueryTags(self, kind, name):
219
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))