map-reduce
map-reduce 是一种处理数据的方式,最早是由 Google 公司研究提出的一种面向大规模数据处理的并行计算模型和方法,开源的版本是 hadoop,前几年比较火。
不过,我要讲的并不是分布式的 map-reduce,而是单机单进程的 map-reduce 方法。map-reduce 分为两个步骤,第一步是映射(map),处理队列中的数据,第二步是规约(reduce),把列表中的每一个元素按照一定的处理方式处理成结果,放入到结果队列中。就像做汉堡一样,map 就是单独处理每一种食材,reduce 就是从每一份食材中取一部分,做成一个汉堡。我们先来看下 map 函数的处理逻辑:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} { out := make(chan interface{}) //创建一个输出chan if in == nil { // 异常检查 close(out) return out } go func() { // 启动一个goroutine,实现map的主要逻辑 defer close(out) for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作 out <- fn(v) } }() return out } |
reduce 函数的处理逻辑如下:
|
1 2 3 4 5 6 7 8 9 10 11 12 |
func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} { if in == nil { // 异常检查 return nil } out := <-in // 先读取第一个元素 for v := range in { // 实现reduce的主要逻辑 out = fn(out, v) } return out } |
我们可以写一个程序,这个程序使用 map-reduce 模式处理一组整数,map 函数就是为每个整数乘以 10,reduce 函数就是把 map 处理的结果累加起来:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
package main import ( "fmt" ) // 生成一个数据流 func asStream(done <-chan struct{}) <-chan interface{} { s := make(chan interface{}) values := []int{1, 2, 3, 4, 5} go func() { defer close(s) for _, v := range values { // 从数组生成 select { case <-done: return case s <- v: } } }() return s } func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} { if in == nil { // 异常检查 return nil } out := <-in // 先读取第一个元素 for v := range in { // 实现reduce的主要逻辑 out = fn(out, v) } return out } func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} { out := make(chan interface{}) //创建一个输出chan if in == nil { // 异常检查 close(out) return out } go func() { // 启动一个goroutine,实现map的主要逻辑 defer close(out) for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作 out <- fn(v) } }() return out } func main() { in := asStream(nil) // map操作: 乘以10 mapFn := func(v interface{}) interface{} { return v.(int) * 10 } // reduce操作: 对map的结果进行累加 reduceFn := func(r, v interface{}) interface{} { return r.(int) + v.(int) } sum := reduce(mapChan(in, mapFn), reduceFn) //返回累加结果 fmt.Println(sum) } |
© 著作权归作者所有
文章评论(0)