SshRunner: Add parameter to always accept peer's SSH key
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import os
25 import logging
26 import threading
27 import errno
28 import re
29 import time
30
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import jstore
39 from ganeti import rpc
40
41
42 JOBQUEUE_THREADS = 5
43
44
45 class _QueuedOpCode(object):
46   """Encasulates an opcode object.
47
48   Access is synchronized by the '_lock' attribute.
49
50   The 'log' attribute holds the execution log and consists of tuples
51   of the form (timestamp, level, message).
52
53   """
54   def __new__(cls, *args, **kwargs):
55     obj = object.__new__(cls, *args, **kwargs)
56     # Create a special lock for logging
57     obj._log_lock = threading.Lock()
58     return obj
59
60   def __init__(self, op):
61     self.input = op
62     self.status = constants.OP_STATUS_QUEUED
63     self.result = None
64     self.log = []
65
66   @classmethod
67   def Restore(cls, state):
68     obj = _QueuedOpCode.__new__(cls)
69     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
70     obj.status = state["status"]
71     obj.result = state["result"]
72     obj.log = state["log"]
73     return obj
74
75   def Serialize(self):
76     self._log_lock.acquire()
77     try:
78       return {
79         "input": self.input.__getstate__(),
80         "status": self.status,
81         "result": self.result,
82         "log": self.log,
83         }
84     finally:
85       self._log_lock.release()
86
87   def Log(self, *args):
88     """Append a log entry.
89
90     """
91     assert len(args) < 3
92
93     if len(args) == 1:
94       log_type = constants.ELOG_MESSAGE
95       log_msg = args[0]
96     else:
97       log_type, log_msg = args
98
99     self._log_lock.acquire()
100     try:
101       # The time is split to make serialization easier and not lose more
102       # precision.
103       self.log.append((utils.SplitTime(time.time()), log_type, log_msg))
104     finally:
105       self._log_lock.release()
106
107   def RetrieveLog(self, start_at=0):
108     """Retrieve (a part of) the execution log.
109
110     """
111     self._log_lock.acquire()
112     try:
113       return self.log[start_at:]
114     finally:
115       self._log_lock.release()
116
117
118 class _QueuedJob(object):
119   """In-memory job representation.
120
121   This is what we use to track the user-submitted jobs.
122
123   """
124   def __new__(cls, *args, **kwargs):
125     obj = object.__new__(cls, *args, **kwargs)
126     # Condition to wait for changes
127     obj.change = threading.Condition()
128     return obj
129
130   def __init__(self, queue, job_id, ops):
131     if not ops:
132       # TODO
133       raise Exception("No opcodes")
134
135     self.queue = queue
136     self.id = job_id
137     self.ops = [_QueuedOpCode(op) for op in ops]
138     self.run_op_index = -1
139
140   @classmethod
141   def Restore(cls, queue, state):
142     obj = _QueuedJob.__new__(cls)
143     obj.queue = queue
144     obj.id = state["id"]
145     obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
146     obj.run_op_index = state["run_op_index"]
147     return obj
148
149   def Serialize(self):
150     return {
151       "id": self.id,
152       "ops": [op.Serialize() for op in self.ops],
153       "run_op_index": self.run_op_index,
154       }
155
156   def CalcStatus(self):
157     status = constants.JOB_STATUS_QUEUED
158
159     all_success = True
160     for op in self.ops:
161       if op.status == constants.OP_STATUS_SUCCESS:
162         continue
163
164       all_success = False
165
166       if op.status == constants.OP_STATUS_QUEUED:
167         pass
168       elif op.status == constants.OP_STATUS_RUNNING:
169         status = constants.JOB_STATUS_RUNNING
170       elif op.status == constants.OP_STATUS_ERROR:
171         status = constants.JOB_STATUS_ERROR
172         # The whole job fails if one opcode failed
173         break
174       elif op.status == constants.OP_STATUS_CANCELED:
175         status = constants.OP_STATUS_CANCELED
176         break
177
178     if all_success:
179       status = constants.JOB_STATUS_SUCCESS
180
181     return status
182
183
184 class _JobQueueWorker(workerpool.BaseWorker):
185   def RunTask(self, job):
186     """Job executor.
187
188     This functions processes a job.
189
190     """
191     logging.debug("Worker %s processing job %s",
192                   self.worker_id, job.id)
193     proc = mcpu.Processor(self.pool.queue.context)
194     queue = job.queue
195     try:
196       try:
197         count = len(job.ops)
198         for idx, op in enumerate(job.ops):
199           try:
200             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
201
202             queue.acquire()
203             try:
204               job.run_op_index = idx
205               op.status = constants.OP_STATUS_RUNNING
206               op.result = None
207               queue.UpdateJobUnlocked(job)
208
209               input_opcode = op.input
210             finally:
211               queue.release()
212
213             def _Log(*args):
214               op.Log(*args)
215
216               job.change.acquire()
217               try:
218                 job.change.notifyAll()
219               finally:
220                 job.change.release()
221
222             result = proc.ExecOpCode(input_opcode, _Log)
223
224             queue.acquire()
225             try:
226               op.status = constants.OP_STATUS_SUCCESS
227               op.result = result
228               queue.UpdateJobUnlocked(job)
229             finally:
230               queue.release()
231
232             logging.debug("Op %s/%s: Successfully finished %s",
233                           idx + 1, count, op)
234           except Exception, err:
235             queue.acquire()
236             try:
237               try:
238                 op.status = constants.OP_STATUS_ERROR
239                 op.result = str(err)
240                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
241               finally:
242                 queue.UpdateJobUnlocked(job)
243             finally:
244               queue.release()
245             raise
246
247       except errors.GenericError, err:
248         logging.exception("Ganeti exception")
249       except:
250         logging.exception("Unhandled exception")
251     finally:
252       queue.acquire()
253       try:
254         job_id = job.id
255         status = job.CalcStatus()
256       finally:
257         queue.release()
258       logging.debug("Worker %s finished job %s, status = %s",
259                     self.worker_id, job_id, status)
260
261
262 class _JobQueueWorkerPool(workerpool.WorkerPool):
263   def __init__(self, queue):
264     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
265                                               _JobQueueWorker)
266     self.queue = queue
267
268
269 class JobQueue(object):
270   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
271
272   def _RequireOpenQueue(fn):
273     """Decorator for "public" functions.
274
275     This function should be used for all "public" functions. That is, functions
276     usually called from other classes.
277
278     Important: Use this decorator only after utils.LockedMethod!
279
280     Example:
281       @utils.LockedMethod
282       @_RequireOpenQueue
283       def Example(self):
284         pass
285
286     """
287     def wrapper(self, *args, **kwargs):
288       assert self._queue_lock is not None, "Queue should be open"
289       return fn(self, *args, **kwargs)
290     return wrapper
291
292   def __init__(self, context):
293     self.context = context
294     self._memcache = {}
295     self._my_hostname = utils.HostInfo().name
296
297     # Locking
298     self._lock = threading.Lock()
299     self.acquire = self._lock.acquire
300     self.release = self._lock.release
301
302     # Initialize
303     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
304
305     # Read serial file
306     self._last_serial = jstore.ReadSerial()
307     assert self._last_serial is not None, ("Serial file was modified between"
308                                            " check in jstore and here")
309
310     # Get initial list of nodes
311     self._nodes = set(self.context.cfg.GetNodeList())
312
313     # Remove master node
314     try:
315       self._nodes.remove(self._my_hostname)
316     except ValueError:
317       pass
318
319     # TODO: Check consistency across nodes
320
321     # Setup worker pool
322     self._wpool = _JobQueueWorkerPool(self)
323
324     # We need to lock here because WorkerPool.AddTask() may start a job while
325     # we're still doing our work.
326     self.acquire()
327     try:
328       for job in self._GetJobsUnlocked(None):
329         status = job.CalcStatus()
330
331         if status in (constants.JOB_STATUS_QUEUED, ):
332           self._wpool.AddTask(job)
333
334         elif status in (constants.JOB_STATUS_RUNNING, ):
335           logging.warning("Unfinished job %s found: %s", job.id, job)
336           try:
337             for op in job.ops:
338               op.status = constants.OP_STATUS_ERROR
339               op.result = "Unclean master daemon shutdown"
340           finally:
341             self.UpdateJobUnlocked(job)
342     finally:
343       self.release()
344
345   @utils.LockedMethod
346   @_RequireOpenQueue
347   def AddNode(self, node_name):
348     assert node_name != self._my_hostname
349
350     # Clean queue directory on added node
351     rpc.call_jobqueue_purge(node_name)
352
353     # Upload the whole queue excluding archived jobs
354     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
355
356     # Upload current serial file
357     files.append(constants.JOB_QUEUE_SERIAL_FILE)
358
359     for file_name in files:
360       # Read file content
361       fd = open(file_name, "r")
362       try:
363         content = fd.read()
364       finally:
365         fd.close()
366
367       result = rpc.call_jobqueue_update([node_name], file_name, content)
368       if not result[node_name]:
369         logging.error("Failed to upload %s to %s", file_name, node_name)
370
371     self._nodes.add(node_name)
372
373   @utils.LockedMethod
374   @_RequireOpenQueue
375   def RemoveNode(self, node_name):
376     try:
377       # The queue is removed by the "leave node" RPC call.
378       self._nodes.remove(node_name)
379     except KeyError:
380       pass
381
382   def _WriteAndReplicateFileUnlocked(self, file_name, data):
383     """Writes a file locally and then replicates it to all nodes.
384
385     """
386     utils.WriteFile(file_name, data=data)
387
388     failed_nodes = 0
389     result = rpc.call_jobqueue_update(self._nodes, file_name, data)
390     for node in self._nodes:
391       if not result[node]:
392         failed_nodes += 1
393         logging.error("Copy of job queue file to node %s failed", node)
394
395     # TODO: check failed_nodes
396
397   def _RenameFileUnlocked(self, old, new):
398     os.rename(old, new)
399
400     result = rpc.call_jobqueue_rename(self._nodes, old, new)
401     for node in self._nodes:
402       if not result[node]:
403         logging.error("Moving %s to %s failed on %s", old, new, node)
404
405     # TODO: check failed nodes
406
407   def _FormatJobID(self, job_id):
408     if not isinstance(job_id, (int, long)):
409       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
410     if job_id < 0:
411       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
412
413     return str(job_id)
414
415   def _NewSerialUnlocked(self):
416     """Generates a new job identifier.
417
418     Job identifiers are unique during the lifetime of a cluster.
419
420     Returns: A string representing the job identifier.
421
422     """
423     # New number
424     serial = self._last_serial + 1
425
426     # Write to file
427     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
428                                         "%s\n" % serial)
429
430     # Keep it only if we were able to write the file
431     self._last_serial = serial
432
433     return self._FormatJobID(serial)
434
435   @staticmethod
436   def _GetJobPath(job_id):
437     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
438
439   @staticmethod
440   def _GetArchivedJobPath(job_id):
441     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
442
443   @classmethod
444   def _ExtractJobID(cls, name):
445     m = cls._RE_JOB_FILE.match(name)
446     if m:
447       return m.group(1)
448     else:
449       return None
450
451   def _GetJobIDsUnlocked(self, archived=False):
452     """Return all known job IDs.
453
454     If the parameter archived is True, archived jobs IDs will be
455     included. Currently this argument is unused.
456
457     The method only looks at disk because it's a requirement that all
458     jobs are present on disk (so in the _memcache we don't have any
459     extra IDs).
460
461     """
462     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
463     jlist.sort()
464     return jlist
465
466   def _ListJobFiles(self):
467     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
468             if self._RE_JOB_FILE.match(name)]
469
470   def _LoadJobUnlocked(self, job_id):
471     if job_id in self._memcache:
472       logging.debug("Found job %s in memcache", job_id)
473       return self._memcache[job_id]
474
475     filepath = self._GetJobPath(job_id)
476     logging.debug("Loading job from %s", filepath)
477     try:
478       fd = open(filepath, "r")
479     except IOError, err:
480       if err.errno in (errno.ENOENT, ):
481         return None
482       raise
483     try:
484       data = serializer.LoadJson(fd.read())
485     finally:
486       fd.close()
487
488     job = _QueuedJob.Restore(self, data)
489     self._memcache[job_id] = job
490     logging.debug("Added job %s to the cache", job_id)
491     return job
492
493   def _GetJobsUnlocked(self, job_ids):
494     if not job_ids:
495       job_ids = self._GetJobIDsUnlocked()
496
497     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
498
499   @utils.LockedMethod
500   @_RequireOpenQueue
501   def SubmitJob(self, ops):
502     """Create and store a new job.
503
504     This enters the job into our job queue and also puts it on the new
505     queue, in order for it to be picked up by the queue processors.
506
507     @type ops: list
508     @param ops: The list of OpCodes that will become the new job.
509
510     """
511     # Get job identifier
512     job_id = self._NewSerialUnlocked()
513     job = _QueuedJob(self, job_id, ops)
514
515     # Write to disk
516     self.UpdateJobUnlocked(job)
517
518     logging.debug("Added new job %s to the cache", job_id)
519     self._memcache[job_id] = job
520
521     # Add to worker pool
522     self._wpool.AddTask(job)
523
524     return job.id
525
526   @_RequireOpenQueue
527   def UpdateJobUnlocked(self, job):
528     filename = self._GetJobPath(job.id)
529     data = serializer.DumpJson(job.Serialize(), indent=False)
530     logging.debug("Writing job %s to %s", job.id, filename)
531     self._WriteAndReplicateFileUnlocked(filename, data)
532     self._CleanCacheUnlocked([job.id])
533
534     # Notify waiters about potential changes
535     job.change.acquire()
536     try:
537       job.change.notifyAll()
538     finally:
539       job.change.release()
540
541   def _CleanCacheUnlocked(self, exclude):
542     """Clean the memory cache.
543
544     The exceptions argument contains job IDs that should not be
545     cleaned.
546
547     """
548     assert isinstance(exclude, list)
549
550     for job in self._memcache.values():
551       if job.id in exclude:
552         continue
553       if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
554                                   constants.JOB_STATUS_RUNNING):
555         logging.debug("Cleaning job %s from the cache", job.id)
556         try:
557           del self._memcache[job.id]
558         except KeyError:
559           pass
560
561   @_RequireOpenQueue
562   def WaitForJobChanges(self, job_id, fields, previous):
563     logging.debug("Waiting for changes in job %s", job_id)
564
565     while True:
566       self.acquire()
567       try:
568         job = self._LoadJobUnlocked(job_id)
569         if not job:
570           logging.debug("Job %s not found", job_id)
571           new_state = None
572           break
573
574         new_state = self._GetJobInfoUnlocked(job, fields)
575       finally:
576         self.release()
577
578       # Serializing and deserializing data can cause type changes (e.g. from
579       # tuple to list) or precision loss. We're doing it here so that we get
580       # the same modifications as the data received from the client. Without
581       # this, the comparison afterwards might fail without the data being
582       # significantly different.
583       new_state = serializer.LoadJson(serializer.DumpJson(new_state))
584
585       if previous != new_state:
586         break
587
588       job.change.acquire()
589       try:
590         job.change.wait()
591       finally:
592         job.change.release()
593
594     logging.debug("Job %s changed", job_id)
595
596     return new_state
597
598   @utils.LockedMethod
599   @_RequireOpenQueue
600   def CancelJob(self, job_id):
601     """Cancels a job.
602
603     @type job_id: string
604     @param job_id: Job ID of job to be cancelled.
605
606     """
607     logging.debug("Cancelling job %s", job_id)
608
609     job = self._LoadJobUnlocked(job_id)
610     if not job:
611       logging.debug("Job %s not found", job_id)
612       return
613
614     if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
615       logging.debug("Job %s is no longer in the queue", job.id)
616       return
617
618     try:
619       for op in job.ops:
620         op.status = constants.OP_STATUS_ERROR
621         op.result = "Job cancelled by request"
622     finally:
623       self.UpdateJobUnlocked(job)
624
625   @utils.LockedMethod
626   @_RequireOpenQueue
627   def ArchiveJob(self, job_id):
628     """Archives a job.
629
630     @type job_id: string
631     @param job_id: Job ID of job to be archived.
632
633     """
634     logging.debug("Archiving job %s", job_id)
635
636     job = self._LoadJobUnlocked(job_id)
637     if not job:
638       logging.debug("Job %s not found", job_id)
639       return
640
641     if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
642                                 constants.JOB_STATUS_SUCCESS,
643                                 constants.JOB_STATUS_ERROR):
644       logging.debug("Job %s is not yet done", job.id)
645       return
646
647     try:
648       old = self._GetJobPath(job.id)
649       new = self._GetArchivedJobPath(job.id)
650
651       self._RenameFileUnlocked(old, new)
652
653       logging.debug("Successfully archived job %s", job.id)
654     finally:
655       # Cleaning the cache because we don't know what os.rename actually did
656       # and to be on the safe side.
657       self._CleanCacheUnlocked([])
658
659   def _GetJobInfoUnlocked(self, job, fields):
660     row = []
661     for fname in fields:
662       if fname == "id":
663         row.append(job.id)
664       elif fname == "status":
665         row.append(job.CalcStatus())
666       elif fname == "ops":
667         row.append([op.input.__getstate__() for op in job.ops])
668       elif fname == "opresult":
669         row.append([op.result for op in job.ops])
670       elif fname == "opstatus":
671         row.append([op.status for op in job.ops])
672       elif fname == "ticker":
673         ji = job.run_op_index
674         if ji < 0:
675           lmsg = None
676         else:
677           lmsg = job.ops[ji].RetrieveLog(-1)
678           # message might be empty here
679           if lmsg:
680             lmsg = lmsg[0]
681           else:
682             lmsg = None
683         row.append(lmsg)
684       else:
685         raise errors.OpExecError("Invalid job query field '%s'" % fname)
686     return row
687
688   @utils.LockedMethod
689   @_RequireOpenQueue
690   def QueryJobs(self, job_ids, fields):
691     """Returns a list of jobs in queue.
692
693     Args:
694     - job_ids: Sequence of job identifiers or None for all
695     - fields: Names of fields to return
696
697     """
698     jobs = []
699
700     for job in self._GetJobsUnlocked(job_ids):
701       if job is None:
702         jobs.append(None)
703       else:
704         jobs.append(self._GetJobInfoUnlocked(job, fields))
705
706     return jobs
707
708   @utils.LockedMethod
709   @_RequireOpenQueue
710   def Shutdown(self):
711     """Stops the job queue.
712
713     """
714     self._wpool.TerminateWorkers()
715
716     self._queue_lock.Close()
717     self._queue_lock = None