Skip to content

Commit 59108fb

Browse files
authored
feat: add distributed mode (#9124)
* feat: add distributed mode (experimental) Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix data races, mutexes, transactions Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix events and tool stream in agent chat Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * use ginkgo Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix(cron): compute correctly time boundaries avoiding re-triggering Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not flood of healthy checks Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * do not list obvious backends as text backends Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * tests fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * refactoring and consolidation Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Drop redundant healthcheck Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * enhancements, refactorings Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
1 parent 4c87028 commit 59108fb

389 files changed

Lines changed: 276253 additions & 246469 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/gallery-agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ func getHuggingFaceAvatarURL(author string) string {
406406
}
407407

408408
// Parse the response to get avatar URL
409-
var userInfo map[string]interface{}
409+
var userInfo map[string]any
410410
body, err := io.ReadAll(resp.Body)
411411
if err != nil {
412412
return ""

.github/gallery-agent/testing.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"math/rand"
6+
"math/rand/v2"
77
"strings"
88
"time"
99
)
@@ -13,11 +13,11 @@ func runSyntheticMode() error {
1313
generator := NewSyntheticDataGenerator()
1414

1515
// Generate a random number of synthetic models (1-3)
16-
numModels := generator.rand.Intn(3) + 1
16+
numModels := generator.rand.IntN(3) + 1
1717
fmt.Printf("Generating %d synthetic models for testing...\n", numModels)
1818

1919
var models []ProcessedModel
20-
for i := 0; i < numModels; i++ {
20+
for i := range numModels {
2121
model := generator.GenerateProcessedModel()
2222
models = append(models, model)
2323
fmt.Printf("Generated synthetic model: %s\n", model.ModelID)
@@ -42,14 +42,14 @@ type SyntheticDataGenerator struct {
4242
// NewSyntheticDataGenerator creates a new synthetic data generator
4343
func NewSyntheticDataGenerator() *SyntheticDataGenerator {
4444
return &SyntheticDataGenerator{
45-
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
45+
rand: rand.New(rand.NewPCG(uint64(time.Now().UnixNano()), 0)),
4646
}
4747
}
4848

4949
// GenerateProcessedModelFile creates a synthetic ProcessedModelFile
5050
func (g *SyntheticDataGenerator) GenerateProcessedModelFile() ProcessedModelFile {
5151
fileTypes := []string{"model", "readme", "other"}
52-
fileType := fileTypes[g.rand.Intn(len(fileTypes))]
52+
fileType := fileTypes[g.rand.IntN(len(fileTypes))]
5353

5454
var path string
5555
var isReadme bool
@@ -68,7 +68,7 @@ func (g *SyntheticDataGenerator) GenerateProcessedModelFile() ProcessedModelFile
6868

6969
return ProcessedModelFile{
7070
Path: path,
71-
Size: int64(g.rand.Intn(1000000000) + 1000000), // 1MB to 1GB
71+
Size: int64(g.rand.IntN(1000000000) + 1000000), // 1MB to 1GB
7272
SHA256: g.randomSHA256(),
7373
IsReadme: isReadme,
7474
FileType: fileType,
@@ -80,19 +80,19 @@ func (g *SyntheticDataGenerator) GenerateProcessedModel() ProcessedModel {
8080
authors := []string{"microsoft", "meta", "google", "openai", "anthropic", "mistralai", "huggingface"}
8181
modelNames := []string{"llama", "gpt", "claude", "mistral", "gemma", "phi", "qwen", "codellama"}
8282

83-
author := authors[g.rand.Intn(len(authors))]
84-
modelName := modelNames[g.rand.Intn(len(modelNames))]
83+
author := authors[g.rand.IntN(len(authors))]
84+
modelName := modelNames[g.rand.IntN(len(modelNames))]
8585
modelID := fmt.Sprintf("%s/%s-%s", author, modelName, g.randomString(6))
8686

8787
// Generate files
88-
numFiles := g.rand.Intn(5) + 2 // 2-6 files
88+
numFiles := g.rand.IntN(5) + 2 // 2-6 files
8989
files := make([]ProcessedModelFile, numFiles)
9090

9191
// Ensure at least one model file and one readme
9292
hasModelFile := false
9393
hasReadme := false
9494

95-
for i := 0; i < numFiles; i++ {
95+
for i := range numFiles {
9696
files[i] = g.GenerateProcessedModelFile()
9797
if files[i].FileType == "model" {
9898
hasModelFile = true
@@ -140,27 +140,27 @@ func (g *SyntheticDataGenerator) GenerateProcessedModel() ProcessedModel {
140140

141141
// Generate sample metadata
142142
licenses := []string{"apache-2.0", "mit", "llama2", "gpl-3.0", "bsd", ""}
143-
license := licenses[g.rand.Intn(len(licenses))]
143+
license := licenses[g.rand.IntN(len(licenses))]
144144

145145
sampleTags := []string{"llm", "gguf", "gpu", "cpu", "text-to-text", "chat", "instruction-tuned"}
146-
numTags := g.rand.Intn(4) + 3 // 3-6 tags
146+
numTags := g.rand.IntN(4) + 3 // 3-6 tags
147147
tags := make([]string, numTags)
148-
for i := 0; i < numTags; i++ {
149-
tags[i] = sampleTags[g.rand.Intn(len(sampleTags))]
148+
for i := range numTags {
149+
tags[i] = sampleTags[g.rand.IntN(len(sampleTags))]
150150
}
151151
// Remove duplicates
152152
tags = g.removeDuplicates(tags)
153153

154154
// Optionally include icon (50% chance)
155155
icon := ""
156-
if g.rand.Intn(2) == 0 {
156+
if g.rand.IntN(2) == 0 {
157157
icon = fmt.Sprintf("https://cdn-avatars.huggingface.co/v1/production/uploads/%s.png", g.randomString(24))
158158
}
159159

160160
return ProcessedModel{
161161
ModelID: modelID,
162162
Author: author,
163-
Downloads: g.rand.Intn(1000000) + 1000,
163+
Downloads: g.rand.IntN(1000000) + 1000,
164164
LastModified: g.randomDate(),
165165
Files: files,
166166
PreferredModelFile: preferredModelFile,
@@ -180,7 +180,7 @@ func (g *SyntheticDataGenerator) randomString(length int) string {
180180
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
181181
b := make([]byte, length)
182182
for i := range b {
183-
b[i] = charset[g.rand.Intn(len(charset))]
183+
b[i] = charset[g.rand.IntN(len(charset))]
184184
}
185185
return string(b)
186186
}
@@ -189,14 +189,14 @@ func (g *SyntheticDataGenerator) randomSHA256() string {
189189
const charset = "0123456789abcdef"
190190
b := make([]byte, 64)
191191
for i := range b {
192-
b[i] = charset[g.rand.Intn(len(charset))]
192+
b[i] = charset[g.rand.IntN(len(charset))]
193193
}
194194
return string(b)
195195
}
196196

197197
func (g *SyntheticDataGenerator) randomDate() string {
198198
now := time.Now()
199-
daysAgo := g.rand.Intn(365) // Random date within last year
199+
daysAgo := g.rand.IntN(365) // Random date within last year
200200
pastDate := now.AddDate(0, 0, -daysAgo)
201201
return pastDate.Format("2006-01-02T15:04:05.000Z")
202202
}
@@ -220,5 +220,5 @@ func (g *SyntheticDataGenerator) generateReadmeContent(modelName, author string)
220220
fmt.Sprintf("# %s Language Model\n\nDeveloped by %s, this model represents state-of-the-art performance in natural language understanding and generation.\n\n## Key Features\n\n- Multilingual support\n- Context-aware responses\n- Efficient memory usage\n- Fast inference speed\n\n## Applications\n\n- Chatbots and virtual assistants\n- Content generation\n- Code completion\n- Educational tools", strings.Title(modelName), author),
221221
}
222222

223-
return templates[g.rand.Intn(len(templates))]
223+
return templates[g.rand.IntN(len(templates))]
224224
}

.github/workflows/test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
runs-on: ubuntu-latest
2222
strategy:
2323
matrix:
24-
go-version: ['1.25.x']
24+
go-version: ['1.26.x']
2525
steps:
2626
- name: Free Disk Space (Ubuntu)
2727
uses: jlumbroso/free-disk-space@main
@@ -179,7 +179,7 @@ jobs:
179179
runs-on: macos-latest
180180
strategy:
181181
matrix:
182-
go-version: ['1.25.x']
182+
go-version: ['1.26.x']
183183
steps:
184184
- name: Clone
185185
uses: actions/checkout@v6

Dockerfile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ ENV PATH=/opt/rocm/bin:${PATH}
176176
# The requirements-core target is common to all images. It should not be placed in requirements-core unless every single build will use it.
177177
FROM requirements-drivers AS build-requirements
178178

179-
ARG GO_VERSION=1.25.4
179+
ARG GO_VERSION=1.26.0
180180
ARG CMAKE_VERSION=3.31.10
181181
ARG CMAKE_FROM_SOURCE=false
182182
ARG TARGETARCH
@@ -319,7 +319,6 @@ COPY ./.git ./.git
319319
# Some of the Go backends use libs from the main src, we could further optimize the caching by building the CPP backends before here
320320
COPY ./pkg/grpc ./pkg/grpc
321321
COPY ./pkg/utils ./pkg/utils
322-
COPY ./pkg/langchain ./pkg/langchain
323322

324323
RUN ls -l ./
325324
RUN make protogen-go

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ For older news and full release notes, see [GitHub Releases](https://github.com/
154154
- [Object Detection](https://localai.io/features/object-detection/)
155155
- [Reranker API](https://localai.io/features/reranker/)
156156
- [P2P Inferencing](https://localai.io/features/distribute/)
157+
- [Distributed Mode](https://localai.io/features/distributed-mode/) — Horizontal scaling with PostgreSQL + NATS
157158
- [Model Context Protocol (MCP)](https://localai.io/docs/features/mcp/)
158159
- [Built-in Agents](https://localai.io/features/agents/) — Autonomous AI agents with tool use, RAG, skills, SSE streaming, and [Agent Hub](https://agenthub.localai.io)
159160
- [Backend Gallery](https://localai.io/backends/) — Install/remove backends on the fly via OCI images

backend/backend.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ service Backend {
5151
rpc StartQuantization(QuantizationRequest) returns (QuantizationJobResult) {}
5252
rpc QuantizationProgress(QuantizationProgressRequest) returns (stream QuantizationProgressUpdate) {}
5353
rpc StopQuantization(QuantizationStopRequest) returns (Result) {}
54+
5455
}
5556

5657
// Define the empty request
@@ -676,3 +677,4 @@ message QuantizationProgressUpdate {
676677
message QuantizationStopRequest {
677678
string job_id = 1;
678679
}
680+

backend/cpp/llama-cpp/grpc-server.cpp

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
#include <grpcpp/ext/proto_server_reflection_plugin.h>
2323
#include <grpcpp/grpcpp.h>
2424
#include <grpcpp/health_check_service_interface.h>
25+
#include <grpcpp/security/server_credentials.h>
2526
#include <regex>
2627
#include <atomic>
28+
#include <cstdlib>
2729
#include <mutex>
2830
#include <signal.h>
2931
#include <thread>
@@ -37,6 +39,47 @@ using grpc::Server;
3739
using grpc::ServerBuilder;
3840
using grpc::ServerContext;
3941
using grpc::Status;
42+
43+
// gRPC bearer token auth via AuthMetadataProcessor for distributed mode.
44+
// Reads LOCALAI_GRPC_AUTH_TOKEN from the environment. When set, rejects
45+
// requests without a matching "authorization: Bearer <token>" metadata header.
46+
class TokenAuthMetadataProcessor : public grpc::AuthMetadataProcessor {
47+
public:
48+
explicit TokenAuthMetadataProcessor(const std::string& token) : token_(token) {}
49+
50+
bool IsBlocking() const override { return false; }
51+
52+
grpc::Status Process(const InputMetadata& auth_metadata,
53+
grpc::AuthContext* /*context*/,
54+
OutputMetadata* /*consumed_auth_metadata*/,
55+
OutputMetadata* /*response_metadata*/) override {
56+
auto it = auth_metadata.find("authorization");
57+
if (it != auth_metadata.end()) {
58+
std::string expected = "Bearer " + token_;
59+
std::string got(it->second.data(), it->second.size());
60+
// Constant-time comparison
61+
if (expected.size() == got.size() && ct_memcmp(expected.data(), got.data(), expected.size()) == 0) {
62+
return grpc::Status::OK;
63+
}
64+
}
65+
return grpc::Status(grpc::StatusCode::UNAUTHENTICATED, "invalid token");
66+
}
67+
68+
private:
69+
std::string token_;
70+
71+
// Minimal constant-time comparison (avoids OpenSSL dependency)
72+
static int ct_memcmp(const void* a, const void* b, size_t n) {
73+
const unsigned char* pa = static_cast<const unsigned char*>(a);
74+
const unsigned char* pb = static_cast<const unsigned char*>(b);
75+
unsigned char result = 0;
76+
for (size_t i = 0; i < n; i++) {
77+
result |= pa[i] ^ pb[i];
78+
}
79+
return result;
80+
}
81+
};
82+
4083
// END LocalAI
4184

4285

@@ -2760,11 +2803,24 @@ int main(int argc, char** argv) {
27602803
BackendServiceImpl service(ctx_server);
27612804

27622805
ServerBuilder builder;
2763-
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
2806+
// Add bearer token auth via AuthMetadataProcessor if LOCALAI_GRPC_AUTH_TOKEN is set
2807+
const char* auth_token = std::getenv("LOCALAI_GRPC_AUTH_TOKEN");
2808+
std::shared_ptr<grpc::ServerCredentials> creds;
2809+
if (auth_token != nullptr && auth_token[0] != '\0') {
2810+
creds = grpc::InsecureServerCredentials();
2811+
creds->SetAuthMetadataProcessor(
2812+
std::make_shared<TokenAuthMetadataProcessor>(auth_token));
2813+
std::cout << "gRPC auth enabled via LOCALAI_GRPC_AUTH_TOKEN" << std::endl;
2814+
} else {
2815+
creds = grpc::InsecureServerCredentials();
2816+
}
2817+
2818+
builder.AddListeningPort(server_address, creds);
27642819
builder.RegisterService(&service);
27652820
builder.SetMaxMessageSize(50 * 1024 * 1024); // 50MB
27662821
builder.SetMaxSendMessageSize(50 * 1024 * 1024); // 50MB
27672822
builder.SetMaxReceiveMessageSize(50 * 1024 * 1024); // 50MB
2823+
27682824
std::unique_ptr<Server> server(builder.BuildAndStart());
27692825
// run the HTTP server in a thread - see comment below
27702826
std::thread t([&]()

backend/go/acestep-cpp/acestepcpp_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ func TestLoadModel(t *testing.T) {
106106
defer conn.Close()
107107

108108
client := pb.NewBackendClient(conn)
109-
109+
110110
// Get base directory from main model file for relative paths
111111
mainModelPath := filepath.Join(modelDir, "acestep-5Hz-lm-0.6B-Q8_0.gguf")
112-
112+
113113
resp, err := client.LoadModel(context.Background(), &pb.ModelOptions{
114114
ModelFile: mainModelPath,
115115
ModelPath: modelDir,
@@ -134,7 +134,7 @@ func TestSoundGeneration(t *testing.T) {
134134
if err != nil {
135135
t.Fatal(err)
136136
}
137-
defer os.RemoveAll(tmpDir)
137+
t.Cleanup(func() { os.RemoveAll(tmpDir) })
138138

139139
outputFile := filepath.Join(tmpDir, "output.wav")
140140

backend/go/acestep-cpp/goacestepcpp.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
)
1212

1313
var (
14-
CppLoadModel func(lmModelPath, textEncoderPath, ditModelPath, vaeModelPath string) int
14+
CppLoadModel func(lmModelPath, textEncoderPath, ditModelPath, vaeModelPath string) int
1515
CppGenerateMusic func(caption, lyrics string, bpm int, keyscale, timesignature string, duration, temperature float32, instrumental bool, seed int, dst string, threads int) int
1616
)
1717

@@ -29,18 +29,18 @@ func (a *AceStepCpp) Load(opts *pb.ModelOptions) error {
2929
var textEncoderModel, ditModel, vaeModel string
3030

3131
for _, oo := range opts.Options {
32-
parts := strings.SplitN(oo, ":", 2)
33-
if len(parts) != 2 {
32+
key, value, found := strings.Cut(oo, ":")
33+
if !found {
3434
fmt.Fprintf(os.Stderr, "Unrecognized option: %v\n", oo)
3535
continue
3636
}
37-
switch parts[0] {
37+
switch key {
3838
case "text_encoder_model":
39-
textEncoderModel = parts[1]
39+
textEncoderModel = value
4040
case "dit_model":
41-
ditModel = parts[1]
41+
ditModel = value
4242
case "vae_model":
43-
vaeModel = parts[1]
43+
vaeModel = value
4444
default:
4545
fmt.Fprintf(os.Stderr, "Unrecognized option: %v\n", oo)
4646
}

backend/go/llm/llama/llama.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ type LLM struct {
1818
draftModel *llama.LLama
1919
}
2020

21-
2221
// Free releases GPU resources and frees the llama model
2322
// This should be called when the model is being unloaded to properly release VRAM
2423
func (llm *LLM) Free() error {

0 commit comments

Comments
 (0)