Statistics
| Branch: | Tag: | Revision:

root / src / gr / ebs / gss / server / ejb / indexer / IndexerMDBean.java @ 139f2dd5

History | View | Annotate | Download (11.2 kB)

1
/*
2
 * Copyright 2007, 2008, 2009 Electronic Business Systems Ltd.
3
 *
4
 * This file is part of GSS.
5
 *
6
 * GSS is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation, either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * GSS is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with GSS.  If not, see <http://www.gnu.org/licenses/>.
18
 */
19
package gr.ebs.gss.server.ejb.indexer;
20

    
21
import static gr.ebs.gss.server.configuration.GSSConfigurationFactory.getConfiguration;
22
import gr.ebs.gss.client.exceptions.ObjectNotFoundException;
23
import gr.ebs.gss.server.domain.FileBody;
24
import gr.ebs.gss.server.domain.FileHeader;
25
import gr.ebs.gss.server.domain.FileTag;
26
import gr.ebs.gss.server.ejb.GSSDAO;
27

    
28
import java.io.File;
29
import java.io.IOException;
30
import java.io.StringWriter;
31
import java.io.UnsupportedEncodingException;
32
import java.util.ArrayList;
33
import java.util.List;
34
import java.util.StringTokenizer;
35

    
36
import javax.ejb.ActivationConfigProperty;
37
import javax.ejb.EJB;
38
import javax.ejb.EJBException;
39
import javax.ejb.MessageDriven;
40
import javax.ejb.TransactionAttribute;
41
import javax.ejb.TransactionAttributeType;
42
import javax.jms.JMSException;
43
import javax.jms.MapMessage;
44
import javax.jms.Message;
45
import javax.jms.MessageListener;
46
import javax.xml.parsers.DocumentBuilder;
47
import javax.xml.parsers.DocumentBuilderFactory;
48
import javax.xml.parsers.ParserConfigurationException;
49
import javax.xml.transform.OutputKeys;
50
import javax.xml.transform.Transformer;
51
import javax.xml.transform.TransformerConfigurationException;
52
import javax.xml.transform.TransformerException;
53
import javax.xml.transform.TransformerFactory;
54
import javax.xml.transform.dom.DOMSource;
55
import javax.xml.transform.stream.StreamResult;
56

    
57
import org.apache.commons.httpclient.HttpClient;
58
import org.apache.commons.httpclient.HttpException;
59
import org.apache.commons.httpclient.methods.PostMethod;
60
import org.apache.commons.httpclient.methods.StringRequestEntity;
61
import org.apache.commons.httpclient.methods.multipart.FilePart;
62
import org.apache.commons.httpclient.methods.multipart.MultipartRequestEntity;
63
import org.apache.commons.httpclient.methods.multipart.Part;
64
import org.apache.commons.httpclient.methods.multipart.StringPart;
65
import org.apache.commons.logging.Log;
66
import org.apache.commons.logging.LogFactory;
67
import org.jboss.ejb3.annotation.ResourceAdapter;
68
import org.w3c.dom.Document;
69
import org.w3c.dom.Element;
70
import org.w3c.dom.Node;
71

    
72
/**
73
 * Message driven bean that accepts messages whenever a document is created,
74
 * modified or deleted and adds/removes the item from the search index.
75
 */
76
@MessageDriven(activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
77
                                                                        @ActivationConfigProperty(propertyName="destination", propertyValue="queue/gss-indexingQueue")})
78
@ResourceAdapter("hornetq-ra.rar")
79
public class IndexerMDBean implements MessageListener {
80
        /**
81
         * The logger
82
         */
83
        private static final Log logger = LogFactory.getLog(IndexerMDBean.class);
84

    
85
        /**
86
         * EJB offering access to the JPA entity manager
87
         */
88
        @EJB GSSDAO dao;
89

    
90
        /**
91
         * Decides to add or drop an item from the index depending on the message
92
         * received
93
         *
94
         * It currently uses the patched solr API for rich documents. This API does not
95
         * allow indexing time field boosting. For this reason we have to use the dismax search API (instead of the
96
         * standard) that allows for search time field boosting
97
         *
98
         * @param msg
99
         * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
100
         */
101
        @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
102
        public void onMessage(Message msg) {
103
                PostMethod method = null;
104
                try {
105
                        MapMessage map = (MapMessage) msg;
106
                        Long id = (Long) map.getObject("id");
107
                        boolean delete = map.getBoolean("delete");
108
                        HttpClient httpClient = new HttpClient();
109
                        if (delete) {
110
                                method = new PostMethod(getConfiguration().getString("solrUpdateUrl"));
111
                                String deleteXMLMsg = "<delete><id>" + id.toString() + "</id></delete>";
112
                                if (logger.isDebugEnabled())
113
                                        logger.debug(deleteXMLMsg);
114
                                method.setRequestEntity(new StringRequestEntity(deleteXMLMsg, "text/xml", "iso8859-1"));
115
                                int statusCode = httpClient.executeMethod(method);
116
                                if (logger.isDebugEnabled())
117
                                        logger.debug("HTTP status: " + statusCode);
118
                                String response = method.getResponseBodyAsString();
119
                                if (logger.isDebugEnabled())
120
                                        logger.debug(response);
121

    
122
                                method.releaseConnection();
123
                                if (statusCode != 200)
124
                                        throw new EJBException("Response from Solr for deleting id " + id.toString() + " had status: " + statusCode);
125
                                sendCommit(httpClient, 0);
126
                        } else {
127
                                FileHeader file = dao.getFileForIndexing(id);
128
                                FileBody body = file.getCurrentBody();
129
                                String type = null;
130
                                String mime = body.getMimeType();
131
                                boolean nofile = false;
132
                                if (body.getFileSize() > getConfiguration().getLong("solrDocumentUploadLimitInKB") * 1024)
133
                                        nofile = true;
134
                                else if (mime.equals("application/pdf"))
135
                                        type = "pdf";
136
                                else if (mime.equals("text/plain"))
137
                                        type = "text";
138
                                else if (mime.equals("text/html"))
139
                                        type = "html";
140
                                else if (mime.endsWith("msword"))
141
                                        type = "doc";
142
                                else if (mime.endsWith("ms-excel"))
143
                                        type = "xls";
144
                                else if (mime.endsWith("powerpoint"))
145
                                        type = "ppt";
146
                                else
147
                                        nofile = true;
148
                                if (!nofile) {
149
                                        method = new PostMethod(getConfiguration().getString("solrUpdateRichUrl"));
150
                                        List<Part> parts = new ArrayList<Part>();
151
                                        parts.add(new StringPart("stream.type", type));
152
                                        StringBuffer fieldnames = new StringBuffer("id,name");
153
                                        if (!file.getFileTags().isEmpty())
154
                                                fieldnames.append(",tag");
155
                                        parts.add(new StringPart("fieldnames", fieldnames.toString()));
156
                                        parts.add(new StringPart("id", id.toString()));
157
                                        parts.add(new StringPart("name", tokenizeFilename(file.getName()), "UTF-8"));
158
                                        for (FileTag tag : file.getFileTags())
159
                                                parts.add(new StringPart("tag", tag.getTag(), "UTF-8"));
160
                                        parts.add(new StringPart("stream.fieldname", "body"));
161
                                        parts.add(new StringPart("commit", "true"));
162
                                        parts.add(new FilePart(file.getName(), new File(body.getStoredFilePath())));
163
                                        method.setRequestEntity(new MultipartRequestEntity(parts.toArray(new Part[1]), method.getParams()));
164
                                        httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(5000);
165
                                        if (logger.isDebugEnabled())
166
                                                logger.debug("Sending rich document " + id.toString());
167
                                        int statusCode = httpClient.executeMethod(method);
168
                                        if (logger.isDebugEnabled())
169
                                                logger.debug("HTTP status: " + statusCode);
170
                                        String response = method.getResponseBodyAsString();
171
                                        if (logger.isDebugEnabled())
172
                                                logger.debug(response);
173
                                        if (statusCode != 200)
174
                                                throw new EJBException("Response from Solr for updating id " + id.toString() + " had status: " + statusCode);
175
                                } else {
176
                                        DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
177
                                        DocumentBuilder db = dbf.newDocumentBuilder();
178
                                        Document doc = db.newDocument();
179
                                        Node root = doc.createElement("add");
180
                                        doc.appendChild(root);
181
                                        Node docNode = doc.createElement("doc");
182
                                        root.appendChild(docNode);
183
                                        Element field = doc.createElement("field");
184
                                        field.setAttribute("name", "id");
185
                                        docNode.appendChild(field);
186
                                        field.appendChild(doc.createTextNode(id.toString()));
187

    
188
                                        field = doc.createElement("field");
189
                                        field.setAttribute("name", "name");
190
                                        docNode.appendChild(field);
191
                                        field.appendChild(doc.createTextNode(tokenizeFilename(file.getName())));
192

    
193
                                        for (FileTag tag : file.getFileTags()) {
194
                                                field = doc.createElement("field");
195
                                                field.setAttribute("name", "tag");
196
                                                docNode.appendChild(field);
197
                                                field.appendChild(doc.createTextNode(tag.getTag()));
198
                                        }
199

    
200
                                        TransformerFactory fact = TransformerFactory.newInstance();
201
                                        Transformer trans = fact.newTransformer();
202
                                        trans.setOutputProperty(OutputKeys.INDENT, "yes");
203
                                        StringWriter sw = new StringWriter();
204
                                        StreamResult sr = new StreamResult(sw);
205
                                        DOMSource source = new DOMSource(doc);
206
                                        trans.transform(source, sr);
207
                                        if (logger.isDebugEnabled())
208
                                                logger.debug(sw.toString());
209

    
210
                                        method = new PostMethod(getConfiguration().getString("solrUpdateUrl"));
211
                                        method.setRequestEntity(new StringRequestEntity(sw.toString(),"text/xml", "UTF-8"));
212
                                        int statusCode = httpClient.executeMethod(method);
213
                                        if (logger.isDebugEnabled())
214
                                                logger.debug("HTTP status: " + statusCode);
215
                                        String response = method.getResponseBodyAsString();
216
                                        if (logger.isDebugEnabled())
217
                                                logger.debug(response);
218

    
219
                                        method.releaseConnection();
220
                                        if (statusCode != 200)
221
                                                throw new EJBException("Response from Solr for updating id " + id.toString() + " had status: " + statusCode);
222

    
223
                                        sendCommit(httpClient, 0);
224
                                }
225
                        }
226
                }
227
                catch (JMSException e) {
228
                        throw new EJBException(e);
229
                } catch (UnsupportedEncodingException e) {
230
                        throw new EJBException(e);
231
                } catch (HttpException e) {
232
                        throw new EJBException(e);
233
                } catch (IOException e) {
234
                        throw new EJBException(e);
235
                } catch (ObjectNotFoundException e) {
236
                        logger.warn("File not found. Indexing aborted: ", e);
237
                } catch (ParserConfigurationException e) {
238
                        throw new EJBException(e);
239
                } catch (TransformerConfigurationException e) {
240
                        throw new EJBException(e);
241
                } catch (TransformerException e) {
242
                        throw new EJBException(e);
243
                }
244
                finally {
245
                        if (method != null)
246
                                method.releaseConnection();
247
                }
248
        }
249

    
250
        /**
251
         * Sends a commit message to the solr server
252
         *
253
         * @param httpClient
254
         * @param retryCount If the commit fails, it is retried three times. This parameter is passed in the recursive
255
         *                                         calls to stop the recursion
256
         * @throws UnsupportedEncodingException
257
         * @throws IOException
258
         * @throws HttpException
259
         */
260
        private void sendCommit(HttpClient httpClient, int retryCount) throws UnsupportedEncodingException, IOException, HttpException {
261
                PostMethod method = null;
262
                try {
263
                        if (logger.isDebugEnabled())
264
                                logger.debug("Commit retry: " + retryCount);
265
                        method = new PostMethod(getConfiguration().getString("solrUpdateUrl"));
266
                        method.setRequestEntity(new StringRequestEntity("<commit/>", "text/xml", "iso8859-1"));
267
                        int statusCode = httpClient.executeMethod(method);
268
                        if (logger.isDebugEnabled())
269
                                logger.debug("HTTP status: " + statusCode);
270
                        String response = method.getResponseBodyAsString();
271
                        if (logger.isDebugEnabled())
272
                                logger.debug(response);
273
                        if (statusCode != 200 && retryCount < 2) {
274
                                try {
275
                                        Thread.sleep(10000); // Give Solr a little time to be available.
276
                                } catch (InterruptedException e) {
277
                                }
278
                                sendCommit(httpClient, retryCount + 1);
279
                        }
280
                }
281
                finally {
282
                        if (method != null)
283
                                method.releaseConnection();
284
                }
285
        }
286

    
287
        private String tokenizeFilename(String filename){
288
                StringBuffer result = new StringBuffer();
289
                StringTokenizer tokenizer = new StringTokenizer(filename,"._");
290
                while(tokenizer.hasMoreTokens()){
291
                        result.append(tokenizer.nextToken());
292
                        result.append(" ");
293
                }
294
                result.append(filename);
295
                return result.toString();
296
        }
297
}