第一步:先把文本以某维度分解映射成最小独立单元。 (段落、单词、字母维度)。
第二步:把最小单元重复的做合并计算。
小张参考MapReduce论文设计Map、Reduce如下:
Map实现
Mapping
Mapping函数把文本分解映射key,value形式的最小单元,即<单词,出现次数(1)>、<word,1>。
public IEnumerable<Tuple<T, int>> Mapping(IEnumerable<T> list)
{
foreach (T sourceVal in list)
yield return Tuple.Create(sourceVal, 1);
}
使用,输出为(brow, 1), (brow, 1), (sorrow, 1), (sorrow, 1):
var spit = hamlet.Split(new[] { " ", Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries);
var mp = new MicroMapReduce<string>(new Master<string>());
var result= mp.Mapping(spit);
Combine
为了减少数据通信开销,mapping出的键值对数据在进入真正的reduce前,进行重复键合并。也相对于提前进行预计算一部分,加快总体计算速度。 输出格式为(brow, 2), (sorrow, 2):
public Dictionary<T, int> Combine(IEnumerable<Tuple<T, int>> list) { Dictionary<T, int> dt = new Dictionary<T, int>(); foreach (var val in list) { if (dt.ContainsKey(val.Item1)) dt[val.Item1] += val.Item2; else dt.Add(val.Item1, val.Item2); } return dt; } |
Partitioner
Partitioner主要用来分组划分,把不同节点的统计数据按照key进行分组。
其输出格式为: (brow, {(brow,2)},(brow,3)), (sorrow, {(sorrow,10)},(brow,11)):
public IEnumerable<Group<T, int>> Partitioner(Dictionary<T, int> list)
{
var dict = new Dictionary<T, Group<T, int>>();
foreach (var val in list)
{
if (!dict.ContainsKey(val.Key))
dict[val.Key] = new Group<T, int>(val.Key);
dict[val.Key].Values.Add(val.Value);
}
return dict.Values;
}
Group定义:
public class Group<TKey, TValue> : Tuple<TKey, List<TValue>> { public Group(TKey key) : base(key, new List<TValue>()) { } public TKey Key { get { return base.Item1; } } public List<TValue> Values { get { return base.Item2; } } } |
Reduce实现
Reducing函数接收,分组后的数据进行最后的统计计算。
public Dictionary<T, int> Reducing(IEnumerable<Group<T, int>> groups)
{
Dictionary<T, int> result=new Dictionary<T, int>();
foreach (var sourceVal in groups)
{
result.Add(sourceVal.Key, sourceVal.Values.Sum());
}
return result;
}
封装调用如下:
public IEnumerable<Group<T, int>> Map(IEnumerable<T> list)
{
var step1 = Mapping(list);
var step2 = Combine(step1);
var step3 = Partitioner(step2);
return step3;
}
public Dictionary<T, int> Reduce(IEnumerable<Group<T, int>> groups)
{
var step1 = Reducing(groups);
return step1;
}
public Dictionary<T, int> MapReduce(IEnumerable<T> list)
{
var map = Map(list);
var reduce = Reduce(map);
return reduce;
}