1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package com.powerset.heritrix.writer;
25
26 import java.io.ByteArrayOutputStream;
27 import java.io.IOException;
28
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.client.HBaseAdmin;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.client.Put;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.Keying;
38 import org.apache.hadoop.io.IOUtils;
39 import org.apache.log4j.Logger;
40 import org.archive.io.RecordingInputStream;
41 import org.archive.io.RecordingOutputStream;
42 import org.archive.io.ReplayInputStream;
43 import org.archive.io.WriterPoolMember;
44 import org.archive.modules.ProcessorURI;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public class HBaseWriter extends WriterPoolMember {
68
69
70 private final Logger LOG = Logger.getLogger(this.getClass().getName());
71
72
73 private final HTable client;
74
75
76
77
78 public static final String CONTENT_COLUMN_FAMILY = "content";
79
80
81 public static final String CONTENT_COLUMN_NAME = "raw_data";
82
83
84
85 public static final String CURI_COLUMN_FAMILY = "curi";
86
87
88 private static final String IP_COLUMN_NAME = "ip";
89
90
91 private static final String PATH_FROM_SEED_COLUMN_NAME = "path-from-seed";
92
93
94 private static final String IS_SEED_COLUMN_NAME = "is-seed";
95
96
97 private static final String VIA_COLUMN_NAME = "via";
98
99
100 private static final String URL_COLUMN_NAME = "url";
101
102
103 private static final String REQUEST_COLUMN_NAME = "request";
104
105
106 private static String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
107
108
109
110
111
112
113 public HTable getClient() {
114 return client;
115 }
116
117
118
119
120
121
122
123
124
125
126
127
128
129 public HBaseWriter(final String zkQuorum, final int zkClientPort, final String tableName) throws IOException {
130 super(null, null, null, false, null);
131 if (tableName == null || tableName.length() <= 0) {
132 throw new IllegalArgumentException("Must specify a table name");
133 }
134 HBaseConfiguration hbaseConfiguration = new HBaseConfiguration();
135
136 if (zkQuorum != null && zkQuorum.length() > 0) {
137 LOG.info("setting zookeeper quorum to : " + zkQuorum);
138 hbaseConfiguration.setStrings(HConstants.ZOOKEEPER_QUORUM, zkQuorum.split(","));
139 }
140
141 if (zkClientPort > 0) {
142 LOG.info("setting zookeeper client Port to : " + zkClientPort);
143 hbaseConfiguration.setInt(ZOOKEEPER_CLIENT_PORT, zkClientPort);
144 }
145
146 createCrawlTable(hbaseConfiguration, tableName);
147 this.client = new HTable(hbaseConfiguration, tableName);
148 }
149
150
151
152
153
154
155
156
157
158 protected void createCrawlTable(final HBaseConfiguration hbaseConfiguration, final String hbaseTableName) throws IOException {
159
160 HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConfiguration);
161 if (hbaseAdmin.tableExists(hbaseTableName)) {
162 boolean foundContentColumnFamily = false;
163 boolean foundCURIColumnFamily = false;
164 LOG.debug("Checking table: " + hbaseTableName + " for structure...");
165
166
167 HTableDescriptor existingHBaseTable = hbaseAdmin.getTableDescriptor(Bytes.toBytes(hbaseTableName));
168 for (HColumnDescriptor hColumnDescriptor: existingHBaseTable.getFamilies()) {
169 if (hColumnDescriptor.getNameAsString().equalsIgnoreCase(CONTENT_COLUMN_FAMILY)) {
170 foundContentColumnFamily = true;
171 } else if (hColumnDescriptor.getNameAsString().equalsIgnoreCase(CURI_COLUMN_FAMILY)) {
172 foundCURIColumnFamily = true;
173 }
174 }
175
176 if (!foundContentColumnFamily || !foundCURIColumnFamily) {
177 LOG.info("Disabling table: " + hbaseTableName);
178 hbaseAdmin.disableTable(hbaseTableName);
179 if (!foundContentColumnFamily) {
180 LOG.info("Adding column to table: " + hbaseTableName + " column: " + CONTENT_COLUMN_FAMILY);
181 existingHBaseTable.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
182 }
183 if (!foundCURIColumnFamily) {
184 LOG.info("Adding column to table: " + hbaseTableName + " column: " + CURI_COLUMN_FAMILY);
185 existingHBaseTable.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
186 }
187 LOG.info("Enabling table: " + hbaseTableName);
188 hbaseAdmin.enableTable(hbaseTableName);
189 }
190 LOG.debug("Done checking table: " + hbaseTableName);
191 } else {
192
193 LOG.info("Creating table " + hbaseTableName);
194 HTableDescriptor newHBaseTable = new HTableDescriptor(hbaseTableName);
195 newHBaseTable.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
196 newHBaseTable.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
197
198 hbaseAdmin.createTable(newHBaseTable);
199 LOG.info("Created table " + newHBaseTable.toString());
200 }
201 }
202
203
204
205
206
207
208
209
210
211
212
213
214 public void write(final ProcessorURI curi, final String ip, final RecordingOutputStream recordingOutputStream,
215 final RecordingInputStream recordingInputStream) throws IOException {
216
217 String url = curi.toString();
218
219 String rowKey = Keying.createKey(url);
220 if (LOG.isTraceEnabled()) {
221 LOG.trace("Writing " + url + " as " + rowKey);
222 }
223
224
225 Put batchPut = new Put(Bytes.toBytes(rowKey));
226
227 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(URL_COLUMN_NAME), curi.getFetchBeginTime(), Bytes.toBytes(url));
228
229 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(IP_COLUMN_NAME), curi.getFetchBeginTime(), Bytes.toBytes(ip));
230
231 if (curi.isSeed()) {
232 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(IS_SEED_COLUMN_NAME), Bytes.toBytes(Boolean.TRUE));
233 if (curi.getPathFromSeed() != null && curi.getPathFromSeed().trim().length() > 0) {
234 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(PATH_FROM_SEED_COLUMN_NAME), Bytes.toBytes(curi.getPathFromSeed().trim()));
235 }
236 }
237
238 String viaStr = (curi.getVia() != null) ? curi.getVia().toString().trim() : null;
239 if (viaStr != null && viaStr.length() > 0) {
240 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(VIA_COLUMN_NAME), Bytes.toBytes(viaStr));
241 }
242
243 if (recordingOutputStream.getSize() > 0) {
244 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(REQUEST_COLUMN_NAME),
245 getByteArrayFromInputStream(recordingOutputStream.getReplayInputStream(), (int) recordingOutputStream.getSize()));
246 }
247
248 ReplayInputStream replayInputStream = recordingInputStream.getReplayInputStream();
249 try {
250
251 batchPut.add(Bytes.toBytes(CONTENT_COLUMN_FAMILY), Bytes.toBytes(CONTENT_COLUMN_NAME),
252 getByteArrayFromInputStream(replayInputStream, (int) recordingInputStream.getSize()));
253
254 replayInputStream = recordingInputStream.getReplayInputStream();
255 replayInputStream.setToResponseBodyStart();
256
257 processContent(batchPut, replayInputStream, (int) recordingInputStream.getSize());
258
259 batchPut.setTimeStamp(curi.getFetchBeginTime());
260
261 this.client.put(batchPut);
262 } finally {
263 IOUtils.closeStream(replayInputStream);
264 }
265 }
266
267
268
269
270
271
272
273
274
275
276
277 protected byte[] getByteArrayFromInputStream(final ReplayInputStream replayInputStream, final int streamSize) throws IOException {
278 ByteArrayOutputStream baos = new ByteArrayOutputStream(streamSize);
279 try {
280
281 replayInputStream.readFullyTo(baos);
282 } finally {
283 replayInputStream.close();
284 }
285 baos.close();
286 return baos.toByteArray();
287 }
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303 protected void processContent(Put put, ReplayInputStream replayInputStream, int streamSize) throws IOException {
304
305
306
307
308
309
310
311 }
312 }