1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package com.powerset.heritrix.writer;
25
26 import java.io.Closeable;
27 import java.io.IOException;
28 import java.io.InputStream;
29
30 import org.apache.hadoop.hbase.client.Get;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.hbase.util.Keying;
34 import org.apache.log4j.Logger;
35 import org.archive.io.ReplayInputStream;
36 import org.archive.io.WriterPool;
37 import org.archive.io.WriterPoolMember;
38 import org.archive.modules.ModuleAttributeConstants;
39 import org.archive.modules.ProcessResult;
40 import org.archive.modules.Processor;
41 import org.archive.modules.ProcessorURI;
42 import org.archive.modules.fetcher.FetchStatusCodes;
43 import org.archive.modules.net.CrawlHost;
44 import org.archive.modules.net.ServerCache;
45 import org.archive.modules.net.ServerCacheUtil;
46 import org.archive.state.Expert;
47 import org.archive.state.Immutable;
48 import org.archive.state.Initializable;
49 import org.archive.state.Key;
50 import org.archive.state.KeyManager;
51 import org.archive.state.StateProvider;
52 import org.archive.util.IoUtils;
53
54
55
56
57
58 public class HBaseWriterProcessor extends Processor implements Initializable, Closeable {
59
60
61 private static final long serialVersionUID = 7166781798179114353L;
62
63
64 private static final Logger LOG = Logger.getLogger("HBaseWriterProcessor");
65
66
67 @Immutable
68 public static final Key<String> ZKQUORUM = Key.make("localhost");
69
70
71 @Immutable
72 public static final Key<Integer> ZKCLIENTPORT = Key.make(2181);
73
74
75 @Immutable
76 public static final Key<String> TABLE = Key.make("crawl");
77
78
79
80
81
82
83
84
85
86
87
88
89 @Immutable
90 public static final Key<Boolean> WRITE_ONLY_NEW_RECORDS = Key.make(false);
91
92
93
94
95
96
97 @Immutable
98 public static final Key<Boolean> PROCESS_ONLY_NEW_RECORDS = Key.make(false);
99
100
101 @Immutable
102 final public static Key<Integer> POOL_MAX_ACTIVE = Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
103
104
105 @Immutable
106 final public static Key<Integer> POOL_MAX_WAIT = Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
107
108
109 @Immutable
110 final public static Key<ServerCache> SERVER_CACHE = Key.makeAuto(ServerCache.class);
111
112
113 @Immutable
114 final public static Key<Integer> CONTENT_MAX_SIZE = Key.make(20 * 1024 * 1024);
115
116
117 @Immutable
118 @Expert
119 final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
120
121
122 private transient WriterPool pool = null;
123
124
125 private transient ServerCache serverCache;
126
127
128 private int maxActive;
129
130
131 private int maxWait;
132
133
134 private int maxContentSize;
135
136
137 private String zkQuorum;
138
139
140 private int zkClientPort;
141
142
143 private String tableName;
144
145
146 private boolean onlyWriteNewRecords;
147
148
149 private boolean onlyProcessNewRecords;
150
151
152 private long totalBytesWritten = 0;
153
154 static {
155 KeyManager.addKeys(HBaseWriterProcessor.class);
156 }
157
158
159
160
161 public HBaseWriterProcessor() {
162 super();
163 }
164
165
166
167
168 public synchronized void initialTasks(StateProvider context) {
169 this.serverCache = context.get(this, SERVER_CACHE);
170 this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
171 this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
172 this.zkQuorum = context.get(this, ZKQUORUM);
173 this.zkClientPort = context.get(this, ZKCLIENTPORT);
174 this.tableName = context.get(this, TABLE);
175 this.onlyWriteNewRecords = context.get(this, WRITE_ONLY_NEW_RECORDS).booleanValue();
176 this.onlyProcessNewRecords = context.get(this, PROCESS_ONLY_NEW_RECORDS).booleanValue();
177 this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
178 setupPool();
179 }
180
181
182
183
184 protected ProcessResult innerProcessResult(final ProcessorURI puri) {
185 ProcessorURI curi = puri;
186 long recordLength = getRecordedSize(curi);
187 ReplayInputStream ris = null;
188 try {
189 if (shouldWrite(curi)) {
190 ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
191 return write(curi, recordLength, ris, getHostAddress(curi));
192 }
193 LOG.info("does not write " + curi.toString());
194 } catch (IOException e) {
195 curi.getNonFatalFailures().add(e);
196 LOG.error("Failed write of Record: " + curi.toString(), e);
197 } finally {
198 IoUtils.close(ris);
199 }
200 return ProcessResult.PROCEED;
201 }
202
203
204
205
206
207
208
209
210
211 protected String getHostAddress(ProcessorURI curi) {
212
213 if (curi.getUURI().getScheme().equalsIgnoreCase("dns")) {
214 return (String) curi.getData().get(
215 ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
216 }
217
218 CrawlHost crawlHost = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
219 if (crawlHost == null) {
220 throw new NullPointerException("Crawlhost is null for " + curi + " " + curi.getVia());
221 }
222
223 if (crawlHost.getIP() == null) {
224 throw new NullPointerException(
225 "Address is null for "
226 + curi
227 + " "
228 + curi.getVia()
229 + ". Address "
230 + ((crawlHost.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP) ? "was never looked up."
231 : (System.currentTimeMillis() - crawlHost.getIpFetched())
232 + " ms ago."));
233 }
234 return crawlHost.getIP().getHostAddress();
235 }
236
237
238
239
240 @Override
241 protected boolean shouldProcess(ProcessorURI uri) {
242 ProcessorURI curi = uri;
243
244 if (curi.getFetchStatus() <= 0) {
245 return false;
246 }
247
248
249 long recordLength = curi.getContentSize();
250 if (recordLength <= 0) {
251
252
253 return false;
254 }
255
256
257
258 if (this.onlyProcessNewRecords) {
259 return this.isRecordNew(curi);
260 }
261
262
263 return true;
264 }
265
266
267
268
269
270
271
272
273
274 protected boolean shouldWrite(ProcessorURI curi) {
275 boolean retVal;
276 String scheme = curi.getUURI().getScheme();
277
278 if (scheme.equalsIgnoreCase("dns")) {
279 retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
280 } else if (scheme.equalsIgnoreCase("http") || scheme.equalsIgnoreCase("https")) {
281 retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
282 } else if (scheme.equalsIgnoreCase("ftp")) {
283 retVal = curi.getFetchStatus() == 200;
284 } else {
285 curi.getAnnotations().add("unwritten:scheme");
286 return false;
287 }
288
289 if (retVal == false) {
290
291 curi.getAnnotations().add("unwritten:status");
292 return false;
293 }
294
295
296 if (curi.getContentSize() > this.maxContentSize) {
297
298 curi.getAnnotations().add("unwritten:size");
299 LOG.warn("content size for " + curi.getUURI() + " is too large ("
300 + curi.getContentSize() + ") - maximum content size is: "
301 + this.maxContentSize);
302 return false;
303 }
304
305
306
307 if (this.onlyWriteNewRecords) {
308 return this.isRecordNew(curi);
309 }
310
311 return true;
312 }
313
314
315
316
317
318
319
320
321 private boolean isRecordNew(ProcessorURI curi) {
322 WriterPoolMember writerPoolMember;
323 try {
324 writerPoolMember = getPool().borrowFile();
325 } catch (IOException e1) {
326 LOG.error("No writer could be borrowed from the pool: " + getPool().toString()
327 + " - exception is: \n" + e1.getMessage());
328 return false;
329 }
330 HTable hbaseTable = ((HBaseWriter) writerPoolMember).getClient();
331
332 String url = curi.toString();
333 String row = Keying.createKey(url);
334 try {
335
336 Get rowToGet = new Get(Bytes.toBytes(row));
337 if (hbaseTable.get(rowToGet) != null && !hbaseTable.get(rowToGet).isEmpty()) {
338 if (LOG.isDebugEnabled()) {
339 LOG.debug("Not A NEW Record - Url: "
340 + url
341 + " has the existing rowkey: "
342 + row
343 + " and has cell data.");
344 }
345 return false;
346 }
347 } catch (IOException e) {
348 LOG.error("Failed to determine if record: "
349 + row
350 + " is a new record due to IOExecption. Deciding the record is already existing for now. \n"
351 + e.getMessage());
352 return false;
353 } finally {
354 try {
355 getPool().returnFile(writerPoolMember);
356 } catch (IOException e) {
357 LOG.error("Failed to add back writer to the pool after checking if a rowkey is new or existing: "
358 + row + "\n" + e.getMessage());
359 return false;
360 }
361 }
362 return true;
363 }
364
365
366
367
368
369
370
371
372
373
374
375
376
377 protected ProcessResult write(final ProcessorURI curi, long recordLength, InputStream in, String ip) throws IOException {
378 WriterPoolMember writerPoolMember = getPool().borrowFile();
379 long writerPoolMemberPosition = writerPoolMember.getPosition();
380 HBaseWriter hbaseWriter = (HBaseWriter) writerPoolMember;
381 try {
382 hbaseWriter.write(curi, getHostAddress(curi), curi.getRecorder().getRecordedOutput(), curi.getRecorder().getRecordedInput());
383 } finally {
384 setTotalBytesWritten(getTotalBytesWritten() + (writerPoolMember.getPosition() - writerPoolMemberPosition));
385 getPool().returnFile(writerPoolMember);
386 }
387 return checkBytesWritten(curi);
388 }
389
390
391
392
393
394
395
396
397 protected ProcessResult checkBytesWritten(StateProvider context) {
398 long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
399 if (max <= 0) {
400 return ProcessResult.PROCEED;
401 }
402 if (max <= this.totalBytesWritten) {
403 return ProcessResult.FINISH;
404
405 }
406 return ProcessResult.PROCEED;
407 }
408
409
410
411
412 protected void setupPool() {
413 setPool(new HBaseWriterPool(getZKQuorum(), getZKClientPort(), getTable(), getMaxActive(), getMaxWait()));
414 }
415
416
417
418
419
420
421 protected String getZKQuorum() {
422 return this.zkQuorum;
423 }
424
425
426
427
428
429
430 protected int getZKClientPort() {
431 return this.zkClientPort;
432 }
433
434
435
436
437
438
439
440 protected String getTable() {
441 return this.tableName;
442 }
443
444
445
446
447
448
449
450 protected int getMaxActive() {
451 return maxActive;
452 }
453
454
455
456
457
458
459 protected int getMaxWait() {
460 return maxWait;
461 }
462
463
464
465
466
467
468 protected void setPool(WriterPool pool) {
469 this.pool = pool;
470 }
471
472
473
474
475
476
477 protected WriterPool getPool() {
478 return this.pool;
479 }
480
481
482
483
484
485
486 protected long getTotalBytesWritten() {
487 return this.totalBytesWritten;
488 }
489
490
491
492
493
494
495 protected void setTotalBytesWritten(final long b) {
496 this.totalBytesWritten = b;
497 }
498
499
500
501
502 protected void innerProcess(ProcessorURI puri) {
503 throw new AssertionError();
504 }
505
506
507
508
509 public void close() {
510 this.pool.close();
511 }
512
513 }