feat: parallelise at hole level (#44)

* feat: parallelise at hole level

* fix(ci): move strategy to testbed job

* feat: output json results file

* fix(ci): install jq

* fix(ci): add missing `runs-on`

* fix(ci): add dependency to testbed job

* fix(ci): invalid artifact key name

* fix(ci): add missing i in fastapi key

* feat(ci): make CI run different # of threads per repo

* fix(ci): results.json not in markdown

* feat: round output values

* fix: avoid creating zombie processes

* fix: check on word instead of line

* feat: recreate holes for long CI
This commit is contained in:
Luc Georges 2023-11-17 18:05:45 +01:00 committed by GitHub
parent 3ad64a32df
commit 6c4e0e4176
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 380 additions and 220 deletions

View file

@ -11,6 +11,22 @@ concurrency:
jobs:
testbed:
strategy:
matrix:
repo:
- { name: simple, key: simple, parallel: 8 }
- { name: mmaitre314/picklescan, key: picklescan, parallel: 8 }
- { name: huggingface/huggingface_hub, key: huggingface_hub, parallel: 8 }
- { name: tiangolo/fastapi, key: fastapi, parallel: 8 }
- { name: encode/starlette, key: starlette, parallel: 8 }
- { name: lancedb/lancedb, key: lancedb, parallel: 2 }
- { name: lancedb/lance, key: lance, parallel: 2 }
- { name: tkaitchuck/constrandom, key: constrandom, parallel: 8 }
- { name: jaemk/cached, key: cached, parallel: 4 }
- { name: smol-rs/async-executor, key: async-executor, parallel: 4 }
- { name: gcanti/io-ts, key: io-ts, parallel: 8 }
- { name: colinhacks/zod, key: zod, parallel: 8 }
- { name: helix-editor/helix, key: helix, parallel: 2 }
runs-on: [self-hosted, intel-cpu, 8-cpu, ci]
container:
image: ubuntu:22.04
@ -60,17 +76,118 @@ jobs:
run: cargo build -r
- name: Run testbed
run: cargo run --bin testbed -r -- --api-token $API_TOKEN -r `pwd`/crates/testbed/repositories-ci.yaml
run: 'cargo run --bin testbed -r -- --api-token $API_TOKEN -r `pwd`/crates/testbed/repositories-ci.yaml -f ${{ matrix.repo.name }} -p ${{ matrix.repo.parallel }}'
if: github.event_name == 'push' || github.event_name == 'pull_request'
env:
API_TOKEN: ${{ secrets.API_TOKEN }}
- name: Run testbed
run: cargo run --bin testbed -r -- --api-token $API_TOKEN
run: 'cargo run --bin testbed -r -- --api-token $API_TOKEN -f ${{ matrix.repo.name }} -p ${{ matrix.repo.parallel }}'
if: github.event_name == 'workflow_dispatch'
env:
API_TOKEN: ${{ secrets.API_TOKEN }}
- name: Upload artifacts
uses: actions/upload-artifact@v1
with:
name: results-${{ matrix.repo.key }}
path: ./results.json
comment_results:
needs: [testbed]
runs-on: [self-hosted, intel-cpu, 8-cpu, ci]
container:
image: ubuntu:22.04
steps:
- name: Install dependencies
run: |
apt update
apt install -y jq
- uses: actions/download-artifact@v1
with:
name: results-simple
path: results-simple
- uses: actions/download-artifact@v1
with:
name: results-picklescan
path: results-picklescan
- uses: actions/download-artifact@v1
with:
name: results-huggingface_hub
path: results-huggingface_hub
- uses: actions/download-artifact@v1
with:
name: results-fastapi
path: results-fastapi
- uses: actions/download-artifact@v1
with:
name: results-starlette
path: results-starlette
- uses: actions/download-artifact@v1
with:
name: results-lancedb
path: results-lancedb
- uses: actions/download-artifact@v1
with:
name: results-lance
path: results-lance
- uses: actions/download-artifact@v1
with:
name: results-constrandom
path: results-constrandom
- uses: actions/download-artifact@v1
with:
name: results-cached
path: results-cached
- uses: actions/download-artifact@v1
with:
name: results-async-executor
path: results-async-executor
- uses: actions/download-artifact@v1
with:
name: results-io-ts
path: results-io-ts
- uses: actions/download-artifact@v1
with:
name: results-zod
path: results-zod
- uses: actions/download-artifact@v1
with:
name: results-helix
path: results-helix
- name: Display structure of downloaded files
run: ls -R
- name: output to markdown
run: |
cat > results.md <<EOF
| Repository name | Source type | Average hole completion time (s) | Pass percentage |
| :-------------- | :---------- | -------------------------------: | --------------: |
EOF
cat **/results.json | jq -r '"| \(.[0].repo_name) | \(.[0].source_type) | \(.[0].avg_hole_completion_time_ms) | \(.[0].pass_percentage)% |"' >> results.md
cat >> results.md <<EOF
**Note:** The "hole completion time" represents the full process of:
- copying files from the setup cache directory
- replacing the code from the file with a completion from the model
- building the project
- running the tests
EOF
- name: Find Comment
uses: peter-evans/find-comment@v2
id: fc

2
.gitignore vendored
View file

@ -3,5 +3,5 @@ dist/
target/
.DS_Store
__pycache__/
results.md
results.json
.pytest_cache/

View file

@ -1,7 +1,6 @@
use ropey::Rope;
use tower_lsp::jsonrpc::Result;
use tower_lsp::lsp_types::Range;
use tracing::info;
use tree_sitter::{InputEdit, Parser, Point, Tree};
use crate::language_id::LanguageId;

View file

@ -11,9 +11,10 @@ Here is a simplified pseudo code algorithm for testbed:
read the repositories file
read the holes file(s)
for each repository
spawn a thread
setup the repository
for each hole
spawn a thread
setup the repository -- only once for each repository
copy files from the setup cache to a new temp dir
make the hole as specified by the file
generate completions
build the code
@ -37,7 +38,7 @@ Before running testbed, you will need to generate a holes file for each reposito
### Setup
testbed runs completions for each repository in parallel. It will first create a temporary directory, then copy or download the repository's source files to that location and finally run the setup commands.
testbed runs hole completions in parallel. It will first, and only once per repository, create a temporary directory, then copy or download the repository's source files to that location and finally run the setup commands. Then for each subsequent completion it will copy the content of the "setup directory" to a new temporary directory so that work can be parallelised.
Setup commands are useful to install dependencies.
@ -60,9 +61,11 @@ build_args: ["-m", "compileall", "-q", "."]
### Runners
testbed supports two test runners at the moment:
testbed supports four test runners:
- cargo
- jest
- pytest
- vitest
To configure your runner, you have the following options:
```yaml

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -12,7 +12,7 @@ use anyhow::anyhow;
use clap::Parser;
use futures_util::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use lang::Language;
use lsp_client::{client::LspClient, msg::RequestId, server::Server};
use lsp_client::{client::LspClient, error::ExtractError, msg::RequestId, server::Server};
use lsp_types::{
DidOpenTextDocumentParams, InitializeParams, TextDocumentIdentifier, TextDocumentItem,
TextDocumentPositionParams,
@ -25,7 +25,7 @@ use tokio::{
fs::{self, read_to_string, File, OpenOptions},
io::{self, AsyncReadExt, AsyncWriteExt},
process::Command,
sync::{RwLock, Semaphore},
sync::{OnceCell, RwLock, Semaphore},
};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, error, info, info_span, warn, Instrument};
@ -75,6 +75,10 @@ struct Args {
#[arg(short, long)]
llm_ls_bin_path: Option<String>,
/// Concurrent hole completions number
#[arg(short, long, default_value_t = 8)]
parallel_hole_completions: usize,
/// Path to the local repositories/ directory
#[arg(short = 'R', long)]
repos_dir_path: Option<String>,
@ -227,6 +231,52 @@ impl HoleCompletionResult {
}
}
struct SetupCache {
cache: HashMap<String, OnceCell<(TempDir, PathBuf)>>,
}
impl SetupCache {
fn new(repositories: &Vec<Repository>) -> Self {
let mut cache = HashMap::new();
for repo in repositories {
cache.insert(repo.name(), OnceCell::new());
}
Self { cache }
}
async fn get_setup_cache(
&self,
repos_dir_path: PathBuf,
repo: Repository,
) -> anyhow::Result<&(TempDir, PathBuf)> {
self.cache
.get(&repo.name())
.ok_or(anyhow!(
"failed to find setup cache for repo {}",
repo.name()
))?
.get_or_try_init(|| async move {
let (temp_dir, repo_path) = setup_repo_dir(&repos_dir_path, &repo.source).await?;
if let Some(commands) = &repo.setup_commands {
run_setup(commands, &repo.env, &repo_path).await?;
}
Ok((temp_dir, repo_path))
})
.await
}
async fn create_cache_copy(
&self,
repos_dir_path: PathBuf,
repo: Repository,
) -> anyhow::Result<TempDir> {
let (_cached_dir, path_in_dir) = self.get_setup_cache(repos_dir_path, repo).await?;
let temp_dir = TempDir::new()?;
copy_dir_contents(path_in_dir, temp_dir.path()).await?;
Ok(temp_dir)
}
}
async fn get_api_token(args_token: Option<String>) -> anyhow::Result<Option<String>> {
if args_token.is_some() {
Ok(args_token)
@ -271,6 +321,11 @@ async fn download_repo_from_github(
}
async fn copy_dir_contents(source: &Path, dest: &Path) -> anyhow::Result<()> {
debug!(
"copying files from {} to {}",
source.to_str().unwrap(),
dest.to_str().unwrap()
);
let mut stack = VecDeque::new();
stack.push_back((source.to_path_buf(), dest.to_path_buf()));
while let Some((src, dst)) = stack.pop_back() {
@ -286,6 +341,9 @@ async fn copy_dir_contents(source: &Path, dest: &Path) -> anyhow::Result<()> {
stack.push_back((src_path, dst_path));
} else if entry_type.is_file() {
fs::copy(&src_path, &dst_path).await?;
} else if entry_type.is_symlink() {
let link_target = fs::read_link(&src_path).await?;
fs::symlink(link_target, dst_path.clone()).await?;
}
}
}
@ -339,7 +397,11 @@ async fn run_setup(
for (name, value) in &parsed_env {
status_cmd.env(name, value);
}
debug!("running setup command: {} {:?}", command.0, command.1);
debug!(
"running setup command: {} {}",
command.0,
command.1.join(" ")
);
let status = status_cmd
.args(&command.1)
.current_dir(&repo_path)
@ -385,31 +447,18 @@ async fn build(
#[allow(clippy::too_many_arguments)]
async fn complete_holes(
hole: Hole,
repo: Repository,
client: Arc<LspClient>,
file_cache: Arc<RwLock<HashMap<PathBuf, Rope>>>,
holes_dir_path: PathBuf,
repos_dir_path: PathBuf,
repos_config: RepositoriesConfig,
api_token: Option<String>,
semaphore: Arc<Semaphore>,
) -> anyhow::Result<Vec<HoleCompletionResult>> {
setup_cache: Arc<SetupCache>,
) -> anyhow::Result<HoleCompletionResult> {
let permit = semaphore.acquire_owned().await?;
let span = info_span!("complete_hole", repo_name = repo.name());
async move {
let holes_file_path = holes_dir_path.join(&repo.holes_file);
let mut holes = String::new();
File::open(holes_file_path)
.await?
.read_to_string(&mut holes)
.await?;
let holes: Vec<Hole> = serde_json::from_str(&holes)?;
let ten_percent = if holes.len() >= 10 {
holes.len() / 10
} else {
1
};
info!("running {} hole completions", holes.len());
let RepositoriesConfig {
context_window,
fim,
@ -420,12 +469,11 @@ async fn complete_holes(
tokens_to_clear,
..
} = repos_config;
let (_temp_dir, repo_path) = setup_repo_dir(&repos_dir_path, &repo.source).await?;
if let Some(commands) = &repo.setup_commands {
run_setup(commands, &repo.env, &repo_path).await?;
}
let mut hole_completions_result = Vec::with_capacity(holes.len());
for (idx, hole) in holes.iter().enumerate() {
async move {
let tmp_dir = setup_cache
.create_cache_copy(repos_dir_path, repo.clone())
.await?;
let repo_path = tmp_dir.path();
let hole_instant = Instant::now();
let file_path = repo_path.join(&hole.file);
let file_path_str = file_path
@ -447,14 +495,14 @@ async fn complete_holes(
file_content
};
let original_content = file_content.clone();
let hole_start = file_content.line_to_char(hole.cursor.line as usize)
+ hole.cursor.character as usize;
let hole_start =
file_content.line_to_char(hole.cursor.line as usize) + hole.cursor.character as usize;
let hole_end = hole_start
+ file_content
.line(hole.cursor.line as usize)
.slice(hole.cursor.character as usize..)
.len_chars()
- 1; // NOTE: -1 to preserve the trailing `\n`
- 1;
file_content.remove(hole_start..hole_end);
let uri = Url::parse(&format!("file:/{file_path_str}"))?;
@ -485,13 +533,7 @@ async fn complete_holes(
tokenizer_config: tokenizer_config.clone(),
})
.await?;
let (_, result): (RequestId, GetCompletionsResult) = match response.extract() {
Ok(res) => res,
Err(err) => {
error!("llm-ls response error: {err}");
continue;
}
};
let (_, result): (RequestId, GetCompletionsResult) = response.extract()?;
file_content.insert(hole_start, &result.completions[0].generated_text);
let mut file = OpenOptions::new()
@ -508,19 +550,19 @@ async fn complete_holes(
&repo.runner_args,
&mut repo.runner_extra_args.clone(),
&repo.env,
&repo_path,
repo_path,
)
.await?
} else {
0f32
};
debug!("{} passed {}%", hole.to_string(), test_percentage * 100f32);
hole_completions_result.push(HoleCompletionResult::new(
let hole_completions_result = HoleCompletionResult::new(
repo.name(),
repo.source.source_type(),
test_percentage,
hole_instant.elapsed().as_millis(),
));
);
let mut file = OpenOptions::new()
.write(true)
.truncate(true)
@ -528,12 +570,7 @@ async fn complete_holes(
.await?;
file.write_all(original_content.to_string().as_bytes())
.await?;
if (idx + 1) % ten_percent == 0 {
info!("completed {}%", (idx + 1) / ten_percent * 10);
}
}
drop(permit);
info!("finished running hole completions");
Ok(hole_completions_result)
}
.instrument(span)
@ -616,39 +653,57 @@ async fn main() -> anyhow::Result<()> {
let mut passing_tests_percentage = vec![];
let repositories = repos_config.repositories.clone();
let setup_cache = Arc::new(SetupCache::new(&repositories));
let mut handles = FuturesUnordered::new();
// Query the model by batches of 64
let semaphore = Arc::new(Semaphore::new(8));
let semaphore = Arc::new(Semaphore::new(args.parallel_hole_completions));
for repo in repositories {
if filter_repos && !filter_list.contains(&repo.name()) {
continue;
}
let holes_file_path = holes_dir_path.join(&repo.holes_file);
let mut holes = String::new();
File::open(holes_file_path)
.await?
.read_to_string(&mut holes)
.await?;
let holes: Vec<Hole> = serde_json::from_str(&holes)?;
info!("running {} hole completions", holes.len());
for hole in holes {
let repo = repo.clone();
let client = client.clone();
let file_cache = file_cache.clone();
let holes_dir_path = holes_dir_path.clone();
let repos_dir_path = repos_dir_path.clone();
let repos_config = repos_config.clone();
let api_token = api_token.clone();
let semaphore = semaphore.clone();
let setup_cache = setup_cache.clone();
handles.push(tokio::spawn(async move {
complete_holes(
hole,
repo,
client,
file_cache,
holes_dir_path,
repos_dir_path,
repos_config,
api_token,
semaphore,
setup_cache,
)
.await
}));
}
}
while let Some(res) = handles.next().await {
match res {
Ok(Ok(res)) => passing_tests_percentage.extend(res),
Ok(Err(err)) => return Err(err),
Ok(Ok(res)) => passing_tests_percentage.push(res),
Ok(Err(err)) => {
if let Some(extract_err) = err.downcast_ref::<ExtractError>() {
error!("llm-ls response error: {extract_err}");
} else {
return Err(err);
}
}
Err(err) => return Err(err.into()),
}
}
@ -663,51 +718,33 @@ async fn main() -> anyhow::Result<()> {
})
.or_insert((res.completion_time_ms, res.pass_percentage, 1f32));
}
let mut results_table =
"| Repository name | Source type | Average hole completion time (s) | Pass percentage |\n| :-------------- | :---------- | -------------------------------: | --------------: |\n".to_owned();
let mut total_time = 0;
let mut total_percentage = 0f32;
let mut total_count = 0f32;
for (k, v) in results_map.iter() {
let avg = v.1 / v.2;
let avg_time = v.0 as f32 / v.2;
results_table.push_str(&format!(
"| {} | {} | {} | {}% |\n",
k.0,
k.1,
avg_time / 1_000f32,
avg * 100f32
));
total_percentage += v.1;
total_count += v.2;
total_time += v.0;
}
let total_avg = total_percentage / total_count;
let total_time_avg = total_time as f32 / total_count;
results_table.push_str(&format!(
"| **Total** | -- | {} | {}% |\n\n",
total_time_avg / 1_000f32,
total_avg * 100f32
));
results_table.push_str(
&[
"**Note:** The \"hole completion time\" represents the full process of:",
" - replacing the code from the file with a completion from the model",
" - building the project",
" - running the tests",
]
.join("\n"),
let json_result = results_map
.iter()
.map(|(k, v)| {
let avg_hole_completion_time_ms = v.0 as f32 / v.2 / 1_000f32;
let pass_percentage = v.1 / v.2 * 100f32;
info!(
"{} from {} obtained {:.2}% in {:.3}s",
k.0, k.1, pass_percentage, avg_hole_completion_time_ms
);
info!("llm-ls results:\n{}", results_table);
serde_json::json!({
"repo_name": k.0,
"source_type": k.1,
"avg_hole_completion_time_ms": format!("{:.3}", avg_hole_completion_time_ms),
"pass_percentage": format!("{:.2}", pass_percentage),
})
})
.collect::<Vec<serde_json::Value>>();
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open("results.md")
.open("results.json")
.await?
.write_all(results_table.as_bytes())
.write_all(serde_json::to_string(&json_result)?.as_bytes())
.await?;
info!("all tests were run, exiting");
client.shutdown().await?;
match Arc::into_inner(client) {
Some(client) => client.exit().await,

View file

@ -52,6 +52,7 @@ async fn pytest_runner(
.ok_or(anyhow!("failed to take stdout"))?
.read_to_string(&mut stdout)
.await?;
child.wait().await?;
// XXX: the pytest command can still fail even after the compilation check
// the above check should prevent an error, but better safe than sorry
@ -124,6 +125,7 @@ async fn cargo_runner(
.ok_or(anyhow!("failed to take stdout"))?
.read_to_string(&mut stdout)
.await?;
child.wait().await?;
let lines = stdout.split_terminator('\n');
let mut passed = 0;
let mut failed = 0;
@ -173,6 +175,7 @@ async fn jest_runner(
.ok_or(anyhow!("failed to take stderr"))?
.read_to_string(&mut stderr)
.await?;
child.wait().await?;
let lines = stderr.split_terminator('\n');
let mut passed = 0f32;
let mut failed = 0f32;
@ -183,7 +186,7 @@ async fn jest_runner(
for word in words {
if word.contains("passed") {
passed = prev.parse::<u32>()? as f32;
} else if line.contains("failed") {
} else if word.contains("failed") {
failed = prev.parse::<u32>()? as f32;
}
prev = word;
@ -228,6 +231,7 @@ async fn vitest_runner(
.ok_or(anyhow!("failed to take stdout"))?
.read_to_string(&mut stdout)
.await?;
child.wait().await?;
let lines = stdout.split_terminator('\n');
let mut passed = 0f32;
let mut failed = 0f32;
@ -238,7 +242,7 @@ async fn vitest_runner(
for word in words {
if word.contains("passed") {
passed = prev.parse::<u32>()? as f32;
} else if line.contains("failed") {
} else if word.contains("failed") {
failed = prev.parse::<u32>()? as f32;
}
prev = word;