源码分析之logs"/>
kubectl源码分析之logs
欢迎关注我的公众号:
目前刚开始写一个月,一共写了18篇原创文章,文章目录如下:
istio多集群探秘,部署了50次多集群后我得出的结论
istio多集群链路追踪,附实操视频
istio防故障利器,你知道几个,istio新手不要读,太难!
istio业务权限控制,原来可以这么玩
istio实现非侵入压缩,微服务之间如何实现压缩
不懂envoyfilter也敢说精通istio系列-http-rbac-不要只会用AuthorizationPolicy配置权限
不懂envoyfilter也敢说精通istio系列-02-http-corsFilter-不要只会vs
不懂envoyfilter也敢说精通istio系列-03-http-csrf filter-再也不用再代码里写csrf逻辑了
不懂envoyfilter也敢说精通istio系列http-jwt_authn-不要只会RequestAuthorization
不懂envoyfilter也敢说精通istio系列-05-fault-filter-故障注入不止是vs
不懂envoyfilter也敢说精通istio系列-06-http-match-配置路由不只是vs
不懂envoyfilter也敢说精通istio系列-07-负载均衡配置不止是dr
不懂envoyfilter也敢说精通istio系列-08-连接池和断路器
不懂envoyfilter也敢说精通istio系列-09-http-route filter
不懂envoyfilter也敢说精通istio系列-network filter-redis proxy
不懂envoyfilter也敢说精通istio系列-network filter-HttpConnectionManager
不懂envoyfilter也敢说精通istio系列-ratelimit-istio ratelimit完全手册
————————————————
type LogsOptions struct {//log选项Namespace stringResourceArg stringAllContainers boolOptions runtime.ObjectResources []stringConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error// PodLogOptionsSinceTime stringSinceSeconds time.DurationFollow boolPrevious boolTimestamps boolIgnoreLogErrors boolLimitBytes int64Tail int64Container string// whether or not a container name was given via --containerContainerNameSpecified boolSelector stringMaxFollowConcurrency intObject runtime.ObjectGetPodTimeout time.DurationRESTClientGetter genericclioptions.RESTClientGetterLogsForObject polymorphichelpers.LogsForObjectFuncgenericclioptions.IOStreamsTailSpecified bool
}
func NewLogsOptions(streams genericclioptions.IOStreams, allContainers bool) *LogsOptions {return &LogsOptions{//初始化log结构体IOStreams: streams,AllContainers: allContainers,Tail: -1,MaxFollowConcurrency: 5,}
}
//创建log命令
func NewCmdLogs(f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {o := NewLogsOptions(streams, false)//初始化log结构体cmd := &cobra.Command{//创建cobra命令Use: logsUsageStr,DisableFlagsInUseLine: true,Short: i18n.T("Print the logs for a container in a pod"),Long: "Print the logs for a container in a pod or specified resource. If the pod has only one container, the container name is optional.",Example: logsExample,Run: func(cmd *cobra.Command, args []string) {cmdutil.CheckErr(o.Complete(f, cmd, args))//准备cmdutil.CheckErr(o.Validate())//校验cmdutil.CheckErr(o.RunLogs())//运行},}cmd.Flags().BoolVar(&o.AllContainers, "all-containers", o.AllContainers, "Get all containers' logs in the pod(s).")//all-container选项cmd.Flags().BoolVarP(&o.Follow, "follow", "f", o.Follow, "Specify if the logs should be streamed.")//follow选项cmd.Flags().BoolVar(&o.Timestamps, "timestamps", o.Timestamps, "Include timestamps on each line in the log output")//timestamps选项cmd.Flags().Int64Var(&o.LimitBytes, "limit-bytes", o.LimitBytes, "Maximum bytes of logs to return. Defaults to no limit.")//limit-bytes选项cmd.Flags().BoolVarP(&o.Previous, "previous", "p", o.Previous, "If true, print the logs for the previous instance of the container in a pod if it exists.")//previous选项cmd.Flags().Int64Var(&o.Tail, "tail", o.Tail, "Lines of recent log file to display. Defaults to -1 with no selector, showing all log lines otherwise 10, if a selector is provided.")//tail选项cmd.Flags().BoolVar(&o.IgnoreLogErrors, "ignore-errors", o.IgnoreLogErrors, "If watching / following pod logs, allow for any errors that occur to be non-fatal")//ignore-errors选项cmd.Flags().StringVar(&o.SinceTime, "since-time", o.SinceTime, i18n.T("Only return logs after a specific date (RFC3339). Defaults to all logs. Only one of since-time / since may be used."))//since-time选项cmd.Flags().DurationVar(&o.SinceSeconds, "since", o.SinceSeconds, "Only return logs newer than a relative duration like 5s, 2m, or 3h. Defaults to all logs. Only one of since-time / since may be used.")//since选项cmd.Flags().StringVarP(&o.Container, "container", "c", o.Container, "Print the logs of this container")//container选项cmdutil.AddPodRunningTimeoutFlag(cmd, defaultPodLogsTimeout)//pod-running-timeout选项cmd.Flags().StringVarP(&o.Selector, "selector", "l", o.Selector, "Selector (label query) to filter on.")//selector选项cmd.Flags().IntVar(&o.MaxFollowConcurrency, "max-log-requests", o.MaxFollowConcurrency, "Specify maximum number of concurrent logs to follow when using by a selector. Defaults to 5.")//max-log-requests选项return cmd
}
//准备函数
func (o *LogsOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {o.ContainerNameSpecified = cmd.Flag("container").Changed//判断是否指定容器o.TailSpecified = cmd.Flag("tail").Changed//判断是否指定tailo.Resources = args//设置参数switch len(args) {case 0:if len(o.Selector) == 0 {//如果参数为0个,selector为空报错return cmdutil.UsageErrorf(cmd, "%s", logsUsageErrStr)}case 1:o.ResourceArg = args[0]if len(o.Selector) != 0 {//如果参数为1个,指定了selector报错return cmdutil.UsageErrorf(cmd, "only a selector (-l) or a POD name is allowed")}case 2:o.ResourceArg = args[0]o.Container = args[1]default:return cmdutil.UsageErrorf(cmd, "%s", logsUsageErrStr)}var err erroro.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace()//设置namespaceif err != nil {return err}o.ConsumeRequestFn = DefaultConsumeRequest//设置ConsumeRequestFn 函数o.GetPodTimeout, err = cmdutil.GetPodRunningTimeoutFlag(cmd)//设置podTimeoutif err != nil {return err}o.Options, err = o.ToLogOptions()//构造log optionif err != nil {return err}o.RESTClientGetter = f//设置RESTClientGettero.LogsForObject = polymorphichelpers.LogsForObjectFn//获取log requestif o.Object == nil {//如果Object为空builder := f.NewBuilder().WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).NamespaceParam(o.Namespace).DefaultNamespace().SingleResourceType()//构造result对象if o.ResourceArg != "" {builder.ResourceNames("pods", o.ResourceArg)}if o.Selector != "" {builder.ResourceTypes("pods").LabelSelectorParam(o.Selector)}infos, err := builder.Do().Infos()//获取info对象if err != nil {return err}if o.Selector == "" && len(infos) != 1 {return errors.New("expected a resource")}o.Object = infos[0].Object//设置Object对象}return nil
}
//构造log option
func (o *LogsOptions) ToLogOptions() (*corev1.PodLogOptions, error) {logOptions := &corev1.PodLogOptions{//设置参数Container: o.Container,Follow: o.Follow,Previous: o.Previous,Timestamps: o.Timestamps,}if len(o.SinceTime) > 0 {//设置sinceTimet, err := util.ParseRFC3339(o.SinceTime, metav1.Now)if err != nil {return nil, err}logOptions.SinceTime = &t}if o.LimitBytes != 0 {//设置limitByteslogOptions.LimitBytes = &o.LimitBytes}if o.SinceSeconds != 0 {//设置sinceSeconds// round up to the nearest secondsec := int64(o.SinceSeconds.Round(time.Second).Seconds())logOptions.SinceSeconds = &sec}if len(o.Selector) > 0 && o.Tail == -1 && !o.TailSpecified {//设置tailLineslogOptions.TailLines = &selectorTail} else if o.Tail != -1 {logOptions.TailLines = &o.Tail}return logOptions, nil
}
//校验
func (o LogsOptions) Validate() error {if len(o.SinceTime) > 0 && o.SinceSeconds != 0 {//sinceTime和sinceSeconds不能同时指定return fmt.Errorf("at most one of `sinceTime` or `sinceSeconds` may be specified")}logsOptions, ok := o.Options.(*corev1.PodLogOptions)//把option转为POdLogOptionsif !ok {return errors.New("unexpected logs options object")}if o.AllContainers && len(logsOptions.Container) > 0 {//allContainers和container不能同时指定return fmt.Errorf("--all-containers=true should not be specified with container name %s", logsOptions.Container)}if o.ContainerNameSpecified && len(o.Resources) == 2 {//只能指定一个containerreturn fmt.Errorf("only one of -c or an inline [CONTAINER] arg is allowed")}if o.LimitBytes < 0 {//limitBytes不能小于0return fmt.Errorf("--limit-bytes must be greater than 0")}if logsOptions.SinceSeconds != nil && *logsOptions.SinceSeconds < int64(0) {//sinceSeconds不能小于0return fmt.Errorf("--since must be greater than 0")}if logsOptions.TailLines != nil && *logsOptions.TailLines < -1 {//tailLines不能小于-1return fmt.Errorf("--tail must be greater than or equal to -1")}return nil
}
//运行
func (o LogsOptions) RunLogs() error {requests, err := o.LogsForObject(o.RESTClientGetter, o.Object, o.Options, o.GetPodTimeout, o.AllContainers)//获取log requestsif err != nil {return err}if o.Follow && len(requests) > 1 {//如果是floow并且log请求大于1个if len(requests) > o.MaxFollowConcurrency {//log请求大于最大并发数量,报错return fmt.Errorf("you are attempting to follow %d log streams, but maximum allowed concurrency is %d, use --max-log-requests to increase the limit",len(requests), o.MaxFollowConcurrency,)}return o.parallelConsumeRequest(requests)//并发消费log request}return o.sequentialConsumeRequest(requests)//串行消费log request
}
//并行消费log requests
func (o LogsOptions) parallelConsumeRequest(requests []rest.ResponseWrapper) error {reader, writer := io.Pipe()//构造pipewg := &sync.WaitGroup{}//创建waitgroupwg.Add(len(requests))for _, request := range requests {//遍历log requestsgo func(request rest.ResponseWrapper) {//启动go routine打印log requestdefer wg.Done()if err := o.ConsumeRequestFn(request, writer); err != nil {if !o.IgnoreLogErrors {writer.CloseWithError(err)// It's important to return here to propagate the error via the pipereturn}fmt.Fprintf(writer, "error: %v\n", err)}}(request)}go func() {wg.Wait()writer.Close()}()_, err := io.Copy(o.Out, reader)//从pipe另一头输出log到outreturn err
}func (o LogsOptions) sequentialConsumeRequest(requests []rest.ResponseWrapper) error {for _, request := range requests {if err := o.ConsumeRequestFn(request, o.Out); err != nil {return err}}return nil
}
//消费log函数
func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error {readCloser, err := request.Stream()//获取流if err != nil {return err}defer readCloser.Close()//defer 关闭流r := bufio.NewReader(readCloser)//创建readerfor {bytes, err := r.ReadBytes('\n')//读取数据if _, err := out.Write(bytes); err != nil {//输出数据return err}if err != nil {if err != io.EOF {return err}return nil//eof返回}}
}
更多推荐
kubectl源码分析之logs
发布评论