Lec1 MapReduce
本章主要介绍 MapReduce 编程模型,以及 MapReduce 的实现原理。
实验
RPC 接口
func NewClient
resp: id, nReduce
func Ping
resp: {'pong'}
func ClaimTask
resp: {taskType, *mapTask, *reduceTask, *controls(pending, done)}
func OmitTask
req: *mapTask
Woker 客户端
func Worker() {
c := NewClient()
for true:
task := rpc.ClaimTask
if !task: return
if task == done: return
if task == pending: sleep
if task == mapTask: mapFunc OmitTask
if task == reduceTask: reduceFunc OmitTask
sleep 10ms
}
func NewClient() {
id := rpc.NewClient()
go func() {
for true:
rpc.Ping
sleep 1ms
}()
}
Coodinator 服务端
// 连接管理
conn map[int64]{status, timestamp}
mapRWL
// 可以考虑分片加锁
go func() {
for true:
for range map:
mapRWL.lock
getConn()
mapRWL.unlock
if timeout(timestamp):
mapRWL.wlock
delConn()
mapRWL.wunlock
sleep 100ms
}
NewClient() {
id := new()
conn[id] = {}
return id
}
Ping() {
mapRWL.wlock()
Conn.update()
mapRWL.wunlock()
}
// 任务管理
mapTask []{status:undo,doing,done}
reduceTask []{status:init,undo,doing,done}
bool mapReduceStatus [nM][nR]
// 并没有考虑负载均衡,先到先得
// 论文中有一个说法是节点断了,reduce 中有关这个节点都要断
minHeap[{taskID,timestamp}]
init() {
go func {
for true:
minHeapTop()
while minHeapTop > timeout:
wlock(status)
if doing
mapTask -> [undo]
wunlock(status)
minHeapPop()
sleep 1s
}
}
func ClaimTask() {
done := true
for range mapTask:
done &= mapTask[done]
if mapTask[i] == undo:
atomic_write(doing)
return mapTask
for range reduceTask:
done &= reduceTask[done]
if reduceTask[i] == undo:
atomic_write(doing)
return reduceTask
if done: return done
return pending
}
func omitTask() {
if mapTask[i]: -> done
if reduceTask[i]: -> done
}