Statistics
| Branch: | Revision:

root / extensions / httpnio / src / main / java / org / jclouds / http / pool / ConnectionPoolTransformingHttpCommandExecutorService.java @ 35e7942d

History | View | Annotate | Download (8.1 kB)

1
/**
2
 *
3
 * Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com>
4
 *
5
 * ====================================================================
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 * ====================================================================
18
 */
19
package org.jclouds.http.pool;
20

    
21
import static com.google.common.base.Preconditions.checkArgument;
22
import static org.jclouds.concurrent.ConcurrentUtils.makeListenable;
23

    
24
import java.net.URI;
25
import java.util.concurrent.BlockingQueue;
26
import java.util.concurrent.Callable;
27
import java.util.concurrent.CancellationException;
28
import java.util.concurrent.ConcurrentMap;
29
import java.util.concurrent.ExecutorService;
30
import java.util.concurrent.SynchronousQueue;
31
import java.util.concurrent.TimeUnit;
32
import java.util.concurrent.TimeoutException;
33

    
34
import javax.inject.Inject;
35

    
36
import org.jclouds.http.HttpCommand;
37
import org.jclouds.http.HttpCommandRendezvous;
38
import org.jclouds.http.HttpResponse;
39
import org.jclouds.http.TransformingHttpCommandExecutorService;
40
import org.jclouds.lifecycle.BaseLifeCycle;
41

    
42
import com.google.common.base.Function;
43
import com.google.common.base.Throwables;
44
import com.google.common.collect.MapMaker;
45
import com.google.common.util.concurrent.ListenableFuture;
46

    
47
/**
48
 * 
49
 * @author Adrian Cole
50
 */
51
public class ConnectionPoolTransformingHttpCommandExecutorService<C> extends BaseLifeCycle
52
         implements TransformingHttpCommandExecutorService {
53

    
54
   private final ConcurrentMap<URI, HttpCommandConnectionPool<C>> poolMap;
55
   private final BlockingQueue<HttpCommandRendezvous<?>> commandQueue;
56
   private final HttpCommandConnectionPool.Factory<C> poolFactory;
57

    
58
   @Inject
59
   public ConnectionPoolTransformingHttpCommandExecutorService(ExecutorService executor,
60
            HttpCommandConnectionPool.Factory<C> pf,
61
            BlockingQueue<HttpCommandRendezvous<?>> commandQueue) {
62
      super(executor);
63
      this.poolFactory = pf;
64
      // TODO inject this.
65
      poolMap = new MapMaker().makeComputingMap(new Function<URI, HttpCommandConnectionPool<C>>() {
66
         public HttpCommandConnectionPool<C> apply(URI endPoint) {
67
            checkArgument(endPoint.getHost() != null, String.format(
68
                     "endPoint.getHost() is null for %s", endPoint));
69
            try {
70
               HttpCommandConnectionPool<C> pool = poolFactory.create(endPoint);
71
               addDependency(pool);
72
               return pool;
73
            } catch (RuntimeException e) {
74
               logger.error(e, "error creating entry for %s", endPoint);
75
               throw e;
76
            }
77
         }
78
      });
79
      this.commandQueue = commandQueue;
80
   }
81

    
82
   /**
83
    * {@inheritDoc}
84
    * 
85
    * If the reason we are shutting down is due an exception, we set that exception on all pending
86
    * commands. Otherwise, we cancel the pending commands.
87
    */
88
   @Override
89
   protected void doShutdown() {
90
      exception.compareAndSet(null, getExceptionFromDependenciesOrNull());
91
      while (!commandQueue.isEmpty()) {
92
         HttpCommandRendezvous<?> rendezvous = (HttpCommandRendezvous<?>) commandQueue.remove();
93
         if (rendezvous != null) {
94
            try {
95
               if (exception.get() != null)
96
                  rendezvous.setException(exception.get());
97
               else
98
                  rendezvous.setException(new CancellationException("shutdown"));
99
            } catch (InterruptedException e) {
100
               logger.error(e, "Error cancelling command %s", rendezvous.getCommand());
101
            }
102
         }
103
      }
104
   }
105

    
106
   @Override
107
   protected void doWork() throws InterruptedException {
108
      takeACommandOffTheQueueAndInvokeIt();
109
   }
110

    
111
   private void takeACommandOffTheQueueAndInvokeIt() throws InterruptedException {
112
      HttpCommandRendezvous<?> rendezvous = commandQueue.poll(1, TimeUnit.SECONDS);
113
      if (rendezvous != null) {
114
         try {
115
            invoke(rendezvous);
116
         } catch (Exception e) {
117
            Throwables.propagateIfPossible(e, InterruptedException.class);
118
            logger.error(e, "Error processing command %s", rendezvous.getCommand());
119
         }
120
      }
121
   }
122

    
123
   /**
124
    * This is an asynchronous operation that puts the <code>command</code> onto a queue. Later, it
125
    * will be processed via the {@link #invoke(HttpCommandRendezvous) invoke}
126
    * method.
127
    */
128
   public <T> ListenableFuture<T> submit(HttpCommand command,
129
            final Function<HttpResponse, T> responseTransformer) {
130
      exceptionIfNotActive();
131
      final SynchronousQueue<?> channel = new SynchronousQueue<Object>();
132
      // should block and immediately parse the response on exit.
133
      ListenableFuture<T> future = makeListenable(executorService.submit(new Callable<T>() {
134
         public T call() throws Exception {
135
            Object o = channel.take();
136
            if (o instanceof Exception) {
137
               throw (Exception) o;
138
            }
139
            return responseTransformer.apply((HttpResponse) o);
140
         }
141
      }), executorService);
142

    
143
      HttpCommandRendezvous<T> rendezvous = new HttpCommandRendezvous<T>(command, channel, future);
144
      commandQueue.add(rendezvous);
145
      return rendezvous.getListenableFuture();
146
   }
147

    
148
   /**
149
    * Invoke binds a command with a connection from the pool. This binding is called a
150
    * {@link HttpCommandConnectionHandle handle}. The handle will keep this binding until the
151
    * command's response is parsed or an exception is set on the Command object.
152
    * 
153
    * @param command
154
    */
155
   protected void invoke(HttpCommandRendezvous<?> command) {
156
      exceptionIfNotActive();
157

    
158
      URI endpoint = createBaseEndpointFor(command);
159

    
160
      HttpCommandConnectionPool<C> pool = poolMap.get(endpoint);
161
      if (pool == null) {
162
         // TODO limit;
163
         logger.warn("pool not available for command %s; retrying", command.getCommand());
164
         commandQueue.add(command);
165
         return;
166
      }
167

    
168
      HttpCommandConnectionHandle<C> connectionHandle = null;
169
      try {
170
         connectionHandle = pool.getHandle(command);
171
      } catch (InterruptedException e) {
172
         logger.warn(e, "Interrupted getting a connection for command %s; retrying", command
173
                  .getCommand());
174
         commandQueue.add(command);
175
         return;
176
      } catch (TimeoutException e) {
177
         logger.warn(e, "Timeout getting a connection for command %s on pool %s; retrying", command
178
                  .getCommand(), pool);
179
         commandQueue.add(command);
180
         return;
181
      } catch (RuntimeException e) {
182
         logger.warn(e, "Error getting a connection for command %s on pool %s; retrying", command
183
                  .getCommand(), pool);
184
         discardPool(endpoint, pool);
185
         commandQueue.add(command);
186
         return;
187
      }
188

    
189
      if (connectionHandle == null) {
190
         logger.error("Failed to obtain connection for command %s; retrying", command.getCommand());
191
         commandQueue.add(command);
192
         return;
193
      }
194
      connectionHandle.startConnection();
195
   }
196

    
197
   private void discardPool(URI endpoint, HttpCommandConnectionPool<C> pool) {
198
      poolMap.remove(endpoint, pool);
199
      pool.shutdown();
200
      this.dependencies.remove(pool);
201
   }
202

    
203
   /**
204
    * keys to the map are only used for socket information, not path. In this case, you should
205
    * remove any path or query details from the URI.
206
    */
207
   private URI createBaseEndpointFor(HttpCommandRendezvous<?> command) {
208
      URI endpoint = command.getCommand().getRequest().getEndpoint();
209
      if (endpoint.getPort() == -1) {
210
         return URI.create(String.format("%s://%s", endpoint.getScheme(), endpoint.getHost()));
211
      } else {
212
         return URI.create(String.format("%s://%s:%d", endpoint.getScheme(), endpoint.getHost(),
213
                  endpoint.getPort()));
214
      }
215
   }
216

    
217
}