1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package com.powerset.heritrix.writer;
25
26 import java.io.Closeable;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.net.InetAddress;
30
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.util.Keying;
34 import org.apache.log4j.Logger;
35 import org.archive.io.ReplayInputStream;
36 import org.archive.io.WriterPool;
37 import org.archive.io.WriterPoolMember;
38 import org.archive.modules.ModuleAttributeConstants;
39 import org.archive.modules.ProcessResult;
40 import org.archive.modules.Processor;
41 import org.archive.modules.ProcessorURI;
42 import org.archive.modules.fetcher.FetchStatusCodes;
43 import org.archive.modules.net.CrawlHost;
44 import org.archive.modules.net.ServerCache;
45 import org.archive.modules.net.ServerCacheUtil;
46 import org.archive.state.Expert;
47 import org.archive.state.Immutable;
48 import org.archive.state.Initializable;
49 import org.archive.state.Key;
50 import org.archive.state.KeyManager;
51 import org.archive.state.StateProvider;
52 import org.archive.util.IoUtils;
53
54
55
56
57
58
59 public class HBaseWriterProcessor extends Processor implements Initializable,
60 Closeable {
61
62
63 private static final long serialVersionUID = 7166781798179114353L;
64
65
66 private final Logger LOG = Logger.getLogger(this.getClass().getName());
67
68
69 @Immutable
70 public static final Key<String> MASTER = Key.make(HConstants.DEFAULT_MASTER_ADDRESS);
71
72
73 @Immutable
74 public static final Key<String> TABLE = Key.make("crawl");
75
76
77
78
79 @Immutable
80 public static final Key<Boolean> WRITE_ONLY_NEW_RECORDS = Key.make(false);
81
82
83
84 @Immutable
85 public static final Key<Boolean> PROCESS_ONLY_NEW_RECORDS = Key.make(false);
86
87
88 @Immutable
89 final public static Key<Integer> POOL_MAX_ACTIVE = Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
90
91
92 @Immutable
93 final public static Key<Integer> POOL_MAX_WAIT = Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
94
95
96 @Immutable
97 final public static Key<ServerCache> SERVER_CACHE = Key.makeAuto(ServerCache.class);
98
99
100 @Immutable
101 final public static Key<Integer> CONTENT_MAX_SIZE = Key.make(20 * 1024 * 1024);
102
103
104
105 @Immutable
106 @Expert
107 final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
108
109
110 private transient WriterPool pool = null;
111
112
113 private ServerCache serverCache;
114
115
116 private int maxActive;
117
118
119 private int maxWait;
120
121
122 private int maxContentSize;
123
124
125 private String master;
126
127
128 private String tableName;
129
130
131 private boolean onlyWriteNewRecords;
132
133
134 private boolean onlyProcessNewRecords;
135
136
137
138
139
140 private long totalBytesWritten = 0;
141
142 static {
143 KeyManager.addKeys(HBaseWriterProcessor.class);
144 }
145
146
147
148
149 public HBaseWriterProcessor() {
150 super();
151 }
152
153
154
155
156 public synchronized void initialTasks(StateProvider context) {
157 this.serverCache = context.get(this, SERVER_CACHE);
158 this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
159 this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
160 this.master = context.get(this, MASTER);
161 this.tableName = context.get(this, TABLE);
162 this.onlyWriteNewRecords = context.get(this, WRITE_ONLY_NEW_RECORDS).booleanValue();
163 this.onlyProcessNewRecords = context.get(this, PROCESS_ONLY_NEW_RECORDS).booleanValue();
164 this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
165 setupPool();
166 }
167
168
169
170
171
172
173 protected String getMaster() {
174 return this.master;
175 }
176
177
178
179
180
181
182 protected String getTable() {
183 return this.tableName;
184 }
185
186
187
188
189 protected void setupPool() {
190 setPool(new HBaseWriterPool(getMaster(), getTable(), getMaxActive(), getMaxWait()));
191 }
192
193
194
195
196
197
198 protected int getMaxActive() {
199 return maxActive;
200 }
201
202
203
204
205
206
207 protected int getMaxWait() {
208 return maxWait;
209 }
210
211
212
213
214
215
216 protected void setPool(WriterPool pool) {
217 this.pool = pool;
218 }
219
220
221
222
223
224
225 protected WriterPool getPool() {
226 return this.pool;
227 }
228
229
230
231
232
233
234 protected long getTotalBytesWritten() {
235 return this.totalBytesWritten;
236 }
237
238
239
240
241
242
243 protected void setTotalBytesWritten(final long b) {
244 this.totalBytesWritten = b;
245 }
246
247
248
249
250 protected ProcessResult innerProcessResult(final ProcessorURI puri) {
251 ProcessorURI curi = puri;
252 long recordLength = getRecordedSize(curi);
253 ReplayInputStream ris = null;
254 try {
255 if (shouldWrite(curi)) {
256 ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
257 return write(curi, recordLength, ris, getHostAddress(curi));
258 }
259 LOG.info("does not write " + curi.toString());
260 } catch (IOException e) {
261 curi.getNonFatalFailures().add(e);
262 LOG.error("Failed write of Record: " + curi.toString(), e);
263 } finally {
264 IoUtils.close(ris);
265 }
266 return ProcessResult.PROCEED;
267 }
268
269
270
271
272
273
274
275
276
277 protected String getHostAddress(ProcessorURI curi) {
278
279 if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
280 return (String) curi.getData().get(
281 ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
282 }
283
284 CrawlHost h = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
285 if (h == null) {
286 throw new NullPointerException("Crawlhost is null for " + curi + " " + curi.getVia());
287 }
288 InetAddress a = h.getIP();
289 if (a == null) {
290 throw new NullPointerException(
291 "Address is null for "
292 + curi
293 + " "
294 + curi.getVia()
295 + ". Address "
296 + ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ? "was never looked up."
297 : (System.currentTimeMillis() - h
298 .getIpFetched())
299 + " ms ago."));
300 }
301 return h.getIP().getHostAddress();
302 }
303
304
305
306
307
308
309
310
311
312 protected boolean shouldWrite(ProcessorURI curi) {
313 boolean retVal;
314 String scheme = curi.getUURI().getScheme().toLowerCase();
315
316 if (scheme.equals("dns")) {
317 retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
318 } else if (scheme.equals("http") || scheme.equals("https")) {
319 retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
320 } else if (scheme.equals("ftp")) {
321 retVal = curi.getFetchStatus() == 200;
322 } else {
323 curi.getAnnotations().add("unwritten:scheme");
324 return false;
325 }
326
327 if (retVal == false) {
328
329 curi.getAnnotations().add("unwritten:status");
330 return false;
331 }
332
333
334 if (curi.getContentSize() > this.maxContentSize) {
335
336 curi.getAnnotations().add("unwritten:size");
337 LOG.warn("content size for " + curi.getUURI() + " is too large ("
338 + curi.getContentSize() + ") - maximum content size is: "
339 + this.maxContentSize);
340 return false;
341 }
342
343
344
345 if (this.onlyWriteNewRecords) {
346 if (!this.isRecordNew(curi)) {
347 return false;
348 }
349 }
350
351 return true;
352 }
353
354
355
356
357
358
359
360
361
362 private boolean isRecordNew(ProcessorURI curi) {
363 WriterPoolMember writer;
364 try {
365 writer = getPool().borrowFile();
366 } catch (IOException e1) {
367 LOG.error("No writer could be gotten from the pool: " + getPool().toString()
368 + " - exception is: \n" + e1.getMessage());
369 return false;
370 }
371 HTable ht = ((HBaseWriter) writer).getClient();
372
373 String url = curi.toString();
374 String row = Keying.createKey(url);
375 try {
376
377 if (ht.getRow(row) != null && !ht.getRow(row).isEmpty()) {
378 if (LOG.isDebugEnabled()) {
379 LOG.debug("Not Writing "
380 + url
381 + " since rowkey: "
382 + row.toString()
383 + " already exists and onlyWriteNewRecords is enabled.");
384 }
385 return false;
386 }
387 } catch (IOException e) {
388 LOG.error("Failed to determine if record: "
389 + row.toString()
390 + " should be written or not, deciding not to write the record: \n"
391 + e.getMessage());
392 return false;
393 } finally {
394 try {
395 getPool().returnFile(writer);
396 } catch (IOException e) {
397 LOG.error("Failed to add back writer to the pool after checking for existing rowkey: "
398 + row.toString() + "\n" + e.getMessage());
399 return false;
400 }
401 }
402 return true;
403 }
404
405
406
407
408
409
410
411
412
413
414
415
416
417 protected ProcessResult write(final ProcessorURI curi, long recordLength, InputStream in, String ip) throws IOException {
418 WriterPoolMember writer = getPool().borrowFile();
419 long position = writer.getPosition();
420 HBaseWriter w = (HBaseWriter) writer;
421 try {
422 w.write(curi, getHostAddress(curi), curi.getRecorder().getRecordedOutput(), curi.getRecorder().getRecordedInput());
423 } finally {
424 setTotalBytesWritten(getTotalBytesWritten() + (writer.getPosition() - position));
425 getPool().returnFile(writer);
426 }
427 return checkBytesWritten(curi);
428 }
429
430
431
432
433
434
435
436
437 protected ProcessResult checkBytesWritten(StateProvider context) {
438 long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
439 if (max <= 0) {
440 return ProcessResult.PROCEED;
441 }
442 if (max <= this.totalBytesWritten) {
443 return ProcessResult.FINISH;
444
445 }
446 return ProcessResult.PROCEED;
447 }
448
449
450
451
452 protected void innerProcess(ProcessorURI puri) {
453 throw new AssertionError();
454 }
455
456
457
458 static {
459 KeyManager.addKeys(HBaseWriterProcessor.class);
460 }
461
462
463
464
465 public void close() {
466 this.pool.close();
467 }
468
469
470
471
472 @Override
473 protected boolean shouldProcess(ProcessorURI uri) {
474 ProcessorURI curi = uri;
475
476 if (curi.getFetchStatus() <= 0) {
477 return false;
478 }
479
480
481 long recordLength = curi.getContentSize();
482 if (recordLength <= 0) {
483
484
485 return false;
486 }
487
488
489
490 if (this.onlyProcessNewRecords) {
491 if (!this.isRecordNew(curi)) {
492 return false;
493 }
494 }
495
496
497
498 return true;
499 }
500 }