af1fa66b6b711dceb539b699c5106e40e8b31c9a
[hive.git] / ql / src / java / org / apache / hadoop / hive / ql / exec / OperatorFactory.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.hive.ql.exec;
20
21 import java.util.ArrayList;
22 import java.util.IdentityHashMap;
23 import java.util.List;
24 import java.util.Map;
25
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.apache.hadoop.hive.ql.CompilationOpContext;
29 import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
30 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
31 import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
32 import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
33 import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
34 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
35 import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
36 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
37 import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
38 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
39 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator;
40 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
41 import org.apache.hadoop.hive.ql.metadata.HiveException;
42 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
43 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
44 import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
45 import org.apache.hadoop.hive.ql.plan.AbstractVectorDesc;
46 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
47 import org.apache.hadoop.hive.ql.plan.CollectDesc;
48 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
49 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
50 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
51 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
52 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
53 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
54 import org.apache.hadoop.hive.ql.plan.FilterDesc;
55 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
56 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
57 import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
58 import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
59 import org.apache.hadoop.hive.ql.plan.JoinDesc;
60 import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
61 import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
62 import org.apache.hadoop.hive.ql.plan.LimitDesc;
63 import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
64 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
65 import org.apache.hadoop.hive.ql.plan.MuxDesc;
66 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
67 import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
68 import org.apache.hadoop.hive.ql.plan.PTFDesc;
69 import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
70 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
71 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
72 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
73 import org.apache.hadoop.hive.ql.plan.SelectDesc;
74 import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
75 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
76 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
77 import org.apache.hadoop.hive.ql.plan.UnionDesc;
78 import org.apache.hadoop.hive.ql.plan.VectorDesc;
79
80 import com.google.common.base.Preconditions;
81
82 /**
83 * OperatorFactory.
84 *
85 */
86 @SuppressWarnings({ "rawtypes", "unchecked" })
87 public final class OperatorFactory {
88 protected static transient final Logger LOG = LoggerFactory.getLogger(OperatorFactory.class);
89 private static final IdentityHashMap<Class<? extends OperatorDesc>,
90 Class<? extends Operator<? extends OperatorDesc>>> opvec = new IdentityHashMap<>();
91 private static final IdentityHashMap<Class<? extends OperatorDesc>,
92 Class<? extends Operator<? extends OperatorDesc>>> vectorOpvec = new IdentityHashMap<>();
93
94 static {
95 opvec.put(FilterDesc.class, FilterOperator.class);
96 opvec.put(SelectDesc.class, SelectOperator.class);
97 opvec.put(ForwardDesc.class, ForwardOperator.class);
98 opvec.put(FileSinkDesc.class, FileSinkOperator.class);
99 opvec.put(CollectDesc.class, CollectOperator.class);
100 opvec.put(ScriptDesc.class, ScriptOperator.class);
101 opvec.put(PTFDesc.class, PTFOperator.class);
102 opvec.put(ReduceSinkDesc.class, ReduceSinkOperator.class);
103 opvec.put(GroupByDesc.class, GroupByOperator.class);
104 opvec.put(JoinDesc.class, JoinOperator.class);
105 opvec.put(MapJoinDesc.class, MapJoinOperator.class);
106 opvec.put(SMBJoinDesc.class, SMBMapJoinOperator.class);
107 opvec.put(LimitDesc.class, LimitOperator.class);
108 opvec.put(TableScanDesc.class, TableScanOperator.class);
109 opvec.put(UnionDesc.class, UnionOperator.class);
110 opvec.put(UDTFDesc.class, UDTFOperator.class);
111 opvec.put(LateralViewJoinDesc.class, LateralViewJoinOperator.class);
112 opvec.put(LateralViewForwardDesc.class, LateralViewForwardOperator.class);
113 opvec.put(HashTableDummyDesc.class, HashTableDummyOperator.class);
114 opvec.put(HashTableSinkDesc.class, HashTableSinkOperator.class);
115 opvec.put(SparkHashTableSinkDesc.class, SparkHashTableSinkOperator.class);
116 opvec.put(DummyStoreDesc.class, DummyStoreOperator.class);
117 opvec.put(DemuxDesc.class, DemuxOperator.class);
118 opvec.put(MuxDesc.class, MuxOperator.class);
119 opvec.put(AppMasterEventDesc.class, AppMasterEventOperator.class);
120 opvec.put(DynamicPruningEventDesc.class, AppMasterEventOperator.class);
121 opvec.put(SparkPartitionPruningSinkDesc.class, SparkPartitionPruningSinkOperator.class);
122 opvec.put(RCFileMergeDesc.class, RCFileMergeOperator.class);
123 opvec.put(OrcFileMergeDesc.class, OrcFileMergeOperator.class);
124 opvec.put(CommonMergeJoinDesc.class, CommonMergeJoinOperator.class);
125 opvec.put(ListSinkDesc.class, ListSinkOperator.class);
126 }
127
128 static {
129 vectorOpvec.put(AppMasterEventDesc.class, VectorAppMasterEventOperator.class);
130 vectorOpvec.put(DynamicPruningEventDesc.class, VectorAppMasterEventOperator.class);
131 vectorOpvec.put(
132 SparkPartitionPruningSinkDesc.class, VectorSparkPartitionPruningSinkOperator.class);
133 vectorOpvec.put(SelectDesc.class, VectorSelectOperator.class);
134 vectorOpvec.put(GroupByDesc.class, VectorGroupByOperator.class);
135 vectorOpvec.put(MapJoinDesc.class, VectorMapJoinOperator.class);
136 vectorOpvec.put(SMBJoinDesc.class, VectorSMBMapJoinOperator.class);
137 vectorOpvec.put(ReduceSinkDesc.class, VectorReduceSinkOperator.class);
138 vectorOpvec.put(FileSinkDesc.class, VectorFileSinkOperator.class);
139 vectorOpvec.put(FilterDesc.class, VectorFilterOperator.class);
140 vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class);
141 vectorOpvec.put(SparkHashTableSinkDesc.class, VectorSparkHashTableSinkOperator.class);
142 }
143
144 public static <T extends OperatorDesc> Operator<T> getVectorOperator(
145 Class<? extends Operator<?>> opClass, CompilationOpContext cContext, T conf,
146 VectorizationContext vContext) throws HiveException {
147 try {
148 VectorDesc vectorDesc = ((AbstractOperatorDesc) conf).getVectorDesc();
149 vectorDesc.setVectorOp(opClass);
150 Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor(
151 CompilationOpContext.class, VectorizationContext.class, OperatorDesc.class)
152 .newInstance(cContext, vContext, conf);
153 return op;
154 } catch (Exception e) {
155 e.printStackTrace();
156 throw new HiveException(e);
157 }
158 }
159
160 public static <T extends OperatorDesc> Operator<T> getVectorOperator(
161 CompilationOpContext cContext, T conf, VectorizationContext vContext) throws HiveException {
162 Class<T> descClass = (Class<T>) conf.getClass();
163 Class<?> opClass = vectorOpvec.get(descClass);
164 if (opClass != null) {
165 return getVectorOperator(vectorOpvec.get(descClass), cContext, conf, vContext);
166 }
167 throw new HiveException("No vector operator for descriptor class " + descClass.getName());
168 }
169
170 public static <T extends OperatorDesc> Operator<T> get(
171 CompilationOpContext cContext, Class<T> descClass) {
172 Preconditions.checkNotNull(cContext);
173 Class<?> opClass = opvec.get(descClass);
174 if (opClass != null) {
175 try {
176 return (Operator<T>)opClass.getDeclaredConstructor(
177 CompilationOpContext.class).newInstance(cContext);
178 } catch (Exception e) {
179 e.printStackTrace();
180 throw new RuntimeException(e);
181 }
182 }
183 throw new RuntimeException("No operator for descriptor class " + descClass.getName());
184 }
185
186 /**
187 * Returns an operator given the conf and a list of children operators.
188 */
189 public static <T extends OperatorDesc> Operator<T> get(CompilationOpContext cContext, T conf) {
190 Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
191 ret.setConf(conf);
192 return (ret);
193 }
194
195 /**
196 * Returns an operator given the conf and a list of children operators.
197 */
198 public static <T extends OperatorDesc> Operator<T> get(T conf,
199 Operator<? extends OperatorDesc> oplist0, Operator<? extends OperatorDesc>... oplist) {
200 Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass());
201 ret.setConf(conf);
202 makeChild(ret, oplist0);
203 makeChild(ret, oplist);
204 return (ret);
205 }
206
207 /**
208 * Returns an operator given the conf and a list of children operators.
209 */
210 public static void makeChild(
211 Operator<? extends OperatorDesc> ret,
212 Operator<? extends OperatorDesc>... oplist) {
213 if (oplist.length == 0) {
214 return;
215 }
216
217 ArrayList<Operator<? extends OperatorDesc>> clist =
218 new ArrayList<Operator<? extends OperatorDesc>>();
219 for (Operator<? extends OperatorDesc> op : oplist) {
220 clist.add(op);
221 }
222 ret.setChildOperators(clist);
223
224 // Add this parent to the children
225 for (Operator<? extends OperatorDesc> op : oplist) {
226 List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
227 parents.add(ret);
228 op.setParentOperators(parents);
229 }
230 }
231
232 /**
233 * Returns an operator given the conf and a list of children operators.
234 */
235 public static <T extends OperatorDesc> Operator<T> get(
236 CompilationOpContext cContext, T conf, RowSchema rwsch) {
237 Operator<T> ret = get(cContext, conf);
238 ret.setSchema(rwsch);
239 return (ret);
240 }
241
242
243 /**
244 * Returns an operator given the conf and a list of parent operators.
245 */
246 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
247 T conf, Operator oplist0, Operator... oplist) {
248 Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass());
249 ret.setConf(conf);
250
251 // Add the new operator as child of each of the passed in operators
252 List<Operator> children = oplist0.getChildOperators();
253 children.add(ret);
254 oplist0.setChildOperators(children);
255 for (Operator op : oplist) {
256 children = op.getChildOperators();
257 children.add(ret);
258 op.setChildOperators(children);
259 }
260
261 // add parents for the newly created operator
262 List<Operator<? extends OperatorDesc>> parent =
263 new ArrayList<Operator<? extends OperatorDesc>>();
264 parent.add(oplist0);
265 for (Operator op : oplist) {
266 parent.add(op);
267 }
268
269 ret.setParentOperators(parent);
270
271 return (ret);
272 }
273
274 /**
275 * Returns an operator given the conf and a list of parent operators.
276 */
277 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
278 T conf, List<Operator<? extends OperatorDesc>> oplist) {
279 Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
280 ret.setConf(conf);
281 if (oplist.size() == 0) {
282 return ret;
283 }
284
285 // Add the new operator as child of each of the passed in operators
286 for (Operator op : oplist) {
287 List<Operator> children = op.getChildOperators();
288 children.add(ret);
289 }
290
291 // add parents for the newly created operator
292 List<Operator<? extends OperatorDesc>> parent =
293 new ArrayList<Operator<? extends OperatorDesc>>();
294 for (Operator op : oplist) {
295 parent.add(op);
296 }
297
298 ret.setParentOperators(parent);
299
300 return ret;
301 }
302
303 /**
304 * Returns an operator given the conf and a list of parent operators.
305 */
306 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
307 CompilationOpContext cContext, T conf, RowSchema rwsch) {
308 Operator<T> ret = get(cContext, (Class<T>) conf.getClass());
309 ret.setConf(conf);
310 ret.setSchema(rwsch);
311 return ret;
312 }
313
314 /**
315 * Returns an operator given the conf and a list of parent operators.
316 */
317 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
318 CompilationOpContext ctx, T conf, RowSchema rwsch, Operator[] oplist) {
319 Operator<T> ret = get(ctx, (Class<T>) conf.getClass());
320 ret.setConf(conf);
321 ret.setSchema(rwsch);
322 if (oplist.length == 0) return ret;
323
324 // Add the new operator as child of each of the passed in operators
325 for (Operator op : oplist) {
326 List<Operator> children = op.getChildOperators();
327 children.add(ret);
328 op.setChildOperators(children);
329 }
330
331 // add parents for the newly created operator
332 List<Operator<? extends OperatorDesc>> parent =
333 new ArrayList<Operator<? extends OperatorDesc>>();
334 for (Operator op : oplist) {
335 parent.add(op);
336 }
337
338 ret.setParentOperators(parent);
339
340 return (ret);
341 }
342
343 /**
344 * Returns an operator given the conf and a list of parent operators.
345 */
346 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(
347 T conf, RowSchema rwsch, Operator oplist0, Operator... oplist) {
348 Operator<T> ret = getAndMakeChild(conf, oplist0, oplist);
349 ret.setSchema(rwsch);
350 return ret;
351 }
352
353 /**
354 * Returns an operator given the conf and a list of parent operators.
355 */
356 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, RowSchema rwsch,
357 Map<String, ExprNodeDesc> colExprMap, Operator oplist0, Operator... oplist) {
358 Operator<T> ret = getAndMakeChild(conf, rwsch, oplist0, oplist);
359 ret.setColumnExprMap(colExprMap);
360 return (ret);
361 }
362
363 /**
364 * Returns an operator given the conf and a list of parent operators.
365 */
366 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
367 T conf, RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) {
368 Operator<T> ret = getAndMakeChild(cContext, conf, oplist);
369 ret.setSchema(rwsch);
370 return (ret);
371 }
372
373 /**
374 * Returns an operator given the conf and a list of parent operators.
375 */
376 public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext,
377 T conf, RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap,
378 List<Operator<? extends OperatorDesc>> oplist) {
379 Operator<T> ret = getAndMakeChild(cContext, conf, rwsch, oplist);
380 ret.setColumnExprMap(colExprMap);
381 return (ret);
382 }
383
384 private OperatorFactory() {
385 // prevent instantiation
386 }
387 }