View Javadoc

1   /*
2    * HBaseWriterProcessor
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * Copyright (C) 2007 stack
9    *
10   * This file is part of the Heritrix web crawler (crawler.archive.org).
11   *
12   * Heritrix is free software; you can redistribute it and/or modify
13   * it under the terms of the GNU Lesser Public License as published by
14   * the Free Software Foundation; either version 2.1 of the License, or
15   * any later version.
16   *
17   * Heritrix is distributed in the hope that it will be useful,
18   * but WITHOUT ANY WARRANTY; without even the implied warranty of
19   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20   * GNU Lesser Public License for more details.
21   *
22   * You should have received a copy of the GNU Lesser Public License
23   * along with Heritrix; if not, write to the Free Software
24   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25   */
26  package com.powerset.heritrix.writer;
27  
28  import java.io.Closeable;
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.net.InetAddress;
32  import java.util.logging.Level;
33  import java.util.logging.Logger;
34  
35  import org.apache.hadoop.hbase.HConstants;
36  import org.archive.io.ReplayInputStream;
37  import org.archive.io.WriterPool;
38  import org.archive.io.WriterPoolMember;
39  import org.archive.modules.ModuleAttributeConstants;
40  import org.archive.modules.ProcessResult;
41  import org.archive.modules.Processor;
42  import org.archive.modules.ProcessorURI;
43  import org.archive.modules.fetcher.FetchStatusCodes;
44  import org.archive.modules.net.CrawlHost;
45  import org.archive.modules.net.ServerCache;
46  import org.archive.modules.net.ServerCacheUtil;
47  import org.archive.state.Expert;
48  import org.archive.state.Immutable;
49  import org.archive.state.Initializable;
50  import org.archive.state.Key;
51  import org.archive.state.KeyManager;
52  import org.archive.state.StateProvider;
53  import org.archive.util.IoUtils;
54  
55  /**
56   * An <a href="http://crawler.archive.org">heritrix2</a> processor that writes to <a href="http://hbase.org">Hadoop HBase</a>.
57   * @author stack
58   */
59  public class HBaseWriterProcessor extends Processor implements
60  Initializable, Closeable {
61    private static final long serialVersionUID = 7166781798179114353L;
62  
63    private final Logger logger = Logger.getLogger(this.getClass().getName());
64  
65    /**
66     * Location of hbase master.
67     */
68    @Immutable
69    public static final Key<String> MASTER =
70      Key.make(HConstants.DEFAULT_MASTER_ADDRESS);
71  
72    /**
73     * HBase table to crawl into.
74     */
75    @Immutable
76    public static final Key<String> TABLE = Key.make("crawl");
77  
78    /**
79     * Maximum active files in pool. This setting cannot be varied over the life
80     * of a crawl.
81     */
82    @Immutable
83    final public static Key<Integer> POOL_MAX_ACTIVE =
84      Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
85  
86  
87    /**
88     * Maximum time to wait on pool element (milliseconds). This setting cannot
89     * be varied over the life of a crawl.
90     */
91    @Immutable
92    final public static Key<Integer> POOL_MAX_WAIT =
93      Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
94  
95    @Immutable
96    final public static Key<ServerCache> SERVER_CACHE = 
97      Key.makeAuto(ServerCache.class);
98  
99    /**
100    * Total file bytes to write to disk. Once the size of all files on disk has
101    * exceeded this limit, this processor will stop the crawler. A value of
102    * zero means no upper limit.
103    */
104   @Immutable @Expert
105   final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
106 
107 
108   /**
109    * Reference to pool.
110    */
111   private transient WriterPool pool = null;
112   private ServerCache serverCache;
113   private int maxActive;
114   private int maxWait;
115   private String master;
116   private String table;
117 
118   /*
119    * Total number of bytes written to disc.
120    */
121   private long totalBytesWritten = 0;
122 
123 
124   static {
125     KeyManager.addKeys(HBaseWriterProcessor.class);
126   }
127 
128   public HBaseWriterProcessor() {
129     super();
130   }
131 
132   public synchronized void initialTasks(StateProvider context) {
133     this.serverCache = context.get(this, SERVER_CACHE);
134     this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
135     this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
136     this.master = context.get(this, MASTER);
137     this.table = context.get(this, TABLE);
138     setupPool();
139   }
140 
141   protected String getMaster() {
142     return this.master;
143   }
144 
145   protected String getTable() {
146     return this.table;
147   }
148 
149   protected void setupPool() {
150     setPool(new HBaseWriterPool(getMaster(), getTable(), getMaxActive(), getMaxWait()));
151   }
152 
153   protected int getMaxActive() {
154     return maxActive;
155   }
156 
157   protected int getMaxWait() {
158     return maxWait;
159   }
160 
161   protected void setPool(WriterPool pool) {
162     this.pool = pool;
163   }
164 
165   protected WriterPool getPool() {
166     return this.pool;
167   }
168 
169   protected long getTotalBytesWritten() {
170     return this.totalBytesWritten;
171   }
172 
173   protected void setTotalBytesWritten(final long b) {
174     this.totalBytesWritten = b;
175   }
176 
177   protected ProcessResult innerProcessResult(final ProcessorURI puri) {
178     ProcessorURI curi = puri;
179     long recordLength = getRecordedSize(curi);
180     ReplayInputStream ris = null;
181     try {
182       if (shouldWrite(curi)) {
183         ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
184         return write(curi, recordLength, ris, getHostAddress(curi));
185       }
186       logger.info("does not write " + curi.toString());
187     } catch (IOException e) {
188       curi.getNonFatalFailures().add(e);
189       logger.log(Level.SEVERE, "Failed write of Record: " +
190           curi.toString(), e);
191     } finally {
192       IoUtils.close(ris);
193     }
194     return ProcessResult.PROCEED;
195   }
196 
197   /**
198    * Return IP address of given URI suitable for recording (as in a
199    * classic ARC 5-field header line).
200    * 
201    * @param curi ProcessorURI
202    * @return String of IP address
203    */
204   protected String getHostAddress(ProcessorURI curi) {
205     // special handling for DNS URIs: want address of DNS server
206     if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
207       return (String)curi.getData().get(ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
208     }
209     // otherwise, host referenced in URI
210     CrawlHost h = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
211     if (h == null) {
212       throw new NullPointerException("Crawlhost is null for " +
213           curi + " " + curi.getVia());
214     }
215     InetAddress a = h.getIP();
216     if (a == null) {
217       throw new NullPointerException("Address is null for " +
218           curi + " " + curi.getVia() + ". Address " +
219           ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP)?
220               "was never looked up.":
221                 (System.currentTimeMillis() - h.getIpFetched()) +
222                 " ms ago."));
223     }
224     return h.getIP().getHostAddress();
225   }
226 
227   /**
228    * Whether the given ProcessorURI should be written to archive files.
229    * Annotates ProcessorURI with a reason for any negative answer.
230    * 
231    * @param curi ProcessorURI
232    * @return true if URI should be written; false otherwise
233    */
234   protected boolean shouldWrite(ProcessorURI curi) {
235     boolean retVal;
236     String scheme = curi.getUURI().getScheme().toLowerCase();
237     if (scheme.equals("dns")) {
238       retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
239     } else if (scheme.equals("http") || scheme.equals("https")) {
240       retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
241     } else if (scheme.equals("ftp")) {
242       retVal = curi.getFetchStatus() == 200;
243     } else {
244       curi.getAnnotations().add("unwritten:scheme");
245       return false;
246     }
247 
248     if (retVal == false) {
249       // status not deserving writing
250       curi.getAnnotations().add("unwritten:status");
251       return false;
252     }
253     return true;
254   }
255 
256   protected ProcessResult write(final ProcessorURI curi,
257       @SuppressWarnings("unused") long recordLength, 
258       @SuppressWarnings("unused") InputStream in,
259       @SuppressWarnings("unused") String ip)
260   throws IOException {
261     WriterPoolMember writer = getPool().borrowFile();
262     long position = writer.getPosition();
263     HBaseWriter w = (HBaseWriter)writer;
264     try {
265       w.write(curi, getHostAddress(curi),
266           curi.getRecorder().getRecordedOutput(),
267           curi.getRecorder().getRecordedInput());
268     } catch (IOException e) {
269       writer = null;
270       throw e;
271     } finally {
272       if (writer != null) {
273         setTotalBytesWritten(getTotalBytesWritten() +
274             (writer.getPosition() - position));
275         getPool().returnFile(writer);
276       }
277     }
278     return checkBytesWritten(curi);
279   }
280 
281   protected ProcessResult checkBytesWritten(StateProvider context) {
282     long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
283     if (max <= 0) {
284       return ProcessResult.PROCEED;
285     }
286     if (max <= this.totalBytesWritten) {
287       return ProcessResult.FINISH; // FIXME: Specify reason
288       //          controller.requestCrawlStop(CrawlStatus.FINISHED_WRITE_LIMIT);
289     }   
290     return ProcessResult.PROCEED;
291   }
292 
293 
294   protected void innerProcess(@SuppressWarnings("unused") ProcessorURI puri) {
295     throw new AssertionError();
296   }  
297 
298   // good to keep at end of source: must run after all per-Key 
299   // initialization values are set.
300   static {
301     KeyManager.addKeys(HBaseWriterProcessor.class);
302   }
303 
304   public void close() {
305     this.pool.close();
306   }
307 
308   @Override
309   protected boolean shouldProcess(ProcessorURI uri) {
310     ProcessorURI curi = uri;
311     // If failure, or we haven't fetched the resource yet, return
312     if (curi.getFetchStatus() <= 0) {
313       return false;
314     }   
315 
316     // If no recorded content at all, don't write record.
317     long recordLength = curi.getContentSize();
318     if (recordLength <= 0) {
319       // getContentSize() should be > 0 if any material (even just
320       // HTTP headers with zero-length body is available.
321       return false;
322     }
323 
324     return true;
325   }
326 }