Golang之流式编程( 二 )
FilterFilter函数提供过滤item的功能 , FilterFunc定义过滤逻辑true保留item , false则不保留:
// 例子 保留偶数s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}fx.From(func(source chan<- interface{}) {for _, v := range s {source <- v}}).Filter(func(item interface{}) bool {if item.(int)%2 == 0 {return true}return false})// 源码func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream { return p.Walk(func(item interface{}, pipe chan<- interface{}) {// 执行过滤函数true保留 , false丢弃if fn(item) {pipe <- item} }, opts...)}
GroupGroup对流数据进行分组 , 需定义分组的key , 数据分组后以slice存入channel:
// 例子 按照首字符"g"或者"p"分组 , 没有则分到另一组ss := []string{"golang", "google", "php", "python", "java", "c++"}fx.From(func(source chan<- interface{}) {for _, s := range ss {source <- s} }).Group(func(item interface{}) interface{} {if strings.HasPrefix(item.(string), "g") {return "g"} else if strings.HasPrefix(item.(string), "p") {return "p"}return "" }).ForEach(func(item interface{}) {fmt.Println(item) })}// 源码func (p Stream) Group(fn KeyFunc) Stream {// 定义分组存储map groups := make(map[interface{}][]interface{}) for item := range p.source {// 用户自定义分组keykey := fn(item)// key相同分到一组groups[key] = append(groups[key], item) } source := make(chan interface{}) go func() {for _, group := range groups {// 相同key的一组数据写入到channelsource <- group}close(source) }() return Range(source)}
Reversereverse可以对流中元素进行反转处理:
文章插图
// 例子fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {fmt.Println(item)})// 源码func (p Stream) Reverse() Stream { var items []interface{}// 获取流中数据 for item := range p.source {items = append(items, item) } // 反转算法 for i := len(items)/2 - 1; i >= 0; i-- {opp := len(items) - 1 - iitems[i], items[opp] = items[opp], items[i] }// 写入流 return Just(items...)}
Distinctdistinct对流中元素进行去重 , 去重在业务开发中比较常用 , 经常需要对用户id等做去重操作:
// 例子fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {return item}).ForEach(func(item interface{}) {fmt.Println(item)})// 结果为 1 , 2 , 3 , 4 , 5 , 6// 源码func (p Stream) Distinct(fn KeyFunc) Stream { source := make(chan interface{}) threading.GoSafe(func() {defer close(source)// 通过key进行去重 , 相同key只保留一个keys := make(map[interface{}]lang.PlaceholderType)for item := range p.source {key := fn(item)// key存在则不保留if _, ok := keys[key]; !ok {source <- itemkeys[key] = lang.Placeholder}} }) return Range(source)}
WalkWalk函数并发的作用在流中每一个item上 , 可以通过WithWorkers设置并发数 , 默认并发数为16 , 最小并发数为1 , 如设置unlimitedWorkers为true则并发数无限制 , 但并发写入流中的数据由defaultWorkers限制 , WalkFunc中用户可以自定义后续写入流中的元素 , 可以不写入也可以写入多个元素:
// 例子fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {newItem := strings.ToUpper(item.(string))pipe
推荐阅读
- 机器人|万州区举办“中国梦科技梦”机器人编程大赛
- 英特尔推出可检测代码错误的ControlFlag机器编程工具
- 可编程3D打印耗材可帮助普通3D打印机轻松实现多材料物品的制作
- 菜鸟学编程,不懂C++ this指针?还不赶快来学一学
- 大学生如何提升Java编程能力
- 对于一个编程小白来说,该学习Java还是Python
- 盘点:2020年5种流行的 AI 编程语言,就业高薪不是梦
- 为何学习编程往往都是从编写输出HelloWorld的程序开始
- 我测|一款重量仅87克的无人机,如何链接大疆教育7年编程梦
- Golang 切片综合指南