基建KIT库

Related tags

Utilities GKit
Overview

GKIT

_____/\\\\\\\\\\\\__/\\\________/\\\__/\\\\\\\\\\\__/\\\\\\\\\\\\\\\_        
 ___/\\\//////////__\/\\\_____/\\\//__\/////\\\///__\///////\\\/////__       
  __/\\\_____________\/\\\__/\\\//_________\/\\\___________\/\\\_______      
   _\/\\\____/\\\\\\\_\/\\\\\\//\\\_________\/\\\___________\/\\\_______     
    _\/\\\___\/////\\\_\/\\\//_\//\\\________\/\\\___________\/\\\_______    
     _\/\\\_______\/\\\_\/\\\____\//\\\_______\/\\\___________\/\\\_______   
      _\/\\\_______\/\\\_\/\\\_____\//\\\______\/\\\___________\/\\\_______  
       _\//\\\\\\\\\\\\/__\/\\\______\//\\\__/\\\\\\\\\\\_______\/\\\_______ 
        __\////////////____\///________\///__\///////////________\///________                                 

目录结构

├── cache (构建缓存相关组件)
├── container (容器化组件,提供group、pool、queue)
├── downgrade (熔断降级相关组件)
├── egroup (errgroup,控制组件生命周期)
├── errors
├── generator (发号器,snowflake)
├── goroutine (提供goroutine池,控制goroutine数量激增)
├── internal (core)
├── log (接口化日志,使用日志组件接入)
├── middleware (中间件接口模型定义)
├── overload (服务器自适应保护,提供bbr接口,监控部署服务器状态选择流量放行,保护服务器可用性)
├── restrictor (限流,提供令牌桶和漏桶接口封装)
├── timeout (超时控制,全链路保护)
└── window (滑动窗口,支持多数据类型指标窗口收集)

组件使用介绍

cache

缓存相关组件

singleflight

归并回源

// getResources: 一般用于去数据库去获取数据
func getResources() (interface{}, error) {
	return "test", nil
}

// cache: 填充到 缓存中的数据
func cache(v interface{}) {
	return
}

// ExampleNewSingleFlight:
func ExampleNewSingleFlight() {
	singleFlight := NewSingleFlight()

	// 如果在key相同的情况下, 同一时间只有一个 func 可以去执行,其他的等待
	// 多用于缓存失效后,构造缓存,缓解服务器压力

	// 同步:
	v, err, _ := singleFlight.Do("test1", func() (interface{}, error) {
		// todo 这里去获取资源
		return getResources()
	})
	if err != nil {
		// todo 处理错误
	}
	// v 就是获取到的资源
	cache(v)

	// 异步:
	ch := singleFlight.DoChan("test2", func() (interface{}, error) {
		// todo 这里去获取资源
		return getResources()
	})

	result := <-ch
	if result.Err != nil {
		// todo 处理错误
	}
	cache(result.Val)

	// 尽力取消
	singleFlight.Forget("test2")
}

container

容器化组件

group

懒加载容器

func createResources() interface{} {
	return map[int]int{}
}

func createResources2() interface{} {
	return []int{}
}

var group LazyLoadGroup

func ExampleNewGroup() {
	// 类似 sync.Pool 一样
	// 初始化一个group
	group = NewGroup(createResources)
}

func ExampleGroup_Get() {
	// 如果key 不存在 调用 NewGroup 传入的 function 创建资源
	// 如果存在则返回创建的资源信息
	v := group.Get("test")
	_ = v
}

func ExampleGroup_ReSet() {
	// ReSet 重置初始化函数,同时会对缓存的 key进行清空
	group.ReSet(createResources2)
}

func ExampleGroup_Clear() {
	// 清空缓存的 buffer
	group.Clear()
}

pool

类似资源池

var pool Pool

type mock map[string]string

func (m *mock) Shutdown() error {
	return nil
}

// getResources: 获取资源,返回的资源对象需要实现 IShutdown 接口,用于资源回收
func getResources(c context.Context) (IShutdown, error) {
	return &mock{}, nil
}

func ExampleNewList() {
	// NewList(options ...)
	// 默认配置
	//pool = NewList()

	// 可供选择配置选项

	// 设置 Pool 连接数, 如果 == 0 则无限制
	//SetActive(100)

	// 设置最大空闲连接数
	//SetIdle(20)

	// 设置空闲等待时间
	//SetIdleTimeout(time.Second)

	// 设置期望等待
	//SetWait(false,time.Second)

	// 自定义配置
	pool = NewList(
		SetActive(100),
		SetIdle(20),
		SetIdleTimeout(time.Second),
		SetWait(false, time.Second))

	// New需要实例化,否则在 pool.Get() 会无法获取到资源
	pool.New(getResources)
}

func ExampleList_Get() {
	v, err := pool.Get(context.TODO())
	if err != nil {
		// 处理错误
	}
	// v 获取到的资源
	_ = v
}

func ExampleList_Put() {
	v, err := pool.Get(context.TODO())
	if err != nil {
		// 处理错误
	}

	// Put: 资源回收
	// forceClose: true 内部帮你调用 Shutdown回收, 否则判断是否是可回收,挂载到list上
	err = pool.Put(context.TODO(), v, false)
	if err != nil {
	  // 处理错误
	}
}


func ExampleList_Shutdown() {

	// Shutdown 回收资源,关闭所有资源
	_ = pool.Shutdown()
}

queue/CoDel

对列管理算法,根据实际的消费情况,算出该请求是否需要等待还是快速失败.

var queue *Queue

func ExampleNew() {
	// 默认配置
	//queue = New()

	// 可供选择配置选项

	// 设置对列延时
	//SetTarget(40)

	// 设置滑动窗口最小时间宽度
	//SetInternal(1000)

	queue = New(SetTarget(40), SetInternal(1000))
}

func ExampleQueue_Stat() {
	// start 体现 CoDel 状态信息
	start := queue.Stat()

	_ = start
}

func ExampleQueue_Push() {
	// 入队
	if err := queue.Push(context.TODO()); err != nil {
		if err == bbr.LimitExceed {
			// todo 处理过载保护错误
		} else {
			// todo 处理其他错误
		}
	}
}

func ExampleQueue_Pop() {
	// 出队,没有请求则会阻塞
	queue.Pop()
}

downgrade

熔断降级

// 与 github.com/afex/hystrix-go/hystrix 使用方法一致,只是做了抽象封装,避免因为升级对服务造成影响

var fuse Fuse

// type runFunc = func() error
// type fallbackFunc = func(error) error
// type runFuncC = func(context.Context) error
// type fallbackFuncC = func(context.Context, error) error

func mockRunFunc() runFunc {
	return func() error {
		return nil
	}
}

func mockFallbackFunc() fallbackFunc {
	return func(err error) error {
		return nil
	}
}

func mockRunFuncC() runFuncC {
	return func(ctx context.Context) error {
		return nil
	}
}

func mockFallbackFuncC() fallbackFuncC {
	return func(ctx context.Context, err error) error {
		return nil
	}
}

func ExampleNewFuse() {
	// 拿到一个熔断器
	fuse = NewFuse()
}

func ExampleHystrix_ConfigureCommand() {
	// 不设置 ConfigureCommand 走默认配置
	// hystrix.CommandConfig{} 设置参数
	fuse.ConfigureCommand("test", hystrix.CommandConfig{})
}

func ExampleHystrix_Do() {
	// Do: 同步执行 func() error, 没有超时控制 直到等到返回,
	// 如果返回 error != nil 则触发 fallbackFunc 进行降级
	err := fuse.Do("do", mockRunFunc(), mockFallbackFunc())
	if err != nil {
		// 处理 error
	}
}

func ExampleHystrix_Go() {
	// Go: 异步执行 返回 channel
	ch := fuse.Go("go", mockRunFunc(), mockFallbackFunc())
	if err := <-ch; err != nil {
		// 处理 error
	}
}

func ExampleHystrix_GoC() {
	// GoC: Do/Go 实际上最终调用的就是GoC, Do主处理了异步过程
	// GoC可以传入 context 保证链路超时控制
	fuse.GoC(context.TODO(), "goc", mockRunFuncC(), mockFallbackFuncC())
}

egroup

组件生命周期管理

// errorGroup 
// 级联控制,如果有组件发生错误,会通知group所有组件退出
// 声明声明周期管理
var admin *LifeAdmin

func mockStart() func(ctx context.Context) error {
	return nil
}

func mockShutdown() func(ctx context.Context) error {
	return nil
}

type mockLifeAdminer struct{}

func (m *mockLifeAdminer) Start(ctx context.Context) error {
	return nil
}

func (m *mockLifeAdminer) Shutdown(ctx context.Context) error {
	return nil
}

func ExampleNewLifeAdmin() {
	// 默认配置
	//admin = NewLifeAdmin()

	// 可供选择配置选项

	// 设置启动超时时间
	// <=0 不启动超时时间,注意要在shutdown处理关闭通知
	//SetStartTimeout(time.Second)

	//  设置关闭超时时间
	//	<=0 不启动超时时间
	//SetStopTimeout(time.Second)

	// 设置信号集合,和处理信号的函数
	//SetSignal(func(lifeAdmin *LifeAdmin, signal os.Signal) {
	//	return
	//}, signal...)

	admin = NewLifeAdmin(SetStartTimeout(time.Second), SetStopTimeout(time.Second), SetSignal(func(a *LifeAdmin, signal os.Signal) {
		switch signal {
		case syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT:
			a.shutdown()
		default:
		}
	}))
}

func ExampleLifeAdmin_Add() {
	// 通过struct添加
	admin.Add(Member{
		Start:    mockStart(),
		Shutdown: mockShutdown(),
	})
}

func ExampleLifeAdmin_AddMember() {
	// 根据接口适配添加
	admin.AddMember(&mockLifeAdminer{})
}

func ExampleLifeAdmin_Start() {
	defer admin.Shutdown()
	if err := admin.Start(); err != nil {
		// 处理错误
		// 正常启动会hold主
	}
}

// 完整demo
var _admin = egroup.NewLifeAdmin()

srv := &http.Server{
		Addr: ":8080",
}
// 增加任务
_admin.Add(egroup.Member{
    Start: func(ctx context.Context) error {
        t.Log("http start")
        return goroutine.Delegate(ctx, -1, func(ctx context.Context) error {
            return srv.ListenAndServe()
        })
    },
    Shutdown: func(ctx context.Context) error {
        t.Log("http shutdown")
        return srv.Shutdown(context.Background())
    },
})
// _admin.Start() 启动
fmt.Println("error", _admin.Start())
defer _admin.shutdown()

errors

封装一些error处理

generator

发号器

snowflake

雪花算法

func ExampleNewSnowflake() {
	// 生成对象
	ids := NewSnowflake(time.Now(), 1)
	nid, err := ids.NextID()
	if err != nil {
		// 处理错误   	    
	}
	_ = nid
}

goroutine

池化,控制野生goroutine

var gGroup GGroup

func mockFunc() func() {
	return func() {

	}
}

func ExampleNewGoroutine() {
	// 默认配置
	//gGroup = NewGoroutine(context.TODO())

	// 可供选择配置选项

	// 设置停止超时时间
	//SetStopTimeout(time.Second)

	// 设置日志对象
	//SetLogger(&testLogger{})

	// 设置pool最大容量
	//SetMax(100)

	gGroup = NewGoroutine(context.TODO(),
		SetStopTimeout(time.Second),
		SetLogger(&testLogger{}),
		SetMax(100),
	)
}

func ExampleGoroutine_AddTask() {
	if !gGroup.AddTask(mockFunc()) {
		// 添加任务失败
	}
}

func ExampleGoroutine_AddTaskN() {
	// 带有超时控制添加任务
	if !gGroup.AddTaskN(context.TODO(), mockFunc()) {
		// 添加任务失败
	}
}

func ExampleGoroutine_ChangeMax() {
	// 修改 pool最大容量
	gGroup.ChangeMax(1000)
}

func ExampleGoroutine_Shutdown() {
	// 回收资源
	_ = gGroup.Shutdown()
}

log

日志相关

type testLogger struct {
	*testing.T
}

func (t *testLogger) Print(kv ...interface{}) {
	t.Log(kv...)
}

log := NewHelper(&testLogger{t}, LevelDebug)
log.Debug("debug", "v")
log.Debugf("%s,%s", "debugf", "v")
log.Info("Info", "v")
log.Infof("%s,%s", "infof", "v")
log.Warn("Warn", "v")
log.Warnf("%s,%s", "warnf", "v")
log.Error("Error", "v")
log.Errorf("%s,%s", "errorf", "v")

middleware

中间件接口模型定义

overload

过载保护

普通使用

// 先建立Group
group := bbr.NewGroup()
// 如果没有就会创建
limiter := group.Get("key")
f, err := limiter.Allow(ctx)
if err != nil {
// 代表已经过载了,服务不允许接入
return
}
// Op:流量实际的操作类型回写记录指标
f(overload.DoneInfo{Op: overload.Success})

中间件套用

func ExampleNewGroup() {
	group := NewGroup()
	// 如果没有就会创建
	limiter := group.Get("key")
	f, err := limiter.Allow(context.TODO())
	if err != nil {
		// 代表已经过载了,服务不允许接入
		return
	}
	// Op:流量实际的操作类型回写记录指标
	f(overload.DoneInfo{Op: overload.Success})
}

func ExampleNewLimiter() {
	// 建立Group 中间件
	middle := NewLimiter()

	// 在middleware中 
	// ctx中携带这两个可配置的有效数据
	// 可以通过 ctx.Set

	// 配置获取限制器类型,可以根据不同api获取不同的限制器
	ctx := context.WithValue(context.TODO(), LimitKey, "key")

	// 可配置成功是否上报
	// 必须是 overload.Op 类型
	ctx = context.WithValue(ctx, LimitOp, overload.Success)

	_ = middle
}

restrictor

限流器

rate

漏桶

func ExampleNewRate() {
	// 第一个参数是 r Limit。代表每秒可以向 Token 桶中产生多少 token。Limit 实际上是 float64 的别名
	// 第二个参数是 b int。b 代表 Token 桶的容量大小。
	// limit := Every(100 * time.Millisecond);
	// limiter := rate.NewLimiter(limit, 4)
	// 以上就表示每 100ms 往桶中放一个 Token。本质上也就是一秒钟产生 10 个。

	// rate: golang.org/x/time/rate
	limiter := rate.NewLimiter(2, 4)

	af, wf := NewRate(limiter)

	// af.Allow()bool: 默认取1个token
	// af.Allow() == af.AllowN(time.Now(), 1)
	af.Allow()

	// af.AllowN(ctx,n)bool: 可以取N个token
	af.AllowN(time.Now(), 5)

	// wf.Wait(ctx) err: 等待ctx超时,默认取1个token
	// wf.Wait(ctx) == wf.WaitN(ctx, 1) 
	_ = wf.Wait(context.TODO())

	// wf.WaitN(ctx, n) err: 等待ctx超时,可以取N个token
	_ = wf.WaitN(context.TODO(), 5)
}

ratelimite

令牌桶

func ExampleNewRateLimit() {
	// ratelimit:github.com/juju/ratelimit
	bucket := ratelimit.NewBucket(time.Second/2, 4)

	af, wf := NewRateLimit(bucket)
	// af.Allow()bool: 默认取1个token
	// af.Allow() == af.AllowN(time.Now(), 1)
	af.Allow()

	// af.AllowN(ctx,n)bool: 可以取N个token
	af.AllowN(time.Now(), 5)

	// wf.Wait(ctx) err: 等待ctx超时,默认取1个token
	// wf.Wait(ctx) == wf.WaitN(ctx, 1) 
	_ = wf.Wait(context.TODO())

	// wf.WaitN(ctx, n) err: 等待ctx超时,可以取N个token
	_ = wf.WaitN(context.TODO(), 5)
}

timeout

各个服务间的超时控制

func ExampleShrink() {
	// timeout.Shrink 方法提供全链路的超时控制
	// 只需要传入一个父节点的ctx 和需要设置的超时时间,他会帮你确认这个ctx是否之前设置过超时时间,
	// 如果设置过超时时间的话会和你当前设置的超时时间进行比较,选择一个最小的进行设置,保证链路超时时间不会被下游影响
	// d: 代表剩余的超时时间
	// nCtx: 新的context对象
	// cancel: 如果是成功真正设置了超时时间会返回一个cancel()方法,未设置成功会返回一个无效的cancel,不过别担心,还是可以正常调用的
	d, nCtx, cancel := Shrink(context.Background(), 5*time.Second)
	// d 根据需要判断 
	// 一般判断该服务的下游超时时间,如果d过于小,可以直接放弃
	select {
	case <-nCtx.Done():
		cancel()
	default:
		// ...
	}
	_ = d
}

window

提供指标窗口

func ExampleInitWindow() {
	// 初始化窗口
	w := InitWindow()

	// 增加指标
	// key:权重
	w.AddIndex("key", 1)

	// Show: 返回当前指标
	slice := w.Show()
	_ = slice
}
You might also like...
Comments
  • 项目引用了gopkg.in/yaml.v2等226个开源组件,存在4个漏洞,建议升级

    项目引用了gopkg.in/yaml.v2等226个开源组件,存在4个漏洞,建议升级

    大佬,看你这个项目调用了gopkg.in/yaml.v2等226个开源组件,存在4个安全漏洞,建议你升级下。

    漏洞标题:Google Kubernetes API Server 资源管理错误漏洞
    漏洞编号:CVE-2019-11254
    漏洞描述:
    Google Kubernetes是美国谷歌(Google)公司的一套开源的Docker容器集群管理系统。该系统为容器化的应用提供资源调度、部署运行、服务发现和扩容缩容等功能。API server是其中的一个API(应用编程接口)服务器。
    Google Kubernetes 1.15.10之前版本、1.16.7之前版本和1.17.3之前版本中的API Server组件存在资源管理错误漏洞。远程攻击者可借助特制请求利用该漏洞造成拒绝服务。
    国家漏洞库信息:https://www.cnvd.org.cn/flaw/show/CNVD-2020-35519
    影响范围:(∞, 2.2.8)
    最小修复版本:2.2.8
    引入路径:
    github.com/songzhibin97/[email protected]>github.com/go-redsync/redsync/[email protected]>github.com/go-redis/redis/[email protected]>gopkg.in/[email protected]
    

    另外3个漏洞 ,信息有点多我就不贴了,你自己看下完整报告:https://www.mfsec.cn/jr?p=a23179 你对这个issues有任何疑问可以回复我,我能看见哈。

    opened by ghost 1
  • v1.1.8 build err

    v1.1.8 build err

    To upgrade to the versions selected by go 1.16: go mod tidy -go=1.16 && go mod tidy -go=1.17 If reproducibility with go 1.16 is not needed: go mod tidy -compat=1.17 Use go mod tidy -compat=1.17

    opened by songzhibin97 0
Releases(v1.2.8)
Owner
null