1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package com.powerset.heritrix.writer;
25
26 import java.io.ByteArrayOutputStream;
27 import java.io.IOException;
28
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.client.HBaseAdmin;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.io.BatchUpdate;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.Keying;
38 import org.apache.log4j.Logger;
39 import org.archive.io.ArchiveFileConstants;
40 import org.archive.io.RecordingInputStream;
41 import org.archive.io.RecordingOutputStream;
42 import org.archive.io.ReplayInputStream;
43 import org.archive.io.WriterPoolMember;
44 import org.archive.modules.ProcessorURI;
45
46
47
48
49
50
51
52
53
54 public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
55 private final Logger LOG = Logger.getLogger(this.getClass().getName());
56 private final HTable client;
57
58
59 public static final String CONTENT_COLUMN_FAMILY = "content:";
60
61
62 public static final String CONTENT_COLUMN = CONTENT_COLUMN_FAMILY + "raw_data";
63
64
65 public static final String CURI_COLUMN_FAMILY = "curi:";
66
67 private static final String IP_COLUMN = CURI_COLUMN_FAMILY + "ip";
68 private static final String PATH_FROM_SEED_COLUMN = CURI_COLUMN_FAMILY + "path-from-seed";
69 private static final String IS_SEED_COLUMN = CURI_COLUMN_FAMILY + "is-seed";
70 private static final String VIA_COLUMN = CURI_COLUMN_FAMILY + "via";
71 private static final String URL_COLUMN = CURI_COLUMN_FAMILY + "url";
72 private static final String REQUEST_COLUMN = CURI_COLUMN_FAMILY + "request";
73
74 public HBaseWriter(final String master, final String table) throws IOException {
75 super(null, null, null, false, null);
76 if (table == null || table.length() <= 0) {
77 throw new IllegalArgumentException("Must specify a table name");
78 }
79 HBaseConfiguration c = new HBaseConfiguration();
80 if (master != null && master.length() > 0) {
81 c.set(HConstants.MASTER_ADDRESS, master);
82 }
83 createCrawlTable(c, table);
84 this.client = new HTable(c, table);
85 }
86
87 public HTable getClient() {
88 return client;
89 }
90
91 protected void createCrawlTable(final HBaseConfiguration c, final String table) throws IOException {
92 HBaseAdmin admin = new HBaseAdmin(c);
93 if (admin.tableExists(table)) {
94 return;
95 }
96 HTableDescriptor htd = new HTableDescriptor(table);
97 htd.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
98 htd.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
99 admin.createTable(htd);
100 LOG.info("Created table " + htd.toString());
101 }
102
103
104
105
106
107
108
109
110
111
112
113 public void write(final ProcessorURI curi, final String ip, final RecordingOutputStream ros, final RecordingInputStream ris)
114 throws IOException {
115 String url = curi.toString();
116 String row = Keying.createKey(url);
117 if (LOG.isTraceEnabled()) {
118 LOG.trace("Writing " + url + " as " + row.toString());
119 }
120 BatchUpdate bu = new BatchUpdate(row);
121 bu.put(URL_COLUMN, Bytes.toBytes(url));
122 bu.put(IP_COLUMN, Bytes.toBytes(ip));
123 if (curi.isSeed()) {
124
125 bu.put(IS_SEED_COLUMN, Bytes.toBytes(Boolean.TRUE.toString()));
126 if (curi.getPathFromSeed() != null && curi.getPathFromSeed().trim().length() > 0) {
127 bu.put(PATH_FROM_SEED_COLUMN, Bytes.toBytes(curi.getPathFromSeed().trim()));
128 }
129 }
130 String viaStr = (curi.getVia() != null) ? curi.getVia().toString().trim() : null;
131 if (viaStr != null && viaStr.length() > 0) {
132 bu.put(VIA_COLUMN, Bytes.toBytes(viaStr));
133 }
134
135 if (ros.getSize() > 0) {
136 add(bu, REQUEST_COLUMN, ros.getReplayInputStream(), (int) ros.getSize());
137 }
138
139 add(bu, CONTENT_COLUMN, ris.getReplayInputStream(), (int) ris.getSize());
140
141 processContent(bu);
142
143 bu.setTimestamp(curi.getFetchBeginTime());
144 this.client.commit(bu);
145 }
146
147
148
149
150
151
152
153
154
155
156
157 protected void processContent(BatchUpdate bu) {
158
159
160
161 }
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 private void add(final BatchUpdate bu, final String key, final ReplayInputStream ris, final int size) throws IOException {
177 ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
178 try {
179 ris.readFullyTo(baos);
180 } finally {
181 ris.close();
182 }
183 baos.close();
184 bu.put(key, baos.toByteArray());
185 }
186 }