View Javadoc

1   /**
2    * HBaseWriter
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * This file is part of the Heritrix web crawler (crawler.archive.org).
9    *
10   * Heritrix is free software; you can redistribute it and/or modify
11   * it under the terms of the GNU Lesser Public License as published by
12   * the Free Software Foundation; either version 2.1 of the License, or
13   * any later version.
14   *
15   * Heritrix is distributed in the hope that it will be useful,
16   * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   * GNU Lesser Public License for more details.
19   *
20   * You should have received a copy of the GNU Lesser Public License
21   * along with Heritrix; if not, write to the Free Software
22   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23   */
24  package com.powerset.heritrix.writer;
25  
26  import java.io.ByteArrayOutputStream;
27  import java.io.IOException;
28  
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.client.HBaseAdmin;
34  import org.apache.hadoop.hbase.client.HTable;
35  import org.apache.hadoop.hbase.io.BatchUpdate;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.Keying;
38  import org.apache.log4j.Logger;
39  import org.archive.io.ArchiveFileConstants;
40  import org.archive.io.RecordingInputStream;
41  import org.archive.io.RecordingOutputStream;
42  import org.archive.io.ReplayInputStream;
43  import org.archive.io.WriterPoolMember;
44  import org.archive.modules.ProcessorURI;
45  
46  // TODO: Auto-generated Javadoc
47  /**
48   * Write to HBase. Puts content into the 'content:' column and all else into the
49   * 'curi:' column family. Makes a row key of an url transformation. Creates
50   * table if it does not exist.
51   * 
52   * <p>
53   * Limitations: Hard-coded table schema.
54   */
55  public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
56  	
57  	/** The LOG. */
58  	private final Logger LOG = Logger.getLogger(this.getClass().getName());
59  	
60  	/** The client. */
61  	private final HTable client;
62  	// TODO: make this variable configurable in the heritrix sheet:
63  	// CONTENT_COLUMN_FAMILY
64  	/** The Constant CONTENT_COLUMN_FAMILY. */
65  	public static final String CONTENT_COLUMN_FAMILY = "content:";
66  	// TODO: make this variable configurable in the heritrix sheet:
67  	// CONTENT_COLUMN
68  	/** The Constant CONTENT_COLUMN. */
69  	public static final String CONTENT_COLUMN = CONTENT_COLUMN_FAMILY + "raw_data";
70  	// TODO: make this variable configurable in the heritrix sheet:
71  	// CURI_COLUMN_FAMILY
72  	/** The Constant CURI_COLUMN_FAMILY. */
73  	public static final String CURI_COLUMN_FAMILY = "curi:";
74  
75  	/** The Constant IP_COLUMN. */
76  	private static final String IP_COLUMN = CURI_COLUMN_FAMILY + "ip";
77  	
78  	/** The Constant PATH_FROM_SEED_COLUMN. */
79  	private static final String PATH_FROM_SEED_COLUMN = CURI_COLUMN_FAMILY + "path-from-seed";
80  	
81  	/** The Constant IS_SEED_COLUMN. */
82  	private static final String IS_SEED_COLUMN = CURI_COLUMN_FAMILY + "is-seed";
83  	
84  	/** The Constant VIA_COLUMN. */
85  	private static final String VIA_COLUMN = CURI_COLUMN_FAMILY + "via";
86  	
87  	/** The Constant URL_COLUMN. */
88  	private static final String URL_COLUMN = CURI_COLUMN_FAMILY + "url";
89  	
90  	/** The Constant REQUEST_COLUMN. */
91  	private static final String REQUEST_COLUMN = CURI_COLUMN_FAMILY + "request";
92  
93  	/**
94  	 * Instantiates a new h base writer.
95  	 * 
96  	 * @param master the master
97  	 * @param table the table
98  	 * 
99  	 * @throws IOException Signals that an I/O exception has occurred.
100 	 */
101 	public HBaseWriter(final String master, final String table) throws IOException {
102 		super(null, null, null, false, null);
103 		if (table == null || table.length() <= 0) {
104 			throw new IllegalArgumentException("Must specify a table name");
105 		}
106 		HBaseConfiguration c = new HBaseConfiguration();
107 		if (master != null && master.length() > 0) {
108 			c.set(HConstants.MASTER_ADDRESS, master);
109 		}
110 		createCrawlTable(c, table);
111 		this.client = new HTable(c, table);
112 	}
113 
114 	/**
115 	 * Gets the client.
116 	 * 
117 	 * @return the client
118 	 */
119 	public HTable getClient() {
120 		return client;
121 	}
122 
123 	/**
124 	 * Creates the crawl table.
125 	 * 
126 	 * @param c the c
127 	 * @param table the table
128 	 * 
129 	 * @throws IOException Signals that an I/O exception has occurred.
130 	 */
131 	protected void createCrawlTable(final HBaseConfiguration c, final String table) throws IOException {
132 		HBaseAdmin admin = new HBaseAdmin(c);
133 		if (admin.tableExists(table)) {
134 			return;
135 		}
136 		HTableDescriptor htd = new HTableDescriptor(table);
137 		htd.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
138 		htd.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
139 		admin.createTable(htd);
140 		LOG.info("Created table " + htd.toString());
141 	}
142 
143 	/**
144 	 * Write.
145 	 * 
146 	 * @param curi URI of crawled document
147 	 * @param ip IP of remote machine.
148 	 * @param ros recording input stream that captured the response
149 	 * @param ris recording output stream that captured the GET request
150 	 * 
151 	 * @throws IOException Signals that an I/O exception has occurred.
152 	 */
153 	public void write(final ProcessorURI curi, final String ip, final RecordingOutputStream ros, final RecordingInputStream ris)
154 			throws IOException {
155 		String url = curi.toString();
156 		String row = Keying.createKey(url);
157 		if (LOG.isTraceEnabled()) {
158 			LOG.trace("Writing " + url + " as " + row.toString());
159 		}
160 		BatchUpdate bu = new BatchUpdate(row);
161 		bu.put(URL_COLUMN, Bytes.toBytes(url));
162 		bu.put(IP_COLUMN, Bytes.toBytes(ip));
163 		if (curi.isSeed()) {
164 			// TODO: Make Bytes.toBytes that takes a boolean.
165 			bu.put(IS_SEED_COLUMN, Bytes.toBytes(Boolean.TRUE.toString()));
166 			if (curi.getPathFromSeed() != null && curi.getPathFromSeed().trim().length() > 0) {
167 				bu.put(PATH_FROM_SEED_COLUMN, Bytes.toBytes(curi.getPathFromSeed().trim()));
168 			}
169 		}
170 		String viaStr = (curi.getVia() != null) ? curi.getVia().toString().trim() : null;
171 		if (viaStr != null && viaStr.length() > 0) {
172 			bu.put(VIA_COLUMN, Bytes.toBytes(viaStr));
173 		}
174 		// Request
175 		if (ros.getSize() > 0) {
176 			add(bu, REQUEST_COLUMN, ros.getReplayInputStream(), (int) ros.getSize());
177 		}
178 		// Response
179 		add(bu, CONTENT_COLUMN, ris.getReplayInputStream(), (int) ris.getSize());
180 		// process the content (optional)
181 		processContent(bu);
182 		// Set crawl time.
183 		bu.setTimestamp(curi.getFetchBeginTime());
184 		this.client.commit(bu);
185 	}
186 
187 	/**
188 	 * This is a stub method and is here to allow extension/overriding for
189 	 * custom content parsing, data manipulation and to populate new columns.
190 	 * 
191 	 * For Example : html parsing, text extraction, analysis and transformation
192 	 * and storing the results in new column families/columns using the batch
193 	 * update object.
194 	 * 
195 	 * @param bu the bu
196 	 */
197 	protected void processContent(BatchUpdate bu) {
198 		// byte[] content = bu.get(CONTENT_COLUMN);
199 		// process content.....
200 		// bu.put("some:new_column", someParsedByteArray);
201 	}
202 
203 	/*
204 	 * Add ReplayInputStream to the passed BatchUpdate.
205 	 * 
206 	 * @param bu
207 	 * 
208 	 * @param key
209 	 * 
210 	 * @param ris
211 	 * 
212 	 * @param baos
213 	 * 
214 	 * @throws IOException
215 	 */
216 	/**
217 	 * Adds the.
218 	 * 
219 	 * @param bu the bu
220 	 * @param key the key
221 	 * @param ris the ris
222 	 * @param size the size
223 	 * 
224 	 * @throws IOException Signals that an I/O exception has occurred.
225 	 */
226 	private void add(final BatchUpdate bu, final String key, final ReplayInputStream ris, final int size) throws IOException {
227 		ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
228 		try {
229 			ris.readFullyTo(baos);
230 		} finally {
231 			ris.close();
232 		}
233 		baos.close();
234 		bu.put(key, baos.toByteArray());
235 	}
236 }