Skip to content

Lec1 MapReduce

333字约1分钟

计算机系统

2024-11-27

本章主要介绍 MapReduce 编程模型,以及 MapReduce 的实现原理。

实验

RPC 接口

func NewClient
	resp: id, nReduce

func Ping
	resp: {'pong'}

func ClaimTask
	resp: {taskType, *mapTask, *reduceTask, *controls(pending, done)}

func OmitTask
	req: *mapTask

Woker 客户端

func Worker() {
	c := NewClient()
	
	for true:
		task := rpc.ClaimTask
		if !task: return
		if task == done: return
		if task == pending: sleep
		if task == mapTask: mapFunc OmitTask
		if task == reduceTask: reduceFunc OmitTask
		sleep 10ms
}

func NewClient() {
	id := rpc.NewClient()
	go func() {
		for true:
			rpc.Ping
			sleep 1ms
	}()
}

Coodinator 服务端

// 连接管理
conn map[int64]{status, timestamp}
mapRWL
// 可以考虑分片加锁

go func() {
	for true:
      for range map:
      	mapRWL.lock
      	getConn()
      	mapRWL.unlock
      	if timeout(timestamp):
      		mapRWL.wlock
      		delConn()
      		mapRWL.wunlock
      sleep 100ms
}

NewClient() {
	id := new()
	conn[id] = {}
	return id
}

Ping() {
	mapRWL.wlock()
	Conn.update()
	mapRWL.wunlock()
}

// 任务管理
mapTask []{status:undo,doing,done}
reduceTask []{status:init,undo,doing,done}
bool mapReduceStatus [nM][nR]
// 并没有考虑负载均衡,先到先得
// 论文中有一个说法是节点断了,reduce 中有关这个节点都要断

minHeap[{taskID,timestamp}]
init() {
	go func {
		for true:
			minHeapTop()
			while minHeapTop > timeout:
				wlock(status)
				if doing
					mapTask -> [undo]
				wunlock(status)
				minHeapPop()
			sleep 1s
	}
}

func ClaimTask() {
	done := true
	
	for range mapTask:
		done &= mapTask[done]
		if mapTask[i] == undo:
			atomic_write(doing)
			return mapTask
	
	for range reduceTask:
		done &= reduceTask[done]
		if reduceTask[i] == undo:
			atomic_write(doing)
			return reduceTask
	
	if done: return done
	return pending
}

func omitTask() {
	if mapTask[i]: -> done
	if reduceTask[i]: -> done
}