From ff4f0cbd1d54ba5acc89c97b49af017eb0d2512d Mon Sep 17 00:00:00 2001 From: Daniel Hiltgen Date: Tue, 4 Jun 2024 14:08:36 -0700 Subject: [PATCH] Prevent multiple concurrent loads on the same gpus While models are loading, the VRAM metrics are dynamic, so try to load on a GPU that doesn't have a model actively loading, or wait to avoid races that lead to OOMs --- server/sched.go | 79 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 12 deletions(-) diff --git a/server/sched.go b/server/sched.go index d380c9d4..e1ceccc1 100644 --- a/server/sched.go +++ b/server/sched.go @@ -26,6 +26,7 @@ type LlmRequest struct { sessionDuration time.Duration successCh chan *runnerRef errCh chan error + schedAttempts uint } type Scheduler struct { @@ -37,10 +38,11 @@ type Scheduler struct { loaded map[string]*runnerRef loadedMu sync.Mutex - loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) - newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) - getGpuFn func() gpu.GpuInfoList - getCpuFn func() gpu.GpuInfoList + loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) + newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) + getGpuFn func() gpu.GpuInfoList + getCpuFn func() gpu.GpuInfoList + reschedDelay time.Duration } var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded") @@ -55,6 +57,7 @@ func InitScheduler(ctx context.Context) *Scheduler { newServerFn: llm.NewLlamaServer, getGpuFn: gpu.GetGPUInfo, getCpuFn: gpu.GetCPUInfo, + reschedDelay: 250 * time.Millisecond, } sched.loadFn = sched.load return sched @@ -106,6 +109,7 @@ func (s *Scheduler) processPending(ctx context.Context) { return case pending := <-s.pendingReqCh: // Block other requests until we get this pending request running + pending.schedAttempts++ if pending.ctx.Err() != nil { slog.Debug("pending request cancelled or timed out, skipping scheduling") @@ -172,13 +176,39 @@ func (s *Scheduler) processPending(ctx context.Context) { } if runnerToExpire == nil { - // More than one loaded model, so we have to see if the new one fits + // More than one loaded model, so we have to see if the + // new one fits + // + // We want to avoid loading on any GPUs that have other + // models still loading on them to avoid potential races + // with VRAM consumption ramping up during load + availGpus := s.filterGPUsWithLoadingModels(gpus) + // Update free memory from currently loaded models - s.updateFreeSpace(gpus) - gpus = pickBestFitGPUs(pending, ggml, gpus) - if gpus != nil { + s.updateFreeSpace(availGpus) + fitGpus := pickBestFitGPUs(pending, ggml, availGpus) + if fitGpus != nil { slog.Debug("new model fits with existing models, loading") - s.loadFn(pending, ggml, gpus) + s.loadFn(pending, ggml, fitGpus) + break + } + + // We couldn't find a set of GPUs to fully load the new + // model. If no other models are loading (both GPU lists + // are the same) then we need to unload another model to + // make room + if len(availGpus) < len(gpus) { + // There are other requests pending, and this one + // needs more time, so put it on the back of the + // queue so that we might satisfy other pending + // requests that aren't blocked + go func() { + // Process in a go routine to avoid deadlocking + // the scheduler if our queue is full + slog.Debug("delaying scheduling while other models finish loading", "attempts", pending.schedAttempts, "model", pending.model.ModelPath) + time.Sleep(s.reschedDelay) + s.pendingReqCh <- pending + }() break } runnerToExpire = s.findRunnerToUnload() @@ -409,11 +439,36 @@ func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) { // after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent. allGpus[i].FreeMemory = allGpus[i].TotalMemory - p } - slog.Info("updated VRAM", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory)) + slog.Info("updated VRAM based on existing loaded models", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory)) } } } +// While models are loading the VRAM consumption numbers will be indeterminate, so we have +// to avoid scheduling another model on the same GPU(s) that haven't stabilized. +// This routine returns the set of GPUs that do not have an active loading model. +// If all GPUs have loading models, an empty list will be returned (not a single CPU entry) +func (s *Scheduler) filterGPUsWithLoadingModels(allGpus gpu.GpuInfoList) gpu.GpuInfoList { + ret := append(gpu.GpuInfoList{}, allGpus...) + s.loadedMu.Lock() + defer s.loadedMu.Unlock() + for _, runner := range s.loaded { + if runner.loading { + slog.Debug("overlapping loads detected", "gpus", runner.gpus, "model", runner.modelPath) + for _, busyGPU := range runner.gpus { + for i := range ret { + if ret[i].ID == busyGPU.ID { + ret = append(ret[:i], ret[i+1:]...) + break + } + } + } + } + } + return ret +} + +// TODO consolidate sched_types.go type runnerRef struct { refMu sync.Mutex // refCond sync.Cond // Signaled on transition from 1 -> 0 refCount @@ -519,7 +574,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan interface{} { for { <-ticker.C if time.Now().After(expiresAt) { - slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds()) + slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "model", runner.modelPath) finished <- struct{}{} } @@ -532,7 +587,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan interface{} { } // If we're within ~80% of the estimated memory usage recovered, bail out if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 { - slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds())) + slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "model", runner.modelPath) finished <- struct{}{} return }