Revision c5f6dcdf

b/Makefile.am
805 805
	src/Ganeti/UDSServer.hs \
806 806
	src/Ganeti/Utils.hs \
807 807
	src/Ganeti/Utils/Atomic.hs \
808
	src/Ganeti/Utils/AsyncWorker.hs \
808 809
	src/Ganeti/VCluster.hs \
809 810
	src/Ganeti/WConfd/ConfigState.hs \
810 811
	src/Ganeti/WConfd/Core.hs \
b/src/Ganeti/Utils/AsyncWorker.hs
1
{-# LANGUAGE FlexibleContexts #-}
2

  
3
{-| Provides a general functionality for workers that run on the background
4
and perform some task when triggered.
5

  
6
Each task can process multiple triggers, if they're coming faster than the
7
tasks are being processed.
8

  
9
Properties:
10

  
11
- If a worked is triggered, it will perform its action eventually.
12
  (i.e. it won't miss a trigger).
13

  
14
- If the worker is busy, the new action will start immediately when it finishes
15
  the current one.
16

  
17
- If the worker is idle, it'll start the action immediately.
18

  
19
- If the caller uses 'triggerAndWait', the call will return just after the
20
  earliest action following the trigger is finished.
21

  
22
- If the worker finishes an action and there are no pending triggers since the
23
  start of the last action, it becomes idle and waits for a new trigger.
24

  
25
-}
26

  
27
{-
28

  
29
Copyright (C) 2014 Google Inc.
30

  
31
This program is free software; you can redistribute it and/or modify
32
it under the terms of the GNU General Public License as published by
33
the Free Software Foundation; either version 2 of the License, or
34
(at your option) any later version.
35

  
36
This program is distributed in the hope that it will be useful, but
37
WITHOUT ANY WARRANTY; without even the implied warranty of
38
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
39
General Public License for more details.
40

  
41
You should have received a copy of the GNU General Public License
42
along with this program; if not, write to the Free Software
43
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
44
02110-1301, USA.
45

  
46
-}
47

  
48
module Ganeti.Utils.AsyncWorker
49
  ( AsyncWorker
50
  , mkAsyncWorker
51
  , trigger
52
  , triggerAndWait
53
  , triggerAndWaitMany
54
  ) where
55

  
56
import Control.Monad
57
import Control.Monad.Base
58
import Control.Monad.Trans.Control
59
import Control.Concurrent (ThreadId)
60
import Control.Concurrent.Lifted (fork)
61
import Control.Concurrent.MVar.Lifted
62
import Data.Functor.Identity
63
import qualified Data.Traversable as T
64
import Data.IORef.Lifted
65

  
66
-- Represents the state of the requests to the worker. The worker is either
67
-- 'Idle', or has 'Pending' triggers to process. After the corresponding
68
-- action is run, all the 'MVar's in the list are notified with the result.
69
-- Note that the action needs to be run even if the list is empty, as it
70
-- means that there are pending requests, only nobody needs to be notified of
71
-- their results.
72
data TriggerState a
73
  = Idle
74
  | Pending [MVar a]
75

  
76

  
77
-- | Adds a new trigger to the current state (therefore the result is always
78
-- 'Pending'), optionally adding a 'MVar' that will receive the output.
79
addTrigger :: Maybe (MVar a) -> TriggerState a -> TriggerState a
80
addTrigger mmvar state = let rs = recipients state
81
                         in Pending $ maybe rs (: rs) mmvar
82
  where
83
    recipients Idle         = []
84
    recipients (Pending rs) = rs
85

  
86
-- | Represent an asynchronous worker whose single action execution returns a
87
-- value of type @a@.
88
data AsyncWorker a
89
    = AsyncWorker ThreadId (IORef (TriggerState a)) (MVar ())
90

  
91
-- | Given an action, construct an 'AsyncWorker'.
92
mkAsyncWorker :: (MonadBaseControl IO m) => m a -> m (AsyncWorker a)
93
mkAsyncWorker act = do
94
    trig <- newMVar ()
95
    ref <- newIORef Idle
96
    thId <- fork . forever $ do
97
        takeMVar trig           -- wait for a trigger
98
        state <- swap ref Idle  -- check the state of pending requests
99
        -- if there are pending requests, run the action and send them results
100
        case state of
101
          Idle        -> return () -- all trigers have been processed, we've
102
                                   -- been woken up by a trigger that has been
103
                                   -- already included in the last run
104
          Pending rs  -> act >>= forM_ rs . flip tryPutMVar
105
    return $ AsyncWorker thId ref trig
106
  where
107
    swap :: (MonadBase IO m) => IORef a -> a -> m a
108
    swap ref x = atomicModifyIORef ref ((,) x)
109

  
110
-- An internal function for triggering a worker, optionally registering
111
-- a callback 'MVar'
112
triggerInternal :: (MonadBase IO m)
113
                => Maybe (MVar a) -> AsyncWorker a -> m ()
114
triggerInternal mmvar (AsyncWorker _ ref trig) = do
115
    atomicModifyIORef ref (\ts -> (addTrigger mmvar ts, ()))
116
    _ <- tryPutMVar trig ()
117
    return ()
118

  
119
-- | Trigger a worker, letting it run its action asynchronously, but do not
120
-- wait for the result.
121
trigger :: (MonadBase IO m) => AsyncWorker a -> m ()
122
trigger = triggerInternal Nothing
123

  
124
-- | Trigger a list of workers and wait until all the actions following these
125
-- triggers finish. Returns the results of the actions.
126
--
127
-- Note that there is a significant difference between 'triggerAndWaitMany'
128
-- and @mapM triggerAndWait@. The latter runs all the actions of the workers
129
-- sequentially, while the former runs them in parallel.
130
triggerAndWaitMany :: (T.Traversable t, MonadBase IO m)
131
                   => t (AsyncWorker a) -> m (t a)
132
triggerAndWaitMany workers =
133
    let trig w = do
134
                  result <- newEmptyMVar
135
                  triggerInternal (Just result) w
136
                  return result
137
    in T.mapM trig workers >>= T.mapM takeMVar
138

  
139
-- | Trigger a worker and wait until the action following this trigger
140
-- finishes. Return the result of the action.
141
triggerAndWait :: (MonadBase IO m) => AsyncWorker a -> m a
142
triggerAndWait = liftM runIdentity . triggerAndWaitMany . Identity

Also available in: Unified diff