416606eaf824563638e4fa38c7008bd74e3b2e99
[hive.git] / ql / src / java / org / apache / hadoop / hive / ql / exec / MapJoinOperator.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.hive.ql.exec;
20
21 import java.io.IOException;
22 import java.io.Serializable;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.Future;
27
28 import org.apache.commons.lang3.tuple.ImmutablePair;
29 import org.apache.commons.lang3.tuple.Pair;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hive.common.ObjectPair;
32 import org.apache.hadoop.hive.conf.HiveConf;
33 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
34 import org.apache.hadoop.hive.ql.CompilationOpContext;
35 import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
36 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
37 import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap;
38 import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer;
39 import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition;
40 import org.apache.hadoop.hive.ql.exec.persistence.KeyValueContainer;
41 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
42 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper;
43 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
44 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
45 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
46 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
47 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
48 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
49 import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer;
50 import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer;
51 import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
52 import org.apache.hadoop.hive.ql.io.HiveKey;
53 import org.apache.hadoop.hive.ql.log.PerfLogger;
54 import org.apache.hadoop.hive.ql.metadata.HiveException;
55 import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
56 import org.apache.hadoop.hive.ql.plan.JoinDesc;
57 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
58 import org.apache.hadoop.hive.ql.plan.TableDesc;
59 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
60 import org.apache.hadoop.hive.ql.session.SessionState;
61 import org.apache.hadoop.hive.serde2.SerDe;
62 import org.apache.hadoop.hive.serde2.SerDeException;
63 import org.apache.hadoop.hive.serde2.SerDeUtils;
64 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
65 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
66 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
67 import org.apache.hadoop.io.BytesWritable;
68 import org.apache.hadoop.io.Writable;
69 import org.apache.hive.common.util.ReflectionUtil;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 import com.esotericsoftware.kryo.KryoException;
74
75 /**
76 * Map side Join operator implementation.
77 */
78 public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implements Serializable {
79
80 private static final long serialVersionUID = 1L;
81 private static final Logger LOG = LoggerFactory.getLogger(MapJoinOperator.class.getName());
82 private static final String CLASS_NAME = MapJoinOperator.class.getName();
83 private transient final PerfLogger perfLogger = SessionState.getPerfLogger();
84
85 private transient String cacheKey;
86 private transient ObjectCache cache;
87
88 protected HashTableLoader loader;
89
90 protected transient MapJoinTableContainer[] mapJoinTables;
91 private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
92 private transient boolean hashTblInitedOnce;
93 protected transient ReusableGetAdaptor[] hashMapRowGetters;
94
95 private UnwrapRowContainer[] unwrapContainer;
96 private transient Configuration hconf;
97 private transient boolean hybridMapJoinLeftover; // whether there's spilled data to be processed
98 protected transient MapJoinBytesTableContainer[] spilledMapJoinTables; // used to hold restored
99 // spilled small tables
100 protected HybridHashTableContainer firstSmallTable; // The first small table;
101 // Only this table has spilled big table rows
102
103 /** Kryo ctor. */
104 protected MapJoinOperator() {
105 super();
106 }
107
108 public MapJoinOperator(CompilationOpContext ctx) {
109 super(ctx);
110 }
111
112 public MapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mjop) {
113 super(mjop);
114 }
115
116 /*
117 * We need the base (operator.java) implementation of start/endGroup.
118 * The parent class has functionality in those that map join can't use.
119 * Note: The mapjoin can be run in the reducer only on Tez.
120 */
121 @Override
122 public void endGroup() throws HiveException {
123 defaultEndGroup();
124 }
125
126 @Override
127 public void startGroup() throws HiveException {
128 defaultStartGroup();
129 }
130
131 protected HashTableLoader getHashTableLoader(Configuration hconf) {
132 return HashTableLoaderFactory.getLoader(hconf);
133 }
134
135 @Override
136 protected void initializeOp(Configuration hconf) throws HiveException {
137 this.hconf = hconf;
138 unwrapContainer = new UnwrapRowContainer[conf.getTagLength()];
139
140 super.initializeOp(hconf);
141
142 int tagLen = conf.getTagLength();
143
144 // On Tez only: The hash map might already be cached in the container we run
145 // the task in. On MR: The cache is a no-op.
146 String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID);
147 cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container";
148 cache = ObjectCacheFactory.getCache(hconf, queryId, false);
149 loader = getHashTableLoader(hconf);
150
151 hashMapRowGetters = null;
152
153 mapJoinTables = new MapJoinTableContainer[tagLen];
154 mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
155 hashTblInitedOnce = false;
156
157 // Reset grace hashjoin context so that there is no state maintained when operator/work is
158 // retrieved from object cache
159 hybridMapJoinLeftover = false;
160 firstSmallTable = null;
161
162 generateMapMetaData();
163
164 final ExecMapperContext mapContext = getExecContext();
165 final MapredContext mrContext = MapredContext.get();
166
167 if (!conf.isBucketMapJoin() && !conf.isDynamicPartitionHashJoin()) {
168 /*
169 * The issue with caching in case of bucket map join is that different tasks
170 * process different buckets and if the container is reused to join a different bucket,
171 * join results can be incorrect. The cache is keyed on operator id and for bucket map join
172 * the operator does not change but data needed is different. For a proper fix, this
173 * requires changes in the Tez API with regard to finding bucket id and
174 * also ability to schedule tasks to re-use containers that have cached the specific bucket.
175 */
176 if (isLogDebugEnabled) {
177 LOG.debug("This is not bucket map join, so cache");
178 }
179
180 Future<Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>> future =
181 cache.retrieveAsync(
182 cacheKey,
183 new Callable<Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>>() {
184 @Override
185 public Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]> call()
186 throws HiveException {
187 return loadHashTable(mapContext, mrContext);
188 }
189 });
190 asyncInitOperations.add(future);
191 } else if (!isInputFileChangeSensitive(mapContext)) {
192 loadHashTable(mapContext, mrContext);
193 hashTblInitedOnce = true;
194 }
195 }
196
197 @SuppressWarnings("unchecked")
198 @Override
199 protected void completeInitializationOp(Object[] os) throws HiveException {
200 if (os.length != 0) {
201 Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]> pair =
202 (Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>) os[0];
203
204 boolean spilled = false;
205 for (MapJoinTableContainer container : pair.getLeft()) {
206 if (container != null) {
207 spilled = spilled || container.hasSpill();
208 }
209 }
210
211 if (spilled) {
212 // we can't use the cached table because it has spilled.
213
214 loadHashTable(getExecContext(), MapredContext.get());
215 } else {
216 if (LOG.isDebugEnabled()) {
217 String s = "Using tables from cache: [";
218 for (MapJoinTableContainer c : pair.getLeft()) {
219 s += ((c == null) ? "null" : c.getClass().getSimpleName()) + ", ";
220 }
221 LOG.debug(s + "]");
222 }
223 // let's use the table from the cache.
224 mapJoinTables = pair.getLeft();
225 mapJoinTableSerdes = pair.getRight();
226 }
227 hashTblInitedOnce = true;
228 }
229
230 if (this.getExecContext() != null) {
231 // reset exec context so that initialization of the map operator happens
232 // properly
233 this.getExecContext().setLastInputPath(null);
234 this.getExecContext().setCurrentInputPath(null);
235 }
236 }
237
238 @Override
239 protected List<ObjectInspector> getValueObjectInspectors(
240 byte alias, List<ObjectInspector>[] aliasToObjectInspectors) {
241 int[] valueIndex = conf.getValueIndex(alias);
242 if (valueIndex == null) {
243 return super.getValueObjectInspectors(alias, aliasToObjectInspectors);
244 }
245
246 List<ObjectInspector> inspectors = aliasToObjectInspectors[alias];
247 int bigPos = conf.getPosBigTable();
248 Converter[] converters = new Converter[valueIndex.length];
249 List<ObjectInspector> valueOI = new ArrayList<ObjectInspector>();
250 for (int i = 0; i < valueIndex.length; i++) {
251 if (valueIndex[i] >= 0 && !joinKeysObjectInspectors[bigPos].isEmpty()) {
252 if (conf.getNoOuterJoin()) {
253 valueOI.add(joinKeysObjectInspectors[bigPos].get(valueIndex[i]));
254 } else {
255 // It is an outer join. We are going to add the inspector from the
256 // inner side, but the key value will come from the outer side, so
257 // we need to create a converter from inputOI to outputOI.
258 valueOI.add(inspectors.get(i));
259 converters[i] = ObjectInspectorConverters.getConverter(
260 joinKeysObjectInspectors[bigPos].get(valueIndex[i]), inspectors.get(i));
261 }
262 } else {
263 valueOI.add(inspectors.get(i));
264 }
265 }
266
267 unwrapContainer[alias] = new UnwrapRowContainer(alias, valueIndex, converters, hasFilter(alias));
268
269 return valueOI;
270 }
271
272 public void generateMapMetaData() throws HiveException {
273 // generate the meta data for key
274 // index for key is -1
275
276 try {
277 TableDesc keyTableDesc = conf.getKeyTblDesc();
278 SerDe keySerializer = (SerDe) ReflectionUtil.newInstance(
279 keyTableDesc.getDeserializerClass(), null);
280 SerDeUtils.initializeSerDe(keySerializer, null, keyTableDesc.getProperties(), null);
281 MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false);
282 for (int pos = 0; pos < order.length; pos++) {
283 if (pos == posBigTable) {
284 continue;
285 }
286 TableDesc valueTableDesc;
287 if (conf.getNoOuterJoin()) {
288 valueTableDesc = conf.getValueTblDescs().get(pos);
289 } else {
290 valueTableDesc = conf.getValueFilteredTblDescs().get(pos);
291 }
292 SerDe valueSerDe = (SerDe) ReflectionUtil.newInstance(
293 valueTableDesc.getDeserializerClass(), null);
294 SerDeUtils.initializeSerDe(valueSerDe, null, valueTableDesc.getProperties(), null);
295 MapJoinObjectSerDeContext valueContext =
296 new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos));
297 mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext);
298 }
299 } catch (SerDeException e) {
300 throw new HiveException(e);
301 }
302 }
303
304 protected Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]> loadHashTable(
305 ExecMapperContext mapContext, MapredContext mrContext) throws HiveException {
306 if (canSkipReload(mapContext)) {
307 // no need to reload
308 return new ImmutablePair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>(
309 mapJoinTables, mapJoinTableSerdes);
310 }
311
312 perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
313 loader.init(mapContext, mrContext, hconf, this);
314 try {
315 loader.load(mapJoinTables, mapJoinTableSerdes);
316 } catch (HiveException e) {
317 if (isLogInfoEnabled) {
318 LOG.info("Exception loading hash tables. Clearing partially loaded hash table containers.");
319 }
320
321 // there could be some spilled partitions which needs to be cleaned up
322 clearAllTableContainers();
323 throw e;
324 }
325
326 hashTblInitedOnce = true;
327
328 Pair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]> pair
329 = new ImmutablePair<MapJoinTableContainer[],
330 MapJoinTableContainerSerDe[]> (mapJoinTables, mapJoinTableSerdes);
331
332 perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
333
334 if (canSkipJoinProcessing(mapContext)) {
335 LOG.info("Skipping big table join processing for " + this.toString());
336 this.setDone(true);
337 }
338
339 return pair;
340 }
341
342 // Load the hash table
343 @Override
344 public void cleanUpInputFileChangedOp() throws HiveException {
345 loadHashTable(getExecContext(), MapredContext.get());
346 }
347
348 protected JoinUtil.JoinResult setMapJoinKey(
349 ReusableGetAdaptor dest, Object row, byte alias) throws HiveException {
350 return dest.setFromRow(row, joinKeys[alias], joinKeysObjectInspectors[alias]);
351 }
352
353 protected MapJoinKey getRefKey(byte alias) {
354 // We assume that since we are joining on the same key, all tables would have either
355 // optimized or non-optimized key; hence, we can pass any key in any table as reference.
356 // We do it so that MJKB could determine whether it can use optimized keys.
357 for (byte pos = 0; pos < order.length; pos++) {
358 if (pos == alias) continue;
359 MapJoinKey refKey = mapJoinTables[pos].getAnyKey();
360 if (refKey != null) return refKey;
361 }
362 return null; // All join tables have 0 keys, doesn't matter what we generate.
363 }
364
365 @Override
366 public void process(Object row, int tag) throws HiveException {
367 try {
368 alias = (byte) tag;
369 if (hashMapRowGetters == null) {
370 hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
371 MapJoinKey refKey = getRefKey(alias);
372 for (byte pos = 0; pos < order.length; pos++) {
373 if (pos != alias) {
374 hashMapRowGetters[pos] = mapJoinTables[pos].createGetter(refKey);
375 }
376 }
377 }
378
379 // As we're calling processOp again to process the leftover "tuples", we know the "row" is
380 // coming from the spilled matchfile. We need to recreate hashMapRowGetter against new hashtables
381 if (hybridMapJoinLeftover) {
382 MapJoinKey refKey = getRefKey(alias);
383 for (byte pos = 0; pos < order.length; pos++) {
384 if (pos != alias && spilledMapJoinTables[pos] != null) {
385 hashMapRowGetters[pos] = spilledMapJoinTables[pos].createGetter(refKey);
386 }
387 }
388 }
389
390 // compute keys and values as StandardObjects
391 ReusableGetAdaptor firstSetKey = null;
392 int fieldCount = joinKeys[alias].size();
393 boolean joinNeeded = false;
394 boolean bigTableRowSpilled = false;
395 for (byte pos = 0; pos < order.length; pos++) {
396 if (pos != alias) {
397 JoinUtil.JoinResult joinResult;
398 ReusableGetAdaptor adaptor;
399 if (firstSetKey == null) {
400 adaptor = firstSetKey = hashMapRowGetters[pos];
401 joinResult = setMapJoinKey(firstSetKey, row, alias);
402 } else {
403 // Keys for all tables are the same, so only the first has to deserialize them.
404 adaptor = hashMapRowGetters[pos];
405 joinResult = adaptor.setFromOther(firstSetKey);
406 }
407 MapJoinRowContainer rowContainer = adaptor.getCurrentRows();
408 if (joinResult != JoinUtil.JoinResult.MATCH) {
409 assert (rowContainer == null || !rowContainer.hasRows()) :
410 "Expecting an empty result set for no match";
411 }
412 if (rowContainer != null && unwrapContainer[pos] != null) {
413 Object[] currentKey = firstSetKey.getCurrentKey();
414 rowContainer = unwrapContainer[pos].setInternal(rowContainer, currentKey);
415 }
416 // there is no join-value or join-key has all null elements
417 if (rowContainer == null || firstSetKey.hasAnyNulls(fieldCount, nullsafes)) {
418 if (!noOuterJoin) {
419 // For Hybrid Grace Hash Join, during the 1st round processing,
420 // we only keep the LEFT side if the row is not spilled
421 if (!conf.isHybridHashJoin() || hybridMapJoinLeftover ||
422 (joinResult != JoinUtil.JoinResult.SPILL && !bigTableRowSpilled)) {
423 joinNeeded = true;
424 storage[pos] = dummyObjVectors[pos];
425 } else {
426 joinNeeded = false;
427 }
428 } else {
429 storage[pos] = emptyList;
430 }
431 } else {
432 joinNeeded = true;
433 storage[pos] = rowContainer.copy();
434 aliasFilterTags[pos] = rowContainer.getAliasFilter();
435 }
436 // Spill the big table rows into appropriate partition:
437 // When the JoinResult is SPILL, it means the corresponding small table row may have been
438 // spilled to disk (at least the partition that holds this row is on disk). So we need to
439 // postpone the join processing for this pair by also spilling this big table row.
440 if (joinResult == JoinUtil.JoinResult.SPILL &&
441 !bigTableRowSpilled) { // For n-way join, only spill big table rows once
442 spillBigTableRow(mapJoinTables[pos], row);
443 bigTableRowSpilled = true;
444 }
445 }
446 }
447 if (joinNeeded) {
448 List<Object> value = getFilteredValue(alias, row);
449 // Add the value to the ArrayList
450 storage[alias].addRow(value);
451 // generate the output records
452 checkAndGenObject();
453 }
454 // done with the row
455 storage[tag].clearRows();
456 for (byte pos = 0; pos < order.length; pos++) {
457 if (pos != tag) {
458 storage[pos] = null;
459 }
460 }
461 } catch (Exception e) {
462 String msg = "Unexpected exception from "
463 + this.getClass().getSimpleName() + " : " + e.getMessage();
464 LOG.error(msg, e);
465 throw new HiveException(msg, e);
466 }
467 }
468
469 /**
470 * Postpone processing the big table row temporarily by spilling it to a row container
471 * @param hybridHtContainer Hybrid hashtable container
472 * @param row big table row
473 */
474 protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row) throws HiveException {
475 HybridHashTableContainer ht = (HybridHashTableContainer) hybridHtContainer;
476 int partitionId = ht.getToSpillPartitionId();
477 HashPartition hp = ht.getHashPartitions()[partitionId];
478 ObjectContainer bigTable = hp.getMatchfileObjContainer();
479 bigTable.add(row);
480 }
481
482 @Override
483 public void closeOp(boolean abort) throws HiveException {
484 boolean spilled = false;
485 for (MapJoinTableContainer container : mapJoinTables) {
486 if (container != null) {
487 spilled = spilled || container.hasSpill();
488 container.dumpMetrics();
489 }
490 }
491
492 // For Hybrid Grace Hash Join, we need to see if there is any spilled data to be processed next
493 if (spilled) {
494 if (!abort) {
495 if (hashMapRowGetters == null) {
496 hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
497 }
498 int numPartitions = 0;
499 // Find out number of partitions for each small table (should be same across tables)
500 for (byte pos = 0; pos < mapJoinTables.length; pos++) {
501 if (pos != conf.getPosBigTable()) {
502 firstSmallTable = (HybridHashTableContainer) mapJoinTables[pos];
503 numPartitions = firstSmallTable.getHashPartitions().length;
504 break;
505 }
506 }
507 assert numPartitions != 0 : "Number of partitions must be greater than 0!";
508
509 if (firstSmallTable.hasSpill()) {
510 spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length];
511 hybridMapJoinLeftover = true;
512
513 // Clear all in-memory partitions first
514 for (byte pos = 0; pos < mapJoinTables.length; pos++) {
515 MapJoinTableContainer tableContainer = mapJoinTables[pos];
516 if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) {
517 HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer;
518 hybridHtContainer.dumpStats();
519
520 HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions();
521 // Clear all in memory partitions first
522 for (int i = 0; i < hashPartitions.length; i++) {
523 if (!hashPartitions[i].isHashMapOnDisk()) {
524 hybridHtContainer.setTotalInMemRowCount(
525 hybridHtContainer.getTotalInMemRowCount() -
526 hashPartitions[i].getHashMapFromMemory().getNumValues());
527 hashPartitions[i].getHashMapFromMemory().clear();
528 }
529 }
530 assert hybridHtContainer.getTotalInMemRowCount() == 0;
531 }
532 }
533
534 // Reprocess the spilled data
535 for (int i = 0; i < numPartitions; i++) {
536 HashPartition[] hashPartitions = firstSmallTable.getHashPartitions();
537 if (hashPartitions[i].isHashMapOnDisk()) {
538 try {
539 continueProcess(i); // Re-process spilled data
540 } catch (KryoException ke) {
541 LOG.error("Processing the spilled data failed due to Kryo error!");
542 LOG.error("Cleaning up all spilled data!");
543 cleanupGraceHashJoin();
544 throw new HiveException(ke);
545 } catch (Exception e) {
546 throw new HiveException(e);
547 }
548 for (byte pos = 0; pos < order.length; pos++) {
549 if (pos != conf.getPosBigTable())
550 spilledMapJoinTables[pos] = null;
551 }
552 }
553 }
554 }
555 }
556
557 if (isLogInfoEnabled) {
558 LOG.info("spilled: " + spilled + " abort: " + abort + ". Clearing spilled partitions.");
559 }
560
561 // spilled tables are loaded always (no sharing), so clear it
562 clearAllTableContainers();
563 cache.remove(cacheKey);
564 }
565
566 // in mapreduce case, we need to always clear up as mapreduce doesn't have object registry.
567 if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null)
568 && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())
569 && !(HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")
570 && SparkUtilities.isDedicatedCluster(hconf))) {
571 if (isLogInfoEnabled) {
572 LOG.info("MR: Clearing all map join table containers.");
573 }
574 clearAllTableContainers();
575 }
576
577 this.loader = null;
578 super.closeOp(abort);
579 }
580
581 private void clearAllTableContainers() {
582 if (mapJoinTables != null) {
583 for (MapJoinTableContainer tableContainer : mapJoinTables) {
584 if (tableContainer != null) {
585 tableContainer.clear();
586 }
587 }
588 }
589 }
590
591 /**
592 * Continue processing join between spilled hashtable(s) and spilled big table
593 * @param partitionId the partition number across all small tables to process
594 * @throws HiveException
595 * @throws IOException
596 * @throws SerDeException
597 */
598 private void continueProcess(int partitionId)
599 throws HiveException, IOException, SerDeException, ClassNotFoundException {
600 for (byte pos = 0; pos < mapJoinTables.length; pos++) {
601 if (pos != conf.getPosBigTable()) {
602 LOG.info("Going to reload hash partition " + partitionId);
603 reloadHashTable(pos, partitionId);
604 }
605 }
606 reProcessBigTable(partitionId);
607 }
608
609 /**
610 * Reload hashtable from the hash partition.
611 * It can have two steps:
612 * 1) Deserialize a serialized hash table, and
613 * 2) Merge every key/value pair from small table container into the hash table
614 * @param pos position of small table
615 * @param partitionId the partition of the small table to be reloaded from
616 * @throws IOException
617 * @throws HiveException
618 * @throws SerDeException
619 */
620 protected void reloadHashTable(byte pos, int partitionId)
621 throws IOException, HiveException, SerDeException, ClassNotFoundException {
622 HybridHashTableContainer container = (HybridHashTableContainer)mapJoinTables[pos];
623 HashPartition partition = container.getHashPartitions()[partitionId];
624
625 // Merge the sidefile into the newly created hash table
626 // This is where the spilling may happen again
627 LOG.info("Going to restore sidefile...");
628 KeyValueContainer kvContainer = partition.getSidefileKVContainer();
629 int rowCount = kvContainer.size();
630 LOG.info("Hybrid Grace Hash Join: Number of rows restored from KeyValueContainer: " +
631 kvContainer.size());
632
633 // Deserialize the on-disk hash table
634 // We're sure this part is smaller than memory limit
635 if (rowCount <= 0) {
636 rowCount = 1024 * 1024; // Since rowCount is used later to instantiate a BytesBytesMultiHashMap
637 // as the initialCapacity which cannot be 0, we provide a reasonable
638 // positive number here
639 }
640 LOG.info("Going to restore hashmap...");
641 BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(rowCount);
642 rowCount += restoredHashMap.getNumValues();
643 LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition...");
644 LOG.info("Hybrid Grace Hash Join: Number of rows in hashmap: " + rowCount);
645
646 // If based on the new key count, keyCount is smaller than a threshold,
647 // then just load the entire restored hashmap into memory.
648 // The size of deserialized partition shouldn't exceed half of memory limit
649 if (rowCount * container.getTableRowSize() >= container.getMemoryThreshold() / 2) {
650 LOG.warn("Hybrid Grace Hash Join: Hash table cannot be reloaded since it" +
651 " will be greater than memory limit. Recursive spilling is currently not supported");
652 }
653
654 KeyValueHelper writeHelper = container.getWriteHelper();
655 while (kvContainer.hasNext()) {
656 ObjectPair<HiveKey, BytesWritable> pair = kvContainer.next();
657 Writable key = pair.getFirst();
658 Writable val = pair.getSecond();
659 writeHelper.setKeyValue(key, val);
660 restoredHashMap.put(writeHelper, -1);
661 }
662
663 container.setTotalInMemRowCount(container.getTotalInMemRowCount()
664 + restoredHashMap.getNumValues());
665 kvContainer.clear();
666
667 spilledMapJoinTables[pos] = new MapJoinBytesTableContainer(restoredHashMap);
668 spilledMapJoinTables[pos].setInternalValueOi(container.getInternalValueOi());
669 spilledMapJoinTables[pos].setSortableSortOrders(container.getSortableSortOrders());
670 spilledMapJoinTables[pos].setNullMarkers(container.getNullMarkers());
671 spilledMapJoinTables[pos].setNotNullMarkers(container.getNotNullMarkers());
672 }
673
674 /**
675 * Iterate over the big table row container and feed process() with leftover rows
676 * @param partitionId the partition from which to take out spilled big table rows
677 * @throws HiveException
678 */
679 protected void reProcessBigTable(int partitionId) throws HiveException {
680 // For binary join, firstSmallTable is the only small table; it has reference to spilled big
681 // table rows;
682 // For n-way join, since we only spill once, when processing the first small table, so only the
683 // firstSmallTable has reference to the spilled big table rows.
684 HashPartition partition = firstSmallTable.getHashPartitions()[partitionId];
685 ObjectContainer bigTable = partition.getMatchfileObjContainer();
686 LOG.info("Hybrid Grace Hash Join: Going to process spilled big table rows in partition " +
687 partitionId + ". Number of rows: " + bigTable.size());
688 while (bigTable.hasNext()) {
689 Object row = bigTable.next();
690 process(row, conf.getPosBigTable());
691 }
692 bigTable.clear();
693 }
694
695 /**
696 * Clean up data participating the join, i.e. in-mem and on-disk files for small table(s) and big table
697 */
698 private void cleanupGraceHashJoin() {
699 for (byte pos = 0; pos < mapJoinTables.length; pos++) {
700 if (pos != conf.getPosBigTable()) {
701 LOG.info("Cleaning up small table data at pos: " + pos);
702 HybridHashTableContainer container = (HybridHashTableContainer) mapJoinTables[pos];
703 container.clear();
704 }
705 }
706 }
707
708 /**
709 * Implements the getName function for the Node Interface.
710 *
711 * @return the name of the operator
712 */
713 @Override
714 public String getName() {
715 return getOperatorName();
716 }
717
718 static public String getOperatorName() {
719 return "MAPJOIN";
720 }
721
722 @Override
723 public OperatorType getType() {
724 return OperatorType.MAPJOIN;
725 }
726
727 protected boolean isInputFileChangeSensitive(ExecMapperContext mapContext) {
728 return !(mapContext == null
729 || mapContext.getLocalWork() == null
730 || mapContext.getLocalWork().getInputFileChangeSensitive() == false);
731 }
732
733 protected boolean canSkipReload(ExecMapperContext mapContext) {
734 return (this.hashTblInitedOnce && !isInputFileChangeSensitive(mapContext));
735 }
736
737 // If the loaded hash table is empty, for some conditions we can skip processing the big table rows.
738 protected boolean canSkipJoinProcessing(ExecMapperContext mapContext) {
739 if (!canSkipReload(mapContext)) {
740 return false;
741 }
742
743 JoinCondDesc[] joinConds = getConf().getConds();
744 if (joinConds.length > 0) {
745 for (JoinCondDesc joinCond : joinConds) {
746 if (joinCond.getType() != JoinDesc.INNER_JOIN) {
747 return false;
748 }
749 }
750 } else {
751 return false;
752 }
753
754 boolean skipJoinProcessing = false;
755 for (int idx = 0; idx < mapJoinTables.length; ++idx) {
756 if (idx == getConf().getPosBigTable()) {
757 continue;
758 }
759 MapJoinTableContainer mapJoinTable = mapJoinTables[idx];
760 if (mapJoinTable.size() == 0) {
761 // If any table is empty, an inner join involving the tables should yield 0 rows.
762 LOG.info("Hash table number " + idx + " is empty");
763 skipJoinProcessing = true;
764 break;
765 }
766 }
767 return skipJoinProcessing;
768 }
769 }