View Javadoc

1   /**
2    * HBaseWriter
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * Copyright (C) 2007 stack
9    *
10   * This file is part of the Heritrix web crawler (crawler.archive.org).
11   *
12   * Heritrix is free software; you can redistribute it and/or modify
13   * it under the terms of the GNU Lesser Public License as published by
14   * the Free Software Foundation; either version 2.1 of the License, or
15   * any later version.
16   *
17   * Heritrix is distributed in the hope that it will be useful,
18   * but WITHOUT ANY WARRANTY; without even the implied warranty of
19   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20   * GNU Lesser Public License for more details.
21   *
22   * You should have received a copy of the GNU Lesser Public License
23   * along with Heritrix; if not, write to the Free Software
24   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25   */
26  package com.powerset.heritrix.writer;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.IOException;
30  
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.client.HBaseAdmin;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.io.BatchUpdate;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.util.Keying;
40  import org.apache.log4j.Logger;
41  import org.archive.io.ArchiveFileConstants;
42  import org.archive.io.RecordingInputStream;
43  import org.archive.io.RecordingOutputStream;
44  import org.archive.io.ReplayInputStream;
45  import org.archive.io.WriterPoolMember;
46  import org.archive.modules.ProcessorURI;
47  
48  /**
49   * Write to HBase.
50   * Puts content into the 'content:' column and all else into the 'curi:'
51   * column family.  Makes a row key of an url transformation.  Creates table
52   * if it does not exist.
53   * 
54   * <p>Limitations: Hard-coded table schema.
55   * @author stack
56   */
57  public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
58    private final Logger LOG = Logger.getLogger(this.getClass().getName());
59    private final HTable client;
60    // TODO: make this variable configurable in the heritrix sheet: CONTENT_COLUMN_FAMILY
61    public static final String CONTENT_COLUMN_FAMILY = "content:";
62    // TODO: make this variable configurable in the heritrix sheet: CONTENT_COLUMN
63    public static final String CONTENT_COLUMN = CONTENT_COLUMN_FAMILY + "raw_data";
64    // TODO: make this variable configurable in the heritrix sheet: CURI_COLUMN_FAMILY
65    public static final String CURI_COLUMN_FAMILY = "curi:";
66    
67    private static final String IP_COLUMN = CURI_COLUMN_FAMILY + "ip";
68    private static final String PATH_FROM_SEED_COLUMN =
69      CURI_COLUMN_FAMILY + "path-from-seed";
70    private static final String IS_SEED_COLUMN = CURI_COLUMN_FAMILY + "is-seed";
71    private static final String VIA_COLUMN = CURI_COLUMN_FAMILY + "via";
72    private static final String URL_COLUMN = CURI_COLUMN_FAMILY + "url";
73    private static final String REQUEST_COLUMN = CURI_COLUMN_FAMILY + "request";
74  
75    public HBaseWriter(final String master, final String table)
76    throws IOException {
77      super(null, null, null, false, null);
78      if (table == null || table.length() <= 0) {
79        throw new IllegalArgumentException("Must specify a table name");
80      }
81      HBaseConfiguration c = new HBaseConfiguration();
82      if (master != null && master.length() > 0) {
83        c.set(HConstants.MASTER_ADDRESS, master);
84      }
85      createCrawlTable(c, table);
86      this.client = new HTable(c, table);
87    }
88  
89    protected void createCrawlTable(final HBaseConfiguration c,
90        final String table)
91    throws IOException {
92      HBaseAdmin admin = new HBaseAdmin(c);
93      if (admin.tableExists(table)) {
94        return;
95      }
96      HTableDescriptor htd = new HTableDescriptor(table);
97      htd.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
98      htd.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
99      admin.createTable(htd);
100     LOG.info("Created table " + htd.toString());
101   }
102 
103   /**
104    * @param curi URI of crawled document
105    * @param ip IP of remote machine.
106    * @param ros recording input stream that captured the response
107    * @param ris recording output stream that captured the GET request
108    */
109   public void write(final ProcessorURI curi, final String ip,
110       final RecordingOutputStream ros, final RecordingInputStream ris)
111   throws IOException {
112     String url = curi.toString();
113     String row = Keying.createKey(url);
114     if (LOG.isTraceEnabled()) {
115       LOG.trace("Writing " + url + " as " + row.toString());
116     }
117     BatchUpdate bu = new BatchUpdate(row);
118     bu.put(URL_COLUMN, Bytes.toBytes(url));
119     bu.put(IP_COLUMN, Bytes.toBytes(ip));
120     if (curi.isSeed()) {
121       // TODO: Make Bytes.toBytes that takes a boolean.
122       bu.put(IS_SEED_COLUMN, Bytes.toBytes(Boolean.TRUE.toString()));
123       if (curi.getPathFromSeed() != null
124           && curi.getPathFromSeed().trim().length() > 0) {
125         bu.put(PATH_FROM_SEED_COLUMN, Bytes.toBytes(curi.getPathFromSeed().trim()));
126       }
127     }
128     String viaStr = (curi.getVia() != null)?
129       curi.getVia().toString().trim(): null;
130       if (viaStr != null && viaStr.length() > 0) {
131         bu.put(VIA_COLUMN, Bytes.toBytes(viaStr));
132       }
133       // Request
134       if (ros.getSize() > 0) {
135         add(bu, REQUEST_COLUMN, ros.getReplayInputStream(), (int)ros.getSize());
136       }
137       // Response
138       add(bu, CONTENT_COLUMN, ris.getReplayInputStream(),
139         (int)ris.getSize());
140       // process the content (optional)
141       processContent(bu);
142       // Set crawl time.
143       bu.setTimestamp(curi.getFetchBeginTime());
144       this.client.commit(bu);
145   }
146   
147   /**
148    *  This is a stub method and is here to allow extension/overriding for 
149    *  custom content parsing, data manipulation and to populate new columns.
150    *  
151    *  For Example : html parsing, text extraction, analysis and transformation and 
152    *  storing the results in new column families/columns using the batch update object.
153    *  
154    * @param bu
155    */
156   protected void processContent(BatchUpdate bu) {
157 	  //byte[] content = bu.get(CONTENT_COLUMN);
158 	  // process content.....
159 	  //bu.put("some:new_column", someParsedByteArray);
160   }
161   
162   /*
163    * Add ReplayInputStream to the passed BatchUpdate.
164    * @param bu
165    * @param key
166    * @param ris
167    * @param baos
168    * @throws IOException
169    */
170   private void add(final BatchUpdate bu, final String key,
171     final ReplayInputStream ris, final int size)
172   throws IOException {
173     ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
174     try {
175       ris.readFullyTo(baos);
176     } finally {
177       ris.close();
178     }
179     baos.close();
180     bu.put(key, baos.toByteArray());
181   }
182 }