METAMODEL-1205: Fixed CassandraUnit, Guava, Hadoop for JDK9+
[metamodel.git] / hbase / src / main / java / org / apache / metamodel / hbase / HBaseDataContext.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.metamodel.hbase;
20
21 import java.io.IOException;
22 import java.util.List;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.TableName;
26 import org.apache.hadoop.hbase.client.Admin;
27 import org.apache.hadoop.hbase.client.Connection;
28 import org.apache.hadoop.hbase.client.ConnectionFactory;
29 import org.apache.hadoop.hbase.client.Get;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.ResultScanner;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.client.TableDescriptor;
34 import org.apache.hadoop.hbase.filter.PageFilter;
35 import org.apache.metamodel.DataContext;
36 import org.apache.metamodel.MetaModelException;
37 import org.apache.metamodel.QueryPostprocessDataContext;
38 import org.apache.metamodel.UpdateScript;
39 import org.apache.metamodel.UpdateSummary;
40 import org.apache.metamodel.UpdateableDataContext;
41 import org.apache.metamodel.annotations.InterfaceStability;
42 import org.apache.metamodel.data.DataSet;
43 import org.apache.metamodel.data.DataSetHeader;
44 import org.apache.metamodel.data.Row;
45 import org.apache.metamodel.data.SimpleDataSetHeader;
46 import org.apache.metamodel.query.FilterItem;
47 import org.apache.metamodel.query.SelectItem;
48 import org.apache.metamodel.schema.Column;
49 import org.apache.metamodel.schema.MutableSchema;
50 import org.apache.metamodel.schema.Schema;
51 import org.apache.metamodel.schema.Table;
52 import org.apache.metamodel.util.FileHelper;
53 import org.apache.metamodel.util.SimpleTableDef;
54
55 /**
56 * MetaModel adaptor for Apache HBase.
57 */
58 @InterfaceStability.Evolving
59 public class HBaseDataContext extends QueryPostprocessDataContext implements UpdateableDataContext {
60
61 public static final String FIELD_ID = "_id";
62
63 private final HBaseConfiguration _configuration;
64 private final Connection _connection;
65
66 /**
67 * Creates a {@link HBaseDataContext}.
68 *
69 * @param configuration
70 */
71 public HBaseDataContext(HBaseConfiguration configuration) {
72 super(false);
73 Configuration config = createConfig(configuration);
74 _configuration = configuration;
75 _connection = createConnection(config);
76 }
77
78 /**
79 * Creates a {@link HBaseDataContext}.
80 *
81 * @param configuration
82 * @param connection
83 */
84 public HBaseDataContext(HBaseConfiguration configuration, Connection connection) {
85 super(false);
86 _configuration = configuration;
87 _connection = connection;
88 }
89
90 private Connection createConnection(Configuration config) {
91 try {
92 return ConnectionFactory.createConnection(config);
93 } catch (IOException e) {
94 throw new MetaModelException(e);
95 }
96 }
97
98 private static Configuration createConfig(HBaseConfiguration configuration) {
99 Configuration config = org.apache.hadoop.hbase.HBaseConfiguration.create();
100 config.set("hbase.zookeeper.quorum", configuration.getZookeeperHostname());
101 config.set("hbase.zookeeper.property.clientPort", Integer.toString(configuration.getZookeeperPort()));
102 config.set("hbase.client.retries.number", Integer.toString(configuration.getHBaseClientRetries()));
103 config.set("zookeeper.session.timeout", Integer.toString(configuration.getZookeeperSessionTimeout()));
104 config.set("zookeeper.recovery.retry", Integer.toString(configuration.getZookeeperRecoveryRetries()));
105 return config;
106 }
107
108 /**
109 * Gets the {@link Admin} used by this {@link DataContext}
110 *
111 * @return
112 */
113 public Admin getAdmin() {
114 try {
115 return _connection.getAdmin();
116 } catch (IOException e) {
117 throw new MetaModelException(e);
118 }
119 }
120
121 public Connection getConnection() {
122 return _connection;
123 }
124
125 @Override
126 protected Schema getMainSchema() throws MetaModelException {
127 final MutableSchema schema = new MutableSchema(_configuration.getSchemaName());
128
129 SimpleTableDef[] tableDefinitions = _configuration.getTableDefinitions();
130 if (tableDefinitions == null) {
131 try {
132 final List<TableDescriptor> tables = getAdmin().listTableDescriptors();
133 tableDefinitions = new SimpleTableDef[tables.size()];
134 for (int i = 0; i < tables.size(); i++) {
135 final String tableName = tables.get(i).getTableName().getNameAsString();
136 final SimpleTableDef emptyTableDef = new SimpleTableDef(tableName, new String[0]);
137 tableDefinitions[i] = emptyTableDef;
138 }
139 } catch (IOException e) {
140 throw new MetaModelException(e);
141 }
142 }
143
144 for (SimpleTableDef tableDef : tableDefinitions) {
145 schema.addTable(new HBaseTable(this, tableDef, schema, _configuration.getDefaultRowKeyType()));
146 }
147
148 return schema;
149 }
150
151 /**
152 * Gets the {@link HBaseConfiguration} that is used in this datacontext.
153 *
154 * @return
155 */
156 public HBaseConfiguration getConfiguration() {
157 return _configuration;
158 }
159
160 @Override
161 protected String getMainSchemaName() throws MetaModelException {
162 return _configuration.getSchemaName();
163 }
164
165 @Override
166 protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
167 if (whereItems != null && !whereItems.isEmpty()) {
168 return null;
169 }
170
171 long result = 0;
172 final org.apache.hadoop.hbase.client.Table hTable = getHTable(table.getName());
173 try {
174 ResultScanner scanner = hTable.getScanner(new Scan());
175 try {
176 while (scanner.next() != null) {
177 result++;
178 }
179 } finally {
180 scanner.close();
181 }
182 return result;
183 } catch (IOException e) {
184 throw new MetaModelException(e);
185 }
186 }
187
188 protected org.apache.hadoop.hbase.client.Table getHTable(String name) {
189 try {
190 final TableName tableName = TableName.valueOf(name);
191 final org.apache.hadoop.hbase.client.Table hTable = _connection.getTable(tableName);
192 return hTable;
193 } catch (IOException e) {
194 throw new MetaModelException(e);
195 }
196 }
197
198 @Override
199 protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn,
200 Object keyValue) {
201 final org.apache.hadoop.hbase.client.Table hTable = getHTable(table.getName());
202 final Get get = new Get(ByteUtils.toBytes(keyValue));
203 try {
204 final Result result = hTable.get(get);
205 final DataSetHeader header = new SimpleDataSetHeader(selectItems);
206 final Row row = new HBaseRow(header, result);
207 return row;
208 } catch (IOException e) {
209 throw new IllegalStateException("Failed to execute HBase get operation with " + primaryKeyColumn.getName()
210 + " = " + keyValue, e);
211 } finally {
212 FileHelper.safeClose(hTable);
213 }
214 }
215
216 @Override
217 protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
218 final Scan scan = new Scan();
219 for (Column column : columns) {
220 if (!column.isPrimaryKey()) {
221 final int colonIndex = column.getName().indexOf(':');
222 if (colonIndex != -1) {
223 String family = column.getName().substring(0, colonIndex);
224 scan.addFamily(family.getBytes());
225 } else {
226 scan.addFamily(column.getName().getBytes());
227 }
228 }
229 }
230
231 if (maxRows > 0) {
232 setMaxRows(scan, maxRows);
233 }
234
235 final org.apache.hadoop.hbase.client.Table hTable = getHTable(table.getName());
236 try {
237 final ResultScanner scanner = hTable.getScanner(scan);
238 return new HBaseDataSet(columns, scanner, hTable);
239 } catch (Exception e) {
240 FileHelper.safeClose(hTable);
241 throw new MetaModelException(e);
242 }
243 }
244
245 private void setMaxRows(Scan scan, int maxRows) {
246 scan.setFilter(new PageFilter(maxRows));
247 }
248
249 @Override
250 public UpdateSummary executeUpdate(UpdateScript update) {
251 final HBaseUpdateCallback callback = new HBaseUpdateCallback(this);
252 update.run(callback);
253
254 return callback.getUpdateSummary();
255 }
256
257 HBaseClient getHBaseClient() {
258 return new HBaseClient(this.getConnection());
259 }
260 }