View Javadoc

1   /**
2    * HBaseWriterProcessor
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * This file is part of the Heritrix web crawler (crawler.archive.org).
9    *
10   * Heritrix is free software; you can redistribute it and/or modify
11   * it under the terms of the GNU Lesser Public License as published by
12   * the Free Software Foundation; either version 2.1 of the License, or
13   * any later version.
14   *
15   * Heritrix is distributed in the hope that it will be useful,
16   * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   * GNU Lesser Public License for more details.
19   *
20   * You should have received a copy of the GNU Lesser Public License
21   * along with Heritrix; if not, write to the Free Software
22   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23   */
24  package com.powerset.heritrix.writer;
25  
26  import java.io.Closeable;
27  import java.io.IOException;
28  import java.io.InputStream;
29  
30  import org.apache.hadoop.hbase.client.Get;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.util.Keying;
34  import org.apache.log4j.Logger;
35  import org.archive.io.ReplayInputStream;
36  import org.archive.io.WriterPool;
37  import org.archive.io.WriterPoolMember;
38  import org.archive.modules.ModuleAttributeConstants;
39  import org.archive.modules.ProcessResult;
40  import org.archive.modules.Processor;
41  import org.archive.modules.ProcessorURI;
42  import org.archive.modules.fetcher.FetchStatusCodes;
43  import org.archive.modules.net.CrawlHost;
44  import org.archive.modules.net.ServerCache;
45  import org.archive.modules.net.ServerCacheUtil;
46  import org.archive.state.Expert;
47  import org.archive.state.Immutable;
48  import org.archive.state.Initializable;
49  import org.archive.state.Key;
50  import org.archive.state.KeyManager;
51  import org.archive.state.StateProvider;
52  import org.archive.util.IoUtils;
53  
54  /**
55   * An <a href="http://crawler.archive.org">heritrix2</a> processor that writes
56   * to <a href="http://hbase.org">Hadoop HBase</a>.
57   */
58  public class HBaseWriterProcessor extends Processor implements Initializable, Closeable {
59  	
60  	/** The Constant serialVersionUID. */
61  	private static final long serialVersionUID = 7166781798179114353L;
62  
63  	/** The LOG. */
64  	private final Logger LOG = Logger.getLogger(this.getClass().getName());
65  
66  	/** Commas-seperated list of Hostnames in the zookeeper quorum. */
67  	@Immutable
68  	public static final Key<String> ZKQUORUM = Key.make("localhost");
69  
70  	/** The port that clients should connect on to contact their zk quorum hsots. */
71  	@Immutable
72  	public static final Key<Integer> ZKCLIENTPORT = Key.make(2181);
73  	
74  	/** HBase tableName to crawl into. */
75  	@Immutable
76  	public static final Key<String> TABLE = Key.make("crawl");
77  
78  	/** 
79  	 * If set to true, then only write urls that are new rowkey records. Default is false, which will write all urls to the HBase table.  
80  	 * Heritrix is good about not hitting the same url twice, so this feature is to ensure that you can run multiple sessions 
81  	 * of the same crawl configuration and not write the same url more than once to the same hbase table. You may just want to crawl a site to 
82  	 * see what new urls have been added over time, or continue where you left off on a terminated crawl.  Heritrix itself does support this
83  	 * functionalty by supporting "Checkpoints" during a crawl session, so this may not be a necessary option. 
84  	 */
85  	@Immutable
86  	public static final Key<Boolean> WRITE_ONLY_NEW_RECORDS = Key.make(false);
87  	
88  	/** 
89  	 * If set to true, then only process urls that are new rowkey records. Default is false, which will process all urls to the HBase table.  
90  	 * In this mode, Heritrix wont even fetch and parse the content served at the url if it already exists as a rowkey in the HBase table. 
91  	 */
92  	@Immutable
93  	public static final Key<Boolean> PROCESS_ONLY_NEW_RECORDS = Key.make(false);
94  
95  	/** Maximum active files in pool. This setting cannot be varied over the life of a crawl. */
96  	@Immutable
97  	final public static Key<Integer> POOL_MAX_ACTIVE = Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
98  
99  	/** Maximum time to wait on pool element (milliseconds). This setting cannot be varied over the life of a crawl. */
100 	@Immutable
101 	final public static Key<Integer> POOL_MAX_WAIT = Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
102 
103 	/** The Constant SERVER_CACHE. */
104 	@Immutable
105 	final public static Key<ServerCache> SERVER_CACHE = Key.makeAuto(ServerCache.class);
106 
107 	/** Maximum allowable content size. */
108 	@Immutable
109 	final public static Key<Integer> CONTENT_MAX_SIZE = Key.make(20 * 1024 * 1024);
110 
111 	/** Total file bytes to write to disk. Once the size of all files on disk has exceeded this limit, this processor will stop the crawler. A value of zero means no upper limit. */
112 	@Immutable
113 	@Expert
114 	final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
115 
116 	/** Reference to pool. */
117 	private transient WriterPool pool = null;
118 	
119 	/** The server cache. */
120 	private ServerCache serverCache;
121 	
122 	/** The max active. */
123 	private int maxActive;
124 	
125 	/** The max wait. */
126 	private int maxWait;
127 	
128 	/** The max content size. */
129 	private int maxContentSize;
130 	
131 	/** The comma seperated string of hostnames that make up the zookeeper quorum. */
132 	private String zkQuorum;
133 	
134 	/** The port that zk clients should connect to for information. */
135 	private int zkClientPort;
136 	
137 	/** The hbase table name. */
138 	private String tableName;
139 	
140 	/** The only write new records turnable. */
141 	private boolean onlyWriteNewRecords;
142 	
143 	/** The only process new records turnable. */
144 	private boolean onlyProcessNewRecords;
145 
146 	/** The total bytes written to disk. */
147 	private long totalBytesWritten = 0;
148 
149 	static {
150 		KeyManager.addKeys(HBaseWriterProcessor.class);
151 	}
152 
153 	/**
154 	 * Instantiates a new HBaseWriterProcessor.
155 	 */
156 	public HBaseWriterProcessor() {
157 		super();
158 	}
159 
160 	/* (non-Javadoc)
161 	 * @see org.archive.state.Initializable#initialTasks(org.archive.state.StateProvider)
162 	 */
163 	public synchronized void initialTasks(StateProvider context) {
164 		this.serverCache = context.get(this, SERVER_CACHE);
165 		this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
166 		this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
167 		this.zkQuorum = context.get(this, ZKQUORUM);
168 		this.zkClientPort = context.get(this, ZKCLIENTPORT);
169 		this.tableName = context.get(this, TABLE);
170 		this.onlyWriteNewRecords = context.get(this, WRITE_ONLY_NEW_RECORDS).booleanValue();
171 		this.onlyProcessNewRecords = context.get(this, PROCESS_ONLY_NEW_RECORDS).booleanValue();
172 		this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
173 		setupPool();
174 	}
175 
176 	/* (non-Javadoc)
177 	 * @see org.archive.modules.Processor#innerProcessResult(org.archive.modules.ProcessorURI)
178 	 */
179 	protected ProcessResult innerProcessResult(final ProcessorURI puri) {
180 		ProcessorURI curi = puri;
181 		long recordLength = getRecordedSize(curi);
182 		ReplayInputStream ris = null;
183 		try {
184 			if (shouldWrite(curi)) {
185 				ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
186 				return write(curi, recordLength, ris, getHostAddress(curi));
187 			}
188 			LOG.info("does not write " + curi.toString());
189 		} catch (IOException e) {
190 			curi.getNonFatalFailures().add(e);
191 			LOG.error("Failed write of Record: " + curi.toString(), e);
192 		} finally {
193 			IoUtils.close(ris);
194 		}
195 		return ProcessResult.PROCEED;
196 	}
197 
198 	/**
199 	 * Return IP address of given URI suitable for recording (as in a classic
200 	 * ARC 5-field header line).
201 	 * 
202 	 * @param curi ProcessorURI
203 	 * 
204 	 * @return String of IP address
205 	 */
206 	protected String getHostAddress(ProcessorURI curi) {
207 		// special handling for DNS URIs: want address of DNS server
208 		if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
209 			return (String) curi.getData().get(
210 					ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
211 		}
212 		// otherwise, host referenced in URI
213 		CrawlHost crawlHost = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
214 		if (crawlHost == null) {
215 			throw new NullPointerException("Crawlhost is null for " + curi + " " + curi.getVia());
216 		}
217 		// check if the ip address was looked up.
218 		if (crawlHost.getIP() == null) {
219 			throw new NullPointerException(
220 					"Address is null for "
221 							+ curi
222 							+ " "
223 							+ curi.getVia()
224 							+ ". Address "
225 							+ ((crawlHost.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ? "was never looked up."
226 									: (System.currentTimeMillis() - crawlHost.getIpFetched()) 
227 							+ " ms ago."));
228 		}
229 		return crawlHost.getIP().getHostAddress();
230 	}
231 
232 	/* (non-Javadoc)
233 	 * @see org.archive.modules.Processor#shouldProcess(org.archive.modules.ProcessorURI)
234 	 */
235 	@Override
236 	protected boolean shouldProcess(ProcessorURI uri) {
237 		ProcessorURI curi = uri;
238 		// If failure, or we haven't fetched the resource yet, return
239 		if (curi.getFetchStatus() <= 0) {
240 			return false;
241 		}
242 
243 		// If no recorded content at all, don't write record.
244 		long recordLength = curi.getContentSize();
245 		if (recordLength <= 0) {
246 			// getContentSize() should be > 0 if any material (even just
247 			// HTTP headers with zero-length body is available.
248 			return false;
249 		}
250 		
251 		// If onlyProcessNewRecords is enabled and the given rowkey has cell data,
252 		// don't write the record.
253 		if (this.onlyProcessNewRecords) {
254 			return this.isRecordNew(curi);
255 		}
256 		// If we make it here, then we passed all our checks and we can assume
257 		// we should write the record.
258 		return true;
259 	}
260 	
261 	/**
262 	 * Whether the given ProcessorURI should be written to archive files.
263 	 * Annotates ProcessorURI with a reason for any negative answer.
264 	 * 
265 	 * @param curi ProcessorURI
266 	 * 
267 	 * @return true if URI should be written; false otherwise
268 	 */
269 	protected boolean shouldWrite(ProcessorURI curi) {
270 		boolean retVal;
271 		String scheme = curi.getUURI().getScheme().toLowerCase();
272 		// determine the return value of the uri request
273 		if (scheme.equals("dns")) {
274 			retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
275 		} else if (scheme.equals("http") || scheme.equals("https")) {
276 			retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
277 		} else if (scheme.equals("ftp")) {
278 			retVal = curi.getFetchStatus() == 200;
279 		} else {
280 			curi.getAnnotations().add("unwritten:scheme");
281 			return false;
282 		}
283 
284 		if (retVal == false) {
285 			// status not deserving writing
286 			curi.getAnnotations().add("unwritten:status");
287 			return false;
288 		}
289 
290 		// If the content exceeds the maxContentSize, then dont write.
291 		if (curi.getContentSize() > this.maxContentSize) {
292 			// content size is too large
293 			curi.getAnnotations().add("unwritten:size");
294 			LOG.warn("content size for " + curi.getUURI() + " is too large ("
295 					+ curi.getContentSize() + ") - maximum content size is: "
296 					+ this.maxContentSize);
297 			return false;
298 		}
299 
300 		// If onlyWriteNewRecords is enabled and the given rowkey has cell data,
301 		// don't write the record.
302 		if (this.onlyWriteNewRecords) {
303 			return this.isRecordNew(curi);
304 		}
305 		// all tests pass, return true to write the content locally.
306 		return true;
307 	}
308 	
309 	/**
310 	 * Determine if the given uri exists as a rowkey in the configured hbase table.
311 	 * 
312 	 * @param curi the curi
313 	 * 
314 	 * @return true, if checks if is record new
315 	 */
316 	private boolean isRecordNew(ProcessorURI curi) {
317 		WriterPoolMember writerPoolMember;
318 		try {
319 			writerPoolMember = getPool().borrowFile();
320 		} catch (IOException e1) {
321 			LOG.error("No writer could be borrowed from the pool: " + getPool().toString() 
322 							+ " - exception is: \n" + e1.getMessage());
323 			return false;
324 		}
325 		HTable hbaseTable = ((HBaseWriter) writerPoolMember).getClient();
326 		// Here we can generate the rowkey for this uri ...
327 		String url = curi.toString();
328 		String row = Keying.createKey(url);
329 		try {
330 			// and look it up to see if it already exists...
331 			Get rowToGet = new Get(Bytes.toBytes(row));
332 			if (hbaseTable.get(rowToGet) != null && !hbaseTable.get(rowToGet).isEmpty()) {
333 				if (LOG.isDebugEnabled()) {
334 					LOG.debug("Not A NEW Record - Url: "
335 								+ url
336 								+ " has the existing rowkey: "
337 								+ row.toString()
338 								+ " and has cell data.");
339 				}
340 				return false;
341 			}
342 		} catch (IOException e) {
343 			LOG.error("Failed to determine if record: "
344 							+ row.toString()
345 							+ " is a new record due to IOExecption.  Deciding the record is already existing for now. \n"
346 							+ e.getMessage());
347 			return false;
348 		} finally {
349 			try {
350 				getPool().returnFile(writerPoolMember);
351 			} catch (IOException e) {
352 				LOG.error("Failed to add back writer to the pool after checking if a rowkey is new or existing: "
353 								+ row.toString() + "\n" + e.getMessage());
354 				return false;
355 			}
356 		}
357 		return true;
358 	}
359 
360 	/**
361 	 * Write.
362 	 * 
363 	 * @param curi the curi
364 	 * @param recordLength the record length
365 	 * @param in the in
366 	 * @param ip the ip
367 	 * 
368 	 * @return the process result
369 	 * 
370 	 * @throws IOException Signals that an I/O exception has occurred.
371 	 */
372 	protected ProcessResult write(final ProcessorURI curi, long recordLength, InputStream in, String ip) throws IOException {
373 		WriterPoolMember writerPoolMember = getPool().borrowFile();
374 		long writerPoolMemberPosition = writerPoolMember.getPosition();
375 		HBaseWriter hbaseWriter = (HBaseWriter) writerPoolMember;
376 		try {
377 			hbaseWriter.write(curi, getHostAddress(curi), curi.getRecorder().getRecordedOutput(), curi.getRecorder().getRecordedInput());
378 		} finally {
379 			setTotalBytesWritten(getTotalBytesWritten() + (writerPoolMember.getPosition() - writerPoolMemberPosition));
380 			getPool().returnFile(writerPoolMember);
381 		}
382 		return checkBytesWritten(curi);
383 	}
384 
385 	/**
386 	 * Check bytes written.
387 	 * 
388 	 * @param context the context
389 	 * 
390 	 * @return the process result
391 	 */
392 	protected ProcessResult checkBytesWritten(StateProvider context) {
393 		long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
394 		if (max <= 0) {
395 			return ProcessResult.PROCEED;
396 		}
397 		if (max <= this.totalBytesWritten) {
398 			return ProcessResult.FINISH; // FIXME: Specify reason
399 			// controller.requestCrawlStop(CrawlStatus.FINISHED_WRITE_LIMIT);
400 		}
401 		return ProcessResult.PROCEED;
402 	}
403 
404 	/**
405 	 * Setup pool.
406 	 */
407 	protected void setupPool() {
408 		setPool(new HBaseWriterPool(getZKQuorum(), getZKClientPort(), getTable(), getMaxActive(), getMaxWait()));
409 	}
410 
411 	/**
412 	 * Gets the zookeeper quorum.
413 	 * 
414 	 * @return the zkQuorum
415 	 */
416 	protected String getZKQuorum() {
417 		return this.zkQuorum;
418 	}
419 
420 	/**
421 	 * Gets the zookeeper client port.
422 	 * 
423 	 * @return the zlClientPort
424 	 */
425 	protected int getZKClientPort() {
426 		return this.zkClientPort;
427 	}
428 
429 	
430 	/**
431 	 * Gets the table.
432 	 * 
433 	 * @return the table
434 	 */
435 	protected String getTable() {
436 		return this.tableName;
437 	}
438 
439 	
440 	/**
441 	 * Gets the max active.
442 	 * 
443 	 * @return the max active
444 	 */
445 	protected int getMaxActive() {
446 		return maxActive;
447 	}
448 
449 	/**
450 	 * Gets the max wait.
451 	 * 
452 	 * @return the max wait
453 	 */
454 	protected int getMaxWait() {
455 		return maxWait;
456 	}
457 
458 	/**
459 	 * Sets the pool.
460 	 * 
461 	 * @param pool the new pool
462 	 */
463 	protected void setPool(WriterPool pool) {
464 		this.pool = pool;
465 	}
466 
467 	/**
468 	 * Gets the pool.
469 	 * 
470 	 * @return the pool
471 	 */
472 	protected WriterPool getPool() {
473 		return this.pool;
474 	}
475 
476 	/**
477 	 * Gets the total bytes written.
478 	 * 
479 	 * @return the total bytes written
480 	 */
481 	protected long getTotalBytesWritten() {
482 		return this.totalBytesWritten;
483 	}
484 
485 	/**
486 	 * Sets the total bytes written.
487 	 * 
488 	 * @param b the new total bytes written
489 	 */
490 	protected void setTotalBytesWritten(final long b) {
491 		this.totalBytesWritten = b;
492 	}
493 	
494 	/* (non-Javadoc)
495 	 * @see org.archive.modules.Processor#innerProcess(org.archive.modules.ProcessorURI)
496 	 */
497 	protected void innerProcess(ProcessorURI puri) {
498 		throw new AssertionError();
499 	}
500 
501 	/* (non-Javadoc)
502 	 * @see java.io.Closeable#close()
503 	 */
504 	public void close() {
505 		this.pool.close();
506 	}
507 
508 }