Golang Stream Api
2021-05-15 23:29
标签:false convert ever eve ide lap one ted splay Golang Stream Api 标签:false convert ever eve ide lap one ted splay 原文地址:https://www.cnblogs.com/leerocks/p/14649473.htmlpackage stream
import (
"log"
"reflect"
"sort"
)
type (
// a Stream is where one can drain data from
Stream chan interface{}
// buffer stream
BufferStream struct {
Stream
Size int
}
// retains those element agreed with FilterFunc
FilterFunc func(i interface{}) bool
// used by sort
LessFunc func(i, j interface{}) bool
//
KeyFunc func(i interface{}) interface{}
)
// instance an empty Stream
func NewStream() Stream {
return make(chan interface{})
}
// init a Stream from a source
func (s Stream) From(source interface{}) Stream {
v := reflect.ValueOf(source)
switch k := v.Kind(); k {
case reflect.Slice:
go func() {
defer close(s)
for i := 0; i {
s v.Index(i).Interface()
}
}()
default:
panic("got a non-slice kind source")
}
return s
}
func (s Stream) Retain(f FilterFunc) Stream {
c := make(chan interface{})
go func() {
defer close(c)
for i := range s {
if f(i) {
c i
}
}
}()
return c
}
func (s Stream) Sort(lessFunc LessFunc) Stream {
cache := make([]interface{}, 0)
for i := range s {
cache = append(cache, i)
}
sort.Slice(cache, func(i, j int) bool {
return lessFunc(cache[i], cache[j])
})
return NewStream().From(cache)
}
func (s Stream) Reverse() Stream {
var items []interface{}
for item := range s {
items = append(items, item)
}
// reverse, official method
for i := len(items)/2 - 1; i >= 0; i-- {
opp := len(items) - 1 - i
items[i], items[opp] = items[opp], items[i]
}
return NewStream().From(items)
}
// sink: print
func (s Stream) Print() {
for i := range s {
log.Println(i)
}
}
// sink: size
func (s Stream) Size() int {
count := 0
for range s {
count += 1
}
return count
}
// sink
func (s Stream) First(n int) Stream {
c := make(chan interface{}, n)
count := 0
go func() {
defer close(c)
for i := range s {
if count n {
c i
count += 1
}
}
}()
return c
}
// sink
func (s Stream) FirstOne() (interface{}, bool) {
c := s.First(1)
cache := make([]interface{}, 0)
for i := range c {
cache = append(cache, i)
}
if len(cache) == 0 {
return nil, false
}
return cache[0], true
}
// sink
func (s Stream) Sum(f KeyFunc) float64 {
result := 0.0
for i := range s {
v := reflect.ValueOf(f(i))
switch v.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
result += float64(v.Int())
case reflect.Float32, reflect.Float64:
result += v.Float()
}
}
return result
}
package stream
type (
GroupedRecord struct {
Key interface{}
Value []interface{}
}
AggregatedRecord struct {
Key interface{}
Value interface{}
}
AggFunc func(elements []interface{}) interface{}
GroupedStream chan GroupedRecord
AggregatedGroupedStream chan AggregatedRecord
)
func (s Stream) GroupBy(f KeyFunc) GroupedStream {
c := make(chan GroupedRecord)
cache := make(map[interface{}][]interface{})
for i := range s {
key := f(i)
cache[key] = append(cache[key], i)
}
go func() {
defer close(c)
for k, v := range cache {
c GroupedRecord{
Key: k,
Value: v,
}
}
}()
return c
}
func (gs GroupedStream) Agg(aggFunc AggFunc) AggregatedGroupedStream {
c := make(chan AggregatedRecord)
go func() {
defer close(c)
for gr := range gs {
c AggregatedRecord{
Key: gr.Key,
Value: aggFunc(gr.Value),
}
}
}()
return c
}
func (ags AggregatedGroupedStream) Gather() map[interface{}]interface{} {
r := make(map[interface{}]interface{})
for i := range ags {
r[i.Key] = i.Value
}
return r
}
package stream
import "log"
type (
JoinedValue struct {
Left interface{}
Right interface{}
}
JoinedRecord struct {
Key interface{}
Value []JoinedValue
}
JoinedStream chan JoinedRecord
JoinedFilterFunc func(left, right interface{}) bool
JoinAggFunc func(left, right interface{}) interface{}
)
func Join(left, right Stream, leftBy, rightBy KeyFunc) JoinedStream {
c := make(chan JoinedRecord)
cache := make(map[interface{}][]JoinedValue)
leftCache, rightCache := make([]interface{}, 0), make([]interface{}, 0)
for i := range left {
leftCache = append(leftCache, i)
}
for j := range right {
rightCache = append(rightCache, j)
}
for _, i := range leftCache {
for _, j := range rightCache {
keyLeft, keyRight := leftBy(i), rightBy(j)
if keyLeft == keyRight {
cache[keyLeft] = append(cache[keyLeft], JoinedValue{
Left: i,
Right: j,
})
}
}
}
go func() {
defer close(c)
for k, v := range cache {
c JoinedRecord{
Key: k,
Value: v,
}
}
}()
return c
}
func (js JoinedStream) Filter(f JoinedFilterFunc) JoinedStream {
c := make(chan JoinedRecord)
go func() {
defer close(c)
for jr := range js {
cache := make([]JoinedValue, 0)
for _, jv := range jr.Value {
if f(jv.Left, jv.Right) {
cache = append(cache, jv)
}
}
if len(cache) != 0 {
c JoinedRecord{
Key: jr.Key,
Value: cache,
}
}
}
}()
return c
}
// sink
func (js JoinedStream) Print() {
for i := range js {
log.Println(i)
}
}
// convert a stream of (key,[]JoinValue) to a stream of (key, []interface)
// a.k.a a JoinedStream -> a GroupedStream
func (js JoinedStream) Fold(f JoinAggFunc) GroupedStream {
c := make(chan GroupedRecord)
go func() {
defer close(c)
for jr := range js {
cache := make([]interface{}, 0)
for _, jv := range jr.Value {
cache = append(cache, f(jv.Left, jv.Right))
}
c GroupedRecord{
Key: jr.Key,
Value: cache,
}
}
}()
return c
}
上一篇:c# 键值对照表
下一篇:Restful API