Merge after backout of changeset 017ec8d2e32f
[pithos] / src / gr / ebs / gss / server / ejb / indexer / IndexerMDBean.java
index 8c54245..d525f38 100644 (file)
@@ -20,17 +20,17 @@ package gr.ebs.gss.server.ejb.indexer;
 
 import static gr.ebs.gss.server.configuration.GSSConfigurationFactory.getConfiguration;
 import gr.ebs.gss.client.exceptions.ObjectNotFoundException;
+import gr.ebs.gss.server.configuration.GSSConfigurationFactory;
 import gr.ebs.gss.server.domain.FileBody;
 import gr.ebs.gss.server.domain.FileHeader;
 import gr.ebs.gss.server.domain.FileTag;
+import gr.ebs.gss.server.ejb.ExternalAPI;
 import gr.ebs.gss.server.ejb.GSSDAO;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.StringWriter;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
+import java.net.MalformedURLException;
 import java.util.StringTokenizer;
 
 import javax.ejb.ActivationConfigProperty;
@@ -43,30 +43,22 @@ import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageListener;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerConfigurationException;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpException;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.StringRequestEntity;
-import org.apache.commons.httpclient.methods.multipart.FilePart;
-import org.apache.commons.httpclient.methods.multipart.MultipartRequestEntity;
-import org.apache.commons.httpclient.methods.multipart.Part;
-import org.apache.commons.httpclient.methods.multipart.StringPart;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.jboss.ejb3.annotation.ResourceAdapter;
 
 /**
  * Message driven bean that accepts messages whenever a document is created,
@@ -74,6 +66,7 @@ import org.w3c.dom.Node;
  */
 @MessageDriven(activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                                                                        @ActivationConfigProperty(propertyName="destination", propertyValue="queue/gss-indexingQueue")})
+@ResourceAdapter("hornetq-ra.rar")
 public class IndexerMDBean implements MessageListener {
        /**
         * The logger
@@ -83,7 +76,7 @@ public class IndexerMDBean implements MessageListener {
        /**
         * EJB offering access to the JPA entity manager
         */
-       @EJB GSSDAO dao;
+       @EJB ExternalAPI service;
 
        /**
         * Decides to add or drop an item from the index depending on the message
@@ -96,200 +89,44 @@ public class IndexerMDBean implements MessageListener {
         * @param msg
         * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
         */
-       @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
+       @Override
        public void onMessage(Message msg) {
-               PostMethod method = null;
+               Long id = null;
                try {
                        MapMessage map = (MapMessage) msg;
-                       Long id = (Long) map.getObject("id");
+                       id = (Long) map.getObject("id");
                        boolean delete = map.getBoolean("delete");
-                       HttpClient httpClient = new HttpClient();
-                       if (delete) {
-                               method = new PostMethod(getConfiguration().getString("solrUpdateUrl"));
-                               String deleteXMLMsg = "<delete><id>" + id.toString() + "</id></delete>";
-                               if (logger.isDebugEnabled())
-                                       logger.debug(deleteXMLMsg);
-                               method.setRequestEntity(new StringRequestEntity(deleteXMLMsg, "text/xml", "iso8859-1"));
-                               int statusCode = httpClient.executeMethod(method);
-                               if (logger.isDebugEnabled())
-                                       logger.debug("HTTP status: " + statusCode);
-                               String response = method.getResponseBodyAsString();
-                               if (logger.isDebugEnabled())
-                                       logger.debug(response);
-
-                               method.releaseConnection();
-                               if (statusCode != 200)
-                                       throw new EJBException("Response from Solr for deleting id " + id.toString() + " had status: " + statusCode);
-                               sendCommit(httpClient, 0);
+                       Configuration config = GSSConfigurationFactory.getConfiguration();
+            CommonsHttpSolrServer solr = new CommonsHttpSolrServer(getConfiguration().getString("solr.url"));
+            if (delete) {
+                               sendDelete(solr, id);
+                solr.commit();
                        } else {
-                               FileHeader file = dao.getFileForIndexing(id);
-                               FileBody body = file.getCurrentBody();
-                               String type = null;
-                               String mime = body.getMimeType();
-                               boolean nofile = false;
-                               if (body.getFileSize() > getConfiguration().getLong("solrDocumentUploadLimitInKB") * 1024)
-                                       nofile = true;
-                               else if (mime.equals("application/pdf"))
-                                       type = "pdf";
-                               else if (mime.equals("text/plain"))
-                                       type = "text";
-                               else if (mime.equals("text/html"))
-                                       type = "html";
-                               else if (mime.equals("application/msword"))
-                                       type = "doc";
-                               else if (mime.equals("application/vnd.ms-excel"))
-                                       type = "xls";
-                               else if (mime.equals("application/vnd.ms-powerpoint"))
-                                       type = "ppt";
-                               else
-                                       nofile = true;
-                               if (!nofile) {
-                                       method = new PostMethod(getConfiguration().getString("solrUpdateRichUrl"));
-                                       List<Part> parts = new ArrayList<Part>();
-                                       parts.add(new StringPart("stream.type", type));
-                                       StringBuffer fieldnames = new StringBuffer("id,name");
-                                       if (!file.getFileTags().isEmpty())
-                                               fieldnames.append(",tag");
-                                       parts.add(new StringPart("fieldnames", fieldnames.toString()));
-                                       parts.add(new StringPart("id", id.toString()));
-                                       parts.add(new StringPart("name", tokenizeFilename(file.getName()), "UTF-8"));
-                                       for (FileTag tag : file.getFileTags())
-                                               parts.add(new StringPart("tag", tag.getTag()));
-                                       parts.add(new StringPart("stream.fieldname", "body"));
-                                       parts.add(new StringPart("commit", "true"));
-                                       parts.add(new FilePart(file.getName(), new File(body.getStoredFilePath())));
-                                       method.setRequestEntity(new MultipartRequestEntity(parts.toArray(new Part[1]), method.getParams()));
-                                       httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(5000);
-                                       if (logger.isDebugEnabled())
-                                               logger.debug("Sending rich document " + id.toString());
-                                       int statusCode = httpClient.executeMethod(method);
-                                       if (logger.isDebugEnabled())
-                                               logger.debug("HTTP status: " + statusCode);
-                                       String response = method.getResponseBodyAsString();
-                                       if (logger.isDebugEnabled())
-                                               logger.debug(response);
-                                       if (statusCode != 200)
-                                               throw new EJBException("Response from Solr for updatind id " + id.toString() + " had status: " + statusCode);
-                               } else {
-                                       DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
-                                       DocumentBuilder db = dbf.newDocumentBuilder();
-                                       Document doc = db.newDocument();
-                                       Node root = doc.createElement("add");
-                                       doc.appendChild(root);
-                                       Node docNode = doc.createElement("doc");
-                                       root.appendChild(docNode);
-                                       Element field = doc.createElement("field");
-                                       field.setAttribute("name", "id");
-                                       docNode.appendChild(field);
-                                       field.appendChild(doc.createTextNode(id.toString()));
-
-                                       field = doc.createElement("field");
-                                       field.setAttribute("name", "name");
-                                       docNode.appendChild(field);
-                                       field.appendChild(doc.createTextNode(tokenizeFilename(file.getName())));
-
-                                       for (FileTag tag : file.getFileTags()) {
-                                               field = doc.createElement("field");
-                                               field.setAttribute("name", "tag");
-                                               docNode.appendChild(field);
-                                               field.appendChild(doc.createTextNode(tag.getTag()));
-                                       }
-
-                                       TransformerFactory fact = TransformerFactory.newInstance();
-                                       Transformer trans = fact.newTransformer();
-                                       trans.setOutputProperty(OutputKeys.INDENT, "yes");
-                                       StringWriter sw = new StringWriter();
-                                       StreamResult sr = new StreamResult(sw);
-                                       DOMSource source = new DOMSource(doc);
-                                       trans.transform(source, sr);
-                                       if (logger.isDebugEnabled())
-                                               logger.debug(sw.toString());
-
-                                       method = new PostMethod(getConfiguration().getString("solrUpdateUrl"));
-                                       method.setRequestEntity(new StringRequestEntity(sw.toString(),"text/xml", "UTF-8"));
-                                       int statusCode = httpClient.executeMethod(method);
-                                       if (logger.isDebugEnabled())
-                                               logger.debug("HTTP status: " + statusCode);
-                                       String response = method.getResponseBodyAsString();
-                                       if (logger.isDebugEnabled())
-                                               logger.debug(response);
-
-                                       method.releaseConnection();
-                                       if (statusCode != 200)
-                                               throw new EJBException("Response from Solr for updating id " + id.toString() + " had status: " + statusCode);
-
-                                       sendCommit(httpClient, 0);
-                               }
-                       }
+                               service.postFileToSolr(solr, id);
+                solr.commit();
+                       }       
                }
                catch (JMSException e) {
-                       throw new EJBException(e);
-               } catch (UnsupportedEncodingException e) {
-                       throw new EJBException(e);
-               } catch (HttpException e) {
-                       throw new EJBException(e);
-               } catch (IOException e) {
-                       throw new EJBException(e);
-               } catch (ObjectNotFoundException e) {
-                       logger.warn("File not found. Indexing aborted: ", e);
-               } catch (ParserConfigurationException e) {
-                       throw new EJBException(e);
-               } catch (TransformerConfigurationException e) {
-                       throw new EJBException(e);
-               } catch (TransformerException e) {
-                       throw new EJBException(e);
+                       throw new EJBException("Error processing file ID " + id, e);
+               }
+               catch (IOException e) {
+                       throw new EJBException("Error processing file ID " + id, e);
                }
-               finally {
-                       if (method != null)
-                               method.releaseConnection();
+               catch (SolrServerException e) {
+                       throw new EJBException(e);
                }
        }
 
+
        /**
-        * Sends a commit message to the solr server
-        *
-        * @param httpClient
-        * @param retryCount If the commit fails, it is retried three times. This parameter is passed in the recursive
-        *                                      calls to stop the recursion
-        * @throws UnsupportedEncodingException
+        * Sends a delete command to solr. The id is the Long id of the indexed document
+        * 
+        * @param solr
+        * @param id
+        * @throws SolrServerException
         * @throws IOException
-        * @throws HttpException
         */
-       private void sendCommit(HttpClient httpClient, int retryCount) throws UnsupportedEncodingException, IOException, HttpException {
-               PostMethod method = null;
-               try {
-                       if (logger.isDebugEnabled())
-                               logger.debug("Commit retry: " + retryCount);
-                       method = new PostMethod(getConfiguration().getString("solrUpdateUrl"));
-                       method.setRequestEntity(new StringRequestEntity("<commit/>", "text/xml", "iso8859-1"));
-                       int statusCode = httpClient.executeMethod(method);
-                       if (logger.isDebugEnabled())
-                               logger.debug("HTTP status: " + statusCode);
-                       String response = method.getResponseBodyAsString();
-                       if (logger.isDebugEnabled())
-                               logger.debug(response);
-                       if (statusCode != 200 && retryCount < 2) {
-                               try {
-                                       Thread.sleep(10000); // Give Solr a little time to be available.
-                               } catch (InterruptedException e) {
-                               }
-                               sendCommit(httpClient, retryCount + 1);
-                       }
-               }
-               finally {
-                       if (method != null)
-                               method.releaseConnection();
-               }
-       }
-
-       private String tokenizeFilename(String filename){
-               StringBuffer result = new StringBuffer();
-               StringTokenizer tokenizer = new StringTokenizer(filename,"._");
-               while(tokenizer.hasMoreTokens()){
-                       result.append(tokenizer.nextToken());
-                       result.append(" ");
-               }
-               result.append(filename);
-               return result.toString();
+       private void sendDelete(CommonsHttpSolrServer solr, Long id)    throws SolrServerException, IOException {
+               solr.deleteById(id.toString());
        }
 }