1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package com.powerset.heritrix.writer;
25
26 import java.io.ByteArrayOutputStream;
27 import java.io.IOException;
28
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.client.HBaseAdmin;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.client.Put;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.Keying;
38 import org.apache.log4j.Logger;
39 import org.archive.io.ArchiveFileConstants;
40 import org.archive.io.RecordingInputStream;
41 import org.archive.io.RecordingOutputStream;
42 import org.archive.io.ReplayInputStream;
43 import org.archive.io.WriterPoolMember;
44 import org.archive.modules.ProcessorURI;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
68
69
70 private final Logger LOG = Logger.getLogger(this.getClass().getName());
71
72
73 private final HTable client;
74
75
76
77 public static final String CONTENT_COLUMN_FAMILY = "content";
78
79
80 public static final String CONTENT_COLUMN_NAME = "raw_data";
81
82
83
84 public static final String CURI_COLUMN_FAMILY = "curi";
85
86
87 private static final String IP_COLUMN_NAME = "ip";
88
89
90 private static final String PATH_FROM_SEED_COLUMN_NAME = "path-from-seed";
91
92
93 private static final String IS_SEED_COLUMN_NAME = "is-seed";
94
95
96 private static final String VIA_COLUMN_NAME = "via";
97
98
99 private static final String URL_COLUMN_NAME = "url";
100
101
102 private static final String REQUEST_COLUMN_NAME = "request";
103
104
105
106
107
108
109 public HTable getClient() {
110 return client;
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124 public HBaseWriter(final String zkQuorum, final int zkClientPort, final String tableName) throws IOException {
125 super(null, null, null, false, null);
126 if (tableName == null || tableName.length() <= 0) {
127 throw new IllegalArgumentException("Must specify a table name");
128 }
129 HBaseConfiguration hbaseConfiguration = new HBaseConfiguration();
130 if (zkQuorum != null && zkQuorum.length() > 0) {
131 LOG.info("setting zookeeper quorum to : " + zkQuorum);
132 hbaseConfiguration.setStrings(HConstants.ZOOKEEPER_QUORUM, zkQuorum.split(","));
133 }
134 LOG.debug("zookeeper quorum value: " + hbaseConfiguration.get(HConstants.ZOOKEEPER_QUORUM));
135 if (zkClientPort > 0) {
136 LOG.info("setting zookeeper client Port to : " + zkClientPort);
137
138 hbaseConfiguration.setInt("hbase.zookeeper.property.clientPort", zkClientPort);
139 }
140 LOG.debug("zookeeper quorum value: " + hbaseConfiguration.get(HConstants.ZOOKEEPER_QUORUM));
141 createCrawlTable(hbaseConfiguration, tableName);
142 this.client = new HTable(hbaseConfiguration, tableName);
143 }
144
145
146
147
148
149
150
151
152
153 protected void createCrawlTable(final HBaseConfiguration hbaseConfiguration, final String hbaseTableName) throws IOException {
154
155 HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseConfiguration);
156 if (hbaseAdmin.tableExists(hbaseTableName)) {
157 boolean foundContentColumnFamily = false;
158 boolean foundCURIColumnFamily = false;
159 LOG.debug("Checking table: " + hbaseTableName + " for structure...");
160
161
162 HTableDescriptor existingHBaseTable = hbaseAdmin.getTableDescriptor(Bytes.toBytes(hbaseTableName));
163 for (HColumnDescriptor hColumnDescriptor: existingHBaseTable.getFamilies()) {
164 if (hColumnDescriptor.getNameAsString().equalsIgnoreCase(CONTENT_COLUMN_FAMILY)) {
165 foundContentColumnFamily = true;
166 } else if (hColumnDescriptor.getNameAsString().equalsIgnoreCase(CURI_COLUMN_FAMILY)) {
167 foundCURIColumnFamily = true;
168 }
169 }
170
171 if (!foundContentColumnFamily || !foundCURIColumnFamily) {
172 LOG.info("Disabling table: " + hbaseTableName);
173 hbaseAdmin.disableTable(hbaseTableName);
174 if (!foundContentColumnFamily) {
175 LOG.info("Adding column to table: " + hbaseTableName + " column: " + CONTENT_COLUMN_FAMILY);
176 existingHBaseTable.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
177 }
178 if (!foundCURIColumnFamily) {
179 LOG.info("Adding column to table: " + hbaseTableName + " column: " + CURI_COLUMN_FAMILY);
180 existingHBaseTable.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
181 }
182 LOG.info("Enabling table: " + hbaseTableName);
183 hbaseAdmin.enableTable(hbaseTableName);
184 }
185 LOG.debug("Done checking table: " + hbaseTableName);
186 } else {
187
188 LOG.info("Creating table " + hbaseTableName);
189 HTableDescriptor newHBaseTable = new HTableDescriptor(hbaseTableName);
190 newHBaseTable.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
191 newHBaseTable.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
192
193 hbaseAdmin.createTable(newHBaseTable);
194 LOG.info("Created table " + newHBaseTable.toString());
195 }
196 }
197
198
199
200
201
202
203
204
205
206
207
208
209 public void write(final ProcessorURI curi, final String ip, final RecordingOutputStream ros, final RecordingInputStream ris)
210 throws IOException {
211
212 String url = curi.toString();
213
214 String rowKey = Keying.createKey(url);
215 if (LOG.isTraceEnabled()) {
216 LOG.trace("Writing " + url + " as " + rowKey.toString());
217 }
218
219
220 Put batchPut = new Put(Bytes.toBytes(rowKey));
221
222 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(URL_COLUMN_NAME), curi.getFetchBeginTime(), Bytes.toBytes(url));
223
224 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(IP_COLUMN_NAME), curi.getFetchBeginTime(), Bytes.toBytes(ip));
225
226 if (curi.isSeed()) {
227 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(IS_SEED_COLUMN_NAME), Bytes.toBytes(Boolean.TRUE));
228 if (curi.getPathFromSeed() != null && curi.getPathFromSeed().trim().length() > 0) {
229 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(PATH_FROM_SEED_COLUMN_NAME), Bytes.toBytes(curi.getPathFromSeed().trim()));
230 }
231 }
232 String viaStr = (curi.getVia() != null) ? curi.getVia().toString().trim() : null;
233 if (viaStr != null && viaStr.length() > 0) {
234 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(VIA_COLUMN_NAME), Bytes.toBytes(viaStr));
235 }
236
237 if (ros.getSize() > 0) {
238 batchPut.add(Bytes.toBytes(CURI_COLUMN_FAMILY), Bytes.toBytes(REQUEST_COLUMN_NAME),
239 getByteArrayFromInputStream(ros.getReplayInputStream(), (int) ros.getSize()));
240 }
241
242 batchPut.add(Bytes.toBytes(CONTENT_COLUMN_FAMILY), Bytes.toBytes(CONTENT_COLUMN_NAME),
243 getByteArrayFromInputStream(ris.getReplayInputStream(), (int) ris.getSize()));
244
245
246 ris.getReplayInputStream().setToResponseBodyStart();
247
248 processContent(batchPut, ris.getReplayInputStream(), (int) ris.getSize());
249
250 batchPut.setTimeStamp(curi.getFetchBeginTime());
251
252 this.client.put(batchPut);
253 }
254
255
256
257
258
259
260
261
262
263
264 protected byte[] getByteArrayFromInputStream(final ReplayInputStream replayInputStream, final int streamSize) throws IOException {
265 ByteArrayOutputStream baos = new ByteArrayOutputStream(streamSize);
266 try {
267
268 replayInputStream.readFullyTo(baos);
269 } finally {
270 replayInputStream.close();
271 }
272 baos.close();
273 return baos.toByteArray();
274 }
275
276
277
278
279
280
281
282
283
284
285
286
287 protected void processContent(Put put, ReplayInputStream replayInputStream, int streamSize) throws IOException {
288
289
290
291
292
293 }
294 }