Skip to content
Snippets Groups Projects
Commit cdbe8ef8 authored by Stephen D's avatar Stephen D
Browse files

Merge branch 'failure_handling' into 'master'

Add failure handling - fixes #7

Closes #7

See merge request !7
parents 06d930a3 183ea0ed
No related branches found
No related tags found
1 merge request!7Add failure handling - fixes #7
......@@ -21,7 +21,6 @@ before_script:
test:
stage: test
script:
- cp ci.toml config.toml
- cargo fmt -- --check
- cargo clippy --all-targets --all-features -- -D warnings
- cargo test
......
database_url="postgres://postgres:postgres@postgres/sparkplug"
image_directory="public"
......@@ -3,6 +3,7 @@ image_directory = "/var/sparkplug/public"
key = "changeme"
server_name = "http://localhost/content/"
port = 3030
max_retries = 3
[[formats]]
name = "original"
......
-- Add migration script here
ALTER TABLE images
ADD COLUMN last_failure TIMESTAMPTZ,
ADD COLUMN failures INTEGER NOT NULL DEFAULT 0;
CREATE INDEX idx_failure_count ON images (failures);
CREATE INDEX idx_last_failure ON images (last_failure);
......@@ -79,16 +79,19 @@ async fn main() {
loop {
let pool = pool.clone();
let rows = match sqlx::query!("SELECT * FROM images WHERE NOT is_scraped LIMIT 1000")
.fetch_all(&pool)
.await
{
Ok(x) => x,
Err(e) => {
println!("Worker error: {}", e);
return;
}
};
let rows = sqlx::query!(
r#"
SELECT id, original_url FROM images
WHERE NOT is_scraped
AND failures < $1
AND (last_failure IS NULL OR last_failure < (NOW() - INTERVAL '1 day'))
LIMIT 1000
"#,
config.max_retries
)
.fetch_all(&pool)
.await
.expect("Database error");
if rows.is_empty() {
break;
......@@ -96,7 +99,7 @@ async fn main() {
let _: Vec<_> = futures::stream::iter(rows.into_iter().map(|row| (row, pool.clone())).map(
|(row, pool)| async move {
let extension = match process_file(
match process_file(
&row.original_url,
image_directory.to_string(),
row.id.to_string(),
......@@ -104,18 +107,22 @@ async fn main() {
)
.await
{
Some(x) => x,
None => return,
Some(extension) => {
sqlx::query!(
"UPDATE images SET is_scraped=true, extension=$2 WHERE id = $1",
&row.id,
extension
)
.execute(&pool)
.await
.expect("Could not update database");
}
None => {
sqlx::query!("UPDATE images SET failures=failures+1, last_failure=NOW() WHERE id = $1", &row.id)
.execute(&pool).await
.expect("Could not update database");
}
};
sqlx::query!(
"UPDATE images SET is_scraped=true, extension=$2 WHERE id = $1",
&row.id,
extension
)
.execute(&pool)
.await
.expect("Could not update database");
},
))
.buffer_unordered(100)
......
......@@ -9,6 +9,7 @@ struct TomlConfig {
pub key: String,
pub server_name: String,
pub port: u16,
pub max_retries: u16,
pub formats: Vec<Format>,
}
......@@ -27,6 +28,7 @@ pub struct Config {
pub key: String,
pub server_name: Url,
pub port: u16,
pub max_retries: i32,
pub formats: Vec<Format>,
}
......@@ -42,6 +44,7 @@ impl From<TomlConfig> for Config {
.parse()
.expect("Invalid server_name - Not a valid URL"),
port: config.port,
max_retries: config.max_retries as i32,
formats: config.formats,
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment