Issues

最近在写 MapReduce Job 时发现 Reducer 输出的结果总是一组完全相同的 Key,而每个 Key 对应的 Value 却不尽相同。以最简单的 word-count 为例,假设我们有一个表示“特征”的对象 Feature(为简单起见,这里省略了部分构造器、方法):

class Feature {
    String attribute;
    int num;

    @Override
    public String toString() {
        return String.format("[attribute=%s,num=%d]", attribute, num);    
    }
}

Map 对象读取 sentence 中的第一个 word,并写入 Feature 对象:

//map
    private word  = new Text();

    @Override
    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\\|");
        String w = words[0];

        word.set(w);
        Feature f = new Feature(w, one);
        context.write(word, f);
    }

在 Reduce 对象中分两步处理,第一步是将收到的同类 Feature 进行汇总(汇总到一个 HashMap 中),然后在汇总完毕之后对结果进行统一处理:

// reduce
    private Map<Text, Feature> map = new HashMap<Text, Feature>();

    @Override
    public void reduce(Text key, Iterable<Feature> values, Context context)
            throws IOException, InterruptedException {
        Feature sum = new Feature(key.toString());
        for (Feature f : values) {
            sum.gather(f);
        }

        map.put(key, sum);
    }

    @Override
    public void cleanup(Context context) throws IOException,
            InterruptedException {
        System.out.println("Within Reduce, we got a map: " + map);
        // do something
    }

运行这个 Job 就会出现类似下面的结果:

Within Reduce, we got a map: {Tag=[attribute=Col,num=25], Tag=[attribute=Tag,num=13], Tag=[attribute=Row,num=37]}

这里可以看出,生成的 map 中所有的 key 值完全相同,不同的仅仅是 value 值。


Analysis

由于 HashMap 是根据对象的哈希值来确定 key 的位置的,而这些 key 值相同却能存放到不同的位置,说明这些 key 对象(这里是 Text 对象)必然有着不同的哈希值。根据这一点就能逐步推断出问题的根源。

Speculation

进一步在 debug 模式下运行 Job 可以发现,在 reduce 方法中,map 的每一个 put 操作都会造成 map 中所有已有的 key 的变化。一开始我认为这可能是由于 reduce 方法中传入了不同的 Text 对象,而对象中的值的引用(bytes 数组)相同的缘故造成的。因为 Text 中的 bytes 数组并不是一个 final 对象,而是在每次 set 的过程中都会变化的。Text 在写入过程中如果发现输入数据大小与本身的 bytes 大小一致,就会使用 System.arraycopy(utf8, start, bytes, 0, len); 直接将输入数组拷贝到已有的 bytes 数组中,而不会创建新的 bytes 对象来进行写入操作,也就是说,如果每次写入的对象大小不变,那么 Text 中的 bytes 域也是不会发生变化的。由于这里的 Job 中处理的 key 都是相同大小的字符串,也就是说 Text 对象中的引用不会变化,这有可能会导致所有的 Text 对象共用同一个 bytes 引用,从而出现新的 Text 对象的变化引起所有对象变化的问题。那么这个问题会不会就是由于这个原因造成的呢?

答案是否定的。

上面所述的情况只有可能在一种场景下出现,那就是新的 Text 对象通过调用旧的 Text 对象中的 bytes 数组来构造,如下面的这段代码所示:

// A fictitious example
Text oldtext = new Text();
oldtext.set(string1);
Text newtext = new Text(oldtext.getBytes());
newtext.set(string2);

但是,如果去运行这段代码,你会发现 oldtextnewtext 并不包含相同的引用,这是因为在构造 Text 对象时,程序会先通过 setCapacity 方法来检查自身的 bytes:

  private void setCapacity(int len, boolean keepData) {
    if (bytes == null || bytes.length < len) {
      if (bytes != null && keepData) {
        bytes = Arrays.copyOf(bytes, Math.max(len,length << 1));
      } else {
        bytes = new byte[len]; // case when bytes is null
      }
    }
  }

如果 bytes 为空(在构造 Text 时这是成立的),那么会创建一个全新的 bytes 数组,而不会直接复用已有的数组,这也是 Text 的一种安全性保障机制。

那么,如果不是这个原因那会是什么原因造成的呢?

Hashes in Java

我们再回过头来复习下 Java 中使用了哈希算法的集合。在 HashMapHashSet 这样的集合中,对于对象哈希值的计算都是通过下面的方法实现的:

    final int hash(Object k) {
        int h = hashSeed;
        if (0 != h && k instanceof String) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode(); // 哈希计算的关键步骤

        // This function ensures that hashCodes that differ only by
        // constant multiples at each bit position have a bounded
        // number of collisions (approximately 8 at default load factor).
        h ^= (h >>> 20) ^ (h >>> 12);
        return h ^ (h >>> 7) ^ (h >>> 4);
    }

可以看出,对于非 String 类型的对象,是通过调用对象本身的 hashCode 方法来计算哈希值的。这一点对于理解 Text 对象的一致性非常重要。

Hashes in Hadoop

再来看 Text 对象的 hashCode 实现。Text 的哈希值计算直接使用的父类 BinaryComparablehashCode 方法实现的,而 BinaryComparable 最终又是通过下面的方法来实现哈希计算的:

  public static int hashBytes(byte[] bytes, int offset, int length) {
    int hash = 1;
    for (int i = offset; i < offset + length; i++)
      hash = (31 * hash) + (int)bytes[i];
    return hash;
  }

也就是说,Hadoop 中的这类序列化对象都是通过字节数组的值来计算哈希值的。如果 Text 对象包含的内容(也就是 bytes 数组)发生了变化,那么该对象的哈希值也会相应改变;相对的,如果两个 Text 对象具有相同的字符串内容(也就是 bytes 数组的值相同),那么这两个对象的哈希值就会是相同的。明白了这一点,我们就可以通过 Hadoop 的任务调度机制来最终解决这个问题了。

Hadoop Task

我们知道,Reducer 的运行是在 ReducerTask 中定义的:

  void runNewReducer(...) {
    // initializing...
    try {
      reducer.run(reducerContext);
    } finally {
      trackedRW.close(reducerContext);
    }
  }

由于 Reducer 默认是以一种循环遍历的方式处理数据的,这里的 reducerContext 是伴随着 reducer 的整个生命周期的:

  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }

可以看出,这里的 context 通过 nextKey 来判断是否还有输入值,再调用 getCurrentKeygetValues 来获取相应的 key 与 values。而 Context 的最终实现类就是 org.apache.hadoop.mapreduce.task.ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

继续观察 ReduceContextImpl 会发现,Context 中的 key 对象是保持不变的,也就是说,key 引用本身不变,改变的只是 key 中的域的取值(类似于添加了 final 关键字的效果):

private KEYIN key;
key = keyDeserializer.deserialize(key);

对于 Text 类型对象,这里的 keyDeserializer 就是 org.apache.hadoop.io.serializer.WritableSerialization.WritableDeserializer,它的反序列化操作非常简单粗暴:

    public Writable deserialize(Writable w) throws IOException {
      Writable writable;
      if (w == null) {
        writable 
          = (Writable) ReflectionUtils.newInstance(writableClass, getConf());
      } else {
        writable = w;
      }
      writable.readFields(dataIn);
      return writable;
    }

对于 null 对象,上述方法会创建一个新的 Writable 对象,而对于已有的对象,则会直接返回原对象,只是在返回之前先读入输入流中的数据。这里的输入流也是在 Context 中定义好的:

// initialize
private RawKeyValueIterator input;
private DataInputBuffer buffer = new DataInputBuffer();
this.keyDeserializer.open(buffer);

// nextKey()
DataInputBuffer nextKey = input.getKey();
currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                  nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());

到这里顺便说一下 RawKeyValueIterator,它是真正用于获取 sort/merge 过程中产生的原始数据的迭代器。从实际实现类 org.apache.hadoop.mapred.Merger.MergeQueue<K, V> 的源码中可以看出,它是通过一个基于小根堆结构的 PriorityQueue 队列来实现有序的数据消费的。

Summary

从上面的叙述中我们已经知道了,不论数据生产的方式是什么样的,对于 Reducer 而言,它接收到的 Text 永远是同一个对象,变化的只是 Text 中 bytes 的取值。这大概可以称为 reduce 中 key 的不变性。这种对象本身的不变性加上实际取值的变化以及哈希值计算方法,就是造成本文开头所述问题的根本原因。我们再用一段代码来模拟一下 Hadoop 中 Reduce 任务运行的基本过程。

package me.weyo.mapred.testing;

import java.io.DataInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;

/**
 * @author weyo
 * @date 2015-7-1
 */
public class MapKeys {
    private static Class<?> writableClass;
    private static DataInputStream dataIn;
    private static Configuration conf;

    public static Writable deserialize(Writable w) throws IOException {
        Writable writable;
        if (w == null) {
          writable 
            = (Writable) ReflectionUtils.newInstance(writableClass, conf);
        } else {
          writable = w;
        }
        writable.readFields(dataIn);
        return writable;
      }

    public static void main(String[] args) throws IOException {
        HashMap<Text, String> map = new HashMap<Text, String>();
        // 模拟一般 MR Job 的初始化过程
        conf = new Configuration();
        new GenericOptionsParser(conf, args);
        writableClass = Text.class;
        Text text = null;
        // Text 使用 UTF-8 编码格式
        byte[] obytes = "1st".getBytes("utf-8");
        // 注意,这里 bytes 第一位必须用于设置内容长度,这是 Text 的变长机制决定的
        byte[] bytes = new byte[obytes.length + 1];
        bytes[0] = (byte) obytes.length;
        System.arraycopy(obytes, 0, bytes, 1, obytes.length);
        DataInputBuffer buffer = new DataInputBuffer();
        dataIn = new DataInputStream(buffer);
        buffer.reset(bytes, 0, bytes.length);
        // 第一次读入数据
        text = (Text) deserialize(text);
        map.put(text, text.toString());
        System.out.println(text + "|text.hashCode=" + text.hashCode());
        obytes = "2nd".getBytes("utf-8");
        System.arraycopy(obytes, 0, bytes, 1, obytes.length);
        buffer.reset(bytes, 0, bytes.length);
        // 第二次读入数据
        text = (Text) deserialize(text);
        map.put(text, text.toString());
        System.out.println(text + "|text.hashCode=" + text.hashCode());
        System.out.println("Map|" + map);
    }
}

这段代码的输出结果为

1st|text.hashCode=80561
2nd|text.hashCode=81351
Map|{2nd=2nd, 2nd=1st}

与我们预想的完全一致。理解了原理之后,我们接下来就可以想办法去解决它。

Solutions

Final object

考虑下面的 HashSet 的例子:

HashSet<Date> set = new HashSet<Date>();
Date date = new Date();
set.add(date);
date.setTime(System.currentTimeMillis() + 60000); // 增加 1 分钟时间
set.add(date);
System.out.println(set);

这段代码的输出结果大概是这样的:

[Thu Jul 02 19:02:36 CST 2015, Thu Jul 02 19:02:36 CST 2015]

也就是说 set 中的两个 Date 对象完全相同(其实就是同一个对象)。这段代码的原理与上文所述的 Hadoop 代码的工作原理完全相同。在这里 HashSet 实际上是使用了两个 hash 位来保存同一个对象的引用。Date 是 JDK 中难得的与 Text 性质相似的对象 —— 拥有可变的 field,并且根据该 field 来计算哈希值。也由于 Date 的这种可变性,使得 Date 很多时候对于“时间”这个概念的语义表达并不够准确,所以在 JDK 5 之后的版本也已经不推荐使用 Date 来表示时间对象(大部分方法被设置为 @Deprecated)。

再回到这个问题上来,既然这个问题是由于 Date 的可变性引起的,那么将 Date 转化成某种“不可变”的对象不就可以解决问题了。以下几种方式均可以实现这个目标:

  • 每次向 set 中存入一个新的 Date 对象: set.add(new Date(date.getTime()));
  • HashSet 不直接处理 Date,而是处理 Date 中的 time 域:

HashSet<Long> set = new HashSet<Long>(); set.add(date.getTime());

而对于本文开始的例子中提到的 reduce 任务,相应的也有两种解决方法:

  • 向 map 中存入新的 Text 对象: map.put(new Text(key), sum);

  • HashMap 直接处理 Text 中保存的字符串信息: Map<String, Feature> map = new HashMap<String, Feature>(); map.put(key.toString(), sum);

Different data structure

由于这个问题是哈希集合数据结构的特征造成的,那么更换一种数据结构也是可以解决问题的。对于 HashSet 的例子,可以使用 TreeSet 来代替;对于 HashMap 的例子就可以使用 TreeMap 来代替(前提是 key 必须实现 Comparable 接口,不过 Hadoop 的序列化对象基本上都符合这个条件)。不过由于 TreeSetTreeMap 只允许保存同一个对象的一个引用,这里的 add/put 操作必须针对 new 的新对象。这一点与上面的存入新对象的思路相似,只是使用 Tree 结构之后可以实现更多功能(比如 key 的自动排序),同时也能够更好地定位问题。

Map<Text, Feature> map = new TreeMap<Text, Feature>();
map.put(new Text(key), sum);

Conclusion

写到这里差不多该结束了。简单总结一下,由于 MapReduce 在任务调度时使用同一个 Text 对象来传递数据,所以在直接使用该对象进行规约时就会出现问题。因此,在 Reducer 中接收新数据时务必要将 Text 转化为 String ,或者创建新的 Text 对象来进行处理(相比之下前一种方法效率可能会更高一点)。对于这个对象不变性问题,本来想向社区提交一个 patch 来改进,但是回过头再想想其实 Hadoop 这么做也无不可。虽然现代的 JVM 对于小对象的频繁创建已经有了很好的性能支持,但是减少创建对象的频率来提高集群性能也没什么错(虽然这点性能提升可能微乎其微),而且 Hadoop 发展了这么多年,到今天这样比较稳定成熟的版本也证明了这种思路没有什么问题。对于程序员来说,不改总比少改好(As we all know the joke, "99 bugs in the code. Fix one bug, compile it down. 167 little bugs in the code....sigh".)。因此,最终还需要我们在编写新任务时理解、注意其中的细节,争取写出具有高鲁棒性的程序。

That's all.


Reference

  1. http://yoyzhou.github.io/blog/2013/05/10/hadoop-serialization-and-writable-object-2/
  2. http://blog.csdn.net/posa88/article/details/7906426
  3. http://blog.csdn.net/lastsweetop/article/details/9249411


Comments

comments powered by Disqus