diff --git a/server/sched.go b/server/sched.go index 37e83694..fa034d28 100644 --- a/server/sched.go +++ b/server/sched.go @@ -23,7 +23,6 @@ import ( type LlmRequest struct { ctx context.Context //nolint:containedctx model *Model - ggml *llm.GGML // TODO - how large is this, and do we need to free it after we've finished loading? opts api.Options sessionDuration time.Duration successCh chan *runnerRef @@ -39,7 +38,7 @@ type Scheduler struct { loaded map[string]*runnerRef loadedMu sync.Mutex - loadFn func(req *LlmRequest, gpus gpu.GpuInfoList) + loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) getGpuFn func() gpu.GpuInfoList } @@ -74,20 +73,14 @@ func InitScheduler(ctx context.Context) *Scheduler { // context must be canceled to decrement ref count and release the runner func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) { - ggml, err := llm.LoadModel(model.ModelPath) req := &LlmRequest{ ctx: c, model: model, - ggml: ggml, opts: opts, sessionDuration: sessionDuration, successCh: make(chan *runnerRef), errCh: make(chan error, 1), } - if err != nil { - req.errCh <- err - return req.successCh, req.errCh - } select { case s.pendingReqCh <- req: default: @@ -130,28 +123,39 @@ func (s *Scheduler) processPending(ctx context.Context) { pending.useLoadedRunner(runner, s.finishedReqCh) break } - } else if loadedCount == 0 { - slog.Debug("loading first model", "model", pending.model.ModelPath) - gpus := s.getGpuFn() - g := pickBestFitGPUs(pending, gpus) - if g != nil { - gpus = g - } - s.loadFn(pending, gpus) - break } else if loadedMax > 0 && loadedCount >= loadedMax { slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount) runnerToExpire = s.findRunnerToUnload(pending) } else { - // More than one loaded model, so we have to see if the new one fits + // Either no models are loaded or below loadedMax // Get a refreshed GPU list gpus := s.getGpuFn() + + // Load model for fitting + ggml, err := llm.LoadModel(pending.model.ModelPath) + if err != nil { + pending.errCh <- err + break + } + + // No models loaded. Load the model but prefer the best fit. + if loadedCount == 0 { + slog.Debug("loading first model", "model", pending.model.ModelPath) + g := pickBestFitGPUs(pending, ggml, gpus) + if g != nil { + gpus = g + } + s.loadFn(pending, ggml, gpus) + break + } + + // More than one loaded model, so we have to see if the new one fits // Update free memory from currently loaded models s.updateFreeSpace(gpus) - gpus = pickBestFitGPUs(pending, gpus) + gpus = pickBestFitGPUs(pending, ggml, gpus) if gpus != nil { slog.Debug("new model fits with existing models, loading") - s.loadFn(pending, gpus) + s.loadFn(pending, ggml, gpus) break } runnerToExpire = s.findRunnerToUnload(pending) @@ -282,8 +286,8 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm }() } -func (s *Scheduler) load(req *LlmRequest, gpus gpu.GpuInfoList) { - llama, err := s.newServerFn(gpus, req.model.ModelPath, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts) +func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) { + llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts) if err != nil { // some older models are not compatible with newer versions of llama.cpp // show a generalized compatibility error until there is a better way to @@ -454,7 +458,7 @@ func (a ByDuration) Less(i, j int) bool { // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits // If the model can not be fit fully within the available GPU(s) nil is returned -func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList { +func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList { var estimatedVRAM uint64 for _, gl := range gpus.ByLibrary() { var ok bool @@ -466,7 +470,7 @@ func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList { // First attempt to fit the model into a single GPU for _, g := range sgl { - if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { + if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM)) return []gpu.GpuInfo{g} } @@ -477,7 +481,7 @@ func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList { // - try subsets of GPUs instead of just falling back to 1 or all in a family // Now try all the GPUs - if ok, estimatedVRAM = llm.PredictServerFit(gl, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { + if ok, estimatedVRAM = llm.PredictServerFit(gl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM)) return gl } diff --git a/server/sched_test.go b/server/sched_test.go index 27d64a9b..3b06e2ba 100644 --- a/server/sched_test.go +++ b/server/sched_test.go @@ -47,6 +47,7 @@ func TestLoad(t *testing.T) { ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond) defer done() s := InitScheduler(ctx) + var ggml *llm.GGML // value not used in tests req := &LlmRequest{ ctx: ctx, model: &Model{ModelPath: "foo"}, @@ -59,7 +60,7 @@ func TestLoad(t *testing.T) { return nil, fmt.Errorf("something failed to load model blah") } gpus := gpu.GpuInfoList{} - s.load(req, gpus) + s.load(req, ggml, gpus) require.Len(t, req.successCh, 0) require.Len(t, req.errCh, 1) require.Len(t, s.loaded, 0) @@ -70,7 +71,7 @@ func TestLoad(t *testing.T) { s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { return server, nil } - s.load(req, gpus) + s.load(req, ggml, gpus) select { case err := <-req.errCh: require.NoError(t, err) @@ -82,7 +83,7 @@ func TestLoad(t *testing.T) { req.model.ModelPath = "dummy_model_path" server.waitResp = fmt.Errorf("wait failure") - s.load(req, gpus) + s.load(req, ggml, gpus) select { case err := <-req.errCh: require.Contains(t, err.Error(), "wait failure") @@ -101,6 +102,7 @@ type bundle struct { ctxDone func() srv *mockLlm req *LlmRequest + ggml *llm.GGML } func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { @@ -132,14 +134,15 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, }) assert.Nil(t, err) + fname := f.Name() model := &Model{Name: modelName, ModelPath: fname} - ggml, err := llm.LoadModel(model.ModelPath) + scenario.ggml, err = llm.LoadModel(model.ModelPath) require.NoError(t, err) + scenario.req = &LlmRequest{ ctx: scenario.ctx, model: model, - ggml: ggml, sessionDuration: 5 * time.Millisecond, successCh: make(chan *runnerRef, 1), errCh: make(chan error, 1), @@ -157,13 +160,13 @@ func TestRequests(t *testing.T) { scenario1a.req.sessionDuration = 0 scenario1b := newScenario(t, ctx, "ollama-model-1", 11) scenario1b.req.model = scenario1a.req.model - scenario1b.req.ggml = scenario1a.req.ggml + scenario1b.ggml = scenario1a.ggml scenario1b.req.sessionDuration = 0 // simple reload of same model scenario2a := newScenario(t, ctx, "ollama-model-1", 20) scenario2a.req.model = scenario1a.req.model - scenario2a.req.ggml = scenario1a.req.ggml + scenario2a.ggml = scenario1a.ggml // Multiple loaded models scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte) @@ -322,13 +325,14 @@ func TestGetRunner(t *testing.T) { successCh1c, errCh1c := s.GetRunner(scenario1c.ctx, scenario1c.req.model, scenario1c.req.opts, scenario1c.req.sessionDuration) require.Len(t, s.pendingReqCh, 0) require.Len(t, successCh1c, 0) + require.Len(t, errCh1c, 0) + + time.Sleep(5 * time.Millisecond) + require.Len(t, s.loaded, 0) require.Len(t, errCh1c, 1) err = <-errCh1c require.Contains(t, err.Error(), "bad path") scenario1b.ctxDone() - - time.Sleep(5 * time.Millisecond) - require.Len(t, s.loaded, 0) } // TODO - add one scenario that triggers the bogus finished event with positive ref count @@ -366,7 +370,9 @@ func TestPrematureExpired(t *testing.T) { require.LessOrEqual(t, len(s.finishedReqCh), 1) time.Sleep(10 * time.Millisecond) require.Len(t, s.finishedReqCh, 0) + s.loadedMu.Lock() require.Len(t, s.loaded, 0) + s.loadedMu.Unlock() // also shouldn't happen in real life s.finishedReqCh <- scenario1a.req @@ -426,7 +432,6 @@ func TestUpdateFreeSpace(t *testing.T) { s.updateFreeSpace(gpus) require.Equal(t, uint64(850), gpus[0].FreeMemory) require.Equal(t, uint64(1850), gpus[1].FreeMemory) - } func TestFindRunnerToUnload(t *testing.T) {