038b96ce4cba3452c29f60e0803b26d03db122e3
[hive.git] / ql / src / java / org / apache / hadoop / hive / ql / exec / OperatorFactory.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.hive.ql.exec;
20
21 import java.util.ArrayList;
22 import java.util.IdentityHashMap;
23 import java.util.List;
24 import java.util.Map;
25
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.apache.hadoop.hive.ql.CompilationOpContext;
29 import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
30 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
31 import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
32 import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
33 import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
34 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
35 import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
36 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
37 import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
38 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
39 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator;
40 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
41 import org.apache.hadoop.hive.ql.metadata.HiveException;
42 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
43 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
44 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
45 import org.apache.hadoop.hive.ql.plan.CollectDesc;
46 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
47 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
48 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
49 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
50 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
51 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
52 import org.apache.hadoop.hive.ql.plan.FilterDesc;
53 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
54 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
55 import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
56 import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
57 import org.apache.hadoop.hive.ql.plan.JoinDesc;
58 import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
59 import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
60 import org.apache.hadoop.hive.ql.plan.LimitDesc;
61 import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
62 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
63 import org.apache.hadoop.hive.ql.plan.MuxDesc;
64 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
65 import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
66 import org.apache.hadoop.hive.ql.plan.PTFDesc;
67 import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
68 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
69 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
70 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
71 import org.apache.hadoop.hive.ql.plan.SelectDesc;
72 import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
73 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
74 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
75 import org.apache.hadoop.hive.ql.plan.UnionDesc;
76
77 import com.google.common.base.Preconditions;
78
79 /**
80 * OperatorFactory.
81 *
82 */
83 @SuppressWarnings({ "rawtypes", "unchecked" })
84 public final class OperatorFactory {
85 protected static transient final Logger LOG = LoggerFactory.getLogger(OperatorFactory.class);
86 private static final IdentityHashMap<Class<? extends OperatorDesc>,
87 Class<? extends Operator<? extends OperatorDesc>>> opvec = new IdentityHashMap<>();
88 private static final IdentityHashMap<Class<? extends OperatorDesc>,
89 Class<? extends Operator<? extends OperatorDesc>>> vectorOpvec = new IdentityHashMap<>();
90
91 static {
92 opvec.put(FilterDesc.class, FilterOperator.class);
93 opvec.put(SelectDesc.class, SelectOperator.class);
94 opvec.put(ForwardDesc.class, ForwardOperator.class);
95 opvec.put(FileSinkDesc.class, FileSinkOperator.class);
96 opvec.put(CollectDesc.class, CollectOperator.class);
97 opvec.put(ScriptDesc.class, ScriptOperator.class);
98 opvec.put(PTFDesc.class, PTFOperator.class);
99 opvec.put(ReduceSinkDesc.class, ReduceSinkOperator.class);
100 opvec.put(GroupByDesc.class, GroupByOperator.class);
101 opvec.put(JoinDesc.class, JoinOperator.class);
102 opvec.put(MapJoinDesc.class, MapJoinOperator.class);
103 opvec.put(SMBJoinDesc.class, SMBMapJoinOperator.class);
104 opvec.put(LimitDesc.class, LimitOperator.class);
105 opvec.put(TableScanDesc.class, TableScanOperator.class);
106 opvec.put(UnionDesc.class, UnionOperator.class);
107 opvec.put(UDTFDesc.class, UDTFOperator.class);
108 opvec.put(LateralViewJoinDesc.class, LateralViewJoinOperator.class);
109 opvec.put(LateralViewForwardDesc.class, LateralViewForwardOperator.class);
110 opvec.put(HashTableDummyDesc.class, HashTableDummyOperator.class);
111 opvec.put(HashTableSinkDesc.class, HashTableSinkOperator.class);
112 opvec.put(SparkHashTableSinkDesc.class, SparkHashTableSinkOperator.class);
113 opvec.put(DummyStoreDesc.class, DummyStoreOperator.class);
114 opvec.put(DemuxDesc.class, DemuxOperator.class);
115 opvec.put(MuxDesc.class, MuxOperator.class);
116 opvec.put(AppMasterEventDesc.class, AppMasterEventOperator.class);
117 opvec.put(DynamicPruningEventDesc.class, AppMasterEventOperator.class);
118 opvec.put(SparkPartitionPruningSinkDesc.class, SparkPartitionPruningSinkOperator.class);
119 opvec.put(RCFileMergeDesc.class, RCFileMergeOperator.class);
120 opvec.put(OrcFileMergeDesc.class, OrcFileMergeOperator.class);
121 opvec.put(CommonMergeJoinDesc.class, CommonMergeJoinOperator.class);
122 opvec.put(ListSinkDesc.class, ListSinkOperator.class);
123 }
124
125 static {
126 vectorOpvec.put(AppMasterEventDesc.class, VectorAppMasterEventOperator.class);
127 vectorOpvec.put(DynamicPruningEventDesc.class, VectorAppMasterEventOperator.class);
128 vectorOpvec.put(
129 SparkPartitionPruningSinkDesc.class, VectorSparkPartitionPruningSinkOperator.class);
130 vectorOpvec.put(SelectDesc.class, VectorSelectOperator.class);
131 vectorOpvec.put(GroupByDesc.class, VectorGroupByOperator.class);
132 vectorOpvec.put(MapJoinDesc.class, VectorMapJoinOperator.class);
133 vectorOpvec.put(SMBJoinDesc.class, VectorSMBMapJoinOperator.class);
134 vectorOpvec.put(ReduceSinkDesc.class, VectorReduceSinkOperator.class);
135 vectorOpvec.put(FileSinkDesc.class, VectorFileSinkOperator.class);
136 vectorOpvec.put(FilterDesc.class, VectorFilterOperator.class);
137 vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class);
138 vectorOpvec.put(SparkHashTableSinkDesc.class, VectorSparkHashTableSinkOperator.class);
139 }
140
141 public static <T extends OperatorDesc> Operator<T> getVectorOperator(
142 Class<? extends Operator<?>> opClass, CompilationOpContext cContext, T conf,
143 VectorizationContext vContext) throws HiveException {
144 try {
145 Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor(
146 CompilationOpContext.class, VectorizationContext.class, OperatorDesc.class)
147 .newInstance(cContext, vContext, conf);
148 return op;
149 } catch (Exception e) {
150 e.printStackTrace();
151 throw new HiveException(e);
152 }
153 }
154
155 public static <T extends OperatorDesc> Operator<T> getVectorOperator(
156 CompilationOpContext cContext, T conf, VectorizationContext vContext) throws HiveException {
157 Class<T> descClass = (Class<T>) conf.getClass();
158 Class<?> opClass = vectorOpvec.get(descClass);
159 if (opClass != null) {
160 return getVectorOperator(vectorOpvec.get(descClass), cContext, conf, vContext);
161 }
162 throw new HiveException("No vector operator for descriptor class " + descClass.getName());
163 }
164
165 public static <T extends OperatorDesc> Operator<T> get(
166 CompilationOpContext cContext, Class<T> descClass) {
167 Preconditions.checkNotNull(cContext);
168 Class<?> opClass = opvec.get(descClass);
169 if (opClass != null) {
170 try {
171 return (Operator<T>)opClass.getDeclaredConstructor(
172 CompilationOpContext.class).newInstance(cContext);
173 } catch (Exception e) {
174 e.printStackTrace();
175 throw new RuntimeException(e);
176 }
177 }
178 throw new RuntimeException("No operator for descriptor class " + descClass.getName());
179 }
180
181 /**
182 * Returns an operator given the conf and a list of children operators.
183 */
184 public static <T extends OperatorDesc> Operator<T> get(CompilationOpContext cContext, T conf) {
185 Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
186 ret.setConf(conf);
187 return (ret);
188 }
189
190 /**
191 * Returns an operator given the conf and a list of children operators.
192 */
193 public static <T extends OperatorDesc> Operator<T> get(T conf,
194 Operator<? extends OperatorDesc> oplist0, Operator<? extends OperatorDesc>... oplist) {
195 Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass());
196 ret.setConf(conf);
197 makeChild(ret, oplist0);
198 makeChild(ret, oplist);
199 return (ret);
200 }
201
202 /**
203 * Returns an operator given the conf and a list of children operators.
204 */
205 public static void makeChild(
206 Operator<? extends OperatorDesc> ret,
207 Operator<? extends OperatorDesc>... oplist) {
208 if (oplist.length == 0) {
209 return;
210 }
211
212 ArrayList<Operator<? extends OperatorDesc>> clist =
213 new ArrayList<Operator<? extends OperatorDesc>>();
214 for (Operator<? extends OperatorDesc> op : oplist) {
215 clist.add(op);
216 }
217 ret.setChildOperators(clist);
218
219 // Add this parent to the children
220 for (Operator<? extends OperatorDesc> op : oplist) {
221 List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
222 parents.add(ret);
223 op.setParentOperators(parents);
224 }
225 }
226
227 /**
228 * Returns an operator given the conf and a list of children operators.
229 */
230 public static <T extends OperatorDesc> Operator<T> get(
231 CompilationOpContext cContext, T conf, RowSchema rwsch) {
232 Operator<T> ret = get(cContext, conf);
233 ret.setSchema(rwsch);
234 return (ret);
235 }
236
237
238 /**
239 * Returns an operator given the conf and a list of parent operators.
240 */
241 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
242 T conf, Operator oplist0, Operator... oplist) {
243 Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass());
244 ret.setConf(conf);
245
246 // Add the new operator as child of each of the passed in operators
247 List<Operator> children = oplist0.getChildOperators();
248 children.add(ret);
249 oplist0.setChildOperators(children);
250 for (Operator op : oplist) {
251 children = op.getChildOperators();
252 children.add(ret);
253 op.setChildOperators(children);
254 }
255
256 // add parents for the newly created operator
257 List<Operator<? extends OperatorDesc>> parent =
258 new ArrayList<Operator<? extends OperatorDesc>>();
259 parent.add(oplist0);
260 for (Operator op : oplist) {
261 parent.add(op);
262 }
263
264 ret.setParentOperators(parent);
265
266 return (ret);
267 }
268
269 /**
270 * Returns an operator given the conf and a list of parent operators.
271 */
272 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
273 T conf, List<Operator<? extends OperatorDesc>> oplist) {
274 Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
275 ret.setConf(conf);
276 if (oplist.size() == 0) {
277 return ret;
278 }
279
280 // Add the new operator as child of each of the passed in operators
281 for (Operator op : oplist) {
282 List<Operator> children = op.getChildOperators();
283 children.add(ret);
284 }
285
286 // add parents for the newly created operator
287 List<Operator<? extends OperatorDesc>> parent =
288 new ArrayList<Operator<? extends OperatorDesc>>();
289 for (Operator op : oplist) {
290 parent.add(op);
291 }
292
293 ret.setParentOperators(parent);
294
295 return ret;
296 }
297
298 /**
299 * Returns an operator given the conf and a list of parent operators.
300 */
301 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
302 CompilationOpContext cContext, T conf, RowSchema rwsch) {
303 Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
304 ret.setConf(conf);
305 ret.setSchema(rwsch);
306 return ret;
307 }
308
309 /**
310 * Returns an operator given the conf and a list of parent operators.
311 */
312 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
313 CompilationOpContext ctx, T conf, RowSchema rwsch, Operator[] oplist) {
314 Operator<T> ret = get(ctx, (Class<T>) conf.getClass());
315 ret.setConf(conf);
316 ret.setSchema(rwsch);
317 if (oplist.length == 0) return ret;
318
319 // Add the new operator as child of each of the passed in operators
320 for (Operator op : oplist) {
321 List<Operator> children = op.getChildOperators();
322 children.add(ret);
323 op.setChildOperators(children);
324 }
325
326 // add parents for the newly created operator
327 List<Operator<? extends OperatorDesc>> parent =
328 new ArrayList<Operator<? extends OperatorDesc>>();
329 for (Operator op : oplist) {
330 parent.add(op);
331 }
332
333 ret.setParentOperators(parent);
334
335 return (ret);
336 }
337
338 /**
339 * Returns an operator given the conf and a list of parent operators.
340 */
341 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
342 T conf, RowSchema rwsch, Operator oplist0, Operator... oplist) {
343 Operator<T> ret = getAndMakeChild(conf, oplist0, oplist);
344 ret.setSchema(rwsch);
345 return ret;
346 }
347
348 /**
349 * Returns an operator given the conf and a list of parent operators.
350 */
351 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, RowSchema rwsch,
352 Map<String, ExprNodeDesc> colExprMap, Operator oplist0, Operator... oplist) {
353 Operator<T> ret = getAndMakeChild(conf, rwsch, oplist0, oplist);
354 ret.setColumnExprMap(colExprMap);
355 return (ret);
356 }
357
358 /**
359 * Returns an operator given the conf and a list of parent operators.
360 */
361 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
362 T conf, RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) {
363 Operator<T> ret = getAndMakeChild(cContext, conf, oplist);
364 ret.setSchema(rwsch);
365 return (ret);
366 }
367
368 /**
369 * Returns an operator given the conf and a list of parent operators.
370 */
371 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
372 T conf, RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap,
373 List<Operator<? extends OperatorDesc>> oplist) {
374 Operator<T> ret = getAndMakeChild(cContext, conf, rwsch, oplist);
375 ret.setColumnExprMap(colExprMap);
376 return (ret);
377 }
378
379 private OperatorFactory() {
380 // prevent instantiation
381 }
382 }