go语言实现日志收集系统图文详解
整理了一下这个日志收集系统的框,如下图
这次要实现的代码的整体逻辑为:
完整代码地址为: https://github.com/pythonsite/logagent
etcd介绍
高可用的分布式key-value存储,可以用于配置共享和服务发现
类似的项目:zookeeper和consul
开发语言:go
接口:提供restful的接口,使用简单
实现算法:基于raft算法的强一致性,高可用的服务存储目录
etcd的应用场景:
1、服务发现和服务注册
2、配置中心(我们实现的日志收集客户端需要用到)
3、分布式锁
4、master选举
官网对etcd的有一个非常简明的介绍:
etcd搭建:
下载地址:https://github.com/coreos/etcd/releases/
根据自己的环境下载对应的版本然后启动起来就可以了
启动之后可以通过如下命令验证一下:
[[email protected] etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan zhaofan[[email protected] etcd-v3.2.18-linux-amd64]# ./etcdctl get namezhaofan[[email protected] etcd-v3.2.18-linux-amd64]#
context 介绍和使用
其实这个东西翻译过来就是上下文管理,那么context的作用是做什么,主要有如下两个作用:
1、控制goroutine的超时
2、保存上下文数据
通过下面一个简单的例子进行理解:
package mainimport ( "fmt" "time" "net/http" "context" "io/ioutil")type Result struct{ r *http.Response err error}func process(){ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) defer cancel() tr := &http.Transport{} client := &http.Client{Transport:tr} c := make(chan Result,1) req,err := http.NewRequest("GET","http://www.google.com",nil) if err != nil{ fmt.Println("http request failed,err:",err) return } // 如果请求成功了会将数据存入到管道中 go func(){ resp,err := client.Do(req) pack := Result{resp,err} c <- pack }() select{ case <- ctx.Done(): tr.CancelRequest(req) fmt.Println("timeout!") case res := <-c: defer res.r.Body.Close() out,_:= ioutil.ReadAll(res.r.Body) fmt.Printf("server response:%s",out) } return}func main() { process()}
写一个通过context保存上下文,代码例子如:
package mainimport ( "github.com/Go-zh/net/context" "fmt")func add(ctx context.Context,a,b int) int { traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) return a+b}func calc(ctx context.Context,a, b int) int{ traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) //再将ctx传入到add中 return add(ctx,a,b)}func main() { //将ctx传递到calc中 ctx := context.WithValue(context.Background(),"trace_id","123456") calc(ctx,20,30)}
结合etcd和context使用
关于通过go连接etcd的简单例子:(这里有个小问题需要注意就是etcd的启动方式,默认启动可能会连接不上,尤其你是在虚拟你安装,所以需要通过如下命令启动:
./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
)
package mainimport ( etcd_client "github.com/coreos/etcd/clientv3" "time" "fmt")func main() { cli, err := etcd_client.New(etcd_client.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect success") defer cli.Close()}
下面一个例子是通过连接etcd,存值并取值
package mainimport ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context")func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect succ") defer cli.Close() ctx,cancel := context.WithTimeout(context.Background(),time.Second) _,err = cli.Put(ctx,"logagent/conf/","sample_value") cancel() if err != nil{ fmt.Println("put failed,err",err) return } ctx, cancel = context.WithTimeout(context.Background(),time.Second) resp,err := cli.Get(ctx,"logagent/conf/") cancel() if err != nil{ fmt.Println("get failed,err:",err) return } for _,ev := range resp.Kvs{ fmt.Printf("%s:%s\n",ev.Key,ev.Value) }}
关于context官网也有一个例子非常有用,用于控制开启的goroutine的退出,代码如下:
package mainimport ( "context" "fmt")func main() { // gen generates integers in a separate goroutine and // sends them to the returned channel. // The callers of gen need to cancel the context once // they are done consuming generated integers not to leak // the internal goroutine started by gen. gen := func(ctx context.Context) <-chan int { dst := make(chan int) n := 1 go func() {for { select { case <-ctx.Done(): return // returning not to leak the goroutine case dst <- n: n++ }} }() return dst } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel when we are finished consuming integers for n := range gen(ctx) { fmt.Println(n) if n == 5 {break } }}
关于官网文档中的WithDeadline演示的代码例子:
package mainimport ( "context" "fmt" "time")func main() { d := time.Now().Add(50 * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) // Even though ctx will be expired, it is good practice to call its // cancelation function in any case. Failure to do so may keep the // context and its parent alive longer than necessary. defer cancel() select { case <-time.After(1 * time.Second): fmt.Println("overslept") case <-ctx.Done(): fmt.Println(ctx.Err()) }}
通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:
package mainimport ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context")func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil { fmt.Println("connect failed,err:",err) return } defer cli.Close() // 这里会阻塞 rch := cli.Watch(context.Background(),"logagent/conf/") for wresp := range rch{ for _,ev := range wresp.Events{fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } }}
实现一个kafka的消费者代码的简单例子:
package mainimport ( "github.com/Shopify/sarama" "strings" "fmt" "time")func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil {fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){for msg := range pc.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))} }(pc) } time.Sleep(time.Hour) consumer.Close()}
但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现
package mainimport ( "github.com/Shopify/sarama" "strings" "fmt" "sync")var ( wg sync.WaitGroup)func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil {fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){wg.Add(1)for msg := range partitionConsumer.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))}wg.Done() }(pc) } //time.Sleep(time.Hour) wg.Wait() consumer.Close()}
将客户端需要收集的日志信息放到etcd中
关于etcd处理的代码为:
package mainimport ( "github.com/coreos/etcd/clientv3" "time" "github.com/astaxie/beego/logs" "context" "fmt")var Client *clientv3.Clientvar logConfChan chan string// 初始化etcdfunc initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){ var keys []string for _,ip := range ipArrays{ //keyfmt = /logagent/%s/log_config keys = append(keys,fmt.Sprintf(keyfmt,ip)) } logConfChan = make(chan string,10) logs.Debug("etcd watch key:%v timeout:%v", keys, timeout) Client,err = clientv3.New(clientv3.Config{ Endpoints:addr, DialTimeout: timeout, }) if err != nil{ logs.Error("connect failed,err:%v",err) return } logs.Debug("init etcd success") waitGroup.Add(1) for _, key := range keys{ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) // 从etcd中获取要收集日志的信息 resp,err := Client.Get(ctx,key) cancel() if err != nil {logs.Warn("get key %s failed,err:%v",key,err)continue } for _, ev := range resp.Kvs{logs.Debug("%q : %q\n", ev.Key, ev.Value)logConfChan <- string(ev.Value) } } go WatchEtcd(keys) return}func WatchEtcd(keys []string){ // 这里用于检测当需要收集的日志信息更改时及时更新 var watchChans []clientv3.WatchChan for _,key := range keys{ rch := Client.Watch(context.Background(),key) watchChans = append(watchChans,rch) } for { for _,watchC := range watchChans{select{case wresp := <-watchC: for _,ev:= range wresp.Events{ logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) logConfChan <- string(ev.Kv.Value) }default:} } time.Sleep(time.Second) } waitGroup.Done()}func GetLogConf()chan string{ return logConfChan}
同样的这里增加对了限速的处理,毕竟日志收集程序不能影响了当前业务的性能,所以增加了limit.go用于限制速度:
package mainimport ( "time" "sync/atomic" "github.com/astaxie/beego/logs")type SecondLimit struct { unixSecond int64 curCount int32 limit int32}func NewSecondLimit(limit int32) *SecondLimit { secLimit := &SecondLimit{ unixSecond:time.Now().Unix(), curCount:0, limit:limit, } return secLimit}func (s *SecondLimit) Add(count int) { sec := time.Now().Unix() if sec == s.unixSecond { atomic.AddInt32(&s.curCount,int32(count)) return } atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount, int32(count))}func (s *SecondLimit) Wait()bool { for { sec := time.Now().Unix() if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {time.Sleep(time.Microsecond)logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)continue } if sec != atomic.LoadInt64(&s.unixSecond) {atomic.StoreInt64(&s.unixSecond,sec)atomic.StoreInt32(&s.curCount,0) } logs.Debug("limit is exited") return false }}
推荐:go语言教程
以上就是go语言实现日志收集系统图文详解的详细内容,更多请关注其它相关文章!
相关文章