View Javadoc

1   /**
2    * HBaseWriterProcessor
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * This file is part of the Heritrix web crawler (crawler.archive.org).
9    *
10   * Heritrix is free software; you can redistribute it and/or modify
11   * it under the terms of the GNU Lesser Public License as published by
12   * the Free Software Foundation; either version 2.1 of the License, or
13   * any later version.
14   *
15   * Heritrix is distributed in the hope that it will be useful,
16   * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   * GNU Lesser Public License for more details.
19   *
20   * You should have received a copy of the GNU Lesser Public License
21   * along with Heritrix; if not, write to the Free Software
22   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23   */
24  package com.powerset.heritrix.writer;
25  
26  import java.io.Closeable;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.net.InetAddress;
30  
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.util.Keying;
34  import org.apache.log4j.Logger;
35  import org.archive.io.ReplayInputStream;
36  import org.archive.io.WriterPool;
37  import org.archive.io.WriterPoolMember;
38  import org.archive.modules.ModuleAttributeConstants;
39  import org.archive.modules.ProcessResult;
40  import org.archive.modules.Processor;
41  import org.archive.modules.ProcessorURI;
42  import org.archive.modules.fetcher.FetchStatusCodes;
43  import org.archive.modules.net.CrawlHost;
44  import org.archive.modules.net.ServerCache;
45  import org.archive.modules.net.ServerCacheUtil;
46  import org.archive.state.Expert;
47  import org.archive.state.Immutable;
48  import org.archive.state.Initializable;
49  import org.archive.state.Key;
50  import org.archive.state.KeyManager;
51  import org.archive.state.StateProvider;
52  import org.archive.util.IoUtils;
53  
54  /**
55   * An <a href="http://crawler.archive.org">heritrix2</a> processor that writes
56   * to <a href="http://hbase.org">Hadoop HBase</a>.
57   */
58  public class HBaseWriterProcessor extends Processor implements Initializable,
59  		Closeable {
60  	private static final long serialVersionUID = 7166781798179114353L;
61  
62  	private final Logger LOG = Logger.getLogger(this.getClass().getName());
63  
64  	/**
65  	 * Location of hbase master.
66  	 */
67  	@Immutable
68  	public static final Key<String> MASTER = Key.make(HConstants.DEFAULT_MASTER_ADDRESS);
69  
70  	/**
71  	 * HBase tableName to crawl into.
72  	 */
73  	@Immutable
74  	public static final Key<String> TABLE = Key.make("crawl");
75  
76  	/**
77  	 * If set to true, then only process urls that are new rowkey records.
78  	 * Default is false, to collect all urls.
79  	 * 
80  	 * Heritrix is good about not hitting the same url twice, so this feature is
81  	 * to ensure that you can run multiple sessions of the same crawl
82  	 * configuration and not fetch the same url more than once. You may just
83  	 * want to crawl a site to see what new urls have been added over time, or
84  	 * continue where you left off on a terminated crawl.
85  	 */
86  	@Immutable
87  	public static final Key<Boolean> ONLY_NEW_RECORDS = Key.make(false);
88  
89  	/**
90  	 * Maximum active files in pool. This setting cannot be varied over the life
91  	 * of a crawl.
92  	 */
93  	@Immutable
94  	final public static Key<Integer> POOL_MAX_ACTIVE = Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
95  
96  	/**
97  	 * Maximum time to wait on pool element (milliseconds). This setting cannot
98  	 * be varied over the life of a crawl.
99  	 */
100 	@Immutable
101 	final public static Key<Integer> POOL_MAX_WAIT = Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
102 
103 	@Immutable
104 	final public static Key<ServerCache> SERVER_CACHE = Key.makeAuto(ServerCache.class);
105 
106 	/**
107 	 * Maximum allowable content size.
108 	 */
109 	@Immutable
110 	final public static Key<Integer> CONTENT_MAX_SIZE = Key.make(20 * 1024 * 1024);
111 
112 	/**
113 	 * Total file bytes to write to disk. Once the size of all files on disk has
114 	 * exceeded this limit, this processor will stop the crawler. A value of
115 	 * zero means no upper limit.
116 	 */
117 	@Immutable
118 	@Expert
119 	final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
120 
121 	/**
122 	 * Reference to pool.
123 	 */
124 	private transient WriterPool pool = null;
125 	private ServerCache serverCache;
126 	private int maxActive;
127 	private int maxWait;
128 	private int maxContentSize;
129 	private String master;
130 	private String tableName;
131 	private boolean onlyWriteNewRecords;
132 
133 	/*
134 	 * Total number of bytes written to disc.
135 	 */
136 	private long totalBytesWritten = 0;
137 
138 	static {
139 		KeyManager.addKeys(HBaseWriterProcessor.class);
140 	}
141 
142 	public HBaseWriterProcessor() {
143 		super();
144 	}
145 
146 	public synchronized void initialTasks(StateProvider context) {
147 		this.serverCache = context.get(this, SERVER_CACHE);
148 		this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
149 		this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
150 		this.master = context.get(this, MASTER);
151 		this.tableName = context.get(this, TABLE);
152 		this.onlyWriteNewRecords = context.get(this, ONLY_NEW_RECORDS).booleanValue();
153 		this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
154 		setupPool();
155 	}
156 
157 	protected String getMaster() {
158 		return this.master;
159 	}
160 
161 	protected String getTable() {
162 		return this.tableName;
163 	}
164 
165 	protected void setupPool() {
166 		setPool(new HBaseWriterPool(getMaster(), getTable(), getMaxActive(), getMaxWait()));
167 	}
168 
169 	protected int getMaxActive() {
170 		return maxActive;
171 	}
172 
173 	protected int getMaxWait() {
174 		return maxWait;
175 	}
176 
177 	protected void setPool(WriterPool pool) {
178 		this.pool = pool;
179 	}
180 
181 	protected WriterPool getPool() {
182 		return this.pool;
183 	}
184 
185 	protected long getTotalBytesWritten() {
186 		return this.totalBytesWritten;
187 	}
188 
189 	protected void setTotalBytesWritten(final long b) {
190 		this.totalBytesWritten = b;
191 	}
192 
193 	protected ProcessResult innerProcessResult(final ProcessorURI puri) {
194 		ProcessorURI curi = puri;
195 		long recordLength = getRecordedSize(curi);
196 		ReplayInputStream ris = null;
197 		try {
198 			if (shouldWrite(curi)) {
199 				ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
200 				return write(curi, recordLength, ris, getHostAddress(curi));
201 			}
202 			LOG.info("does not write " + curi.toString());
203 		} catch (IOException e) {
204 			curi.getNonFatalFailures().add(e);
205 			LOG.error("Failed write of Record: " + curi.toString(), e);
206 		} finally {
207 			IoUtils.close(ris);
208 		}
209 		return ProcessResult.PROCEED;
210 	}
211 
212 	/**
213 	 * Return IP address of given URI suitable for recording (as in a classic
214 	 * ARC 5-field header line).
215 	 * 
216 	 * @param curi
217 	 *            ProcessorURI
218 	 * @return String of IP address
219 	 */
220 	protected String getHostAddress(ProcessorURI curi) {
221 		// special handling for DNS URIs: want address of DNS server
222 		if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
223 			return (String) curi.getData().get(
224 					ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
225 		}
226 		// otherwise, host referenced in URI
227 		CrawlHost h = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
228 		if (h == null) {
229 			throw new NullPointerException("Crawlhost is null for " + curi + " " + curi.getVia());
230 		}
231 		InetAddress a = h.getIP();
232 		if (a == null) {
233 			throw new NullPointerException(
234 					"Address is null for "
235 							+ curi
236 							+ " "
237 							+ curi.getVia()
238 							+ ". Address "
239 							+ ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ? "was never looked up."
240 									: (System.currentTimeMillis() - h
241 											.getIpFetched())
242 											+ " ms ago."));
243 		}
244 		return h.getIP().getHostAddress();
245 	}
246 
247 	/**
248 	 * Whether the given ProcessorURI should be written to archive files.
249 	 * Annotates ProcessorURI with a reason for any negative answer.
250 	 * 
251 	 * @param curi
252 	 *            ProcessorURI
253 	 * @return true if URI should be written; false otherwise
254 	 */
255 	protected boolean shouldWrite(ProcessorURI curi) {
256 		boolean retVal;
257 		String scheme = curi.getUURI().getScheme().toLowerCase();
258 		// determine the return value of the uri request
259 		if (scheme.equals("dns")) {
260 			retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
261 		} else if (scheme.equals("http") || scheme.equals("https")) {
262 			retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
263 		} else if (scheme.equals("ftp")) {
264 			retVal = curi.getFetchStatus() == 200;
265 		} else {
266 			curi.getAnnotations().add("unwritten:scheme");
267 			return false;
268 		}
269 
270 		if (retVal == false) {
271 			// status not deserving writing
272 			curi.getAnnotations().add("unwritten:status");
273 			return false;
274 		}
275 
276 		// If the content exceeds the maxContentSize, then dont write.
277 		if (curi.getContentSize() > this.maxContentSize) {
278 			// content size is too large
279 			curi.getAnnotations().add("unwritten:size");
280 			LOG.warn("content size for " + curi.getUURI() + " is too large ("
281 					+ curi.getContentSize() + ") - maximum content size is: "
282 					+ this.maxContentSize);
283 			return false;
284 		}
285 
286 		// If onlyWriteNewRecords is enabled and the given rowkey has cell data,
287 		// don't write the record.
288 		if (this.onlyWriteNewRecords) {
289 			WriterPoolMember writer;
290 			try {
291 				writer = getPool().borrowFile();
292 			} catch (IOException e1) {
293 				LOG.error("No writer could be gotten from the pool: " + getPool().toString() 
294 								+ " - exception is: \n" + e1.getMessage());
295 				return false;
296 			}
297 			HTable ht = ((HBaseWriter) writer).getClient();
298 			// Here we can generate the rowkey for this uri ...
299 			String url = curi.toString();
300 			String row = Keying.createKey(url);
301 			try {
302 				// and look it up to see if it already exists...
303 				if (ht.getRow(row) != null && !ht.getRow(row).isEmpty()) {
304 					if (LOG.isTraceEnabled()) {
305 						LOG.trace("Not Writing "
306 									+ url
307 									+ " since rowkey: "
308 									+ row.toString()
309 									+ " already exists and onlyWriteNewRecords is enabled.");
310 					}
311 					return false;
312 				}
313 			} catch (IOException e) {
314 				LOG.error("Failed to determine if record: "
315 								+ row.toString()
316 								+ " should be written or not, deciding not to write the record: \n"
317 								+ e.getMessage());
318 				return false;
319 			} finally {
320 				try {
321 					getPool().returnFile(writer);
322 				} catch (IOException e) {
323 					LOG.error("Failed to add back writer to the pool after checking for existing rowkey: "
324 									+ row.toString() + "\n" + e.getMessage());
325 					return false;
326 				}
327 			}
328 		}
329 		// all tests pass, return true to write the content locally.
330 		return true;
331 	}
332 
333 	protected ProcessResult write(final ProcessorURI curi, long recordLength, InputStream in, String ip) throws IOException {
334 		WriterPoolMember writer = getPool().borrowFile();
335 		long position = writer.getPosition();
336 		HBaseWriter w = (HBaseWriter) writer;
337 		try {
338 			w.write(curi, getHostAddress(curi), curi.getRecorder().getRecordedOutput(), curi.getRecorder().getRecordedInput());
339 		} finally {
340 			setTotalBytesWritten(getTotalBytesWritten() + (writer.getPosition() - position));
341 			getPool().returnFile(writer);
342 		}
343 		return checkBytesWritten(curi);
344 	}
345 
346 	protected ProcessResult checkBytesWritten(StateProvider context) {
347 		long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
348 		if (max <= 0) {
349 			return ProcessResult.PROCEED;
350 		}
351 		if (max <= this.totalBytesWritten) {
352 			return ProcessResult.FINISH; // FIXME: Specify reason
353 			// controller.requestCrawlStop(CrawlStatus.FINISHED_WRITE_LIMIT);
354 		}
355 		return ProcessResult.PROCEED;
356 	}
357 
358 	protected void innerProcess(ProcessorURI puri) {
359 		throw new AssertionError();
360 	}
361 
362 	// good to keep at end of source: must run after all per-Key
363 	// initialization values are set.
364 	static {
365 		KeyManager.addKeys(HBaseWriterProcessor.class);
366 	}
367 
368 	public void close() {
369 		this.pool.close();
370 	}
371 
372 	@Override
373 	protected boolean shouldProcess(ProcessorURI uri) {
374 		ProcessorURI curi = uri;
375 		// If failure, or we haven't fetched the resource yet, return
376 		if (curi.getFetchStatus() <= 0) {
377 			return false;
378 		}
379 
380 		// If no recorded content at all, don't write record.
381 		long recordLength = curi.getContentSize();
382 		if (recordLength <= 0) {
383 			// getContentSize() should be > 0 if any material (even just
384 			// HTTP headers with zero-length body is available.
385 			return false;
386 		}
387 
388 		// If we make it here, then we passed all our checks and we can assume
389 		// we should write the record.
390 		return true;
391 	}
392 }