Fixed bug: When copying a versioned file, destination versioned flag wasn't set,...
[pithos] / src / gr / ebs / gss / server / ejb / indexer / IndexerMDBean.java
1 /*
2  * Copyright 2007, 2008, 2009 Electronic Business Systems Ltd.
3  *
4  * This file is part of GSS.
5  *
6  * GSS is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * GSS is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with GSS.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 package gr.ebs.gss.server.ejb.indexer;
20
21 import static gr.ebs.gss.server.configuration.GSSConfigurationFactory.getConfiguration;
22 import gr.ebs.gss.client.exceptions.ObjectNotFoundException;
23 import gr.ebs.gss.server.domain.FileBody;
24 import gr.ebs.gss.server.domain.FileHeader;
25 import gr.ebs.gss.server.domain.FileTag;
26 import gr.ebs.gss.server.ejb.GSSDAO;
27
28 import java.io.File;
29 import java.io.IOException;
30 import java.io.StringWriter;
31 import java.io.UnsupportedEncodingException;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.StringTokenizer;
35
36 import javax.ejb.ActivationConfigProperty;
37 import javax.ejb.EJB;
38 import javax.ejb.EJBException;
39 import javax.ejb.MessageDriven;
40 import javax.ejb.TransactionAttribute;
41 import javax.ejb.TransactionAttributeType;
42 import javax.jms.JMSException;
43 import javax.jms.MapMessage;
44 import javax.jms.Message;
45 import javax.jms.MessageListener;
46 import javax.xml.parsers.DocumentBuilder;
47 import javax.xml.parsers.DocumentBuilderFactory;
48 import javax.xml.parsers.ParserConfigurationException;
49 import javax.xml.transform.OutputKeys;
50 import javax.xml.transform.Transformer;
51 import javax.xml.transform.TransformerConfigurationException;
52 import javax.xml.transform.TransformerException;
53 import javax.xml.transform.TransformerFactory;
54 import javax.xml.transform.dom.DOMSource;
55 import javax.xml.transform.stream.StreamResult;
56
57 import org.apache.commons.httpclient.HttpClient;
58 import org.apache.commons.httpclient.HttpException;
59 import org.apache.commons.httpclient.methods.PostMethod;
60 import org.apache.commons.httpclient.methods.StringRequestEntity;
61 import org.apache.commons.httpclient.methods.multipart.FilePart;
62 import org.apache.commons.httpclient.methods.multipart.MultipartRequestEntity;
63 import org.apache.commons.httpclient.methods.multipart.Part;
64 import org.apache.commons.httpclient.methods.multipart.StringPart;
65 import org.apache.commons.logging.Log;
66 import org.apache.commons.logging.LogFactory;
67 import org.w3c.dom.Document;
68 import org.w3c.dom.Element;
69 import org.w3c.dom.Node;
70
71 /**
72  * Message driven bean that accepts messages whenever a document is created,
73  * modified or deleted and adds/removes the item from the search index.
74  */
75 @MessageDriven(activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
76                                                                         @ActivationConfigProperty(propertyName="destination", propertyValue="queue/gss-indexingQueue")})
77 public class IndexerMDBean implements MessageListener {
78         /**
79          * The logger
80          */
81         private static final Log logger = LogFactory.getLog(IndexerMDBean.class);
82
83         /**
84          * EJB offering access to the JPA entity manager
85          */
86         @EJB GSSDAO dao;
87
88         /**
89          * Decides to add or drop an item from the index depending on the message
90          * received
91          *
92          * It currently uses the patched solr API for rich documents. This API does not
93          * allow indexing time field boosting. For this reason we have to use the dismax search API (instead of the
94          * standard) that allows for search time field boosting
95          *
96          * @param msg
97          * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
98          */
99         @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
100         public void onMessage(Message msg) {
101                 PostMethod method = null;
102                 try {
103                         MapMessage map = (MapMessage) msg;
104                         Long id = (Long) map.getObject("id");
105                         boolean delete = map.getBoolean("delete");
106                         HttpClient httpClient = new HttpClient();
107                         if (delete) {
108                                 method = new PostMethod(getConfiguration().getString("solrUpdateUrl"));
109                                 String deleteXMLMsg = "<delete><id>" + id.toString() + "</id></delete>";
110                                 if (logger.isDebugEnabled())
111                                         logger.debug(deleteXMLMsg);
112                                 method.setRequestEntity(new StringRequestEntity(deleteXMLMsg, "text/xml", "iso8859-1"));
113                                 int statusCode = httpClient.executeMethod(method);
114                                 if (logger.isDebugEnabled())
115                                         logger.debug("HTTP status: " + statusCode);
116                                 String response = method.getResponseBodyAsString();
117                                 if (logger.isDebugEnabled())
118                                         logger.debug(response);
119
120                                 method.releaseConnection();
121                                 if (statusCode != 200)
122                                         throw new EJBException("Response from Solr for deleting id " + id.toString() + " had status: " + statusCode);
123                                 sendCommit(httpClient, 0);
124                         } else {
125                                 FileHeader file = dao.getFileForIndexing(id);
126                                 FileBody body = file.getCurrentBody();
127                                 String type = null;
128                                 String mime = body.getMimeType();
129                                 boolean nofile = false;
130                                 if (body.getFileSize() > getConfiguration().getLong("solrDocumentUploadLimitInKB") * 1024)
131                                         nofile = true;
132                                 else if (mime.equals("application/pdf"))
133                                         type = "pdf";
134                                 else if (mime.equals("text/plain"))
135                                         type = "text";
136                                 else if (mime.equals("text/html"))
137                                         type = "html";
138                                 else if (mime.equals("application/msword"))
139                                         type = "doc";
140                                 else if (mime.equals("application/vnd.ms-excel"))
141                                         type = "xls";
142                                 else if (mime.equals("application/vnd.ms-powerpoint"))
143                                         type = "ppt";
144                                 else
145                                         nofile = true;
146                                 if (!nofile) {
147                                         method = new PostMethod(getConfiguration().getString("solrUpdateRichUrl"));
148                                         List<Part> parts = new ArrayList<Part>();
149                                         parts.add(new StringPart("stream.type", type));
150                                         StringBuffer fieldnames = new StringBuffer("id,name");
151                                         if (!file.getFileTags().isEmpty())
152                                                 fieldnames.append(",tag");
153                                         parts.add(new StringPart("fieldnames", fieldnames.toString()));
154                                         parts.add(new StringPart("id", id.toString()));
155                                         parts.add(new StringPart("name", tokenizeFilename(file.getName()), "UTF-8"));
156                                         for (FileTag tag : file.getFileTags())
157                                                 parts.add(new StringPart("tag", tag.getTag()));
158                                         parts.add(new StringPart("stream.fieldname", "body"));
159                                         parts.add(new StringPart("commit", "true"));
160                                         parts.add(new FilePart(file.getName(), new File(body.getStoredFilePath())));
161                                         method.setRequestEntity(new MultipartRequestEntity(parts.toArray(new Part[1]), method.getParams()));
162                                         httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(5000);
163                                         if (logger.isDebugEnabled())
164                                                 logger.debug("Sending rich document " + id.toString());
165                                         int statusCode = httpClient.executeMethod(method);
166                                         if (logger.isDebugEnabled())
167                                                 logger.debug("HTTP status: " + statusCode);
168                                         String response = method.getResponseBodyAsString();
169                                         if (logger.isDebugEnabled())
170                                                 logger.debug(response);
171                                         if (statusCode != 200)
172                                                 throw new EJBException("Response from Solr for updatind id " + id.toString() + " had status: " + statusCode);
173                                 } else {
174                                         DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
175                                         DocumentBuilder db = dbf.newDocumentBuilder();
176                                         Document doc = db.newDocument();
177                                         Node root = doc.createElement("add");
178                                         doc.appendChild(root);
179                                         Node docNode = doc.createElement("doc");
180                                         root.appendChild(docNode);
181                                         Element field = doc.createElement("field");
182                                         field.setAttribute("name", "id");
183                                         docNode.appendChild(field);
184                                         field.appendChild(doc.createTextNode(id.toString()));
185
186                                         field = doc.createElement("field");
187                                         field.setAttribute("name", "name");
188                                         docNode.appendChild(field);
189                                         field.appendChild(doc.createTextNode(tokenizeFilename(file.getName())));
190
191                                         for (FileTag tag : file.getFileTags()) {
192                                                 field = doc.createElement("field");
193                                                 field.setAttribute("name", "tag");
194                                                 docNode.appendChild(field);
195                                                 field.appendChild(doc.createTextNode(tag.getTag()));
196                                         }
197
198                                         TransformerFactory fact = TransformerFactory.newInstance();
199                                         Transformer trans = fact.newTransformer();
200                                         trans.setOutputProperty(OutputKeys.INDENT, "yes");
201                                         StringWriter sw = new StringWriter();
202                                         StreamResult sr = new StreamResult(sw);
203                                         DOMSource source = new DOMSource(doc);
204                                         trans.transform(source, sr);
205                                         if (logger.isDebugEnabled())
206                                                 logger.debug(sw.toString());
207
208                                         method = new PostMethod(getConfiguration().getString("solrUpdateUrl"));
209                                         method.setRequestEntity(new StringRequestEntity(sw.toString(),"text/xml", "UTF-8"));
210                                         int statusCode = httpClient.executeMethod(method);
211                                         if (logger.isDebugEnabled())
212                                                 logger.debug("HTTP status: " + statusCode);
213                                         String response = method.getResponseBodyAsString();
214                                         if (logger.isDebugEnabled())
215                                                 logger.debug(response);
216
217                                         method.releaseConnection();
218                                         if (statusCode != 200)
219                                                 throw new EJBException("Response from Solr for updating id " + id.toString() + " had status: " + statusCode);
220
221                                         sendCommit(httpClient, 0);
222                                 }
223                         }
224                 }
225                 catch (JMSException e) {
226                         throw new EJBException(e);
227                 } catch (UnsupportedEncodingException e) {
228                         throw new EJBException(e);
229                 } catch (HttpException e) {
230                         throw new EJBException(e);
231                 } catch (IOException e) {
232                         throw new EJBException(e);
233                 } catch (ObjectNotFoundException e) {
234                         logger.warn("File not found. Indexing aborted: ", e);
235                 } catch (ParserConfigurationException e) {
236                         throw new EJBException(e);
237                 } catch (TransformerConfigurationException e) {
238                         throw new EJBException(e);
239                 } catch (TransformerException e) {
240                         throw new EJBException(e);
241                 }
242                 finally {
243                         if (method != null)
244                                 method.releaseConnection();
245                 }
246         }
247
248         /**
249          * Sends a commit message to the solr server
250          *
251          * @param httpClient
252          * @param retryCount If the commit fails, it is retried three times. This parameter is passed in the recursive
253          *                                      calls to stop the recursion
254          * @throws UnsupportedEncodingException
255          * @throws IOException
256          * @throws HttpException
257          */
258         private void sendCommit(HttpClient httpClient, int retryCount) throws UnsupportedEncodingException, IOException, HttpException {
259                 PostMethod method = null;
260                 try {
261                         if (logger.isDebugEnabled())
262                                 logger.debug("Commit retry: " + retryCount);
263                         method = new PostMethod(getConfiguration().getString("solrUpdateUrl"));
264                         method.setRequestEntity(new StringRequestEntity("<commit/>", "text/xml", "iso8859-1"));
265                         int statusCode = httpClient.executeMethod(method);
266                         if (logger.isDebugEnabled())
267                                 logger.debug("HTTP status: " + statusCode);
268                         String response = method.getResponseBodyAsString();
269                         if (logger.isDebugEnabled())
270                                 logger.debug(response);
271                         if (statusCode != 200 && retryCount < 2) {
272                                 try {
273                                         Thread.sleep(10000); // Give Solr a little time to be available.
274                                 } catch (InterruptedException e) {
275                                 }
276                                 sendCommit(httpClient, retryCount + 1);
277                         }
278                 }
279                 finally {
280                         if (method != null)
281                                 method.releaseConnection();
282                 }
283         }
284
285         private String tokenizeFilename(String filename){
286                 StringBuffer result = new StringBuffer();
287                 StringTokenizer tokenizer = new StringTokenizer(filename,"._");
288                 while(tokenizer.hasMoreTokens()){
289                         result.append(tokenizer.nextToken());
290                         result.append(" ");
291                 }
292                 result.append(filename);
293                 return result.toString();
294         }
295 }