View Javadoc

1   /**
2    * HBaseWriterProcessor
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * Copyright (C) 2007 stack
9    *
10   * This file is part of the Heritrix web crawler (crawler.archive.org).
11   *
12   * Heritrix is free software; you can redistribute it and/or modify
13   * it under the terms of the GNU Lesser Public License as published by
14   * the Free Software Foundation; either version 2.1 of the License, or
15   * any later version.
16   *
17   * Heritrix is distributed in the hope that it will be useful,
18   * but WITHOUT ANY WARRANTY; without even the implied warranty of
19   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20   * GNU Lesser Public License for more details.
21   *
22   * You should have received a copy of the GNU Lesser Public License
23   * along with Heritrix; if not, write to the Free Software
24   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25   */
26  package com.powerset.heritrix.writer;
27  
28  import java.io.Closeable;
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.net.InetAddress;
32  import java.util.logging.Level;
33  import java.util.logging.Logger;
34  
35  import org.apache.hadoop.hbase.HConstants;
36  import org.archive.io.ReplayInputStream;
37  import org.archive.io.WriterPool;
38  import org.archive.io.WriterPoolMember;
39  import org.archive.modules.ModuleAttributeConstants;
40  import org.archive.modules.ProcessResult;
41  import org.archive.modules.Processor;
42  import org.archive.modules.ProcessorURI;
43  import org.archive.modules.fetcher.FetchStatusCodes;
44  import org.archive.modules.net.CrawlHost;
45  import org.archive.modules.net.ServerCache;
46  import org.archive.modules.net.ServerCacheUtil;
47  import org.archive.state.Expert;
48  import org.archive.state.Immutable;
49  import org.archive.state.Initializable;
50  import org.archive.state.Key;
51  import org.archive.state.KeyManager;
52  import org.archive.state.StateProvider;
53  import org.archive.util.IoUtils;
54  
55  /**
56   * An <a href="http://crawler.archive.org">heritrix2</a> processor that writes to <a href="http://hbase.org">Hadoop HBase</a>.
57   * @author stack
58   */
59  public class HBaseWriterProcessor extends Processor implements
60  Initializable, Closeable {
61    private static final long serialVersionUID = 7166781798179114353L;
62  
63    private final Logger logger = Logger.getLogger(this.getClass().getName());
64  
65    /**
66     * Location of hbase master.
67     */
68    @Immutable
69    public static final Key<String> MASTER =
70      Key.make(HConstants.DEFAULT_MASTER_ADDRESS);
71  
72    /**
73     * HBase table to crawl into.
74     */
75    @Immutable
76    public static final Key<String> TABLE = Key.make("crawl");
77  
78    /**
79     * Maximum active files in pool. This setting cannot be varied over the life
80     * of a crawl.
81     */
82    @Immutable
83    final public static Key<Integer> POOL_MAX_ACTIVE =
84      Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
85  
86    /**
87     * Maximum time to wait on pool element (milliseconds). This setting cannot
88     * be varied over the life of a crawl.
89     */
90    @Immutable
91    final public static Key<Integer> POOL_MAX_WAIT =
92      Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
93  
94    @Immutable
95    final public static Key<ServerCache> SERVER_CACHE = 
96      Key.makeAuto(ServerCache.class);
97  
98    /**
99     * Maximum allowable content size.
100    */
101   @Immutable
102   final public static Key<Integer> CONTENT_MAX_SIZE =
103     Key.make(20 * 1024 * 1024);
104 
105   /**
106    * Total file bytes to write to disk. Once the size of all files on disk has
107    * exceeded this limit, this processor will stop the crawler. A value of
108    * zero means no upper limit.
109    */
110   @Immutable @Expert
111   final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
112 
113 
114   /**
115    * Reference to pool.
116    */
117   private transient WriterPool pool = null;
118   private ServerCache serverCache;
119   private int maxActive;
120   private int maxWait;
121   private int maxContentSize;
122   private String master;
123   private String table;
124 
125   /*
126    * Total number of bytes written to disc.
127    */
128   private long totalBytesWritten = 0;
129 
130 
131   static {
132     KeyManager.addKeys(HBaseWriterProcessor.class);
133   }
134 
135   public HBaseWriterProcessor() {
136     super();
137   }
138 
139   public synchronized void initialTasks(StateProvider context) {
140     this.serverCache = context.get(this, SERVER_CACHE);
141     this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
142     this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
143     this.master = context.get(this, MASTER);
144     this.table = context.get(this, TABLE);
145     this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
146     setupPool();
147   }
148 
149   protected String getMaster() {
150     return this.master;
151   }
152 
153   protected String getTable() {
154     return this.table;
155   }
156 
157   protected void setupPool() {
158     setPool(new HBaseWriterPool(getMaster(), getTable(), getMaxActive(), getMaxWait()));
159   }
160 
161   protected int getMaxActive() {
162     return maxActive;
163   }
164 
165   protected int getMaxWait() {
166     return maxWait;
167   }
168 
169   protected void setPool(WriterPool pool) {
170     this.pool = pool;
171   }
172 
173   protected WriterPool getPool() {
174     return this.pool;
175   }
176 
177   protected long getTotalBytesWritten() {
178     return this.totalBytesWritten;
179   }
180 
181   protected void setTotalBytesWritten(final long b) {
182     this.totalBytesWritten = b;
183   }
184 
185   protected ProcessResult innerProcessResult(final ProcessorURI puri) {
186     ProcessorURI curi = puri;
187     long recordLength = getRecordedSize(curi);
188     ReplayInputStream ris = null;
189     try {
190       if (shouldWrite(curi)) {
191         ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
192         return write(curi, recordLength, ris, getHostAddress(curi));
193       }
194       logger.info("does not write " + curi.toString());
195     } catch (IOException e) {
196       curi.getNonFatalFailures().add(e);
197       logger.log(Level.SEVERE, "Failed write of Record: " +
198           curi.toString(), e);
199     } finally {
200       IoUtils.close(ris);
201     }
202     return ProcessResult.PROCEED;
203   }
204 
205   /**
206    * Return IP address of given URI suitable for recording (as in a
207    * classic ARC 5-field header line).
208    * 
209    * @param curi ProcessorURI
210    * @return String of IP address
211    */
212   protected String getHostAddress(ProcessorURI curi) {
213     // special handling for DNS URIs: want address of DNS server
214     if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
215       return (String)curi.getData().get(ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
216     }
217     // otherwise, host referenced in URI
218     CrawlHost h = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
219     if (h == null) {
220       throw new NullPointerException("Crawlhost is null for " +
221           curi + " " + curi.getVia());
222     }
223     InetAddress a = h.getIP();
224     if (a == null) {
225       throw new NullPointerException("Address is null for " +
226           curi + " " + curi.getVia() + ". Address " +
227           ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP)?
228               "was never looked up.":
229                 (System.currentTimeMillis() - h.getIpFetched()) +
230                 " ms ago."));
231     }
232     return h.getIP().getHostAddress();
233   }
234 
235   /**
236    * Whether the given ProcessorURI should be written to archive files.
237    * Annotates ProcessorURI with a reason for any negative answer.
238    * 
239    * @param curi ProcessorURI
240    * @return true if URI should be written; false otherwise
241    */
242   protected boolean shouldWrite(ProcessorURI curi) {
243     boolean retVal;
244     String scheme = curi.getUURI().getScheme().toLowerCase();
245     if (scheme.equals("dns")) {
246       retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
247     } else if (scheme.equals("http") || scheme.equals("https")) {
248       retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
249     } else if (scheme.equals("ftp")) {
250       retVal = curi.getFetchStatus() == 200;
251     } else {
252       curi.getAnnotations().add("unwritten:scheme");
253       return false;
254     }
255 
256     if (retVal == false) {
257       // status not deserving writing
258       curi.getAnnotations().add("unwritten:status");
259       return false;
260     }
261     
262     if (curi.getContentSize() > this.maxContentSize) {
263        // content size is too large
264        curi.getAnnotations().add("unwritten:size");
265        logger.warning("content size for " + curi.getUURI() + " is too large (" +
266          curi.getContentSize() + ") - maximum content size is: " + this.maxContentSize );
267        return false;
268      }
269     
270      return true;
271   }
272 
273   protected ProcessResult write(final ProcessorURI curi,
274       @SuppressWarnings("unused") long recordLength, 
275       @SuppressWarnings("unused") InputStream in,
276       @SuppressWarnings("unused") String ip)
277   throws IOException {
278     WriterPoolMember writer = getPool().borrowFile();
279     long position = writer.getPosition();
280     HBaseWriter w = (HBaseWriter)writer;
281     try {
282       w.write(curi, getHostAddress(curi),
283           curi.getRecorder().getRecordedOutput(),
284           curi.getRecorder().getRecordedInput());
285     } finally {
286       setTotalBytesWritten(getTotalBytesWritten() +
287           (writer.getPosition() - position));
288       getPool().returnFile(writer);
289     }
290     return checkBytesWritten(curi);
291   }
292 
293   protected ProcessResult checkBytesWritten(StateProvider context) {
294     long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
295     if (max <= 0) {
296       return ProcessResult.PROCEED;
297     }
298     if (max <= this.totalBytesWritten) {
299       return ProcessResult.FINISH; // FIXME: Specify reason
300       //          controller.requestCrawlStop(CrawlStatus.FINISHED_WRITE_LIMIT);
301     }   
302     return ProcessResult.PROCEED;
303   }
304 
305 
306   protected void innerProcess(@SuppressWarnings("unused") ProcessorURI puri) {
307     throw new AssertionError();
308   }  
309 
310   // good to keep at end of source: must run after all per-Key 
311   // initialization values are set.
312   static {
313     KeyManager.addKeys(HBaseWriterProcessor.class);
314   }
315 
316   public void close() {
317     this.pool.close();
318   }
319 
320   @Override
321   protected boolean shouldProcess(ProcessorURI uri) {
322     ProcessorURI curi = uri;
323     // If failure, or we haven't fetched the resource yet, return
324     if (curi.getFetchStatus() <= 0) {
325       return false;
326     }   
327 
328     // If no recorded content at all, don't write record.
329     long recordLength = curi.getContentSize();
330     if (recordLength <= 0) {
331       // getContentSize() should be > 0 if any material (even just
332       // HTTP headers with zero-length body is available.
333       return false;
334     }
335 
336     return true;
337   }
338 }