package logs import ( "bufio" "context" "fmt" "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) // Logger represents a Kubernetes log tailer type Logger struct { // Deployment name Deployment string // Namespace name Namespace string // Optional: Pod name (if specified, will tail logs from this specific pod) Pod string // Optional: Container name (if specified, will tail logs from this specific container) Container string // Optional: Number of lines to tail from the end of the logs // If not specified, will tail from the beginning TailLines *int64 // Optional: Time to wait before giving up on tailing Timeout time.Duration } // Tailer represents a running log tailer type Tailer struct { lines chan string done chan struct{} } // NextLine returns a channel that will receive log lines func (t *Tailer) NextLine() <-chan string { return t.lines } // Stop stops the tailer and closes the line channel func (t *Tailer) Stop() { close(t.done) } // Start starts the log tailer func (l *Logger) Start() (*Tailer, func() error) { tailer := &Tailer{ lines: make(chan string), done: make(chan struct{}), } // Create Kubernetes client client, err := l.createKubernetesClient() if err != nil { close(tailer.lines) return tailer, func() error { return err } } // Get pod name if not specified podName := l.Pod if podName == "" { podName, err = l.getPodName(client) if err != nil { close(tailer.lines) return tailer, func() error { return err } } } // Get container name if not specified containerName := l.Container if containerName == "" { containerName, err = l.getContainerName(client, podName) if err != nil { close(tailer.lines) return tailer, func() error { return err } } } // Start tailing logs go l.tailLogs(client, podName, containerName, tailer) return tailer, func() error { tailer.Stop() return nil } } func (l *Logger) createKubernetesClient() (*kubernetes.Clientset, error) { // Try to load in-cluster config first config, err := rest.InClusterConfig() if err != nil { // If in-cluster config fails, try kubeconfig loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() configOverrides := &clientcmd.ConfigOverrides{} kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) config, err = kubeConfig.ClientConfig() if err != nil { return nil, fmt.Errorf("failed to create kubernetes client: %w", err) } } client, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to create kubernetes client: %w", err) } return client, nil } func (l *Logger) getPodName(client *kubernetes.Clientset) (string, error) { // List pods with the deployment label podList, err := client.CoreV1().Pods(l.Namespace).List(context.TODO(), metav1.ListOptions{ LabelSelector: fmt.Sprintf("app=%s", l.Deployment), }) if err != nil { return "", fmt.Errorf("failed to list pods: %w", err) } if len(podList.Items) == 0 { return "", fmt.Errorf("no pods found for deployment %s in namespace %s", l.Deployment, l.Namespace) } // Return the first pod that's running for _, pod := range podList.Items { if pod.Status.Phase == corev1.PodRunning { return pod.Name, nil } } // If no running pods, return the first pod return podList.Items[0].Name, nil } func (l *Logger) getContainerName(client *kubernetes.Clientset, podName string) (string, error) { pod, err := client.CoreV1().Pods(l.Namespace).Get(context.TODO(), podName, metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("failed to get pod %s: %w", podName, err) } if len(pod.Spec.Containers) == 0 { return "", fmt.Errorf("pod %s has no containers", podName) } // Return the first container name return pod.Spec.Containers[0].Name, nil } func (l *Logger) tailLogs(client *kubernetes.Clientset, podName, containerName string, tailer *Tailer) { defer close(tailer.lines) // Prepare log request logOpts := &corev1.PodLogOptions{ Container: containerName, Follow: true, } if l.TailLines != nil { logOpts.TailLines = l.TailLines } // Create context with timeout if specified ctx := context.Background() if l.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, l.Timeout) defer cancel() } // Get logs stream req := client.CoreV1().Pods(l.Namespace).GetLogs(podName, logOpts) logsStream, err := req.Stream(ctx) if err != nil { select { case tailer.lines <- fmt.Sprintf("Error: failed to get logs stream: %v", err): case <-tailer.done: return } return } defer logsStream.Close() // Read logs line by line scanner := bufio.NewScanner(logsStream) for scanner.Scan() { select { case tailer.lines <- scanner.Text(): case <-tailer.done: return } } if err := scanner.Err(); err != nil { select { case tailer.lines <- fmt.Sprintf("Error: failed to read logs: %v", err): case <-tailer.done: return } } }