1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package com.powerset.heritrix.writer;
27
28 import java.io.ByteArrayOutputStream;
29 import java.io.IOException;
30 import java.util.logging.Level;
31 import java.util.logging.Logger;
32
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.client.HBaseAdmin;
38 import org.apache.hadoop.hbase.client.HTable;
39 import org.apache.hadoop.hbase.io.BatchUpdate;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.util.Keying;
42 import org.archive.io.ArchiveFileConstants;
43 import org.archive.io.RecordingInputStream;
44 import org.archive.io.RecordingOutputStream;
45 import org.archive.io.ReplayInputStream;
46 import org.archive.io.WriterPoolMember;
47 import org.archive.modules.ProcessorURI;
48
49
50
51
52
53
54
55
56
57
58 public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
59 private final Logger LOG = Logger.getLogger(this.getClass().getName());
60 private final HTable client;
61
62 public static final String CONTENT_COLUMN_FAMILY = "content:";
63
64 public static final String CONTENT_COLUMN = CONTENT_COLUMN_FAMILY + "raw_data";
65
66 public static final String CURI_COLUMN_FAMILY = "curi:";
67
68 private static final String IP_COLUMN = CURI_COLUMN_FAMILY + "ip";
69 private static final String PATH_FROM_SEED_COLUMN =
70 CURI_COLUMN_FAMILY + "path-from-seed";
71 private static final String IS_SEED_COLUMN = CURI_COLUMN_FAMILY + "is-seed";
72 private static final String VIA_COLUMN = CURI_COLUMN_FAMILY + "via";
73 private static final String URL_COLUMN = CURI_COLUMN_FAMILY + "url";
74 private static final String REQUEST_COLUMN = CURI_COLUMN_FAMILY + "request";
75
76 public HBaseWriter(final String master, final String table)
77 throws IOException {
78 super(null, null, null, false, null);
79 if (table == null || table.length() <= 0) {
80 throw new IllegalArgumentException("Must specify a table name");
81 }
82 HBaseConfiguration c = new HBaseConfiguration();
83 if (master != null && master.length() > 0) {
84 c.set(HConstants.MASTER_ADDRESS, master);
85 }
86 createCrawlTable(c, table);
87 this.client = new HTable(c, table);
88 }
89
90 protected void createCrawlTable(final HBaseConfiguration c,
91 final String table)
92 throws IOException {
93 HBaseAdmin admin = new HBaseAdmin(c);
94 if (admin.tableExists(table)) {
95 return;
96 }
97 HTableDescriptor htd = new HTableDescriptor(table);
98 htd.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
99 htd.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
100 admin.createTable(htd);
101 LOG.info("Created table " + htd.toString());
102 }
103
104
105
106
107
108
109
110 public void write(final ProcessorURI curi, final String ip,
111 final RecordingOutputStream ros, final RecordingInputStream ris)
112 throws IOException {
113 String url = curi.toString();
114 String row = Keying.createKey(url);
115 if (LOG.isLoggable(Level.FINE)) {
116 LOG.fine("Writing " + url + " as " + row.toString());
117 }
118 BatchUpdate bu = new BatchUpdate(row);
119 bu.put(URL_COLUMN, Bytes.toBytes(url));
120 bu.put(IP_COLUMN, Bytes.toBytes(ip));
121 if (curi.isSeed()) {
122
123 bu.put(IS_SEED_COLUMN, Bytes.toBytes(Boolean.TRUE.toString()));
124 if (curi.getPathFromSeed() != null
125 && curi.getPathFromSeed().trim().length() > 0) {
126 bu.put(PATH_FROM_SEED_COLUMN, Bytes.toBytes(curi.getPathFromSeed().trim()));
127 }
128 }
129 String viaStr = (curi.getVia() != null)?
130 curi.getVia().toString().trim(): null;
131 if (viaStr != null && viaStr.length() > 0) {
132 bu.put(VIA_COLUMN, Bytes.toBytes(viaStr));
133 }
134
135 if (ros.getSize() > 0) {
136 add(bu, REQUEST_COLUMN, ros.getReplayInputStream(), (int)ros.getSize());
137 }
138
139 add(bu, CONTENT_COLUMN, ris.getReplayInputStream(),
140 (int)ris.getSize());
141
142 processContent(bu);
143
144 bu.setTimestamp(curi.getFetchBeginTime());
145 this.client.commit(bu);
146 }
147
148
149
150
151
152
153
154
155
156
157 protected void processContent(BatchUpdate bu) {
158
159
160
161 }
162
163
164
165
166
167
168
169
170
171 private void add(final BatchUpdate bu, final String key,
172 final ReplayInputStream ris, final int size)
173 throws IOException {
174 ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
175 try {
176 ris.readFullyTo(baos);
177 } finally {
178 ris.close();
179 }
180 baos.close();
181 bu.put(key, baos.toByteArray());
182 }
183 }