1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package com.powerset.heritrix.writer;
27
28 import java.io.ByteArrayOutputStream;
29 import java.io.IOException;
30
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.HColumnDescriptor;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.client.HBaseAdmin;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.io.BatchUpdate;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.hbase.util.Keying;
40 import org.apache.log4j.Logger;
41 import org.archive.io.ArchiveFileConstants;
42 import org.archive.io.RecordingInputStream;
43 import org.archive.io.RecordingOutputStream;
44 import org.archive.io.ReplayInputStream;
45 import org.archive.io.WriterPoolMember;
46 import org.archive.modules.ProcessorURI;
47
48
49
50
51
52
53
54
55
56
57 public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
58 private final Logger LOG = Logger.getLogger(this.getClass().getName());
59 private final HTable client;
60
61 public static final String CONTENT_COLUMN_FAMILY = "content:";
62
63 public static final String CONTENT_COLUMN = CONTENT_COLUMN_FAMILY + "raw_data";
64
65 public static final String CURI_COLUMN_FAMILY = "curi:";
66
67 private static final String IP_COLUMN = CURI_COLUMN_FAMILY + "ip";
68 private static final String PATH_FROM_SEED_COLUMN =
69 CURI_COLUMN_FAMILY + "path-from-seed";
70 private static final String IS_SEED_COLUMN = CURI_COLUMN_FAMILY + "is-seed";
71 private static final String VIA_COLUMN = CURI_COLUMN_FAMILY + "via";
72 private static final String URL_COLUMN = CURI_COLUMN_FAMILY + "url";
73 private static final String REQUEST_COLUMN = CURI_COLUMN_FAMILY + "request";
74
75 public HBaseWriter(final String master, final String table)
76 throws IOException {
77 super(null, null, null, false, null);
78 if (table == null || table.length() <= 0) {
79 throw new IllegalArgumentException("Must specify a table name");
80 }
81 HBaseConfiguration c = new HBaseConfiguration();
82 if (master != null && master.length() > 0) {
83 c.set(HConstants.MASTER_ADDRESS, master);
84 }
85 createCrawlTable(c, table);
86 this.client = new HTable(c, table);
87 }
88
89 protected void createCrawlTable(final HBaseConfiguration c,
90 final String table)
91 throws IOException {
92 HBaseAdmin admin = new HBaseAdmin(c);
93 if (admin.tableExists(table)) {
94 return;
95 }
96 HTableDescriptor htd = new HTableDescriptor(table);
97 htd.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
98 htd.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
99 admin.createTable(htd);
100 LOG.info("Created table " + htd.toString());
101 }
102
103
104
105
106
107
108
109 public void write(final ProcessorURI curi, final String ip,
110 final RecordingOutputStream ros, final RecordingInputStream ris)
111 throws IOException {
112 String url = curi.toString();
113 String row = Keying.createKey(url);
114 if (LOG.isTraceEnabled()) {
115 LOG.trace("Writing " + url + " as " + row.toString());
116 }
117 BatchUpdate bu = new BatchUpdate(row);
118 bu.put(URL_COLUMN, Bytes.toBytes(url));
119 bu.put(IP_COLUMN, Bytes.toBytes(ip));
120 if (curi.isSeed()) {
121
122 bu.put(IS_SEED_COLUMN, Bytes.toBytes(Boolean.TRUE.toString()));
123 if (curi.getPathFromSeed() != null
124 && curi.getPathFromSeed().trim().length() > 0) {
125 bu.put(PATH_FROM_SEED_COLUMN, Bytes.toBytes(curi.getPathFromSeed().trim()));
126 }
127 }
128 String viaStr = (curi.getVia() != null)?
129 curi.getVia().toString().trim(): null;
130 if (viaStr != null && viaStr.length() > 0) {
131 bu.put(VIA_COLUMN, Bytes.toBytes(viaStr));
132 }
133
134 if (ros.getSize() > 0) {
135 add(bu, REQUEST_COLUMN, ros.getReplayInputStream(), (int)ros.getSize());
136 }
137
138 add(bu, CONTENT_COLUMN, ris.getReplayInputStream(),
139 (int)ris.getSize());
140
141 processContent(bu);
142
143 bu.setTimestamp(curi.getFetchBeginTime());
144 this.client.commit(bu);
145 }
146
147
148
149
150
151
152
153
154
155
156 protected void processContent(BatchUpdate bu) {
157
158
159
160 }
161
162
163
164
165
166
167
168
169
170 private void add(final BatchUpdate bu, final String key,
171 final ReplayInputStream ris, final int size)
172 throws IOException {
173 ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
174 try {
175 ris.readFullyTo(baos);
176 } finally {
177 ris.close();
178 }
179 baos.close();
180 bu.put(key, baos.toByteArray());
181 }
182 }