1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package com.powerset.heritrix.writer;
25
26 import java.io.Closeable;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.net.InetAddress;
30
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.util.Keying;
34 import org.apache.log4j.Logger;
35 import org.archive.io.ReplayInputStream;
36 import org.archive.io.WriterPool;
37 import org.archive.io.WriterPoolMember;
38 import org.archive.modules.ModuleAttributeConstants;
39 import org.archive.modules.ProcessResult;
40 import org.archive.modules.Processor;
41 import org.archive.modules.ProcessorURI;
42 import org.archive.modules.fetcher.FetchStatusCodes;
43 import org.archive.modules.net.CrawlHost;
44 import org.archive.modules.net.ServerCache;
45 import org.archive.modules.net.ServerCacheUtil;
46 import org.archive.state.Expert;
47 import org.archive.state.Immutable;
48 import org.archive.state.Initializable;
49 import org.archive.state.Key;
50 import org.archive.state.KeyManager;
51 import org.archive.state.StateProvider;
52 import org.archive.util.IoUtils;
53
54
55
56
57
58 public class HBaseWriterProcessor extends Processor implements Initializable,
59 Closeable {
60 private static final long serialVersionUID = 7166781798179114353L;
61
62 private final Logger LOG = Logger.getLogger(this.getClass().getName());
63
64
65
66
67 @Immutable
68 public static final Key<String> MASTER = Key.make(HConstants.DEFAULT_MASTER_ADDRESS);
69
70
71
72
73 @Immutable
74 public static final Key<String> TABLE = Key.make("crawl");
75
76
77
78
79
80
81
82
83
84
85
86 @Immutable
87 public static final Key<Boolean> ONLY_NEW_RECORDS = Key.make(false);
88
89
90
91
92
93 @Immutable
94 final public static Key<Integer> POOL_MAX_ACTIVE = Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
95
96
97
98
99
100 @Immutable
101 final public static Key<Integer> POOL_MAX_WAIT = Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
102
103 @Immutable
104 final public static Key<ServerCache> SERVER_CACHE = Key.makeAuto(ServerCache.class);
105
106
107
108
109 @Immutable
110 final public static Key<Integer> CONTENT_MAX_SIZE = Key.make(20 * 1024 * 1024);
111
112
113
114
115
116
117 @Immutable
118 @Expert
119 final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
120
121
122
123
124 private transient WriterPool pool = null;
125 private ServerCache serverCache;
126 private int maxActive;
127 private int maxWait;
128 private int maxContentSize;
129 private String master;
130 private String tableName;
131 private boolean onlyWriteNewRecords;
132
133
134
135
136 private long totalBytesWritten = 0;
137
138 static {
139 KeyManager.addKeys(HBaseWriterProcessor.class);
140 }
141
142 public HBaseWriterProcessor() {
143 super();
144 }
145
146 public synchronized void initialTasks(StateProvider context) {
147 this.serverCache = context.get(this, SERVER_CACHE);
148 this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
149 this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
150 this.master = context.get(this, MASTER);
151 this.tableName = context.get(this, TABLE);
152 this.onlyWriteNewRecords = context.get(this, ONLY_NEW_RECORDS).booleanValue();
153 this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
154 setupPool();
155 }
156
157 protected String getMaster() {
158 return this.master;
159 }
160
161 protected String getTable() {
162 return this.tableName;
163 }
164
165 protected void setupPool() {
166 setPool(new HBaseWriterPool(getMaster(), getTable(), getMaxActive(), getMaxWait()));
167 }
168
169 protected int getMaxActive() {
170 return maxActive;
171 }
172
173 protected int getMaxWait() {
174 return maxWait;
175 }
176
177 protected void setPool(WriterPool pool) {
178 this.pool = pool;
179 }
180
181 protected WriterPool getPool() {
182 return this.pool;
183 }
184
185 protected long getTotalBytesWritten() {
186 return this.totalBytesWritten;
187 }
188
189 protected void setTotalBytesWritten(final long b) {
190 this.totalBytesWritten = b;
191 }
192
193 protected ProcessResult innerProcessResult(final ProcessorURI puri) {
194 ProcessorURI curi = puri;
195 long recordLength = getRecordedSize(curi);
196 ReplayInputStream ris = null;
197 try {
198 if (shouldWrite(curi)) {
199 ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
200 return write(curi, recordLength, ris, getHostAddress(curi));
201 }
202 LOG.info("does not write " + curi.toString());
203 } catch (IOException e) {
204 curi.getNonFatalFailures().add(e);
205 LOG.error("Failed write of Record: " + curi.toString(), e);
206 } finally {
207 IoUtils.close(ris);
208 }
209 return ProcessResult.PROCEED;
210 }
211
212
213
214
215
216
217
218
219
220 protected String getHostAddress(ProcessorURI curi) {
221
222 if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
223 return (String) curi.getData().get(
224 ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
225 }
226
227 CrawlHost h = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
228 if (h == null) {
229 throw new NullPointerException("Crawlhost is null for " + curi + " " + curi.getVia());
230 }
231 InetAddress a = h.getIP();
232 if (a == null) {
233 throw new NullPointerException(
234 "Address is null for "
235 + curi
236 + " "
237 + curi.getVia()
238 + ". Address "
239 + ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ? "was never looked up."
240 : (System.currentTimeMillis() - h
241 .getIpFetched())
242 + " ms ago."));
243 }
244 return h.getIP().getHostAddress();
245 }
246
247
248
249
250
251
252
253
254
255 protected boolean shouldWrite(ProcessorURI curi) {
256 boolean retVal;
257 String scheme = curi.getUURI().getScheme().toLowerCase();
258
259 if (scheme.equals("dns")) {
260 retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
261 } else if (scheme.equals("http") || scheme.equals("https")) {
262 retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
263 } else if (scheme.equals("ftp")) {
264 retVal = curi.getFetchStatus() == 200;
265 } else {
266 curi.getAnnotations().add("unwritten:scheme");
267 return false;
268 }
269
270 if (retVal == false) {
271
272 curi.getAnnotations().add("unwritten:status");
273 return false;
274 }
275
276
277 if (curi.getContentSize() > this.maxContentSize) {
278
279 curi.getAnnotations().add("unwritten:size");
280 LOG.warn("content size for " + curi.getUURI() + " is too large ("
281 + curi.getContentSize() + ") - maximum content size is: "
282 + this.maxContentSize);
283 return false;
284 }
285
286
287
288 if (this.onlyWriteNewRecords) {
289 WriterPoolMember writer;
290 try {
291 writer = getPool().borrowFile();
292 } catch (IOException e1) {
293 LOG.error("No writer could be gotten from the pool: " + getPool().toString()
294 + " - exception is: \n" + e1.getMessage());
295 return false;
296 }
297 HTable ht = ((HBaseWriter) writer).getClient();
298
299 String url = curi.toString();
300 String row = Keying.createKey(url);
301 try {
302
303 if (ht.getRow(row) != null && !ht.getRow(row).isEmpty()) {
304 if (LOG.isTraceEnabled()) {
305 LOG.trace("Not Writing "
306 + url
307 + " since rowkey: "
308 + row.toString()
309 + " already exists and onlyWriteNewRecords is enabled.");
310 }
311 return false;
312 }
313 } catch (IOException e) {
314 LOG.error("Failed to determine if record: "
315 + row.toString()
316 + " should be written or not, deciding not to write the record: \n"
317 + e.getMessage());
318 return false;
319 } finally {
320 try {
321 getPool().returnFile(writer);
322 } catch (IOException e) {
323 LOG.error("Failed to add back writer to the pool after checking for existing rowkey: "
324 + row.toString() + "\n" + e.getMessage());
325 return false;
326 }
327 }
328 }
329
330 return true;
331 }
332
333 protected ProcessResult write(final ProcessorURI curi, long recordLength, InputStream in, String ip) throws IOException {
334 WriterPoolMember writer = getPool().borrowFile();
335 long position = writer.getPosition();
336 HBaseWriter w = (HBaseWriter) writer;
337 try {
338 w.write(curi, getHostAddress(curi), curi.getRecorder().getRecordedOutput(), curi.getRecorder().getRecordedInput());
339 } finally {
340 setTotalBytesWritten(getTotalBytesWritten() + (writer.getPosition() - position));
341 getPool().returnFile(writer);
342 }
343 return checkBytesWritten(curi);
344 }
345
346 protected ProcessResult checkBytesWritten(StateProvider context) {
347 long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
348 if (max <= 0) {
349 return ProcessResult.PROCEED;
350 }
351 if (max <= this.totalBytesWritten) {
352 return ProcessResult.FINISH;
353
354 }
355 return ProcessResult.PROCEED;
356 }
357
358 protected void innerProcess(ProcessorURI puri) {
359 throw new AssertionError();
360 }
361
362
363
364 static {
365 KeyManager.addKeys(HBaseWriterProcessor.class);
366 }
367
368 public void close() {
369 this.pool.close();
370 }
371
372 @Override
373 protected boolean shouldProcess(ProcessorURI uri) {
374 ProcessorURI curi = uri;
375
376 if (curi.getFetchStatus() <= 0) {
377 return false;
378 }
379
380
381 long recordLength = curi.getContentSize();
382 if (recordLength <= 0) {
383
384
385 return false;
386 }
387
388
389
390 return true;
391 }
392 }