View Javadoc

1   /**
2    * HBaseWriter
3    *
4    * $Id$
5    *
6    * Created on June 23rd, 2007
7    *
8    * This file is part of the Heritrix web crawler (crawler.archive.org).
9    *
10   * Heritrix is free software; you can redistribute it and/or modify
11   * it under the terms of the GNU Lesser Public License as published by
12   * the Free Software Foundation; either version 2.1 of the License, or
13   * any later version.
14   *
15   * Heritrix is distributed in the hope that it will be useful,
16   * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   * GNU Lesser Public License for more details.
19   *
20   * You should have received a copy of the GNU Lesser Public License
21   * along with Heritrix; if not, write to the Free Software
22   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23   */
24  package com.powerset.heritrix.writer;
25  
26  import java.io.ByteArrayOutputStream;
27  import java.io.IOException;
28  
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.client.HBaseAdmin;
34  import org.apache.hadoop.hbase.client.HTable;
35  import org.apache.hadoop.hbase.io.BatchUpdate;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.Keying;
38  import org.apache.log4j.Logger;
39  import org.archive.io.ArchiveFileConstants;
40  import org.archive.io.RecordingInputStream;
41  import org.archive.io.RecordingOutputStream;
42  import org.archive.io.ReplayInputStream;
43  import org.archive.io.WriterPoolMember;
44  import org.archive.modules.ProcessorURI;
45  
46  /**
47   * Write to HBase. Puts content into the 'content:' column and all else into the
48   * 'curi:' column family. Makes a row key of an url transformation. Creates
49   * table if it does not exist.
50   * 
51   * <p>
52   * Limitations: Hard-coded table schema.
53   */
54  public class HBaseWriter extends WriterPoolMember implements ArchiveFileConstants {
55  	private final Logger LOG = Logger.getLogger(this.getClass().getName());
56  	private final HTable client;
57  	// TODO: make this variable configurable in the heritrix sheet:
58  	// CONTENT_COLUMN_FAMILY
59  	public static final String CONTENT_COLUMN_FAMILY = "content:";
60  	// TODO: make this variable configurable in the heritrix sheet:
61  	// CONTENT_COLUMN
62  	public static final String CONTENT_COLUMN = CONTENT_COLUMN_FAMILY + "raw_data";
63  	// TODO: make this variable configurable in the heritrix sheet:
64  	// CURI_COLUMN_FAMILY
65  	public static final String CURI_COLUMN_FAMILY = "curi:";
66  
67  	private static final String IP_COLUMN = CURI_COLUMN_FAMILY + "ip";
68  	private static final String PATH_FROM_SEED_COLUMN = CURI_COLUMN_FAMILY + "path-from-seed";
69  	private static final String IS_SEED_COLUMN = CURI_COLUMN_FAMILY + "is-seed";
70  	private static final String VIA_COLUMN = CURI_COLUMN_FAMILY + "via";
71  	private static final String URL_COLUMN = CURI_COLUMN_FAMILY + "url";
72  	private static final String REQUEST_COLUMN = CURI_COLUMN_FAMILY + "request";
73  
74  	public HBaseWriter(final String master, final String table) throws IOException {
75  		super(null, null, null, false, null);
76  		if (table == null || table.length() <= 0) {
77  			throw new IllegalArgumentException("Must specify a table name");
78  		}
79  		HBaseConfiguration c = new HBaseConfiguration();
80  		if (master != null && master.length() > 0) {
81  			c.set(HConstants.MASTER_ADDRESS, master);
82  		}
83  		createCrawlTable(c, table);
84  		this.client = new HTable(c, table);
85  	}
86  
87  	public HTable getClient() {
88  		return client;
89  	}
90  
91  	protected void createCrawlTable(final HBaseConfiguration c, final String table) throws IOException {
92  		HBaseAdmin admin = new HBaseAdmin(c);
93  		if (admin.tableExists(table)) {
94  			return;
95  		}
96  		HTableDescriptor htd = new HTableDescriptor(table);
97  		htd.addFamily(new HColumnDescriptor(CONTENT_COLUMN_FAMILY));
98  		htd.addFamily(new HColumnDescriptor(CURI_COLUMN_FAMILY));
99  		admin.createTable(htd);
100 		LOG.info("Created table " + htd.toString());
101 	}
102 
103 	/**
104 	 * @param curi
105 	 *            URI of crawled document
106 	 * @param ip
107 	 *            IP of remote machine.
108 	 * @param ros
109 	 *            recording input stream that captured the response
110 	 * @param ris
111 	 *            recording output stream that captured the GET request
112 	 */
113 	public void write(final ProcessorURI curi, final String ip, final RecordingOutputStream ros, final RecordingInputStream ris)
114 			throws IOException {
115 		String url = curi.toString();
116 		String row = Keying.createKey(url);
117 		if (LOG.isTraceEnabled()) {
118 			LOG.trace("Writing " + url + " as " + row.toString());
119 		}
120 		BatchUpdate bu = new BatchUpdate(row);
121 		bu.put(URL_COLUMN, Bytes.toBytes(url));
122 		bu.put(IP_COLUMN, Bytes.toBytes(ip));
123 		if (curi.isSeed()) {
124 			// TODO: Make Bytes.toBytes that takes a boolean.
125 			bu.put(IS_SEED_COLUMN, Bytes.toBytes(Boolean.TRUE.toString()));
126 			if (curi.getPathFromSeed() != null && curi.getPathFromSeed().trim().length() > 0) {
127 				bu.put(PATH_FROM_SEED_COLUMN, Bytes.toBytes(curi.getPathFromSeed().trim()));
128 			}
129 		}
130 		String viaStr = (curi.getVia() != null) ? curi.getVia().toString().trim() : null;
131 		if (viaStr != null && viaStr.length() > 0) {
132 			bu.put(VIA_COLUMN, Bytes.toBytes(viaStr));
133 		}
134 		// Request
135 		if (ros.getSize() > 0) {
136 			add(bu, REQUEST_COLUMN, ros.getReplayInputStream(), (int) ros.getSize());
137 		}
138 		// Response
139 		add(bu, CONTENT_COLUMN, ris.getReplayInputStream(), (int) ris.getSize());
140 		// process the content (optional)
141 		processContent(bu);
142 		// Set crawl time.
143 		bu.setTimestamp(curi.getFetchBeginTime());
144 		this.client.commit(bu);
145 	}
146 
147 	/**
148 	 * This is a stub method and is here to allow extension/overriding for
149 	 * custom content parsing, data manipulation and to populate new columns.
150 	 * 
151 	 * For Example : html parsing, text extraction, analysis and transformation
152 	 * and storing the results in new column families/columns using the batch
153 	 * update object.
154 	 * 
155 	 * @param bu
156 	 */
157 	protected void processContent(BatchUpdate bu) {
158 		// byte[] content = bu.get(CONTENT_COLUMN);
159 		// process content.....
160 		// bu.put("some:new_column", someParsedByteArray);
161 	}
162 
163 	/*
164 	 * Add ReplayInputStream to the passed BatchUpdate.
165 	 * 
166 	 * @param bu
167 	 * 
168 	 * @param key
169 	 * 
170 	 * @param ris
171 	 * 
172 	 * @param baos
173 	 * 
174 	 * @throws IOException
175 	 */
176 	private void add(final BatchUpdate bu, final String key, final ReplayInputStream ris, final int size) throws IOException {
177 		ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
178 		try {
179 			ris.readFullyTo(baos);
180 		} finally {
181 			ris.close();
182 		}
183 		baos.close();
184 		bu.put(key, baos.toByteArray());
185 	}
186 }