1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package com.powerset.heritrix.writer;
25
26 import java.io.Closeable;
27 import java.io.IOException;
28 import java.io.InputStream;
29
30 import org.apache.hadoop.hbase.client.Get;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.hbase.util.Keying;
34 import org.apache.log4j.Logger;
35 import org.archive.io.ReplayInputStream;
36 import org.archive.io.WriterPool;
37 import org.archive.io.WriterPoolMember;
38 import org.archive.modules.ModuleAttributeConstants;
39 import org.archive.modules.ProcessResult;
40 import org.archive.modules.Processor;
41 import org.archive.modules.ProcessorURI;
42 import org.archive.modules.fetcher.FetchStatusCodes;
43 import org.archive.modules.net.CrawlHost;
44 import org.archive.modules.net.ServerCache;
45 import org.archive.modules.net.ServerCacheUtil;
46 import org.archive.state.Expert;
47 import org.archive.state.Immutable;
48 import org.archive.state.Initializable;
49 import org.archive.state.Key;
50 import org.archive.state.KeyManager;
51 import org.archive.state.StateProvider;
52 import org.archive.util.IoUtils;
53
54
55
56
57
58 public class HBaseWriterProcessor extends Processor implements Initializable, Closeable {
59
60
61 private static final long serialVersionUID = 7166781798179114353L;
62
63
64 private final Logger LOG = Logger.getLogger(this.getClass().getName());
65
66
67 @Immutable
68 public static final Key<String> ZKQUORUM = Key.make("localhost");
69
70
71 @Immutable
72 public static final Key<Integer> ZKCLIENTPORT = Key.make(2181);
73
74
75 @Immutable
76 public static final Key<String> TABLE = Key.make("crawl");
77
78
79
80
81
82
83
84
85 @Immutable
86 public static final Key<Boolean> WRITE_ONLY_NEW_RECORDS = Key.make(false);
87
88
89
90
91
92 @Immutable
93 public static final Key<Boolean> PROCESS_ONLY_NEW_RECORDS = Key.make(false);
94
95
96 @Immutable
97 final public static Key<Integer> POOL_MAX_ACTIVE = Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
98
99
100 @Immutable
101 final public static Key<Integer> POOL_MAX_WAIT = Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
102
103
104 @Immutable
105 final public static Key<ServerCache> SERVER_CACHE = Key.makeAuto(ServerCache.class);
106
107
108 @Immutable
109 final public static Key<Integer> CONTENT_MAX_SIZE = Key.make(20 * 1024 * 1024);
110
111
112 @Immutable
113 @Expert
114 final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
115
116
117 private transient WriterPool pool = null;
118
119
120 private ServerCache serverCache;
121
122
123 private int maxActive;
124
125
126 private int maxWait;
127
128
129 private int maxContentSize;
130
131
132 private String zkQuorum;
133
134
135 private int zkClientPort;
136
137
138 private String tableName;
139
140
141 private boolean onlyWriteNewRecords;
142
143
144 private boolean onlyProcessNewRecords;
145
146
147 private long totalBytesWritten = 0;
148
149 static {
150 KeyManager.addKeys(HBaseWriterProcessor.class);
151 }
152
153
154
155
156 public HBaseWriterProcessor() {
157 super();
158 }
159
160
161
162
163 public synchronized void initialTasks(StateProvider context) {
164 this.serverCache = context.get(this, SERVER_CACHE);
165 this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
166 this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
167 this.zkQuorum = context.get(this, ZKQUORUM);
168 this.zkClientPort = context.get(this, ZKCLIENTPORT);
169 this.tableName = context.get(this, TABLE);
170 this.onlyWriteNewRecords = context.get(this, WRITE_ONLY_NEW_RECORDS).booleanValue();
171 this.onlyProcessNewRecords = context.get(this, PROCESS_ONLY_NEW_RECORDS).booleanValue();
172 this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
173 setupPool();
174 }
175
176
177
178
179 protected ProcessResult innerProcessResult(final ProcessorURI puri) {
180 ProcessorURI curi = puri;
181 long recordLength = getRecordedSize(curi);
182 ReplayInputStream ris = null;
183 try {
184 if (shouldWrite(curi)) {
185 ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
186 return write(curi, recordLength, ris, getHostAddress(curi));
187 }
188 LOG.info("does not write " + curi.toString());
189 } catch (IOException e) {
190 curi.getNonFatalFailures().add(e);
191 LOG.error("Failed write of Record: " + curi.toString(), e);
192 } finally {
193 IoUtils.close(ris);
194 }
195 return ProcessResult.PROCEED;
196 }
197
198
199
200
201
202
203
204
205
206 protected String getHostAddress(ProcessorURI curi) {
207
208 if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
209 return (String) curi.getData().get(
210 ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
211 }
212
213 CrawlHost crawlHost = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
214 if (crawlHost == null) {
215 throw new NullPointerException("Crawlhost is null for " + curi + " " + curi.getVia());
216 }
217
218 if (crawlHost.getIP() == null) {
219 throw new NullPointerException(
220 "Address is null for "
221 + curi
222 + " "
223 + curi.getVia()
224 + ". Address "
225 + ((crawlHost.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ? "was never looked up."
226 : (System.currentTimeMillis() - crawlHost.getIpFetched())
227 + " ms ago."));
228 }
229 return crawlHost.getIP().getHostAddress();
230 }
231
232
233
234
235 @Override
236 protected boolean shouldProcess(ProcessorURI uri) {
237 ProcessorURI curi = uri;
238
239 if (curi.getFetchStatus() <= 0) {
240 return false;
241 }
242
243
244 long recordLength = curi.getContentSize();
245 if (recordLength <= 0) {
246
247
248 return false;
249 }
250
251
252
253 if (this.onlyProcessNewRecords) {
254 return this.isRecordNew(curi);
255 }
256
257
258 return true;
259 }
260
261
262
263
264
265
266
267
268
269 protected boolean shouldWrite(ProcessorURI curi) {
270 boolean retVal;
271 String scheme = curi.getUURI().getScheme().toLowerCase();
272
273 if (scheme.equals("dns")) {
274 retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
275 } else if (scheme.equals("http") || scheme.equals("https")) {
276 retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
277 } else if (scheme.equals("ftp")) {
278 retVal = curi.getFetchStatus() == 200;
279 } else {
280 curi.getAnnotations().add("unwritten:scheme");
281 return false;
282 }
283
284 if (retVal == false) {
285
286 curi.getAnnotations().add("unwritten:status");
287 return false;
288 }
289
290
291 if (curi.getContentSize() > this.maxContentSize) {
292
293 curi.getAnnotations().add("unwritten:size");
294 LOG.warn("content size for " + curi.getUURI() + " is too large ("
295 + curi.getContentSize() + ") - maximum content size is: "
296 + this.maxContentSize);
297 return false;
298 }
299
300
301
302 if (this.onlyWriteNewRecords) {
303 return this.isRecordNew(curi);
304 }
305
306 return true;
307 }
308
309
310
311
312
313
314
315
316 private boolean isRecordNew(ProcessorURI curi) {
317 WriterPoolMember writerPoolMember;
318 try {
319 writerPoolMember = getPool().borrowFile();
320 } catch (IOException e1) {
321 LOG.error("No writer could be borrowed from the pool: " + getPool().toString()
322 + " - exception is: \n" + e1.getMessage());
323 return false;
324 }
325 HTable hbaseTable = ((HBaseWriter) writerPoolMember).getClient();
326
327 String url = curi.toString();
328 String row = Keying.createKey(url);
329 try {
330
331 Get rowToGet = new Get(Bytes.toBytes(row));
332 if (hbaseTable.get(rowToGet) != null && !hbaseTable.get(rowToGet).isEmpty()) {
333 if (LOG.isDebugEnabled()) {
334 LOG.debug("Not A NEW Record - Url: "
335 + url
336 + " has the existing rowkey: "
337 + row.toString()
338 + " and has cell data.");
339 }
340 return false;
341 }
342 } catch (IOException e) {
343 LOG.error("Failed to determine if record: "
344 + row.toString()
345 + " is a new record due to IOExecption. Deciding the record is already existing for now. \n"
346 + e.getMessage());
347 return false;
348 } finally {
349 try {
350 getPool().returnFile(writerPoolMember);
351 } catch (IOException e) {
352 LOG.error("Failed to add back writer to the pool after checking if a rowkey is new or existing: "
353 + row.toString() + "\n" + e.getMessage());
354 return false;
355 }
356 }
357 return true;
358 }
359
360
361
362
363
364
365
366
367
368
369
370
371
372 protected ProcessResult write(final ProcessorURI curi, long recordLength, InputStream in, String ip) throws IOException {
373 WriterPoolMember writerPoolMember = getPool().borrowFile();
374 long writerPoolMemberPosition = writerPoolMember.getPosition();
375 HBaseWriter hbaseWriter = (HBaseWriter) writerPoolMember;
376 try {
377 hbaseWriter.write(curi, getHostAddress(curi), curi.getRecorder().getRecordedOutput(), curi.getRecorder().getRecordedInput());
378 } finally {
379 setTotalBytesWritten(getTotalBytesWritten() + (writerPoolMember.getPosition() - writerPoolMemberPosition));
380 getPool().returnFile(writerPoolMember);
381 }
382 return checkBytesWritten(curi);
383 }
384
385
386
387
388
389
390
391
392 protected ProcessResult checkBytesWritten(StateProvider context) {
393 long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
394 if (max <= 0) {
395 return ProcessResult.PROCEED;
396 }
397 if (max <= this.totalBytesWritten) {
398 return ProcessResult.FINISH;
399
400 }
401 return ProcessResult.PROCEED;
402 }
403
404
405
406
407 protected void setupPool() {
408 setPool(new HBaseWriterPool(getZKQuorum(), getZKClientPort(), getTable(), getMaxActive(), getMaxWait()));
409 }
410
411
412
413
414
415
416 protected String getZKQuorum() {
417 return this.zkQuorum;
418 }
419
420
421
422
423
424
425 protected int getZKClientPort() {
426 return this.zkClientPort;
427 }
428
429
430
431
432
433
434
435 protected String getTable() {
436 return this.tableName;
437 }
438
439
440
441
442
443
444
445 protected int getMaxActive() {
446 return maxActive;
447 }
448
449
450
451
452
453
454 protected int getMaxWait() {
455 return maxWait;
456 }
457
458
459
460
461
462
463 protected void setPool(WriterPool pool) {
464 this.pool = pool;
465 }
466
467
468
469
470
471
472 protected WriterPool getPool() {
473 return this.pool;
474 }
475
476
477
478
479
480
481 protected long getTotalBytesWritten() {
482 return this.totalBytesWritten;
483 }
484
485
486
487
488
489
490 protected void setTotalBytesWritten(final long b) {
491 this.totalBytesWritten = b;
492 }
493
494
495
496
497 protected void innerProcess(ProcessorURI puri) {
498 throw new AssertionError();
499 }
500
501
502
503
504 public void close() {
505 this.pool.close();
506 }
507
508 }