1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package com.powerset.heritrix.writer;
25
26 import java.io.ByteArrayOutputStream;
27 import java.io.IOException;
28
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.client.HBaseAdmin;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.io.BatchUpdate;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.Keying;
38 import org.apache.log4j.Logger;
39 import org.archive.io.ArchiveFileConstants;
40 import org.archive.io.RecordingInputStream;
41 import org.archive.io.RecordingOutputStream;
42 import org.archive.io.ReplayInputStream;
43 import org.archive.io.WriterPoolMember;
44 import org.archive.modules.ProcessorURI;
45
46
47
48
49
50
51
52
53
54
55 public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
56
57
58 private final Logger LOG = Logger.getLogger(this.getClass().getName());
59
60
61 private final HTable client;
62
63
64
65 public static final String CONTENT_COLUMN_FAMILY = "content:";
66
67
68
69 public static final String CONTENT_COLUMN = CONTENT_COLUMN_FAMILY + "raw_data";
70
71
72
73 public static final String CURI_COLUMN_FAMILY = "curi:";
74
75
76 private static final String IP_COLUMN = CURI_COLUMN_FAMILY + "ip";
77
78
79 private static final String PATH_FROM_SEED_COLUMN = CURI_COLUMN_FAMILY + "path-from-seed";
80
81
82 private static final String IS_SEED_COLUMN = CURI_COLUMN_FAMILY + "is-seed";
83
84
85 private static final String VIA_COLUMN = CURI_COLUMN_FAMILY + "via";
86
87
88 private static final String URL_COLUMN = CURI_COLUMN_FAMILY + "url";
89
90
91 private static final String REQUEST_COLUMN = CURI_COLUMN_FAMILY + "request";
92
93
94
95
96
97
98
99
100
101 public HBaseWriter(final String master, final String table) throws IOException {
102 super(null, null, null, false, null);
103 if (table == null || table.length() <= 0) {
104 throw new IllegalArgumentException("Must specify a table name");
105 }
106 HBaseConfiguration c = new HBaseConfiguration();
107 if (master != null && master.length() > 0) {
108 c.set(HConstants.MASTER_ADDRESS, master);
109 }
110 createCrawlTable(c, table);
111 this.client = new HTable(c, table);
112 }
113
114
115
116
117
118
119 public HTable getClient() {
120 return client;
121 }
122
123
124
125
126
127
128
129
130
131 protected void createCrawlTable(final HBaseConfiguration c, final String table) throws IOException {
132 HBaseAdmin admin = new HBaseAdmin(c);
133 if (admin.tableExists(table)) {
134 return;
135 }
136 HTableDescriptor htd = new HTableDescriptor(table);
137 htd.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
138 htd.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
139 admin.createTable(htd);
140 LOG.info("Created table " + htd.toString());
141 }
142
143
144
145
146
147
148
149
150
151
152
153 public void write(final ProcessorURI curi, final String ip, final RecordingOutputStream ros, final RecordingInputStream ris)
154 throws IOException {
155 String url = curi.toString();
156 String row = Keying.createKey(url);
157 if (LOG.isTraceEnabled()) {
158 LOG.trace("Writing " + url + " as " + row.toString());
159 }
160 BatchUpdate bu = new BatchUpdate(row);
161 bu.put(URL_COLUMN, Bytes.toBytes(url));
162 bu.put(IP_COLUMN, Bytes.toBytes(ip));
163 if (curi.isSeed()) {
164
165 bu.put(IS_SEED_COLUMN, Bytes.toBytes(Boolean.TRUE.toString()));
166 if (curi.getPathFromSeed() != null && curi.getPathFromSeed().trim().length() > 0) {
167 bu.put(PATH_FROM_SEED_COLUMN, Bytes.toBytes(curi.getPathFromSeed().trim()));
168 }
169 }
170 String viaStr = (curi.getVia() != null) ? curi.getVia().toString().trim() : null;
171 if (viaStr != null && viaStr.length() > 0) {
172 bu.put(VIA_COLUMN, Bytes.toBytes(viaStr));
173 }
174
175 if (ros.getSize() > 0) {
176 add(bu, REQUEST_COLUMN, ros.getReplayInputStream(), (int) ros.getSize());
177 }
178
179 add(bu, CONTENT_COLUMN, ris.getReplayInputStream(), (int) ris.getSize());
180
181 processContent(bu);
182
183 bu.setTimestamp(curi.getFetchBeginTime());
184 this.client.commit(bu);
185 }
186
187
188
189
190
191
192
193
194
195
196
197 protected void processContent(BatchUpdate bu) {
198
199
200
201 }
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226 private void add(final BatchUpdate bu, final String key, final ReplayInputStream ris, final int size) throws IOException {
227 ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
228 try {
229 ris.readFullyTo(baos);
230 } finally {
231 ris.close();
232 }
233 baos.close();
234 bu.put(key, baos.toByteArray());
235 }
236 }