1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package com.powerset.heritrix.writer;
27
28 import java.io.Closeable;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.net.InetAddress;
32 import java.util.logging.Level;
33 import java.util.logging.Logger;
34
35 import org.apache.hadoop.hbase.HConstants;
36 import org.archive.io.ReplayInputStream;
37 import org.archive.io.WriterPool;
38 import org.archive.io.WriterPoolMember;
39 import org.archive.modules.ModuleAttributeConstants;
40 import org.archive.modules.ProcessResult;
41 import org.archive.modules.Processor;
42 import org.archive.modules.ProcessorURI;
43 import org.archive.modules.fetcher.FetchStatusCodes;
44 import org.archive.modules.net.CrawlHost;
45 import org.archive.modules.net.ServerCache;
46 import org.archive.modules.net.ServerCacheUtil;
47 import org.archive.state.Expert;
48 import org.archive.state.Immutable;
49 import org.archive.state.Initializable;
50 import org.archive.state.Key;
51 import org.archive.state.KeyManager;
52 import org.archive.state.StateProvider;
53 import org.archive.util.IoUtils;
54
55
56
57
58
59 public class HBaseWriterProcessor extends Processor implements
60 Initializable, Closeable {
61 private static final long serialVersionUID = 7166781798179114353L;
62
63 private final Logger logger = Logger.getLogger(this.getClass().getName());
64
65
66
67
68 @Immutable
69 public static final Key<String> MASTER =
70 Key.make(HConstants.DEFAULT_MASTER_ADDRESS);
71
72
73
74
75 @Immutable
76 public static final Key<String> TABLE = Key.make("crawl");
77
78
79
80
81
82 @Immutable
83 final public static Key<Integer> POOL_MAX_ACTIVE =
84 Key.make(WriterPool.DEFAULT_MAX_ACTIVE);
85
86
87
88
89
90
91 @Immutable
92 final public static Key<Integer> POOL_MAX_WAIT =
93 Key.make(WriterPool.DEFAULT_MAXIMUM_WAIT);
94
95 @Immutable
96 final public static Key<ServerCache> SERVER_CACHE =
97 Key.makeAuto(ServerCache.class);
98
99
100
101
102
103
104 @Immutable @Expert
105 final public static Key<Long> TOTAL_BYTES_TO_WRITE = Key.make(0L);
106
107
108
109
110
111 private transient WriterPool pool = null;
112 private ServerCache serverCache;
113 private int maxActive;
114 private int maxWait;
115 private String master;
116 private String table;
117
118
119
120
121 private long totalBytesWritten = 0;
122
123
124 static {
125 KeyManager.addKeys(HBaseWriterProcessor.class);
126 }
127
128 public HBaseWriterProcessor() {
129 super();
130 }
131
132 public synchronized void initialTasks(StateProvider context) {
133 this.serverCache = context.get(this, SERVER_CACHE);
134 this.maxActive = context.get(this, POOL_MAX_ACTIVE).intValue();
135 this.maxWait = context.get(this, POOL_MAX_WAIT).intValue();
136 this.master = context.get(this, MASTER);
137 this.table = context.get(this, TABLE);
138 setupPool();
139 }
140
141 protected String getMaster() {
142 return this.master;
143 }
144
145 protected String getTable() {
146 return this.table;
147 }
148
149 protected void setupPool() {
150 setPool(new HBaseWriterPool(getMaster(), getTable(), getMaxActive(), getMaxWait()));
151 }
152
153 protected int getMaxActive() {
154 return maxActive;
155 }
156
157 protected int getMaxWait() {
158 return maxWait;
159 }
160
161 protected void setPool(WriterPool pool) {
162 this.pool = pool;
163 }
164
165 protected WriterPool getPool() {
166 return this.pool;
167 }
168
169 protected long getTotalBytesWritten() {
170 return this.totalBytesWritten;
171 }
172
173 protected void setTotalBytesWritten(final long b) {
174 this.totalBytesWritten = b;
175 }
176
177 protected ProcessResult innerProcessResult(final ProcessorURI puri) {
178 ProcessorURI curi = puri;
179 long recordLength = getRecordedSize(curi);
180 ReplayInputStream ris = null;
181 try {
182 if (shouldWrite(curi)) {
183 ris = curi.getRecorder().getRecordedInput().getReplayInputStream();
184 return write(curi, recordLength, ris, getHostAddress(curi));
185 }
186 logger.info("does not write " + curi.toString());
187 } catch (IOException e) {
188 curi.getNonFatalFailures().add(e);
189 logger.log(Level.SEVERE, "Failed write of Record: " +
190 curi.toString(), e);
191 } finally {
192 IoUtils.close(ris);
193 }
194 return ProcessResult.PROCEED;
195 }
196
197
198
199
200
201
202
203
204 protected String getHostAddress(ProcessorURI curi) {
205
206 if (curi.getUURI().getScheme().toLowerCase().equals("dns")) {
207 return (String)curi.getData().get(ModuleAttributeConstants.A_DNS_SERVER_IP_LABEL);
208 }
209
210 CrawlHost h = ServerCacheUtil.getHostFor(serverCache, curi.getUURI());
211 if (h == null) {
212 throw new NullPointerException("Crawlhost is null for " +
213 curi + " " + curi.getVia());
214 }
215 InetAddress a = h.getIP();
216 if (a == null) {
217 throw new NullPointerException("Address is null for " +
218 curi + " " + curi.getVia() + ". Address " +
219 ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP)?
220 "was never looked up.":
221 (System.currentTimeMillis() - h.getIpFetched()) +
222 " ms ago."));
223 }
224 return h.getIP().getHostAddress();
225 }
226
227
228
229
230
231
232
233
234 protected boolean shouldWrite(ProcessorURI curi) {
235 boolean retVal;
236 String scheme = curi.getUURI().getScheme().toLowerCase();
237 if (scheme.equals("dns")) {
238 retVal = curi.getFetchStatus() == FetchStatusCodes.S_DNS_SUCCESS;
239 } else if (scheme.equals("http") || scheme.equals("https")) {
240 retVal = curi.getFetchStatus() > 0 && curi.getHttpMethod() != null;
241 } else if (scheme.equals("ftp")) {
242 retVal = curi.getFetchStatus() == 200;
243 } else {
244 curi.getAnnotations().add("unwritten:scheme");
245 return false;
246 }
247
248 if (retVal == false) {
249
250 curi.getAnnotations().add("unwritten:status");
251 return false;
252 }
253 return true;
254 }
255
256 protected ProcessResult write(final ProcessorURI curi,
257 @SuppressWarnings("unused") long recordLength,
258 @SuppressWarnings("unused") InputStream in,
259 @SuppressWarnings("unused") String ip)
260 throws IOException {
261 WriterPoolMember writer = getPool().borrowFile();
262 long position = writer.getPosition();
263 HBaseWriter w = (HBaseWriter)writer;
264 try {
265 w.write(curi, getHostAddress(curi),
266 curi.getRecorder().getRecordedOutput(),
267 curi.getRecorder().getRecordedInput());
268 } catch (IOException e) {
269 writer = null;
270 throw e;
271 } finally {
272 if (writer != null) {
273 setTotalBytesWritten(getTotalBytesWritten() +
274 (writer.getPosition() - position));
275 getPool().returnFile(writer);
276 }
277 }
278 return checkBytesWritten(curi);
279 }
280
281 protected ProcessResult checkBytesWritten(StateProvider context) {
282 long max = context.get(this, TOTAL_BYTES_TO_WRITE).longValue();
283 if (max <= 0) {
284 return ProcessResult.PROCEED;
285 }
286 if (max <= this.totalBytesWritten) {
287 return ProcessResult.FINISH;
288
289 }
290 return ProcessResult.PROCEED;
291 }
292
293
294 protected void innerProcess(@SuppressWarnings("unused") ProcessorURI puri) {
295 throw new AssertionError();
296 }
297
298
299
300 static {
301 KeyManager.addKeys(HBaseWriterProcessor.class);
302 }
303
304 public void close() {
305 this.pool.close();
306 }
307
308 @Override
309 protected boolean shouldProcess(ProcessorURI uri) {
310 ProcessorURI curi = uri;
311
312 if (curi.getFetchStatus() <= 0) {
313 return false;
314 }
315
316
317 long recordLength = curi.getContentSize();
318 if (recordLength <= 0) {
319
320
321 return false;
322 }
323
324 return true;
325 }
326 }