协程(二):协程的应用
# 感谢“书香”推荐: @shuxiang29
上一篇中对协程的概念做出了解释和澄清。总的来说,完全协程才算得上是真正意义上的协程,其它如生成器等只是部分实现了协程概念的非完全协程,我们之后主要讨论完全协程。
本篇介绍一些协程的实际应用。协程本质是一种控制抽象,它的价值在于可以简洁优雅地实现一些控制行为。在协程中,控制可以从当前执行上下文跳转到程序的其它位置,并且可以在之后的任意时刻恢复当前执行上下文,控制从跳出点处继续执行。这种行为与Continuation类似,但协程相比之下更容易理解,某些情况下还更加高效。之后会有一篇专门对比这两个概念。
生产者-消费者模型
在前一篇中已有提及,这是协程最典型也最常见的应用场景。Conway提出协程这个概念时所解决的编译器问题就属于生产者-消费者问题。
管道(Pipeline)也是由生产者-消费者模型扩展而来的,管道实际上是由一个生产者,加上一个或多个过滤器(Filter),再加上一个最终的消费者组成的生产者-消费者链。其中过滤器既是生产者又是消费者。以下是由Lua实现的过滤器:
function filter(ant)
return coroutine.wrap(function())
while true do
-- resume antecessor to obtain value
local x = ant()
-- yield transformed value
coroutine.yield(f(x))
end
end
end
只需要一行语句把生产者、过滤器和消费者串联起来就可以创建出一个管道:
consumer(filter(producer()))
生成器(Generator)
生成器实际上可以看作只有生产者的生产者-消费者模型。生成器的主要用途就是实现迭代器(Iterator)。需要提到的一点就是目前大多数语言提供的生成器都是非栈式(Stackful)的,即不能在嵌套调用中直接yield,这样在实现复杂的生成器时会非常麻烦。比如要实现一个二叉树的先序遍历,先看Lua实现:
function preorder(node)
if node then
preorder(node.left)
coroutine.yield(node.key)
preorder(node.right)
end
end
-- create an iterator
function preorder_iterator(tree)
return coroutine.wrap(function()
preorder(tree)
return nil
end)
end
简洁而直接,因为Lua的协程是完全协程,允许在嵌套调用中yield。如果用C#实现:
public static class BinaryTree<T>
{
static IEnumerable<T> Preorder(Node<T> node)
{
if (node != null) {
foreach (var key in Preorder(node.left))
yield return key;
yield return node.key;
foreach (var key in Preorder(node.right))
yield return key;
}
}
public static IEnumerable<T> PreorderIterator(Node<T> tree)
{
foreach (var key in Preorder(tree))
yield return key;
}
}
可以看到,在递归的每一级都需要一个迭代块向上一级yield,直到最顶层。如果生成器更加复杂,比如需要调用一系列辅助方法,那么在每个调用点处都要增加机械的yield代码。
目标导向(Goal-oriented)编程
目标导向编程是指模式匹配(Pattern-matching)或Prolog的查询这样的系统,由用户提出一个形式化定义的目标(Goal),系统会在一系列可选的子目标中寻找直到确认一个解决方案。在寻找过程中常常会需要回溯(Backtracking)机制,这种机制使用完全非对称协程作为生成器很容易实现。
例如用Lua实现一个模式匹配系统,支持匹配常量、可选和序列三种模式的组合:
-- matching a string literal
function prim(str)
return function(S, pos)
local len = string.len(str)
if string.sub(S, pos, pos+len-1) == str then
coroutine.yield(pos + len)
end
end
end
-- alternative patterns (disjunction)
function alt(patt1, patt2)
return function(S, pos)
patt1(S, pos)
patt2(S, pos)
end
end
-- sequence of sub-patterns (conjuction)
function seq(patt1, patt2)
return function(S, pos)
local btpoint = coroutine.wrap(function()
patt1(S, pos)
end)
for npos in btpoint do
patt2(S, npos)
end
end
end
function match(S, patt)
local len = string.len(S)
local m = coroutine.wrap(function() patt(S, 1) end)
for pos in m do
if pos == len+1 then
return true
end
end
return false
end
使用时形如("abc"|"de")."x"的目标可以定义如下:
patt = seq(alt(prim("abc"), prim("de")), prim("x"))
在进行匹配时这个目标被封装到一个协程中,通过循环逐一尝试匹配每个子目标。实现中的每个模式函数接受目标字符串和起始位置作为参数,匹配成功时yield下一个位置给后续匹配,无法继续匹配时返回nil。我尝试了一下使用C#来实现上述代码,得到的编译错误是:不能在匿名方法或lambda表达式内部使用yield语句。于是我便没有继续了,因为已经可以想象最后的完成代码会有多臃肿。这是C#生成器的又一使用限制,的确非常影响表达力。
协作式多任务(Cooperative multitasking)
并发编程所涉及的范围太广,这里仅讨论线程与协程的对比。在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。
由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点),因此非常容易使用。下面是一段Lua实现多任务的代码:
-- list of "live" tasks
tasks = {}
-- create a task
function create_task(f)
local co = coroutine.wrap(function() f(); return "ended" end)
table.insert(tasks, co)
end
-- task dispatcher
function dispatcher()
while true do
local n = table.getn(tasks)
if n == 0 then break end -- no more tasks to run
for i = 1, n do
local status = tasks[i]()
if status == "ended" then
table.remove(tasks, i)
break
end
end
end
end
协作式多任务的缺点之一是进行阻塞(Blocking)操作如IO时会阻塞掉整个程序,解决方案是提供一个辅助函数,初始化IO操作之后如果操作不能立即完成就挂起当前协程,Programming in Lua中给出了一个多任务下载的例子:
function download(host, file)
local c = assert(socket.connect(host, 80))
local count = 0
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status, partial = receive(c)
count = count + #(s or partial)
if status == "closed" then break end
end
c:close()
print(file, count)
end
function receive(connection)
connection:settimeout(0)
local s, status, partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s or partial, status
end
threads = {}
function get(host, file)
local co = coroutine.create(function()
download(host, file)
end)
table.insert(threads, co)
end
function dispatch()
local i = 1
while true do
if threads[i] == nil then
if threads[1] == nil then break end
i = 1
end
local status, res = coroutine.resume(threads[i])
if not res then
table.remove(threads, i)
else
i = i + 1
end
end
end
这个例子把每个下载任务放在一个协程中,通过settimeout(0)使获取操作不会阻塞,再由调度函数逐一唤醒协程执行,如果获取还未完成就挂起,直到所有下载完成。
协作式多任务的另一个缺点是无法利用多核资源,这一点我认为我们日常所编写的绝大部分应用都没有这个必要,除非是实时性要求非常高(如操作系统)或需要尽可能地榨取机器性能(如Web服务器)的情况下(即使这时多线程也并非唯一选择)。所以在你打算使用多线程时先认真考虑一下是否协程就已经足够。在这篇采访中也有一些关于协程和并发的观点供参考。
异常处理
支持异常处理的语言需要实现两个基础原语:try和raise。try原语包含两项表达:主体和异常处理器。若主体正常返回,则返回值作为try的值,忽略异常处理器。若主体遇到异常条件,则引发(raise)一个异常并立即送给异常处理器,主体的剩余部分被忽略。异常处理器可以返回一个值作为try的值,或重新引发一个异常给更外层的异常处理器。
使用完全非对称协程来实现异常处理很简单:try原语由一个函数实现,此函数接受两个函数(主体和异常处理器)作为参数,然后在一个协程中执行主体函数;raise原语也是一个函数,在其中yield出一个异常即可。
总结
协程还适于实现状态机(每对入口/出口点表现一个状态),参与者模型(Actor model)(实质上也是协作式多任务)等。重要的是理解协程所表达的控制抽象,在此之上能够灵活运用的话,原本一些棘手的控制问题也许就可以简洁优雅地实现出来。
function filter(ant)
return coroutine.wrap(function())
while true do
-- resume antecessor to obtain value
local x = ant()
-- yield transformed value
coroutine.yield(f(x))
end
end
end
consumer(filter(producer()))
function preorder(node)
if node then
preorder(node.left)
coroutine.yield(node.key)
preorder(node.right)
end
end
-- create an iterator
function preorder_iterator(tree)
return coroutine.wrap(function()
preorder(tree)
return nil
end)
end
public static class BinaryTree<T>
{
static IEnumerable<T> Preorder(Node<T> node)
{
if (node != null) {
foreach (var key in Preorder(node.left))
yield return key;
yield return node.key;
foreach (var key in Preorder(node.right))
yield return key;
}
}
public static IEnumerable<T> PreorderIterator(Node<T> tree)
{
foreach (var key in Preorder(tree))
yield return key;
}
}
-- matching a string literal
function prim(str)
return function(S, pos)
local len = string.len(str)
if string.sub(S, pos, pos+len-1) == str then
coroutine.yield(pos + len)
end
end
end
-- alternative patterns (disjunction)
function alt(patt1, patt2)
return function(S, pos)
patt1(S, pos)
patt2(S, pos)
end
end
-- sequence of sub-patterns (conjuction)
function seq(patt1, patt2)
return function(S, pos)
local btpoint = coroutine.wrap(function()
patt1(S, pos)
end)
for npos in btpoint do
patt2(S, npos)
end
end
end
function match(S, patt)
local len = string.len(S)
local m = coroutine.wrap(function() patt(S, 1) end)
for pos in m do
if pos == len+1 then
return true
end
end
return false
end
patt = seq(alt(prim("abc"), prim("de")), prim("x"))
-- list of "live" tasks
tasks = {}
-- create a task
function create_task(f)
local co = coroutine.wrap(function() f(); return "ended" end)
table.insert(tasks, co)
end
-- task dispatcher
function dispatcher()
while true do
local n = table.getn(tasks)
if n == 0 then break end -- no more tasks to run
for i = 1, n do
local status = tasks[i]()
if status == "ended" then
table.remove(tasks, i)
break
end
end
end
end
function download(host, file)
local c = assert(socket.connect(host, 80))
local count = 0
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
while true do
local s, status, partial = receive(c)
count = count + #(s or partial)
if status == "closed" then break end
end
c:close()
print(file, count)
end
function receive(connection)
connection:settimeout(0)
local s, status, partial = connection:receive(2^10)
if status == "timeout" then
coroutine.yield(connection)
end
return s or partial, status
end
threads = {}
function get(host, file)
local co = coroutine.create(function()
download(host, file)
end)
table.insert(threads, co)
end
function dispatch()
local i = 1
while true do
if threads[i] == nil then
if threads[1] == nil then break end
i = 1
end
local status, res = coroutine.resume(threads[i])
if not res then
table.remove(threads, i)
else
i = i + 1
end
end
end