View Javadoc

1   /**
2    * HBaseWriterProcessor
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * This file is part of the Heritrix web crawler (crawler.archive.org).
9    *
10   * Heritrix is free software; you can redistribute it and/or modify
11   * it under the terms of the GNU Lesser Public License as published by
12   * the Free Software Foundation; either version 2.1 of the License, or
13   * any later version.
14   *
15   * Heritrix is distributed in the hope that it will be useful,
16   * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   * GNU Lesser Public License for more details.
19   *
20   * You should have received a copy of the GNU Lesser Public License
21   * along with Heritrix; if not, write to the Free Software
22   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23   */
24  package com.powerset.heritrix.writer;
25  
26  import java.io.Closeable;
27  import java.io.IOException;
28  import java.io.InputStream;
29  
30  import org.apache.hadoop.hbase.client.Get;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.util.Keying;
34  import org.apache.log4j.Logger;
35  import org.archive.io.ReplayInputStream;
36  import org.archive.io.WriterPool;
37  import org.archive.io.WriterPoolMember;
38  import org.archive.modules.ModuleAttributeConstants;
39  import org.archive.modules.ProcessResult;
40  import org.archive.modules.Processor;
41  import org.archive.modules.ProcessorURI;
42  import org.archive.modules.fetcher.FetchStatusCodes;
43  import org.archive.modules.net.CrawlHost;
44  import org.archive.modules.net.ServerCache;
45  import org.archive.modules.net.ServerCacheUtil;
46  import org.archive.state.Expert;
47  import org.archive.state.Immutable;
48  import org.archive.state.Initializable;
49  import org.archive.state.Key;
50  import org.archive.state.KeyManager;
51  import org.archive.state.StateProvider;
52  import org.archive.util.IoUtils;
53  
54  /**
55   * An <a href="http://crawler.archive.org">heritrix2</a> processor that writes
56   * to <a href="http://hbase.org">Hadoop HBase</a>.
57   */
58  public class HBaseWriterProcessor extends Processor implements Initializable, Closeable {
59  	
60  	/** The Constant serialVersionUID. */
61  	private static final long serialVersionUID = 7166781798179114353L;
62  
63  	/** The LOG. */
64  	private static final Logger LOG = Logger.getLogger("HBaseWriterProcessor");
65  
66  	/** Commas-seperated list of Hostnames in the zookeeper quorum. */
67  	@Immutable
68  	public static final Key<String> ZKQUORUM = Key.make("localhost");
69  
70  	/** The port that clients should connect on to contact their zk quorum hsots. */
71  	@Immutable
72  	public static final Key<Integer> ZKCLIENTPORT = Key.make(2181);
73  	
74  	/** HBase tableName to crawl into. */
75  	@Immutable
76  	public static final Key<String> TABLE = Key.make("crawl");
77  
78  	/** If set to true, then only write urls that are new rowkey records. 
79  	 *  Default is false, which will write all urls to the HBase table. 
80  	 * Heritrix is good about not hitting the same url twice, so this feature 
81  	 * is to ensure that you can run multiple sessions of the same crawl 
82  	 * configuration and not write the same url more than once to the same 
83  	 * hbase table. You may just want to crawl a site to see what new urls have 
84  	 * been added over time, or continue where you left off on a terminated 
85  	 * crawl.  Heritrix itself does support this functionalty by supporting 
86  	 * "Checkpoints" during a crawl session, so this may not be a necessary 
87  	 * option. 
88  	 */
89  	@Immutable
90  	public static final Key<Boolean> WRITE_ONLY_NEW_RECORDS = Key.make(false);
91  	
92  	/** If set to true, then only process urls that are new rowkey records. 
93  	 * Default is false, which will process all urls to the HBase table. 
94  	 * In this mode, Heritrix wont even fetch and parse the content served at 
95  	 * the url if it already exists as a rowkey in the HBase table. 
96  	 */
97  	@Immutable
98  	public static final Key<Boolean> PROCESS_ONLY_NEW_RECORDS = Key.make(false);
99  
100 	/** Maximum active files in pool. This setting cannot be varied over the life of a crawl. */
101 	@Immutable
102 	final public static Key<Integer> POOL_MAX_ACTIVE = Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
103 
104 	/** Maximum time to wait on pool element (milliseconds). This setting cannot be varied over the life of a crawl. */
105 	@Immutable
106 	final public static Key<Integer> POOL_MAX_WAIT = Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
107 
108 	/** The Constant SERVER_CACHE. */
109 	@Immutable
110 	final public static Key<ServerCache> SERVER_CACHE = Key.makeAuto(ServerCache.class);
111 
112 	/** Maximum allowable content size. */
113 	@Immutable
114 	final public static Key<Integer> CONTENT_MAX_SIZE = Key.make(20 * 1024 * 1024);
115 
116 	/** Total file bytes to write to disk. Once the size of all files on disk has exceeded this limit, this processor will stop the crawler. A value of zero means no upper limit. */
117 	@Immutable
118 	@Expert
119 	final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
120 
121 	/** Reference to pool. */
122 	private transient WriterPool pool = null;
123 	
124 	/** The server cache. */
125 	private transient ServerCache serverCache;
126 	
127 	/** The max active. */
128 	private int maxActive;
129 	
130 	/** The max wait. */
131 	private int maxWait;
132 	
133 	/** The max content size. */
134 	private int maxContentSize;
135 	
136 	/** The comma seperated string of hostnames that make up the zookeeper quorum. */
137 	private String zkQuorum;
138 	
139 	/** The port that zk clients should connect to for information. */
140 	private int zkClientPort;
141 	
142 	/** The hbase table name. */
143 	private String tableName;
144 	
145 	/** The only write new records turnable. */
146 	private boolean onlyWriteNewRecords;
147 	
148 	/** The only process new records turnable. */
149 	private boolean onlyProcessNewRecords;
150 
151 	/** The total bytes written to disk. */
152 	private long totalBytesWritten = 0;
153 
154 	static {
155 		KeyManager.addKeys(HBaseWriterProcessor.class);
156 	}
157 
158 	/**
159 	 * Instantiates a new HBaseWriterProcessor.
160 	 */
161 	public HBaseWriterProcessor() {
162 		super();
163 	}
164 
165 	/* (non-Javadoc)
166 	 * @see org.archive.state.Initializable#initialTasks(org.archive.state.StateProvider)
167 	 */
168 	public synchronized void initialTasks(StateProvider context) {
169 		this.serverCache = context.get(this, SERVER_CACHE);
170 		this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
171 		this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
172 		this.zkQuorum = context.get(this, ZKQUORUM);
173 		this.zkClientPort = context.get(this, ZKCLIENTPORT);
174 		this.tableName = context.get(this, TABLE);
175 		this.onlyWriteNewRecords = context.get(this, WRITE_ONLY_NEW_RECORDS).booleanValue();
176 		this.onlyProcessNewRecords = context.get(this, PROCESS_ONLY_NEW_RECORDS).booleanValue();
177 		this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
178 		setupPool();
179 	}
180 
181 	/* (non-Javadoc)
182 	 * @see org.archive.modules.Processor#innerProcessResult(org.archive.modules.ProcessorURI)
183 	 */
184 	protected ProcessResult innerProcessResult(final ProcessorURI puri) {
185 		ProcessorURI curi = puri;
186 		long recordLength = getRecordedSize(curi);
187 		ReplayInputStream ris = null;
188 		try {
189 			if (shouldWrite(curi)) {
190 				ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
191 				return write(curi, recordLength, ris, getHostAddress(curi));
192 			}
193 			LOG.info("does not write " + curi.toString());
194 		} catch (IOException e) {
195 			curi.getNonFatalFailures().add(e);
196 			LOG.error("Failed write of Record: " + curi.toString(), e);
197 		} finally {
198 			IoUtils.close(ris);
199 		}
200 		return ProcessResult.PROCEED;
201 	}
202 
203 	/**
204 	 * Return IP address of given URI suitable for recording (as in a classic
205 	 * ARC 5-field header line).
206 	 * 
207 	 * @param curi ProcessorURI
208 	 * 
209 	 * @return String of IP address
210 	 */
211 	protected String getHostAddress(ProcessorURI curi) {
212 		// special handling for DNS URIs: want address of DNS server
213 		if (curi.getUURI().getScheme().equalsIgnoreCase("dns")) {
214 			return (String) curi.getData().get(
215 					ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
216 		}
217 		// otherwise, host referenced in URI
218 		CrawlHost crawlHost = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
219 		if (crawlHost == null) {
220 			throw new NullPointerException("Crawlhost is null for " + curi + " " + curi.getVia());
221 		}
222 		// check if the ip address was looked up.
223 		if (crawlHost.getIP() == null) {
224 			throw new NullPointerException(
225 					"Address is null for "
226 							+ curi
227 							+ " "
228 							+ curi.getVia()
229 							+ ". Address "
230 							+ ((crawlHost.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ? "was never looked up."
231 									: (System.currentTimeMillis() - crawlHost.getIpFetched()) 
232 							+ " ms ago."));
233 		}
234 		return crawlHost.getIP().getHostAddress();
235 	}
236 
237 	/* (non-Javadoc)
238 	 * @see org.archive.modules.Processor#shouldProcess(org.archive.modules.ProcessorURI)
239 	 */
240 	@Override
241 	protected boolean shouldProcess(ProcessorURI uri) {
242 		ProcessorURI curi = uri;
243 		// If failure, or we haven't fetched the resource yet, return
244 		if (curi.getFetchStatus() <= 0) {
245 			return false;
246 		}
247 
248 		// If no recorded content at all, don't write record.
249 		long recordLength = curi.getContentSize();
250 		if (recordLength <= 0) {
251 			// getContentSize() should be > 0 if any material (even just
252 			// HTTP headers with zero-length body is available.
253 			return false;
254 		}
255 		
256 		// If onlyProcessNewRecords is enabled and the given rowkey has cell data,
257 		// don't write the record.
258 		if (this.onlyProcessNewRecords) {
259 			return this.isRecordNew(curi);
260 		}
261 		// If we make it here, then we passed all our checks and we can assume
262 		// we should write the record.
263 		return true;
264 	}
265 	
266 	/**
267 	 * Whether the given ProcessorURI should be written to archive files.
268 	 * Annotates ProcessorURI with a reason for any negative answer.
269 	 * 
270 	 * @param curi ProcessorURI
271 	 * 
272 	 * @return true if URI should be written; false otherwise
273 	 */
274 	protected boolean shouldWrite(ProcessorURI curi) {
275 		boolean retVal;
276 		String scheme = curi.getUURI().getScheme();
277 		// determine the return value of the uri request
278 		if (scheme.equalsIgnoreCase("dns")) {
279 			retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
280 		} else if (scheme.equalsIgnoreCase("http") || scheme.equalsIgnoreCase("https")) {
281 			retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
282 		} else if (scheme.equalsIgnoreCase("ftp")) {
283 			retVal = curi.getFetchStatus() == 200;
284 		} else {
285 			curi.getAnnotations().add("unwritten:scheme");
286 			return false;
287 		}
288 
289 		if (retVal == false) {
290 			// status not deserving writing
291 			curi.getAnnotations().add("unwritten:status");
292 			return false;
293 		}
294 
295 		// If the content exceeds the maxContentSize, then dont write.
296 		if (curi.getContentSize() > this.maxContentSize) {
297 			// content size is too large
298 			curi.getAnnotations().add("unwritten:size");
299 			LOG.warn("content size for " + curi.getUURI() + " is too large ("
300 					+ curi.getContentSize() + ") - maximum content size is: "
301 					+ this.maxContentSize);
302 			return false;
303 		}
304 
305 		// If onlyWriteNewRecords is enabled and the given rowkey has cell data,
306 		// don't write the record.
307 		if (this.onlyWriteNewRecords) {
308 			return this.isRecordNew(curi);
309 		}
310 		// all tests pass, return true to write the content locally.
311 		return true;
312 	}
313 	
314 	/**
315 	 * Determine if the given uri exists as a rowkey in the configured hbase table.
316 	 * 
317 	 * @param curi the curi
318 	 * 
319 	 * @return true, if checks if is record new
320 	 */
321 	private boolean isRecordNew(ProcessorURI curi) {
322 		WriterPoolMember writerPoolMember;
323 		try {
324 			writerPoolMember = getPool().borrowFile();
325 		} catch (IOException e1) {
326 			LOG.error("No writer could be borrowed from the pool: " + getPool().toString() 
327 							+ " - exception is: \n" + e1.getMessage());
328 			return false;
329 		}
330 		HTable hbaseTable = ((HBaseWriter) writerPoolMember).getClient();
331 		// Here we can generate the rowkey for this uri ...
332 		String url = curi.toString();
333 		String row = Keying.createKey(url);
334 		try {
335 			// and look it up to see if it already exists...
336 			Get rowToGet = new Get(Bytes.toBytes(row));
337 			if (hbaseTable.get(rowToGet) != null && !hbaseTable.get(rowToGet).isEmpty()) {
338 				if (LOG.isDebugEnabled()) {
339 					LOG.debug("Not A NEW Record - Url: "
340 								+ url
341 								+ " has the existing rowkey: "
342 								+ row
343 								+ " and has cell data.");
344 				}
345 				return false;
346 			}
347 		} catch (IOException e) {
348 			LOG.error("Failed to determine if record: "
349 							+ row
350 							+ " is a new record due to IOExecption.  Deciding the record is already existing for now. \n"
351 							+ e.getMessage());
352 			return false;
353 		} finally {
354 			try {
355 				getPool().returnFile(writerPoolMember);
356 			} catch (IOException e) {
357 				LOG.error("Failed to add back writer to the pool after checking if a rowkey is new or existing: "
358 								+ row + "\n" + e.getMessage());
359 				return false;
360 			}
361 		}
362 		return true;
363 	}
364 
365 	/**
366 	 * Write.
367 	 * 
368 	 * @param curi the curi
369 	 * @param recordLength the record length
370 	 * @param in the in
371 	 * @param ip the ip
372 	 * 
373 	 * @return the process result
374 	 * 
375 	 * @throws IOException Signals that an I/O exception has occurred.
376 	 */
377 	protected ProcessResult write(final ProcessorURI curi, long recordLength, InputStream in, String ip) throws IOException {
378 		WriterPoolMember writerPoolMember = getPool().borrowFile();
379 		long writerPoolMemberPosition = writerPoolMember.getPosition();
380 		HBaseWriter hbaseWriter = (HBaseWriter) writerPoolMember;
381 		try {
382 			hbaseWriter.write(curi, getHostAddress(curi), curi.getRecorder().getRecordedOutput(), curi.getRecorder().getRecordedInput());
383 		} finally {
384 			setTotalBytesWritten(getTotalBytesWritten() + (writerPoolMember.getPosition() - writerPoolMemberPosition));
385 			getPool().returnFile(writerPoolMember);
386 		}
387 		return checkBytesWritten(curi);
388 	}
389 
390 	/**
391 	 * Check bytes written.
392 	 * 
393 	 * @param context the context
394 	 * 
395 	 * @return the process result
396 	 */
397 	protected ProcessResult checkBytesWritten(StateProvider context) {
398 		long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
399 		if (max <= 0) {
400 			return ProcessResult.PROCEED;
401 		}
402 		if (max <= this.totalBytesWritten) {
403 			return ProcessResult.FINISH; // FIXME: Specify reason
404 			// controller.requestCrawlStop(CrawlStatus.FINISHED_WRITE_LIMIT);
405 		}
406 		return ProcessResult.PROCEED;
407 	}
408 
409 	/**
410 	 * Setup pool.
411 	 */
412 	protected void setupPool() {
413 		setPool(new HBaseWriterPool(getZKQuorum(), getZKClientPort(), getTable(), getMaxActive(), getMaxWait()));
414 	}
415 
416 	/**
417 	 * Gets the zookeeper quorum.
418 	 * 
419 	 * @return the zkQuorum
420 	 */
421 	protected String getZKQuorum() {
422 		return this.zkQuorum;
423 	}
424 
425 	/**
426 	 * Gets the zookeeper client port.
427 	 * 
428 	 * @return the zlClientPort
429 	 */
430 	protected int getZKClientPort() {
431 		return this.zkClientPort;
432 	}
433 
434 	
435 	/**
436 	 * Gets the table.
437 	 * 
438 	 * @return the table
439 	 */
440 	protected String getTable() {
441 		return this.tableName;
442 	}
443 
444 	
445 	/**
446 	 * Gets the max active.
447 	 * 
448 	 * @return the max active
449 	 */
450 	protected int getMaxActive() {
451 		return maxActive;
452 	}
453 
454 	/**
455 	 * Gets the max wait.
456 	 * 
457 	 * @return the max wait
458 	 */
459 	protected int getMaxWait() {
460 		return maxWait;
461 	}
462 
463 	/**
464 	 * Sets the pool.
465 	 * 
466 	 * @param pool the new pool
467 	 */
468 	protected void setPool(WriterPool pool) {
469 		this.pool = pool;
470 	}
471 
472 	/**
473 	 * Gets the pool.
474 	 * 
475 	 * @return the pool
476 	 */
477 	protected WriterPool getPool() {
478 		return this.pool;
479 	}
480 
481 	/**
482 	 * Gets the total bytes written.
483 	 * 
484 	 * @return the total bytes written
485 	 */
486 	protected long getTotalBytesWritten() {
487 		return this.totalBytesWritten;
488 	}
489 
490 	/**
491 	 * Sets the total bytes written.
492 	 * 
493 	 * @param b the new total bytes written
494 	 */
495 	protected void setTotalBytesWritten(final long b) {
496 		this.totalBytesWritten = b;
497 	}
498 	
499 	/* (non-Javadoc)
500 	 * @see org.archive.modules.Processor#innerProcess(org.archive.modules.ProcessorURI)
501 	 */
502 	protected void innerProcess(ProcessorURI puri) {
503 		throw new AssertionError();
504 	}
505 
506 	/* (non-Javadoc)
507 	 * @see java.io.Closeable#close()
508 	 */
509 	public void close() {
510 		this.pool.close();
511 	}
512 
513 }