root / src / Ganeti / Curl / Multi.hs @ ffc18bb2
History | View | Annotate | Download (7.7 kB)
1 |
{-# LANGUAGE ForeignFunctionInterface, EmptyDataDecls #-} |
---|---|
2 |
|
3 |
{-| Ganeti-specific implementation of the Curl multi interface |
4 |
(<http://curl.haxx.se/libcurl/c/libcurl-multi.html>). |
5 |
|
6 |
TODO: Evaluate implementing and switching to |
7 |
curl_multi_socket_action(3) interface, which is deemed to be more |
8 |
performant for high-numbers of connections (but this is not the case |
9 |
for Ganeti). |
10 |
|
11 |
-} |
12 |
|
13 |
{- |
14 |
|
15 |
Copyright (C) 2013 Google Inc. |
16 |
|
17 |
This program is free software; you can redistribute it and/or modify |
18 |
it under the terms of the GNU General Public License as published by |
19 |
the Free Software Foundation; either version 2 of the License, or |
20 |
(at your option) any later version. |
21 |
|
22 |
This program is distributed in the hope that it will be useful, but |
23 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
24 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
25 |
General Public License for more details. |
26 |
|
27 |
You should have received a copy of the GNU General Public License |
28 |
along with this program; if not, write to the Free Software |
29 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
30 |
02110-1301, USA. |
31 |
|
32 |
-} |
33 |
|
34 |
module Ganeti.Curl.Multi where |
35 |
|
36 |
import Control.Concurrent |
37 |
import Control.Monad |
38 |
import Data.IORef |
39 |
import qualified Data.Map as Map |
40 |
import Foreign.C.String |
41 |
import Foreign.C.Types |
42 |
import Foreign.Marshal |
43 |
import Foreign.Ptr |
44 |
import Foreign.Storable |
45 |
import Network.Curl |
46 |
|
47 |
import Ganeti.Curl.Internal |
48 |
import Ganeti.Logging |
49 |
|
50 |
-- * Data types |
51 |
|
52 |
-- | Empty data type denoting a Curl multi handle. Naming is similar to |
53 |
-- "Network.Curl" types. |
54 |
data CurlM_ |
55 |
|
56 |
-- | Type alias for a pointer to a Curl multi handle. |
57 |
type CurlMH = Ptr CurlM_ |
58 |
|
59 |
-- | Our type alias for maps indexing 'CurlH' handles to the 'IORef' |
60 |
-- for the Curl code. |
61 |
type HandleMap = Map.Map CurlH (IORef CurlCode) |
62 |
|
63 |
-- * FFI declarations |
64 |
|
65 |
foreign import ccall |
66 |
"curl_multi_init" curl_multi_init :: IO CurlMH |
67 |
|
68 |
foreign import ccall |
69 |
"curl_multi_cleanup" curl_multi_cleanup :: CurlMH -> IO CInt |
70 |
|
71 |
foreign import ccall |
72 |
"curl_multi_add_handle" curl_multi_add_handle :: CurlMH -> CurlH -> IO CInt |
73 |
|
74 |
foreign import ccall |
75 |
"curl_multi_remove_handle" curl_multi_remove_handle :: CurlMH -> CurlH -> |
76 |
IO CInt |
77 |
|
78 |
foreign import ccall |
79 |
"curl_multi_perform" curl_multi_perform :: CurlMH -> Ptr CInt -> IO CInt |
80 |
|
81 |
foreign import ccall |
82 |
"curl_multi_info_read" curl_multi_info_read :: CurlMH -> Ptr CInt |
83 |
-> IO (Ptr CurlMsg) |
84 |
|
85 |
-- * Wrappers over FFI functions |
86 |
|
87 |
-- | Adds an easy handle to a multi handle. This is a nicer wrapper |
88 |
-- over 'curl_multi_add_handle' that fails for wrong codes. |
89 |
curlMultiAddHandle :: CurlMH -> Curl -> IO () |
90 |
curlMultiAddHandle multi easy = do |
91 |
r <- curlPrim easy $ \_ x -> curl_multi_add_handle multi x |
92 |
when (toMCode r /= CurlmOK) . |
93 |
fail $ "Failed adding easy handle to multi handle: " ++ show r |
94 |
|
95 |
-- | Nice wrapper over 'curl_multi_info_read' that massages the |
96 |
-- results into Haskell types. |
97 |
curlMultiInfoRead :: CurlMH -> IO (Maybe CurlMsg, CInt) |
98 |
curlMultiInfoRead multi = |
99 |
alloca $ \ppending -> do |
100 |
pmsg <- curl_multi_info_read multi ppending |
101 |
pending <- peek ppending |
102 |
msg <- if pmsg == nullPtr |
103 |
then return Nothing |
104 |
else Just `fmap` peek pmsg |
105 |
return (msg, pending) |
106 |
|
107 |
-- | Nice wrapper over 'curl_multi_perform'. |
108 |
curlMultiPerform :: CurlMH -> IO (CurlMCode, CInt) |
109 |
curlMultiPerform multi = |
110 |
alloca $ \running -> do |
111 |
mcode <- curl_multi_perform multi running |
112 |
running' <- peek running |
113 |
return (toMCode mcode, running') |
114 |
|
115 |
-- * Helper functions |
116 |
|
117 |
-- | Magical constant for the polling delay. This needs to be chosen such that: |
118 |
-- |
119 |
-- * we don't poll too often; a slower poll allows the RTS to schedule |
120 |
-- other threads, and let them work |
121 |
-- |
122 |
-- * we don't want to pool too slow, so that Curl gets to act on the |
123 |
-- handles that need it |
124 |
pollDelayInterval :: Int |
125 |
pollDelayInterval = 10000 |
126 |
|
127 |
-- | Writes incoming curl data to a list of strings, stored in an 'IORef'. |
128 |
writeHandle :: IORef [String] -> Ptr CChar -> CInt -> CInt -> Ptr () -> IO CInt |
129 |
writeHandle bufref cstr sz nelems _ = do |
130 |
let full_sz = sz * nelems |
131 |
hs_str <- peekCStringLen (cstr, fromIntegral full_sz) |
132 |
modifyIORef bufref (hs_str:) |
133 |
return full_sz |
134 |
|
135 |
-- | Loops and extracts all pending messages from a Curl multi handle. |
136 |
readMessages :: CurlMH -> HandleMap -> IO () |
137 |
readMessages mh hmap = do |
138 |
(cmsg, pending) <- curlMultiInfoRead mh |
139 |
case cmsg of |
140 |
Nothing -> return () |
141 |
Just (CurlMsg msg eh res) -> do |
142 |
logDebug $ "Got msg! msg " ++ show msg ++ " res " ++ show res ++ |
143 |
", " ++ show pending ++ " messages left" |
144 |
let cref = (Map.!) hmap eh |
145 |
writeIORef cref res |
146 |
_ <- curl_multi_remove_handle mh eh |
147 |
when (pending > 0) $ readMessages mh hmap |
148 |
|
149 |
-- | Loops and polls curl until there are no more remaining handles. |
150 |
performMulti :: CurlMH -> HandleMap -> CInt -> IO () |
151 |
performMulti mh hmap expected = do |
152 |
(mcode, running) <- curlMultiPerform mh |
153 |
delay <- case mcode of |
154 |
CurlmCallMultiPerform -> return $ return () |
155 |
CurlmOK -> return $ threadDelay pollDelayInterval |
156 |
code -> error $ "Received bad return code from" ++ |
157 |
"'curl_multi_perform': " ++ show code |
158 |
logDebug $ "mcode: " ++ show mcode ++ ", remaining: " ++ show running |
159 |
-- check if any handles are done and then retrieve their messages |
160 |
when (expected /= running) $ readMessages mh hmap |
161 |
-- and if we still have handles running, loop |
162 |
when (running > 0) $ delay >> performMulti mh hmap running |
163 |
|
164 |
-- | Template for the Curl error buffer. |
165 |
errorBuffer :: String |
166 |
errorBuffer = replicate errorBufferSize '\0' |
167 |
|
168 |
-- | Allocate a NULL-initialised error buffer. |
169 |
mallocErrorBuffer :: IO CString |
170 |
mallocErrorBuffer = fst `fmap` newCStringLen errorBuffer |
171 |
|
172 |
-- | Initialise a curl handle. This is just a wrapper over the |
173 |
-- "Network.Curl" function 'initialize', plus adding our options. |
174 |
makeEasyHandle :: (IORef [String], Ptr CChar, ([CurlOption], URLString)) |
175 |
-> IO Curl |
176 |
makeEasyHandle (f, e, (opts, url)) = do |
177 |
h <- initialize |
178 |
setopts h opts |
179 |
setopts h [ CurlWriteFunction (writeHandle f) |
180 |
, CurlErrorBuffer e |
181 |
, CurlURL url |
182 |
, CurlFailOnError True |
183 |
, CurlNoSignal True |
184 |
, CurlProxy "" |
185 |
] |
186 |
return h |
187 |
|
188 |
-- * Main multi-call work function |
189 |
|
190 |
-- | Perform a multi-call against a list of nodes. |
191 |
execMultiCall :: [([CurlOption], String)] -> IO [(CurlCode, String)] |
192 |
execMultiCall ous = do |
193 |
-- error buffers |
194 |
errorbufs <- mapM (const mallocErrorBuffer) ous |
195 |
-- result buffers |
196 |
outbufs <- mapM (\_ -> newIORef []) ous |
197 |
-- handles |
198 |
ehandles <- mapM makeEasyHandle $ zip3 outbufs errorbufs ous |
199 |
-- data.map holding handles to error code iorefs |
200 |
hmap <- foldM (\m h -> curlPrim h (\_ hnd -> do |
201 |
ccode <- newIORef CurlOK |
202 |
return $ Map.insert hnd ccode m |
203 |
)) Map.empty ehandles |
204 |
mh <- curl_multi_init |
205 |
mapM_ (curlMultiAddHandle mh) ehandles |
206 |
performMulti mh hmap (fromIntegral $ length ehandles) |
207 |
-- dummy code to keep the handles alive until here |
208 |
mapM_ (\h -> curlPrim h (\_ _ -> return ())) ehandles |
209 |
-- cleanup the multi handle |
210 |
mh_cleanup <- toMCode `fmap` curl_multi_cleanup mh |
211 |
when (mh_cleanup /= CurlmOK) . |
212 |
logError $ "Non-OK return from multi_cleanup: " ++ show mh_cleanup |
213 |
-- and now extract the data from the IORefs |
214 |
mapM (\(e, b, h) -> do |
215 |
s <- peekCString e |
216 |
free e |
217 |
cref <- curlPrim h (\_ hnd -> return $ (Map.!) hmap hnd) |
218 |
ccode <- readIORef cref |
219 |
result <- if ccode == CurlOK |
220 |
then (concat . reverse) `fmap` readIORef b |
221 |
else return s |
222 |
return (ccode, result) |
223 |
) $ zip3 errorbufs outbufs ehandles |