1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package com.powerset.heritrix.writer;
27
28 import java.io.Closeable;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.net.InetAddress;
32 import java.util.logging.Level;
33 import java.util.logging.Logger;
34
35 import org.apache.hadoop.hbase.HConstants;
36 import org.archive.io.ReplayInputStream;
37 import org.archive.io.WriterPool;
38 import org.archive.io.WriterPoolMember;
39 import org.archive.modules.ModuleAttributeConstants;
40 import org.archive.modules.ProcessResult;
41 import org.archive.modules.Processor;
42 import org.archive.modules.ProcessorURI;
43 import org.archive.modules.fetcher.FetchStatusCodes;
44 import org.archive.modules.net.CrawlHost;
45 import org.archive.modules.net.ServerCache;
46 import org.archive.modules.net.ServerCacheUtil;
47 import org.archive.state.Expert;
48 import org.archive.state.Immutable;
49 import org.archive.state.Initializable;
50 import org.archive.state.Key;
51 import org.archive.state.KeyManager;
52 import org.archive.state.StateProvider;
53 import org.archive.util.IoUtils;
54
55
56
57
58
59 public class HBaseWriterProcessor extends Processor implements
60 Initializable, Closeable {
61 private static final long serialVersionUID = 7166781798179114353L;
62
63 private final Logger logger = Logger.getLogger(this.getClass().getName());
64
65
66
67
68 @Immutable
69 public static final Key<String> MASTER =
70 Key.make(HConstants.DEFAULT_MASTER_ADDRESS);
71
72
73
74
75 @Immutable
76 public static final Key<String> TABLE = Key.make("crawl");
77
78
79
80
81
82 @Immutable
83 final public static Key<Integer> POOL_MAX_ACTIVE =
84 Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
85
86
87
88
89
90 @Immutable
91 final public static Key<Integer> POOL_MAX_WAIT =
92 Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
93
94 @Immutable
95 final public static Key<ServerCache> SERVER_CACHE =
96 Key.makeAuto(ServerCache.class);
97
98
99
100
101 @Immutable
102 final public static Key<Integer> CONTENT_MAX_SIZE =
103 Key.make(20 * 1024 * 1024);
104
105
106
107
108
109
110 @Immutable @Expert
111 final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
112
113
114
115
116
117 private transient WriterPool pool = null;
118 private ServerCache serverCache;
119 private int maxActive;
120 private int maxWait;
121 private int maxContentSize;
122 private String master;
123 private String table;
124
125
126
127
128 private long totalBytesWritten = 0;
129
130
131 static {
132 KeyManager.addKeys(HBaseWriterProcessor.class);
133 }
134
135 public HBaseWriterProcessor() {
136 super();
137 }
138
139 public synchronized void initialTasks(StateProvider context) {
140 this.serverCache = context.get(this, SERVER_CACHE);
141 this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
142 this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
143 this.master = context.get(this, MASTER);
144 this.table = context.get(this, TABLE);
145 this.maxContentSize = context.get(this, CONTENT_MAX_SIZE).intValue();
146 setupPool();
147 }
148
149 protected String getMaster() {
150 return this.master;
151 }
152
153 protected String getTable() {
154 return this.table;
155 }
156
157 protected void setupPool() {
158 setPool(new HBaseWriterPool(getMaster(), getTable(), getMaxActive(), getMaxWait()));
159 }
160
161 protected int getMaxActive() {
162 return maxActive;
163 }
164
165 protected int getMaxWait() {
166 return maxWait;
167 }
168
169 protected void setPool(WriterPool pool) {
170 this.pool = pool;
171 }
172
173 protected WriterPool getPool() {
174 return this.pool;
175 }
176
177 protected long getTotalBytesWritten() {
178 return this.totalBytesWritten;
179 }
180
181 protected void setTotalBytesWritten(final long b) {
182 this.totalBytesWritten = b;
183 }
184
185 protected ProcessResult innerProcessResult(final ProcessorURI puri) {
186 ProcessorURI curi = puri;
187 long recordLength = getRecordedSize(curi);
188 ReplayInputStream ris = null;
189 try {
190 if (shouldWrite(curi)) {
191 ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
192 return write(curi, recordLength, ris, getHostAddress(curi));
193 }
194 logger.info("does not write " + curi.toString());
195 } catch (IOException e) {
196 curi.getNonFatalFailures().add(e);
197 logger.log(Level.SEVERE, "Failed write of Record: " +
198 curi.toString(), e);
199 } finally {
200 IoUtils.close(ris);
201 }
202 return ProcessResult.PROCEED;
203 }
204
205
206
207
208
209
210
211
212 protected String getHostAddress(ProcessorURI curi) {
213
214 if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
215 return (String)curi.getData().get(ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
216 }
217
218 CrawlHost h = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
219 if (h == null) {
220 throw new NullPointerException("Crawlhost is null for " +
221 curi + " " + curi.getVia());
222 }
223 InetAddress a = h.getIP();
224 if (a == null) {
225 throw new NullPointerException("Address is null for " +
226 curi + " " + curi.getVia() + ". Address " +
227 ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP)?
228 "was never looked up.":
229 (System.currentTimeMillis() - h.getIpFetched()) +
230 " ms ago."));
231 }
232 return h.getIP().getHostAddress();
233 }
234
235
236
237
238
239
240
241
242 protected boolean shouldWrite(ProcessorURI curi) {
243 boolean retVal;
244 String scheme = curi.getUURI().getScheme().toLowerCase();
245 if (scheme.equals("dns")) {
246 retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
247 } else if (scheme.equals("http") || scheme.equals("https")) {
248 retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
249 } else if (scheme.equals("ftp")) {
250 retVal = curi.getFetchStatus() == 200;
251 } else {
252 curi.getAnnotations().add("unwritten:scheme");
253 return false;
254 }
255
256 if (retVal == false) {
257
258 curi.getAnnotations().add("unwritten:status");
259 return false;
260 }
261
262 if (curi.getContentSize() > this.maxContentSize) {
263
264 curi.getAnnotations().add("unwritten:size");
265 logger.warning("content size for " + curi.getUURI() + " is too large (" +
266 curi.getContentSize() + ") - maximum content size is: " + this.maxContentSize );
267 return false;
268 }
269
270 return true;
271 }
272
273 protected ProcessResult write(final ProcessorURI curi,
274 @SuppressWarnings("unused") long recordLength,
275 @SuppressWarnings("unused") InputStream in,
276 @SuppressWarnings("unused") String ip)
277 throws IOException {
278 WriterPoolMember writer = getPool().borrowFile();
279 long position = writer.getPosition();
280 HBaseWriter w = (HBaseWriter)writer;
281 try {
282 w.write(curi, getHostAddress(curi),
283 curi.getRecorder().getRecordedOutput(),
284 curi.getRecorder().getRecordedInput());
285 } finally {
286 setTotalBytesWritten(getTotalBytesWritten() +
287 (writer.getPosition() - position));
288 getPool().returnFile(writer);
289 }
290 return checkBytesWritten(curi);
291 }
292
293 protected ProcessResult checkBytesWritten(StateProvider context) {
294 long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
295 if (max <= 0) {
296 return ProcessResult.PROCEED;
297 }
298 if (max <= this.totalBytesWritten) {
299 return ProcessResult.FINISH;
300
301 }
302 return ProcessResult.PROCEED;
303 }
304
305
306 protected void innerProcess(@SuppressWarnings("unused") ProcessorURI puri) {
307 throw new AssertionError();
308 }
309
310
311
312 static {
313 KeyManager.addKeys(HBaseWriterProcessor.class);
314 }
315
316 public void close() {
317 this.pool.close();
318 }
319
320 @Override
321 protected boolean shouldProcess(ProcessorURI uri) {
322 ProcessorURI curi = uri;
323
324 if (curi.getFetchStatus() <= 0) {
325 return false;
326 }
327
328
329 long recordLength = curi.getContentSize();
330 if (recordLength <= 0) {
331
332
333 return false;
334 }
335
336 return true;
337 }
338 }