From 84ac7ce139252506d77115a3152f36a5a4f3541a Mon Sep 17 00:00:00 2001 From: Daniel Hiltgen Date: Thu, 9 May 2024 11:10:28 -0700 Subject: [PATCH] Refine subprocess reaping --- llm/server.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/llm/server.go b/llm/server.go index b452434e..4600d00f 100644 --- a/llm/server.go +++ b/llm/server.go @@ -53,6 +53,7 @@ type llmServer struct { estimatedTotal uint64 // Total size of model totalLayers uint64 gpuCount int + loadDuration time.Duration // Record how long it took the model to load sem *semaphore.Weighted } @@ -291,6 +292,7 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr sem: semaphore.NewWeighted(int64(numParallel)), totalLayers: ggml.KV().BlockCount() + 1, gpuCount: gpuCount, + done: make(chan error, 1), } s.cmd.Env = os.Environ() @@ -339,6 +341,11 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr continue } + // reap subprocess when it exits + go func() { + s.done <- s.cmd.Wait() + }() + return s, nil } @@ -486,6 +493,7 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error { expiresAt := time.Now().Add(10 * time.Minute) // be generous with timeout, large models can take a while to load slog.Info("waiting for llama runner to start responding") + var lastStatus ServerStatus = -1 for { select { @@ -500,12 +508,6 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error { return fmt.Errorf("llama runner process has terminated: %v %s", err, msg) default: } - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - status, err := s.getServerStatus(ctx) - if err != nil { - slog.Debug("server not yet available", "error", err) - } if time.Now().After(expiresAt) { // timeout msg := "" @@ -521,14 +523,20 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error { } return fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg) } + ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel() + status, _ := s.getServerStatus(ctx) + if lastStatus != status && status != ServerStatusReady { + // Only log on status changes + slog.Info("waiting for server to become available", "status", status.ToString()) + } switch status { - case ServerStatusLoadingModel: - time.Sleep(time.Millisecond * 250) - slog.Debug("loading model") case ServerStatusReady: - slog.Info(fmt.Sprintf("llama runner started in %0.2f seconds", time.Since(start).Seconds())) + s.loadDuration = time.Since(start) + slog.Info(fmt.Sprintf("llama runner started in %0.2f seconds", s.loadDuration.Seconds())) return nil default: + lastStatus = status time.Sleep(time.Millisecond * 250) continue } @@ -930,8 +938,11 @@ func (s *llmServer) Close() error { if err := s.cmd.Process.Kill(); err != nil { return err } - - _ = s.cmd.Wait() + // if ProcessState is already populated, Wait already completed, no need to wait again + if s.cmd.ProcessState == nil { + slog.Debug("waiting for llama server to exit") + <-s.done + } slog.Debug("llama server stopped") }