在处理大量数据时,标准的LINQJoin和GroupJoin操作可能会因为需要将整个序列加载到内存中而变得效率低下。本文将探讨如何利用数据已排序的假设,通过延迟计算实现Join和GroupJoin操作,以提高处理大数据集时的性能。虽然本文使用C#进行示例,但所介绍的技术适用于所有.NET语言。
假设有两个数据集——Master和Detail。这两个数据集都包含大量按Master ID字段排序的记录,并具有主/从关系。数据集不一定存储在数据库中,它们可以位于任何地方(在示例中,将即时生成假数据序列)。现在,任务是从这两个序列中读取数据,通过Master ID将结果组合在一起,并在结果序列上执行一些操作(在示例中,只是打印)。当然,如果数据存储在数据库中,可以编写一个SQL JOIN语句,让SQL Server执行连接。但为了演示,让尝试在C#代码中自己实现连接和分组。
在演示中,将使用以下数据结构,称为MasterData和DetailData:
public struct MasterData
{
public int MasterId { get; set; }
public string Data
{
get
{
return string.Format("MASTER(Master ID: {0})", MasterId);
}
}
}
public struct DetailData
{
public int MasterId { get; set; }
public int DetailId { get; set; }
public string Data
{
get
{
return string.Format("DETAIL(Master ID: {0}, Detail ID: {1})", MasterId, DetailId);
}
}
}
还将使用PrintData帮助器扩展方法来打印数据:
public static class PrintingExtensions
{
public static void PrintData(this IEnumerable> data)
{
foreach (var masterItem in data)
Console.WriteLine("{0} <===> {1}", masterItem.Item1.Data, masterItem.Item2.Data);
}
public static void PrintData(this IEnumerable>> data)
{
foreach (var masterItem in data)
{
Console.WriteLine(masterItem.Item1.Data);
foreach (var detailItem in masterItem.Item2)
Console.WriteLine("\t{0}", detailItem.Data);
}
}
}
最后,有两个方法来生成样本数据序列:
private static IEnumerable GetMasterData(int count)
{
return Enumerable.Range(1, count).Select(m => new MasterData { MasterId = m });
}
private static IEnumerable GetDetailData(int count)
{
return Enumerable.Range(1, count).SelectMany(m =>
Enumerable.Range(1, 5).Select(d => new DetailData { MasterId = m, DetailId = d }));
}
这是如何使用标准的GroupJoinLINQ操作符:
private const int c_count = 10 * 1000 * 1000;
private const int c_skipCount = 1 * 1000 * 1000;
private const int c_takeCount = 3;
void Main()
{
var masterData = GetMasterData(c_count);
var detailData = GetDetailData(c_count);
masterData.GroupJoin(detailData, m => m.MasterId, d => d.MasterId, Tuple.Create)
.Skip(c_skipCount).Take(c_takeCount).PrintData();
}
这在相对短的序列上工作得很好,但当项目数量很大时,执行变得不太理想,因为标准的LINQGroupJoin操作符要求整个序列首先加载到内存中,以便即使对于未排序的序列,连接和分组也能正确工作。
为了利用数据集已排序的事实,创建了一些IEnumerable
private const int c_count = 10 * 1000 * 1000;
private const int c_skipCount = 1 * 1000 * 1000;
private const int c_takeCount = 3;
void Main()
{
var masterData = GetMasterData(c_count);
var detailData = GetDetailData(c_count);
masterData.OrderedEqualityGroupJoin(detailData, m => m.MasterId, d => d.MasterId, Tuple.Create)
.Skip(c_skipCount).Take(c_takeCount).PrintData();
}
OrderedEqualityGroupJoin操作符,以及OrderedCompareGroupJoin,内部使用以下类来实现延迟计算:
private class FilteredEnumerator : IDisposable
{
private bool m_hasData;
private readonly IEnumerator m_enumerator;
public FilteredEnumerator(IEnumerable sequence)
{
m_enumerator = sequence.GetEnumerator();
m_hasData = m_enumerator.MoveNext();
}
public IEnumerable SkipAndTakeWhile(Predicate filter)
{
while (m_hasData && !filter(m_enumerator.Current))
m_hasData = m_enumerator.MoveNext();
while (m_hasData && filter(m_enumerator.Current))
{
yield return m_enumerator.Current;
m_hasData = m_enumerator.MoveNext();
}
}
public IEnumerable SkipAndTakeWhile(Func comparer)
{
while (m_hasData && comparer(m_enumerator.Current) > 0)
m_hasData = m_enumerator.MoveNext();
while (m_hasData && comparer(m_enumerator.Current) == 0)
{
yield return m_enumerator.Current;
m_hasData = m_enumerator.MoveNext();
}
}
public void Dispose()
{
m_enumerator.Dispose();
}
}
FilteredEnumerator将跳过所有Detail记录,直到找到与当前Master键匹配的记录,并只选择匹配的记录。OrderedEqualityGroupJoin基于键的相等性,而OrderedCompareGroupJoin基于键的比较。这在某些Master记录没有相应的Detail记录时变得相关。基于键相等性的运算符可能会跳过整个序列,但基于键比较的运算符只会跳过键小于当前Master ID的Detail记录。
到目前为止,已经讨论了Group Joining,现在让看看Join运算符。附加的源代码包含内连接、外全连接、左连接和右连接的实现,同样假设内部和外部序列都是有序的。实现基于OrderedJoinIterator,它基本上遍历两个序列,并根据键的比较和连接类型返回内连接或外连接记录,或它们的默认值。
static IEnumerable OrderedJoinIterator(
this IEnumerable outer,
IEnumerable inner,
Func outerKeySelector,
Func innerKeySelector,
Func resultSelector,
JoinType jt, IComparer comparer)
{
if (comparer == null)
comparer = Comparer.Default;
var l = outer.GetEnumerator();
var r = inner.GetEnumerator();
var lHasData = l.MoveNext();
var rHasData = r.MoveNext();
while (lHasData || rHasData)
{
if (!lHasData)
{
if (jt == JoinType.Inner || jt == JoinType.Left)
yield break;
yield return resultSelector(default(TOuter), r.Current);
rHasData = r.MoveNext();
continue;
}
if (!rHasData)
{
if (jt == JoinType.Inner || jt == JoinType.Right)
yield break;
yield return resultSelector(l.Current, default(TInner));
lHasData = l.MoveNext();
continue;
}
var comp = comparer.Compare(outerKeySelector(l.Current), innerKeySelector(r.Current));
if (comp < 0)
{
if (jt == JoinType.Left || jt == JoinType.Full)
yield return resultSelector(l.Current, default(TInner));
lHasData = l.MoveNext();
}
else if (comp > 0)
{
if (jt == JoinType.Right || jt == JoinType.Full)
yield return resultSelector(default(TOuter), r.Current);
rHasData = r.MoveNext();
}
else
{
yield return resultSelector(l.Current, r.Current);
lHasData = l.MoveNext();
rHasData = r.MoveNext();
}
}
}