View Javadoc

1   /**
2    * HBaseWriterProcessor
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * This file is part of the Heritrix web crawler (crawler.archive.org).
9    *
10   * Heritrix is free software; you can redistribute it and/or modify
11   * it under the terms of the GNU Lesser Public License as published by
12   * the Free Software Foundation; either version 2.1 of the License, or
13   * any later version.
14   *
15   * Heritrix is distributed in the hope that it will be useful,
16   * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   * GNU Lesser Public License for more details.
19   *
20   * You should have received a copy of the GNU Lesser Public License
21   * along with Heritrix; if not, write to the Free Software
22   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23   */
24  package com.powerset.heritrix.writer;
25  
26  import java.io.Closeable;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.net.InetAddress;
30  
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.util.Keying;
34  import org.apache.log4j.Logger;
35  import org.archive.io.ReplayInputStream;
36  import org.archive.io.WriterPool;
37  import org.archive.io.WriterPoolMember;
38  import org.archive.modules.ModuleAttributeConstants;
39  import org.archive.modules.ProcessResult;
40  import org.archive.modules.Processor;
41  import org.archive.modules.ProcessorURI;
42  import org.archive.modules.fetcher.FetchStatusCodes;
43  import org.archive.modules.net.CrawlHost;
44  import org.archive.modules.net.ServerCache;
45  import org.archive.modules.net.ServerCacheUtil;
46  import org.archive.state.Expert;
47  import org.archive.state.Immutable;
48  import org.archive.state.Initializable;
49  import org.archive.state.Key;
50  import org.archive.state.KeyManager;
51  import org.archive.state.StateProvider;
52  import org.archive.util.IoUtils;
53  
54  // TODO: Auto-generated Javadoc
55  /**
56   * An <a href="http://crawler.archive.org">heritrix2</a> processor that writes
57   * to <a href="http://hbase.org">Hadoop HBase</a>.
58   */
59  public class HBaseWriterProcessor extends Processor implements Initializable,
60  		Closeable {
61  	
62  	/** The Constant serialVersionUID. */
63  	private static final long serialVersionUID = 7166781798179114353L;
64  
65  	/** The LOG. */
66  	private final Logger LOG = Logger.getLogger(this.getClass().getName());
67  
68  	/** Location of hbase master. */
69  	@Immutable
70  	public static final Key<String> MASTER = Key.make(HConstants.DEFAULT_MASTER_ADDRESS);
71  
72  	/** HBase tableName to crawl into. */
73  	@Immutable
74  	public static final Key<String> TABLE = Key.make("crawl");
75  
76  	/** If set to true, then only write urls that are new rowkey records. Default is false, which will write all urls to the HBase table.  
77  	 * Heritrix is good about not hitting the same url twice during the same session, so this feature is to ensure that you can run multiple sessions 
78  	 * of the same crawl configuration and not write the same url more than once to the given HBase table. */
79  	@Immutable
80  	public static final Key<Boolean> WRITE_ONLY_NEW_RECORDS = Key.make(false);
81  	
82  	/** If set to true, then only fetch & process urls that are new rowkey records. Default is false, which will process all urls to the HBase table.  
83  	 * In this mode, Heritrix wont even download and traverse the url if it exists in the HBase table. */
84  	@Immutable
85  	public static final Key<Boolean> PROCESS_ONLY_NEW_RECORDS = Key.make(false);
86  
87  	/** Maximum active files in pool. This setting cannot be varied over the life of a crawl. */
88  	@Immutable
89  	final public static Key<Integer> POOL_MAX_ACTIVE = Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
90  
91  	/** Maximum time to wait on pool element (milliseconds). This setting cannot be varied over the life of a crawl. */
92  	@Immutable
93  	final public static Key<Integer> POOL_MAX_WAIT = Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
94  
95  	/** The Constant SERVER_CACHE. */
96  	@Immutable
97  	final public static Key<ServerCache> SERVER_CACHE = Key.makeAuto(ServerCache.class);
98  
99  	/** Maximum allowable content size. */
100 	@Immutable
101 	final public static Key<Integer> CONTENT_MAX_SIZE = Key.make(20 * 1024 * 1024);
102 
103 	/** Total file bytes to write to disk. Once the size of all files on disk has exceeded this limit, this processor will stop the crawler. 
104 	 * A value of zero means no upper limit. */
105 	@Immutable
106 	@Expert
107 	final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
108 
109 	/** Reference to pool. */
110 	private transient WriterPool pool = null;
111 	
112 	/** The server cache. */
113 	private ServerCache serverCache;
114 	
115 	/** The max active. */
116 	private int maxActive;
117 	
118 	/** The max wait. */
119 	private int maxWait;
120 	
121 	/** The max content size. */
122 	private int maxContentSize;
123 	
124 	/** The master. */
125 	private String master;
126 	
127 	/** The table name. */
128 	private String tableName;
129 	
130 	/** The only write new records. */
131 	private boolean onlyWriteNewRecords;
132 	
133 	/** The only process new records. */
134 	private boolean onlyProcessNewRecords;
135 
136 	/*
137 	 * Total number of bytes written to disc.
138 	 */
139 	/** The total bytes written. */
140 	private long totalBytesWritten = 0;
141 
142 	static {
143 		KeyManager.addKeys(HBaseWriterProcessor.class);
144 	}
145 
146 	/**
147 	 * Instantiates a new h base writer processor.
148 	 */
149 	public HBaseWriterProcessor() {
150 		super();
151 	}
152 
153 	/* (non-Javadoc)
154 	 * @see org.archive.state.Initializable#initialTasks(org.archive.state.StateProvider)
155 	 */
156 	public synchronized void initialTasks(StateProvider context) {
157 		this.serverCache = context.get(this, SERVER_CACHE);
158 		this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
159 		this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
160 		this.master = context.get(this, MASTER);
161 		this.tableName = context.get(this, TABLE);
162 		this.onlyWriteNewRecords = context.get(this, WRITE_ONLY_NEW_RECORDS).booleanValue();
163 		this.onlyProcessNewRecords = context.get(this, PROCESS_ONLY_NEW_RECORDS).booleanValue();
164 		this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
165 		setupPool();
166 	}
167 
168 	/**
169 	 * Gets the master.
170 	 * 
171 	 * @return the master
172 	 */
173 	protected String getMaster() {
174 		return this.master;
175 	}
176 
177 	/**
178 	 * Gets the table.
179 	 * 
180 	 * @return the table
181 	 */
182 	protected String getTable() {
183 		return this.tableName;
184 	}
185 
186 	/**
187 	 * Setup pool.
188 	 */
189 	protected void setupPool() {
190 		setPool(new HBaseWriterPool(getMaster(), getTable(), getMaxActive(), getMaxWait()));
191 	}
192 
193 	/**
194 	 * Gets the max active.
195 	 * 
196 	 * @return the max active
197 	 */
198 	protected int getMaxActive() {
199 		return maxActive;
200 	}
201 
202 	/**
203 	 * Gets the max wait.
204 	 * 
205 	 * @return the max wait
206 	 */
207 	protected int getMaxWait() {
208 		return maxWait;
209 	}
210 
211 	/**
212 	 * Sets the pool.
213 	 * 
214 	 * @param pool the new pool
215 	 */
216 	protected void setPool(WriterPool pool) {
217 		this.pool = pool;
218 	}
219 
220 	/**
221 	 * Gets the pool.
222 	 * 
223 	 * @return the pool
224 	 */
225 	protected WriterPool getPool() {
226 		return this.pool;
227 	}
228 
229 	/**
230 	 * Gets the total bytes written.
231 	 * 
232 	 * @return the total bytes written
233 	 */
234 	protected long getTotalBytesWritten() {
235 		return this.totalBytesWritten;
236 	}
237 
238 	/**
239 	 * Sets the total bytes written.
240 	 * 
241 	 * @param b the new total bytes written
242 	 */
243 	protected void setTotalBytesWritten(final long b) {
244 		this.totalBytesWritten = b;
245 	}
246 
247 	/* (non-Javadoc)
248 	 * @see org.archive.modules.Processor#innerProcessResult(org.archive.modules.ProcessorURI)
249 	 */
250 	protected ProcessResult innerProcessResult(final ProcessorURI puri) {
251 		ProcessorURI curi = puri;
252 		long recordLength = getRecordedSize(curi);
253 		ReplayInputStream ris = null;
254 		try {
255 			if (shouldWrite(curi)) {
256 				ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
257 				return write(curi, recordLength, ris, getHostAddress(curi));
258 			}
259 			LOG.info("does not write " + curi.toString());
260 		} catch (IOException e) {
261 			curi.getNonFatalFailures().add(e);
262 			LOG.error("Failed write of Record: " + curi.toString(), e);
263 		} finally {
264 			IoUtils.close(ris);
265 		}
266 		return ProcessResult.PROCEED;
267 	}
268 
269 	/**
270 	 * Return IP address of given URI suitable for recording (as in a classic
271 	 * ARC 5-field header line).
272 	 * 
273 	 * @param curi ProcessorURI
274 	 * 
275 	 * @return String of IP address
276 	 */
277 	protected String getHostAddress(ProcessorURI curi) {
278 		// special handling for DNS URIs: want address of DNS server
279 		if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
280 			return (String) curi.getData().get(
281 					ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
282 		}
283 		// otherwise, host referenced in URI
284 		CrawlHost h = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
285 		if (h == null) {
286 			throw new NullPointerException("Crawlhost is null for " + curi + " " + curi.getVia());
287 		}
288 		InetAddress a = h.getIP();
289 		if (a == null) {
290 			throw new NullPointerException(
291 					"Address is null for "
292 							+ curi
293 							+ " "
294 							+ curi.getVia()
295 							+ ". Address "
296 							+ ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ? "was never looked up."
297 									: (System.currentTimeMillis() - h
298 											.getIpFetched())
299 											+ " ms ago."));
300 		}
301 		return h.getIP().getHostAddress();
302 	}
303 
304 	/**
305 	 * Whether the given ProcessorURI should be written to archive files.
306 	 * Annotates ProcessorURI with a reason for any negative answer.
307 	 * 
308 	 * @param curi ProcessorURI
309 	 * 
310 	 * @return true if URI should be written; false otherwise
311 	 */
312 	protected boolean shouldWrite(ProcessorURI curi) {
313 		boolean retVal;
314 		String scheme = curi.getUURI().getScheme().toLowerCase();
315 		// determine the return value of the uri request
316 		if (scheme.equals("dns")) {
317 			retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
318 		} else if (scheme.equals("http") || scheme.equals("https")) {
319 			retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
320 		} else if (scheme.equals("ftp")) {
321 			retVal = curi.getFetchStatus() == 200;
322 		} else {
323 			curi.getAnnotations().add("unwritten:scheme");
324 			return false;
325 		}
326 
327 		if (retVal == false) {
328 			// status not deserving writing
329 			curi.getAnnotations().add("unwritten:status");
330 			return false;
331 		}
332 
333 		// If the content exceeds the maxContentSize, then dont write.
334 		if (curi.getContentSize() > this.maxContentSize) {
335 			// content size is too large
336 			curi.getAnnotations().add("unwritten:size");
337 			LOG.warn("content size for " + curi.getUURI() + " is too large ("
338 					+ curi.getContentSize() + ") - maximum content size is: "
339 					+ this.maxContentSize);
340 			return false;
341 		}
342 
343 		// If onlyWriteNewRecords is enabled and the given rowkey has cell data,
344 		// don't write the record.
345 		if (this.onlyWriteNewRecords) {
346 			if (!this.isRecordNew(curi)) {
347 				return false;
348 			}
349 		}
350 		// all tests pass, return true to write the content locally.
351 		return true;
352 	}
353 	
354 	/**
355 	 * Determine if the given uri exists as a rowkey in the configured hbase table.
356 	 * 
357 	 * @param curi the curi
358 	 * 
359 	 * @return true if given curi is a new record in the configured HBase table.
360 	 * false if the rowkey has any cells associated with it.
361 	 */
362 	private boolean isRecordNew(ProcessorURI curi) {
363 		WriterPoolMember writer;
364 		try {
365 			writer = getPool().borrowFile();
366 		} catch (IOException e1) {
367 			LOG.error("No writer could be gotten from the pool: " + getPool().toString() 
368 							+ " - exception is: \n" + e1.getMessage());
369 			return false;
370 		}
371 		HTable ht = ((HBaseWriter) writer).getClient();
372 		// Here we can generate the rowkey for this uri ...
373 		String url = curi.toString();
374 		String row = Keying.createKey(url);
375 		try {
376 			// and look it up to see if it already exists...
377 			if (ht.getRow(row) != null && !ht.getRow(row).isEmpty()) {
378 				if (LOG.isDebugEnabled()) {
379 					LOG.debug("Not Writing "
380 								+ url
381 								+ " since rowkey: "
382 								+ row.toString()
383 								+ " already exists and onlyWriteNewRecords is enabled.");
384 				}
385 				return false;
386 			}
387 		} catch (IOException e) {
388 			LOG.error("Failed to determine if record: "
389 							+ row.toString()
390 							+ " should be written or not, deciding not to write the record: \n"
391 							+ e.getMessage());
392 			return false;
393 		} finally {
394 			try {
395 				getPool().returnFile(writer);
396 			} catch (IOException e) {
397 				LOG.error("Failed to add back writer to the pool after checking for existing rowkey: "
398 								+ row.toString() + "\n" + e.getMessage());
399 				return false;
400 			}
401 		}
402 		return true;
403 	}
404 
405 	/**
406 	 * Write.
407 	 * 
408 	 * @param curi the curi
409 	 * @param recordLength the record length
410 	 * @param in the in
411 	 * @param ip the ip
412 	 * 
413 	 * @return the process result
414 	 * 
415 	 * @throws IOException Signals that an I/O exception has occurred.
416 	 */
417 	protected ProcessResult write(final ProcessorURI curi, long recordLength, InputStream in, String ip) throws IOException {
418 		WriterPoolMember writer = getPool().borrowFile();
419 		long position = writer.getPosition();
420 		HBaseWriter w = (HBaseWriter) writer;
421 		try {
422 			w.write(curi, getHostAddress(curi), curi.getRecorder().getRecordedOutput(), curi.getRecorder().getRecordedInput());
423 		} finally {
424 			setTotalBytesWritten(getTotalBytesWritten() + (writer.getPosition() - position));
425 			getPool().returnFile(writer);
426 		}
427 		return checkBytesWritten(curi);
428 	}
429 
430 	/**
431 	 * Check bytes written.
432 	 * 
433 	 * @param context the context
434 	 * 
435 	 * @return the process result
436 	 */
437 	protected ProcessResult checkBytesWritten(StateProvider context) {
438 		long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
439 		if (max <= 0) {
440 			return ProcessResult.PROCEED;
441 		}
442 		if (max <= this.totalBytesWritten) {
443 			return ProcessResult.FINISH; // FIXME: Specify reason
444 			// controller.requestCrawlStop(CrawlStatus.FINISHED_WRITE_LIMIT);
445 		}
446 		return ProcessResult.PROCEED;
447 	}
448 
449 	/* (non-Javadoc)
450 	 * @see org.archive.modules.Processor#innerProcess(org.archive.modules.ProcessorURI)
451 	 */
452 	protected void innerProcess(ProcessorURI puri) {
453 		throw new AssertionError();
454 	}
455 
456 	// good to keep at end of source: must run after all per-Key
457 	// initialization values are set.
458 	static {
459 		KeyManager.addKeys(HBaseWriterProcessor.class);
460 	}
461 
462 	/* (non-Javadoc)
463 	 * @see java.io.Closeable#close()
464 	 */
465 	public void close() {
466 		this.pool.close();
467 	}
468 
469 	/* (non-Javadoc)
470 	 * @see org.archive.modules.Processor#shouldProcess(org.archive.modules.ProcessorURI)
471 	 */
472 	@Override
473 	protected boolean shouldProcess(ProcessorURI uri) {
474 		ProcessorURI curi = uri;
475 		// If failure, or we haven't fetched the resource yet, return
476 		if (curi.getFetchStatus() <= 0) {
477 			return false;
478 		}
479 
480 		// If no recorded content at all, don't write record.
481 		long recordLength = curi.getContentSize();
482 		if (recordLength <= 0) {
483 			// getContentSize() should be > 0 if any material (even just
484 			// HTTP headers with zero-length body is available.
485 			return false;
486 		}
487 		
488 		// If onlyProcessNewRecords is enabled and the given rowkey has cell data,
489 		// don't write the record.
490 		if (this.onlyProcessNewRecords) {
491 			if (!this.isRecordNew(curi)) {
492 				return false;
493 			}
494 		}
495 
496 		// If we make it here, then we passed all our checks and we can assume
497 		// we should write the record.
498 		return true;
499 	}
500 }