协程的抽象与实现

协程是一种程序组件,它允许多个任务在单个执行线程上以协作的方式运行。在本文中,将探讨协程的概念,并展示如何通过迭代的方式逐步抽象和实现协程,以支持多任务处理。

协程的概念

协程的核心思想是允许任务在执行过程中挂起和恢复,而不需要操作系统的介入。这种机制使得协程非常适合用于处理I/O密集型任务,如网络请求、文件读写等。

迭代1:抽象与分离工作

在第一次迭代中,尝试将工作与协程的状态分离。这需要进行以下三个改变:

  • 工作状态需要在自己的容器中指定。
  • 工作本身是一个单独的方法。
  • 协程方法改为接受一个函数,它执行并返回结果。

为了管理状态,现在有一个正式的状态类:

public class UpDownCounterState { public bool IsDone => Value == 1 && CountDirection == Direction.Down; public int StateValue => Value; public int Value { get; set; } = 0; public Direction CountDirection { get; set; } = Direction.Up; }

工作方法被单独定义(为了简洁,这里将其改为从1数到7再回到1):

static (int ret, bool done) Counter(UpDownCounterState currentState) { currentState.Value += 1 * (int)currentState.CountDirection; bool done = currentState.IsDone; currentState.CountDirection = currentState.Value == 7 ? Direction.Down : currentState.CountDirection; return (currentState.Value, done); }

这产生了以下协程实现:

static IEnumerable<R> Coroutine<R, Q>(Q state, Func<Q, (R ret, bool done)> fnc) { bool done = false; while (!done) { var result = fnc(state); yield return result.ret; done = result.done; } }

使用这个很简单 - 传入初始状态和工作方法,让协程处理调用和返回每一步的结果:

using (var cr = Coroutine(new UpDownCounterState(), Counter).GetEnumerator()) { while (cr.MoveNext()) { Console.Write(cr.Current + "\n"); } }

可以看到:

1 2 3 4 5 6 7 6 5 4 3 2 1

注意,协程已经被“泛化”,以便返回可以是工作方法返回的任何东西。

多任务处理

上述实现实现了一定程度的抽象,并且可以扩展以支持多个工作器,假设想要一个乘法器。因此有一个类来处理乘法器状态:

static (int ret, bool done) Multiplier(UpDownMultiplierState currentState) { currentState.StateValue += 1 * (int)currentState.CountDirection; bool done = currentState.IsDone; currentState.CountDirection = currentState.StateValue == 5 ? Direction.Down : currentState.CountDirection; return (currentState.StateValue * currentState.Multiplier, done); }

协程实现接受一个工作器列表,循环直到所有工作完成:

static IEnumerable<R> Coroutiners<Q, R>(List<(Q state, Func<Q, (R ret, bool done)> fnc)> fncs) { List<bool> doners = Enumerable.Repeat(false, fncs.Count).ToList(); int n = 0; while (!doners.All(done => done)) { if (!doners[n]) { var result = fncs[n].fnc(fncs[n].state); doners[n] = result.done; yield return result.ret; } n = (n == fncs.Count - 1) ? 0 : n + 1; } }

可以这样启动合作协程工作器:

using (var cr = Coroutiners(new List<(IState, Func<IState, (int, bool)>)>() { (new UpDownCounterState(), Counter), (new UpDownMultiplierState(), Multiplier) }).GetEnumerator()) { while (cr.MoveNext()) { Console.Write(cr.Current + "\n"); } }

鉴于乘法器工作器从1数到5再回到1(计数器从1数到7再回到1),可以看到:

1 10 2 20 3 30 4 40 5 50 6 40 7 30 6 20 5 10 4 3 2 1

这展示了每个工作器的合作“多任务处理”。

问题

这种实现有三个主要问题:

  • 所有工作器必须返回相同的类型。
  • 需要抽象IState,这迫使每个工作器强制转换为具体的州类型。
  • 状态和工作器是解耦的: (new UpDownCounterState(), Counter) 这样程序员很容易将错误的状态应用到工作器函数上。

为了说明#2,状态容器必须派生自IState:

public interface IState { } public class UpDownCounterState : IState { public bool IsDone => Value == 1 && CountDirection == Direction.Down; public int StateValue => Value; public int Value { get; set; } = 0; public Direction CountDirection { get; set; } = Direction.Up; } public class UpDownMultiplierState : IState { public bool IsDone => Counter == 1 && CountDirection == Direction.Down; public int Counter { get; set; } = 0; public int Multiplier { get; set; } = 10; public Direction CountDirection { get; set; } = Direction.Up; }

为了说明#3,工作器必须将IState强制转换为预期的状态容器:

static (int ret, bool done) Counter(IState state) { UpDownCounterState currentState = state as UpDownCounterState; currentState.Value += 1 * (int)currentState.CountDirection; bool done = currentState.IsDone; currentState.CountDirection = currentState.Value == 7 ? Direction.Down : currentState.CountDirection; return (currentState.Value, done); } static (int ret, bool done) Multiplier(IState state) { UpDownMultiplierState currentState = state as UpDownMultiplierState; currentState.Counter += 1 * (int)currentState.CountDirection; bool done = currentState.IsDone; currentState.CountDirection = currentState.Counter == 5 ? Direction.Down : currentState.CountDirection; return (currentState.Counter * currentState.Multiplier, done); }

迭代2:进一步抽象

解决上述问题的解决方案是每个工作器必须被抽象到它自己的容器类中,该类管理自己的状态:

public interface ICoroutine { bool IsDone { get; } void Step(); } public class UpDownCounter : ICoroutine { public bool IsDone => State.IsDone; protected UpDownCounterState State { get; set; } public UpDownCounter() { State = new UpDownCounterState(); } public void Step() { State.Value += 1 * (int)State.CountDirection; State.CountDirection = State.Value == 7 ? Direction.Down : State.CountDirection; } public override string ToString() { return State.Value.ToString(); } } public class UpDownMultiplier : ICoroutine { public bool IsDone => State.IsDone; protected UpDownMultiplierState State { get; set; } public UpDownMultiplier() { State = new UpDownMultiplierState(); } public void Step() { State.Counter += 1 * (int)State.CountDirection; State.Value = State.Counter * State.Multiplier; State.CountDirection = State.Counter == 5 ? Direction.Down : State.CountDirection; } public override string ToString() { return State.Value.ToString(); } }

注意,工作器初始化自己的状态!是的,程序员仍然可以搞砸,但认为可能性较小。

接下来,Coroutines方法签名实际上更简单:

static IEnumerable<Q> Coroutines<Q>(List<Q> fncs) where Q : ICoroutine { int n = 0; while (!fncs.All(f => f.IsDone)) { if (!fncs[n].IsDone) { fncs[n].Step(); yield return fncs[n]; } n = n == (fncs.Count - 1) ? 0 : n + 1; } }

但是看!不再返回工作器步骤的值,返回工作器本身!还消除了对IState接口的需求。

它的使用更容易定义:

using (var cr2 = Coroutines(new List<ICoroutine>() { new UpDownCounter(), new UpDownMultiplier() }).GetEnumerator()) { while (cr2.MoveNext()) { Console.Write(cr2.Current.ToString() + "\n"); } }

再次看到:

1 10 2 20 3 30 4 40 5 50 6 40 7 30 6 20 5 10 4 3 2 1

在这里,ToString在工作器中被覆盖,以便它返回工作器的当前步骤值,但应该指出的是,大多数工作器只会做一些事情,并不关心它们的内部状态,所以可以简单地将合作多任务工作写成:

foreach (var _ in Coroutines(new List<ICoroutine>() { new UpDownCounter(), new UpDownMultiplier() }));

这是一种不寻常的语法,因为foreach没有主体!人们希望编译器不会将其优化为“这个循环什么也不做”并丢弃代码!

受到codewitch的启发,抽象了协程的概念,以支持多个工作器,在这个过程中解决了各种问题。一些想法:这里的代码现在可以扩展到:

  • 处理工作器可能抛出的异常,而不会破坏其他工作器。
  • 实现一个“停止全部”选项,任何工作器都可以设置。
  • 包括一个工作器,它简单地有一个可以异步设置的“停止全部”标志。
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485