View Javadoc

1   /**
2    * HBaseWriter
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * Copyright (C) 2007 stack
9    *
10   * This file is part of the Heritrix web crawler (crawler.archive.org).
11   *
12   * Heritrix is free software; you can redistribute it and/or modify
13   * it under the terms of the GNU Lesser Public License as published by
14   * the Free Software Foundation; either version 2.1 of the License, or
15   * any later version.
16   *
17   * Heritrix is distributed in the hope that it will be useful,
18   * but WITHOUT ANY WARRANTY; without even the implied warranty of
19   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20   * GNU Lesser Public License for more details.
21   *
22   * You should have received a copy of the GNU Lesser Public License
23   * along with Heritrix; if not, write to the Free Software
24   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25   */
26  package com.powerset.heritrix.writer;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.IOException;
30  import java.util.logging.Level;
31  import java.util.logging.Logger;
32  
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.HColumnDescriptor;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.client.HBaseAdmin;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.io.BatchUpdate;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.Keying;
42  import org.archive.io.ArchiveFileConstants;
43  import org.archive.io.RecordingInputStream;
44  import org.archive.io.RecordingOutputStream;
45  import org.archive.io.ReplayInputStream;
46  import org.archive.io.WriterPoolMember;
47  import org.archive.modules.ProcessorURI;
48  
49  /**
50   * Write to HBase.
51   * Puts content into the 'content:' column and all else into the 'curi:'
52   * column family.  Makes a row key of an url transformation.  Creates table
53   * if it does not exist.
54   * 
55   * <p>Limitations: Hard-coded table schema.
56   * @author stack
57   */
58  public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
59    private final Logger LOG = Logger.getLogger(this.getClass().getName());
60    private final HTable client;
61    // TODO: make this variable configurable in the heritrix sheet: CONTENT_COLUMN_FAMILY
62    public static final String CONTENT_COLUMN_FAMILY = "content:";
63    // TODO: make this variable configurable in the heritrix sheet: CONTENT_COLUMN
64    public static final String CONTENT_COLUMN = CONTENT_COLUMN_FAMILY + "raw_data";
65    // TODO: make this variable configurable in the heritrix sheet: CURI_COLUMN_FAMILY
66    public static final String CURI_COLUMN_FAMILY = "curi:";
67    
68    private static final String IP_COLUMN = CURI_COLUMN_FAMILY + "ip";
69    private static final String PATH_FROM_SEED_COLUMN =
70      CURI_COLUMN_FAMILY + "path-from-seed";
71    private static final String IS_SEED_COLUMN = CURI_COLUMN_FAMILY + "is-seed";
72    private static final String VIA_COLUMN = CURI_COLUMN_FAMILY + "via";
73    private static final String URL_COLUMN = CURI_COLUMN_FAMILY + "url";
74    private static final String REQUEST_COLUMN = CURI_COLUMN_FAMILY + "request";
75  
76    public HBaseWriter(final String master, final String table)
77    throws IOException {
78      super(null, null, null, false, null);
79      if (table == null || table.length() <= 0) {
80        throw new IllegalArgumentException("Must specify a table name");
81      }
82      HBaseConfiguration c = new HBaseConfiguration();
83      if (master != null && master.length() > 0) {
84        c.set(HConstants.MASTER_ADDRESS, master);
85      }
86      createCrawlTable(c, table);
87      this.client = new HTable(c, table);
88    }
89  
90    protected void createCrawlTable(final HBaseConfiguration c,
91        final String table)
92    throws IOException {
93      HBaseAdmin admin = new HBaseAdmin(c);
94      if (admin.tableExists(table)) {
95        return;
96      }
97      HTableDescriptor htd = new HTableDescriptor(table);
98      htd.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
99      htd.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
100     admin.createTable(htd);
101     LOG.info("Created table " + htd.toString());
102   }
103 
104   /**
105    * @param curi URI of crawled document
106    * @param ip IP of remote machine.
107    * @param ros recording input stream that captured the response
108    * @param ris recording output stream that captured the GET request
109    */
110   public void write(final ProcessorURI curi, final String ip,
111       final RecordingOutputStream ros, final RecordingInputStream ris)
112   throws IOException {
113     String url = curi.toString();
114     String row = Keying.createKey(url);
115     if (LOG.isLoggable(Level.FINE)) {
116       LOG.fine("Writing " + url + " as " + row.toString());
117     }
118     BatchUpdate bu = new BatchUpdate(row);
119     bu.put(URL_COLUMN, Bytes.toBytes(url));
120     bu.put(IP_COLUMN, Bytes.toBytes(ip));
121     if (curi.isSeed()) {
122       // TODO: Make Bytes.toBytes that takes a boolean.
123       bu.put(IS_SEED_COLUMN, Bytes.toBytes(Boolean.TRUE.toString()));
124       if (curi.getPathFromSeed() != null
125           && curi.getPathFromSeed().trim().length() > 0) {
126         bu.put(PATH_FROM_SEED_COLUMN, Bytes.toBytes(curi.getPathFromSeed().trim()));
127       }
128     }
129     String viaStr = (curi.getVia() != null)?
130       curi.getVia().toString().trim(): null;
131       if (viaStr != null && viaStr.length() > 0) {
132         bu.put(VIA_COLUMN, Bytes.toBytes(viaStr));
133       }
134       // Request
135       if (ros.getSize() > 0) {
136         add(bu, REQUEST_COLUMN, ros.getReplayInputStream(), (int)ros.getSize());
137       }
138       // Response
139       add(bu, CONTENT_COLUMN, ris.getReplayInputStream(),
140         (int)ris.getSize());
141       // process the content (optional)
142       processContent(bu);
143       // Set crawl time.
144       bu.setTimestamp(curi.getFetchBeginTime());
145       this.client.commit(bu);
146   }
147   
148   /**
149    *  This is a stub method and is here to allow extension/overriding for 
150    *  custom content parsing, data manipulation and to populate new columns.
151    *  
152    *  For Example : html parsing, text extraction, analysis and transformation and 
153    *  storing the results in new column families/columns using the batch update object.
154    *  
155    * @param bu
156    */
157   protected void processContent(BatchUpdate bu) {
158 	  //byte[] content = bu.get(CONTENT_COLUMN);
159 	  // process content.....
160 	  //bu.put("some:new_column", someParsedByteArray);
161   }
162   
163   /*
164    * Add ReplayInputStream to the passed BatchUpdate.
165    * @param bu
166    * @param key
167    * @param ris
168    * @param baos
169    * @throws IOException
170    */
171   private void add(final BatchUpdate bu, final String key,
172     final ReplayInputStream ris, final int size)
173   throws IOException {
174     ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
175     try {
176       ris.readFullyTo(baos);
177     } finally {
178       ris.close();
179     }
180     baos.close();
181     bu.put(key, baos.toByteArray());
182   }
183 }