View Javadoc

1   /**
2    * HBaseWriter
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * This file is part of the Heritrix web crawler (crawler.archive.org).
9    *
10   * Heritrix is free software; you can redistribute it and/or modify
11   * it under the terms of the GNU Lesser Public License as published by
12   * the Free Software Foundation; either version 2.1 of the License, or
13   * any later version.
14   *
15   * Heritrix is distributed in the hope that it will be useful,
16   * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   * GNU Lesser Public License for more details.
19   *
20   * You should have received a copy of the GNU Lesser Public License
21   * along with Heritrix; if not, write to the Free Software
22   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23   */
24  package com.powerset.heritrix.writer;
25  
26  import java.io.ByteArrayOutputStream;
27  import java.io.IOException;
28  
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.client.HBaseAdmin;
34  import org.apache.hadoop.hbase.client.HTable;
35  import org.apache.hadoop.hbase.client.Put;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.Keying;
38  import org.apache.log4j.Logger;
39  import org.archive.io.ArchiveFileConstants;
40  import org.archive.io.RecordingInputStream;
41  import org.archive.io.RecordingOutputStream;
42  import org.archive.io.ReplayInputStream;
43  import org.archive.io.WriterPoolMember;
44  import org.archive.modules.ProcessorURI;
45  
46  /**
47   * Write crawled content as records to an HBase table. 
48   * Puts content into the 'content:raw_data' column and all else into the
49   * 'curi:' column family. Makes a row key of an url transformation. Creates
50   * table if it does not exist.
51   * 
52  	The following is a complete list of columns that get written to by default:
53  	
54  	content:raw_data
55  	
56  	curi:ip
57  	curi:path-from-seed
58  	curi:is-seed
59  	curi:via
60  	curi:url
61  	curi:request
62  	
63   * 
64   * <p>
65   * Limitations: Hard-coded table schema.  
66   */
67  public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
68  	
69  	/** The LOG. */
70  	private final Logger LOG = Logger.getLogger(this.getClass().getName());
71  	
72  	/** The client. */
73  	private final HTable client;
74  	// TODO: make this variable configurable in the heritrix sheet:
75  	// CONTENT_COLUMN_FAMILY
76  	/** The Constant CONTENT_COLUMN_FAMILY. */
77  	public static final String CONTENT_COLUMN_FAMILY = "content";
78  	// TODO: make this variable configurable in the heritrix sheet:
79  	/** The Constant CONTENT_COLUMN. */
80  	public static final String CONTENT_COLUMN_NAME = "raw_data";
81  	// TODO: make this variable configurable in the heritrix sheet:
82  	// CURI_COLUMN_FAMILY
83  	/** The Constant CURI_COLUMN_FAMILY. */
84  	public static final String CURI_COLUMN_FAMILY = "curi";
85  	// TODO: make this variable configurable in the heritrix sheet:
86  	/** The Constant IP_COLUMN. */
87  	private static final String IP_COLUMN_NAME = "ip";
88  	// TODO: make this variable configurable in the heritrix sheet:
89  	/** The Constant PATH_FROM_SEED_COLUMN. */
90  	private static final String PATH_FROM_SEED_COLUMN_NAME = "path-from-seed";
91  	// TODO: make this variable configurable in the heritrix sheet:
92  	/** The Constant IS_SEED_COLUMN. */
93  	private static final String IS_SEED_COLUMN_NAME = "is-seed";
94  	// TODO: make this variable configurable in the heritrix sheet:
95  	/** The Constant VIA_COLUMN. */
96  	private static final String VIA_COLUMN_NAME = "via";
97  	// TODO: make this variable configurable in the heritrix sheet:
98  	/** The Constant URL_COLUMN. */
99  	private static final String URL_COLUMN_NAME = "url";
100 	// TODO: make this variable configurable in the heritrix sheet:
101 	/** The Constant REQUEST_COLUMN. */
102 	private static final String REQUEST_COLUMN_NAME = "request";
103 
104 	/**
105 	 * Gets the HTable client.
106 	 * 
107 	 * @return the client
108 	 */
109 	public HTable getClient() {
110 		return client;
111 	}
112 	
113 	/**
114 	 * Instantiates a new HBaseWriter for the WriterPool to use in heritrix2.
115 	 * 
116 	 * @param zkQuorum 
117 	 * 		the zookeeper quorum. The list of hosts that make up you zookeeper quorum.  
118 	 * 		i.e.:  zkHost1,zkHost2,zkHost3  
119 	 * @param tableName 
120 	 * 		the table in hbase to write to.  i.e. : webtable
121 	 * 
122 	 * @throws IOException Signals that an I/O exception has occurred.
123 	 */
124 	public HBaseWriter(final String zkQuorum, final int zkClientPort, final String tableName) throws IOException {
125 		super(null, null, null, false, null);
126 		if (tableName == null || tableName.length() <= 0) {
127 			throw new IllegalArgumentException("Must specify a table name");
128 		}
129 		HBaseConfiguration hbaseConfiguration = new HBaseConfiguration();
130 		if (zkQuorum != null && zkQuorum.length() > 0) {
131 			LOG.info("setting zookeeper quorum to : " + zkQuorum);
132 			hbaseConfiguration.setStrings(HConstants.ZOOKEEPER_QUORUM, zkQuorum.split(","));
133 		}
134 		LOG.debug("zookeeper quorum value: " + hbaseConfiguration.get(HConstants.ZOOKEEPER_QUORUM));
135 		if (zkClientPort > 0) {
136 			LOG.info("setting zookeeper client Port to : " + zkClientPort);
137 			// TODO: Add this string to HConstants
138 			hbaseConfiguration.setInt("hbase.zookeeper.property.clientPort", zkClientPort);
139 		}
140 		LOG.debug("zookeeper quorum value: " + hbaseConfiguration.get(HConstants.ZOOKEEPER_QUORUM));
141 		createCrawlTable(hbaseConfiguration, tableName);
142 		this.client = new HTable(hbaseConfiguration, tableName);
143 	}
144 
145 	/**
146 	 * Creates the crawl table in HBase.
147 	 * 
148 	 * @param hbaseConfiguration the c
149 	 * @param hbaseTableName the table
150 	 * 
151 	 * @throws IOException Signals that an I/O exception has occurred.
152 	 */
153 	protected void createCrawlTable(final HBaseConfiguration hbaseConfiguration, final String hbaseTableName) throws IOException {
154 		// an HBase admin object to manage hbase tables.
155 		HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConfiguration);
156 		if (hbaseAdmin.tableExists(hbaseTableName)) {
157 			boolean foundContentColumnFamily = false;
158 			boolean foundCURIColumnFamily = false;
159 			LOG.debug("Checking table: " + hbaseTableName + " for structure...");
160 			// Check the existing table and manipulate it if necessary
161 			// to conform to the pre-existing table schema.
162 			HTableDescriptor existingHBaseTable = hbaseAdmin.getTableDescriptor(Bytes.toBytes(hbaseTableName));
163 			for (HColumnDescriptor hColumnDescriptor: existingHBaseTable.getFamilies()) {
164 				if (hColumnDescriptor.getNameAsString().equalsIgnoreCase(CONTENT_COLUMN_FAMILY)) {
165 					foundContentColumnFamily = true;
166 				} else if (hColumnDescriptor.getNameAsString().equalsIgnoreCase(CURI_COLUMN_FAMILY)) {
167 					foundCURIColumnFamily = true;
168 				}
169 			}
170 			// modify the table if it's missing any of the column families.
171 			if (!foundContentColumnFamily || !foundCURIColumnFamily) {
172 				LOG.info("Disabling table: " + hbaseTableName);
173 				hbaseAdmin.disableTable(hbaseTableName);
174 				if (!foundContentColumnFamily) {
175 					LOG.info("Adding column to table: " + hbaseTableName + " column: " + CONTENT_COLUMN_FAMILY);
176 					existingHBaseTable.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));		
177 				}
178 				if (!foundCURIColumnFamily) {
179 					LOG.info("Adding column to table: " + hbaseTableName + " column: " + CURI_COLUMN_FAMILY);
180 					existingHBaseTable.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));	
181 				}
182 				LOG.info("Enabling table: " + hbaseTableName);
183 				hbaseAdmin.enableTable(hbaseTableName);	
184 			}
185 			LOG.debug("Done checking table: " + hbaseTableName);
186 		} else {
187 			// create a new hbase table
188 			LOG.info("Creating table " + hbaseTableName);
189 			HTableDescriptor newHBaseTable = new HTableDescriptor(hbaseTableName);
190 			newHBaseTable.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
191 			newHBaseTable.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
192 			// create the table
193 			hbaseAdmin.createTable(newHBaseTable);
194 			LOG.info("Created table " + newHBaseTable.toString());
195 		}
196 	}
197 
198 	/**
199 	 * Write the crawled output to the configured HBase table.
200 	 * Write each row key as the url with reverse domain and optionally process any content.
201 	 * 
202 	 * @param curi URI of crawled document
203 	 * @param ip IP of remote machine.
204 	 * @param ros recording input stream that captured the response
205 	 * @param ris recording output stream that captured the GET request
206 	 * 
207 	 * @throws IOException Signals that an I/O exception has occurred.
208 	 */
209 	public void write(final ProcessorURI curi, final String ip, final RecordingOutputStream ros, final RecordingInputStream ris)
210 			throws IOException {
211 		// generate the target url of the crawled document
212 		String url = curi.toString();
213 		// create the hbase friendly rowkey
214 		String rowKey = Keying.createKey(url);
215 		if (LOG.isTraceEnabled()) {
216 			LOG.trace("Writing " + url + " as " + rowKey.toString());
217 		}
218 		// create an hbase updateable object (the put object)
219 		// Constructor takes the rowkey as the only argument
220 		Put batchPut = new Put(Bytes.toBytes(rowKey));
221 		// write the target url to the url column
222 		batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(URL_COLUMN_NAME), curi.getFetchBeginTime(), Bytes.toBytes(url));
223 		// write the target ip to the ip column
224 		batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(IP_COLUMN_NAME), curi.getFetchBeginTime(), Bytes.toBytes(ip));
225 		// is the url part of the seed url (the initial url(s) used to start the crawl)
226 		if (curi.isSeed()) {
227 			batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(IS_SEED_COLUMN_NAME), Bytes.toBytes(Boolean.TRUE));
228 			if (curi.getPathFromSeed() != null && curi.getPathFromSeed().trim().length() > 0) {
229 				batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(PATH_FROM_SEED_COLUMN_NAME), Bytes.toBytes(curi.getPathFromSeed().trim()));
230 			}
231 		}
232 		String viaStr = (curi.getVia() != null) ? curi.getVia().toString().trim() : null;
233 		if (viaStr != null && viaStr.length() > 0) {
234 			batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(VIA_COLUMN_NAME), Bytes.toBytes(viaStr));
235 		}
236 		// Write the Crawl Request to the Put object
237 		if (ros.getSize() > 0) {
238 			batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(REQUEST_COLUMN_NAME), 
239 					getByteArrayFromInputStream(ros.getReplayInputStream(), (int) ros.getSize()));
240 		}
241 		// Write the Crawl Response to the Put object
242 		batchPut.add(Bytes.toBytes(CONTENT_COLUMN_FAMILY), Bytes.toBytes(CONTENT_COLUMN_NAME), 
243 				getByteArrayFromInputStream(ris.getReplayInputStream(), (int) ris.getSize()));
244 		
245 		// reset the input steam for the content processor.
246 		ris.getReplayInputStream().setToResponseBodyStart();
247 		// process the content (optional)
248 		processContent(batchPut, ris.getReplayInputStream(), (int) ris.getSize());
249 		// Set crawl time as the timestamp to the Put object.
250 		batchPut.setTimeStamp(curi.getFetchBeginTime());
251 		// write the Put object to the HBase table
252 		this.client.put(batchPut);
253 	}
254 
255 	/**
256 	 * Read the ReplayInputStream and write it to the given BatchUpdate with the given column.
257 	 * 
258 	 * @param column the column for the given data.
259 	 * @param replayInputStream the ris the cell data as a replay input stream
260 	 * @param streamSize the size
261 	 * 
262 	 * @throws IOException Signals that an I/O exception has occurred.
263 	 */
264 	protected byte[] getByteArrayFromInputStream(final ReplayInputStream replayInputStream, final int streamSize) throws IOException {
265 		ByteArrayOutputStream baos = new ByteArrayOutputStream(streamSize);
266 		try {
267 			// read the InputStream to the ByteArrayOutputStream
268 			replayInputStream.readFullyTo(baos);
269 		} finally {
270 			replayInputStream.close();
271 		}
272 		baos.close();
273 		return baos.toByteArray();
274 	}
275 
276 	/**
277 	 * This is a stub method and is here to allow extension/overriding for
278 	 * custom content parsing, data manipulation and to populate new columns.
279 	 * 
280 	 * For Example : html parsing, text extraction, analysis and transformation
281 	 * and storing the results in new column families/columns using the batch
282 	 * update object.
283 	 * 
284 	 * @param batchUpdate the batchUpdate - the hbase row object whose state can be manipulated
285 	 * before the object is written.
286 	 */
287 	protected void processContent(Put put, ReplayInputStream replayInputStream, int streamSize) throws IOException {
288 		// process content array and parse it to a new byte array.....
289 		// byte[] rowKey = put.getRow();
290 		// byte[] rawContent = this.getByteArrayFromInputStream(replayInputStream, streamSize)
291 		// byte[] someParsedByteArray = ....
292 		// put.add(Bytes.toBytes("some_column_family"), Bytes.toBytes("a_new_column_name"), someParsedByteArray);
293 	}
294 }