View Javadoc

1   /**
2    * HBaseWriter
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * This file is part of the Heritrix web crawler (crawler.archive.org).
9    *
10   * Heritrix is free software; you can redistribute it and/or modify
11   * it under the terms of the GNU Lesser Public License as published by
12   * the Free Software Foundation; either version 2.1 of the License, or
13   * any later version.
14   *
15   * Heritrix is distributed in the hope that it will be useful,
16   * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   * GNU Lesser Public License for more details.
19   *
20   * You should have received a copy of the GNU Lesser Public License
21   * along with Heritrix; if not, write to the Free Software
22   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23   */
24  package com.powerset.heritrix.writer;
25  
26  import java.io.ByteArrayOutputStream;
27  import java.io.IOException;
28  
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.client.HBaseAdmin;
34  import org.apache.hadoop.hbase.client.HTable;
35  import org.apache.hadoop.hbase.client.Put;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.Keying;
38  import org.apache.hadoop.io.IOUtils;
39  import org.apache.log4j.Logger;
40  import org.archive.io.RecordingInputStream;
41  import org.archive.io.RecordingOutputStream;
42  import org.archive.io.ReplayInputStream;
43  import org.archive.io.WriterPoolMember;
44  import org.archive.modules.ProcessorURI;
45  
46  // TODO: Auto-generated Javadoc
47  /**
48   * Write crawled content as records to an HBase table.
49   * Puts content into the 'content:raw_data' column and all else into the
50   * 'curi:' column family. Makes a row key of an url transformation. Creates
51   * table if it does not exist.
52   * 
53   * The following is a complete list of columns that get written to by default:
54   * 
55   * content:raw_data
56   * 
57   * curi:ip
58   * curi:path-from-seed
59   * curi:is-seed
60   * curi:via
61   * curi:url
62   * curi:request
63   * 
64   * <p>
65   * Limitations: Hard-coded table schema.
66   */
67  public class HBaseWriter extends WriterPoolMember {
68  	
69  	/** The LOG. */
70  	private final Logger LOG = Logger.getLogger(this.getClass().getName());
71  	
72  	/** The client. */
73  	private final HTable client;
74  	
75  	// TODO: make this variable configurable in the heritrix sheet:
76  	// CONTENT_COLUMN_FAMILY
77  	/** The Constant CONTENT_COLUMN_FAMILY. */
78  	public static final String CONTENT_COLUMN_FAMILY = "content";
79  	// TODO: make this variable configurable in the heritrix sheet:
80  	/** The Constant CONTENT_COLUMN. */
81  	public static final String CONTENT_COLUMN_NAME = "raw_data";
82  	// TODO: make this variable configurable in the heritrix sheet:
83  	// CURI_COLUMN_FAMILY
84  	/** The Constant CURI_COLUMN_FAMILY. */
85  	public static final String CURI_COLUMN_FAMILY = "curi";
86  	// TODO: make this variable configurable in the heritrix sheet:
87  	/** The Constant IP_COLUMN. */
88  	private static final String IP_COLUMN_NAME = "ip";
89  	// TODO: make this variable configurable in the heritrix sheet:
90  	/** The Constant PATH_FROM_SEED_COLUMN. */
91  	private static final String PATH_FROM_SEED_COLUMN_NAME = "path-from-seed";
92  	// TODO: make this variable configurable in the heritrix sheet:
93  	/** The Constant IS_SEED_COLUMN. */
94  	private static final String IS_SEED_COLUMN_NAME = "is-seed";
95  	// TODO: make this variable configurable in the heritrix sheet:
96  	/** The Constant VIA_COLUMN. */
97  	private static final String VIA_COLUMN_NAME = "via";
98  	// TODO: make this variable configurable in the heritrix sheet:
99  	/** The Constant URL_COLUMN. */
100 	private static final String URL_COLUMN_NAME = "url";
101 	// TODO: make this variable configurable in the heritrix sheet:
102 	/** The Constant REQUEST_COLUMN. */
103 	private static final String REQUEST_COLUMN_NAME = "request";
104 	// TODO: Add this string to HConstants
105 	/** the zk client port name, this has to match what is in hbase-site.xml for the clientPort config attribute. */
106 	private static String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
107 	
108 	/**
109 	 * Gets the HTable client.
110 	 * 
111 	 * @return the client
112 	 */
113 	public HTable getClient() {
114 		return client;
115 	}
116 	
117 	/**
118 	 * Instantiates a new HBaseWriter for the WriterPool to use in heritrix2.
119 	 * 
120 	 * @param zkQuorum the zookeeper quorum. The list of hosts that make up you zookeeper quorum.
121 	 * i.e.:  zkHost1,zkHost2,zkHost3
122 	 * @param zkClientPort the zookeeper client port that clients should try to connect on for
123 	 * servers in the zk quorum.  This value is analgous to the hase-site.xml config parameter:
124 	 * hbase.zookeeper.property.clientPort
125 	 * @param tableName the table in hbase to write to.  i.e. : webtable
126 	 * 
127 	 * @throws IOException Signals that an I/O exception has occurred.
128 	 */
129 	public HBaseWriter(final String zkQuorum, final int zkClientPort, final String tableName) throws IOException {
130 		super(null, null, null, false, null);
131 		if (tableName == null || tableName.length() <= 0) {
132 			throw new IllegalArgumentException("Must specify a table name");
133 		}
134 		HBaseConfiguration hbaseConfiguration = new HBaseConfiguration();
135 		// set the zk quorum list
136 		if (zkQuorum != null && zkQuorum.length() > 0) {
137 			LOG.info("setting zookeeper quorum to : " + zkQuorum);
138 			hbaseConfiguration.setStrings(HConstants.ZOOKEEPER_QUORUM, zkQuorum.split(","));
139 		}
140 		// set the client port
141 		if (zkClientPort > 0) {
142 			LOG.info("setting zookeeper client Port to : " + zkClientPort);
143 			hbaseConfiguration.setInt(ZOOKEEPER_CLIENT_PORT, zkClientPort);
144 		}
145 		// create a crawl table
146 		createCrawlTable(hbaseConfiguration, tableName);
147 		this.client = new HTable(hbaseConfiguration, tableName);
148 	}
149 
150 	/**
151 	 * Creates the crawl table in HBase.
152 	 * 
153 	 * @param hbaseConfiguration the c
154 	 * @param hbaseTableName the table
155 	 * 
156 	 * @throws IOException Signals that an I/O exception has occurred.
157 	 */
158 	protected void createCrawlTable(final HBaseConfiguration hbaseConfiguration, final String hbaseTableName) throws IOException {
159 		// an HBase admin object to manage hbase tables.
160 		HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConfiguration);
161 		if (hbaseAdmin.tableExists(hbaseTableName)) {
162 			boolean foundContentColumnFamily = false;
163 			boolean foundCURIColumnFamily = false;
164 			LOG.debug("Checking table: " + hbaseTableName + " for structure...");
165 			// Check the existing table and manipulate it if necessary
166 			// to conform to the pre-existing table schema.
167 			HTableDescriptor existingHBaseTable = hbaseAdmin.getTableDescriptor(Bytes.toBytes(hbaseTableName));
168 			for (HColumnDescriptor hColumnDescriptor: existingHBaseTable.getFamilies()) {
169 				if (hColumnDescriptor.getNameAsString().equalsIgnoreCase(CONTENT_COLUMN_FAMILY)) {
170 					foundContentColumnFamily = true;
171 				} else if (hColumnDescriptor.getNameAsString().equalsIgnoreCase(CURI_COLUMN_FAMILY)) {
172 					foundCURIColumnFamily = true;
173 				}
174 			}
175 			// modify the table if it's missing any of the column families.
176 			if (!foundContentColumnFamily || !foundCURIColumnFamily) {
177 				LOG.info("Disabling table: " + hbaseTableName);
178 				hbaseAdmin.disableTable(hbaseTableName);
179 				if (!foundContentColumnFamily) {
180 					LOG.info("Adding column to table: " + hbaseTableName + " column: " + CONTENT_COLUMN_FAMILY);
181 					existingHBaseTable.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));		
182 				}
183 				if (!foundCURIColumnFamily) {
184 					LOG.info("Adding column to table: " + hbaseTableName + " column: " + CURI_COLUMN_FAMILY);
185 					existingHBaseTable.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));	
186 				}
187 				LOG.info("Enabling table: " + hbaseTableName);
188 				hbaseAdmin.enableTable(hbaseTableName);	
189 			}
190 			LOG.debug("Done checking table: " + hbaseTableName);
191 		} else {
192 			// create a new hbase table
193 			LOG.info("Creating table " + hbaseTableName);
194 			HTableDescriptor newHBaseTable = new HTableDescriptor(hbaseTableName);
195 			newHBaseTable.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
196 			newHBaseTable.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
197 			// create the table
198 			hbaseAdmin.createTable(newHBaseTable);
199 			LOG.info("Created table " + newHBaseTable.toString());
200 		}
201 	}
202 
203 	/**
204 	 * Write the crawled output to the configured HBase table.
205 	 * Write each row key as the url with reverse domain and optionally process any content.
206 	 * 
207 	 * @param curi URI of crawled document
208 	 * @param ip IP of remote machine.
209 	 * @param recordingOutputStream recording input stream that captured the response
210 	 * @param recordingInputStream recording output stream that captured the GET request
211 	 * 
212 	 * @throws IOException Signals that an I/O exception has occurred.
213 	 */
214 	public void write(final ProcessorURI curi, final String ip, final RecordingOutputStream recordingOutputStream, 
215 			final RecordingInputStream recordingInputStream) throws IOException {
216 		// generate the target url of the crawled document
217 		String url = curi.toString();
218 		// create the hbase friendly rowkey
219 		String rowKey = Keying.createKey(url);
220 		if (LOG.isTraceEnabled()) {
221 			LOG.trace("Writing " + url + " as " + rowKey);
222 		}
223 		// create an hbase updateable object (the put object)
224 		// Constructor takes the rowkey as the only argument
225 		Put batchPut = new Put(Bytes.toBytes(rowKey));
226 		// write the target url to the url column
227 		batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(URL_COLUMN_NAME), curi.getFetchBeginTime(), Bytes.toBytes(url));
228 		// write the target ip to the ip column
229 		batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(IP_COLUMN_NAME), curi.getFetchBeginTime(), Bytes.toBytes(ip));
230 		// is the url part of the seed url (the initial url(s) used to start the crawl)
231 		if (curi.isSeed()) {
232 			batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(IS_SEED_COLUMN_NAME), Bytes.toBytes(Boolean.TRUE));
233 			if (curi.getPathFromSeed() != null && curi.getPathFromSeed().trim().length() > 0) {
234 				batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(PATH_FROM_SEED_COLUMN_NAME), Bytes.toBytes(curi.getPathFromSeed().trim()));
235 			}
236 		}
237 		// write the Via string
238 		String viaStr = (curi.getVia() != null) ? curi.getVia().toString().trim() : null;
239 		if (viaStr != null && viaStr.length() > 0) {
240 			batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(VIA_COLUMN_NAME), Bytes.toBytes(viaStr));
241 		}
242 		// Write the Crawl Request to the Put object
243 		if (recordingOutputStream.getSize() > 0) {
244 			batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(REQUEST_COLUMN_NAME), 
245 					getByteArrayFromInputStream(recordingOutputStream.getReplayInputStream(), (int) recordingOutputStream.getSize()));
246 		}
247 		// Write the Crawl Response to the Put object
248 		ReplayInputStream replayInputStream = recordingInputStream.getReplayInputStream();
249         try {
250         	// add the raw content to the table record.
251             batchPut.add(Bytes.toBytes(CONTENT_COLUMN_FAMILY), Bytes.toBytes(CONTENT_COLUMN_NAME),
252             		getByteArrayFromInputStream(replayInputStream, (int) recordingInputStream.getSize()));
253             // reset the input steam for the content processor.
254             replayInputStream = recordingInputStream.getReplayInputStream();
255             replayInputStream.setToResponseBodyStart();
256             // process the content (optional)
257             processContent(batchPut, replayInputStream, (int) recordingInputStream.getSize());
258             // Set crawl time as the timestamp to the Put object.
259             batchPut.setTimeStamp(curi.getFetchBeginTime());
260             // write the Put object to the HBase table
261             this.client.put(batchPut);
262         } finally {
263             IOUtils.closeStream(replayInputStream);
264         }
265 	}
266 
267 	/**
268 	 * Read the ReplayInputStream and write it to the given BatchUpdate with the given column.
269 	 * 
270 	 * @param replayInputStream the ris the cell data as a replay input stream
271 	 * @param streamSize the size
272 	 * 
273 	 * @return the byte array from input stream
274 	 * 
275 	 * @throws IOException Signals that an I/O exception has occurred.
276 	 */
277 	protected byte[] getByteArrayFromInputStream(final ReplayInputStream replayInputStream, final int streamSize) throws IOException {
278 		ByteArrayOutputStream baos = new ByteArrayOutputStream(streamSize);
279 		try {
280 			// read the InputStream to the ByteArrayOutputStream
281 			replayInputStream.readFullyTo(baos);
282 		} finally {
283 			replayInputStream.close();
284 		}
285 		baos.close();
286 		return baos.toByteArray();
287 	}
288 
289 	/**
290 	 * This is a stub method and is here to allow extension/overriding for
291 	 * custom content parsing, data manipulation and to populate new columns.
292 	 * 
293 	 * For Example : html parsing, text extraction, analysis and transformation
294 	 * and storing the results in new column families/columns using the batch
295 	 * update object. Or even saving the values in other custom hbase tables or data sources.
296 	 * 
297 	 * @param put the stateful put object containg all the row data to be written.
298 	 * @param replayInputStream the replay input stream containing the raw content gotten by heritrix crawler.
299 	 * @param streamSize the stream size
300 	 * 
301 	 * @throws IOException Signals that an I/O exception has occurred.
302 	 */
303 	protected void processContent(Put put, ReplayInputStream replayInputStream, int streamSize) throws IOException {
304 		// Below is just an example of a typical use case of overriding this method.
305 		// I.E.: The goal below is to process the raw content array and parse it to a new byte array.....
306 		// byte[] rowKey = put.getRow();
307 		// byte[] rawContent = this.getByteArrayFromInputStream(replayInputStream, streamSize)
308 		// // process rawContent and create output to store in new columns. 
309 		// byte[] someParsedByteArray = userDefinedMethondToProcessRawContent(rawContent);
310 		// put.add(Bytes.toBytes("some_column_family"), Bytes.toBytes("a_new_column_name"), someParsedByteArray);
311 	}
312 }