Skip to content
Snippets Groups Projects
import.rs 12.3 KiB
Newer Older
Stephen D's avatar
Stephen D committed
use std::{
    collections::HashMap,
    io::{Cursor, Read},
    sync::Arc,
Stephen D's avatar
Stephen D committed
    time::Duration,
Stephen D's avatar
Stephen D committed
};
Stephen D's avatar
Stephen D committed
use anyhow::{bail, Context};
Stephen D's avatar
Stephen D committed
use commoncrawl_graph::measure_async;
Stephen D's avatar
Stephen D committed
use datadog_statsd::Client;
use flate2::read::GzDecoder;
Stephen D's avatar
Stephen D committed
use futures::{stream, StreamExt, TryStreamExt};
use reqwest::Url;
Stephen D's avatar
Stephen D committed
use serde::Deserialize;
Stephen D's avatar
Stephen D committed
use sqlx::{
    postgres::{PgPoolCopyExt, PgPoolOptions},
    query, Pool, Postgres,
};
use tokio::{sync::mpsc, task::JoinHandle};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Stephen D's avatar
Stephen D committed
    let pool = PgPoolOptions::new()
        .max_connections(48)
        .connect("postgres://localhost/commoncrawl_graph")
        .await?;
Stephen D's avatar
Stephen D committed
    let client = Arc::new(Client::new(
        "127.0.0.1:8125",
        "commoncrawl_graph_import",
        None,
    )?);
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
    get_dump_paths(&pool).await?;
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
    while let Some(unprocessed) = query!(
        r#"
WITH unprocessed AS (SELECT id FROM dumps WHERE status = 0 LIMIT 1)
UPDATE dumps d SET status = 1 FROM unprocessed u WHERE d.id = u.id RETURNING d.id, d.name
"#
    )
    .fetch_optional(&pool)
    .await?
    {
        match run(&pool, client.clone(), &unprocessed.name).await {
            Ok(_) => {
                query!("UPDATE dumps SET status = 2 WHERE id = $1", unprocessed.id)
                    .execute(&pool)
                    .await?;
            }
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
            Err(e) => {
                query!("UPDATE dumps SET status = 3 WHERE id = $1", unprocessed.id)
                    .execute(&pool)
                    .await?;
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
                return Err(e);
            }
Stephen D's avatar
Stephen D committed
        }
    }
Stephen D's avatar
Stephen D committed

    Ok(())
Stephen D's avatar
Stephen D committed
}
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
async fn run(pool: &Pool<Postgres>, statsd: Arc<Client>, name: &str) -> anyhow::Result<()> {
    println!("Retrieve nodes");

    let node_id_mapping = measure_async(
        &statsd,
        "retrieve_nodes",
        get_nodes(pool, statsd.clone(), name),
    )
    .await?;

    println!("Retrieve edges");

    measure_async(
        &statsd,
        "retrieve_edges",
        get_edges(pool, statsd.clone(), name, node_id_mapping),
    )
    .await?;

    /*println!("Truncate tables");
Stephen D's avatar
Stephen D committed
    measure_async(client, "truncate_tables", truncate_tables(pool)).await?;
Stephen D's avatar
Stephen D committed

    println!("Populate tables");

Stephen D's avatar
Stephen D committed
    measure_async(
Stephen D's avatar
Stephen D committed
        client,
Stephen D's avatar
Stephen D committed
        "populate_tables",
Stephen D's avatar
Stephen D committed
        populate_tables(pool, paths.nodes, paths.edges),
Stephen D's avatar
Stephen D committed
    )
    .await?;
Stephen D's avatar
Stephen D committed

    println!("Create indexes");
Stephen D's avatar
Stephen D committed
    measure_async(client, "create_indexes", create_indexes(pool)).await?;
Stephen D's avatar
Stephen D committed
    println!("Swap tables");

Stephen D's avatar
Stephen D committed
    measure_async(client, "swap_tables", swap_tables(pool)).await?;
Stephen D's avatar
Stephen D committed

    println!("Create tables");

Stephen D's avatar
Stephen D committed
    measure_async(client, "create_tables", create_tables(pool)).await?;*/

    Ok(())
}

// Key = CommonCrawl domain ID
// Value = Postgres node ID
async fn get_nodes(
    pool: &Pool<Postgres>,
    statsd: Arc<Client>,
    name: &str,
) -> anyhow::Result<HashMap<i32, i32>> {
    let (tx, mut rx) = mpsc::channel::<String>(1);
    let p = pool.clone();
    let node_inserter: JoinHandle<anyhow::Result<HashMap<_, _>>> = tokio::task::spawn(async move {
        let mut mapping = HashMap::new();

        while let Some(part) = rx.recv().await {
            let chunks = stream::iter(part.lines()).chunks(100_000);

            let mappings: anyhow::Result<Vec<Vec<_>>> = chunks
                .then(|chunk| async {
                    let (ids, names): (Vec<_>, Vec<_>) = chunk
                        .into_iter()
                        .map(|l| {
                            let mut l = l.split_whitespace();

                            let id: i32 = l.next().unwrap().parse().unwrap();
                            let name = l.next().unwrap();

                            (id, name.to_string())
                        })
                        .unzip();

                    query!(
                        r#"
WITH unique_names AS (
    SELECT UNNEST($1::text[]) AS name
)
INSERT INTO nodes(name)
SELECT un.name
FROM unique_names un
WHERE NOT EXISTS (
    SELECT 1
    FROM nodes n
    WHERE n.name = un.name
)
ON CONFLICT DO NOTHING;
"#,
Stephen D's avatar
Stephen D committed
                        &names
                    )
                    .execute(&p)
                    .await?;

                    let map = query!(
                        r#"
SELECT id AS pg_id, cc_id
FROM nodes
INNER JOIN (
    SELECT UNNEST($1::int[]) AS cc_id, UNNEST($2::text[]) AS name
) og ON og.name = nodes.name
"#,
                        &ids,
                        &names
                    )
                    .fetch_all(&p)
                    .await?
                    .into_iter()
                    .map(|r| (r.cc_id.unwrap(), r.pg_id));

                    statsd.count("nodes.inserts", ids.len() as f64, &None);

                    Ok(map.collect::<Vec<_>>())
                })
                .try_collect()
                .await;

            let mappings = mappings?;
            for m in mappings {
                for (k, v) in m {
                    mapping.insert(k, v);
                }
            }

            println!("Done insert");
        }

        Ok(mapping)
    });

    let url = Url::parse(&format!(
        "https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-host-vertices.paths.gz",
        name, name
    ))?;

    let download_result = download_file_with_indirection(url, tx).await;

    // check if this errors before checking if the download errored
    let mapping = node_inserter.await??;

    download_result?;

    Ok(mapping)
}

async fn get_edges(
    pool: &Pool<Postgres>,
    statsd: Arc<Client>,
    name: &str,
    node_id_mapping: HashMap<i32, i32>,
) -> anyhow::Result<()> {
    let (tx, mut rx) = mpsc::channel::<String>(1);
    let p = pool.clone();
    let node_inserter: JoinHandle<anyhow::Result<_>> = tokio::task::spawn(async move {
        while let Some(part) = rx.recv().await {
            let chunks = stream::iter(part.lines()).chunks(100_000);

            let res: anyhow::Result<()> = chunks
                .then(|chunk| async {
                    let (from_ids, to_ids): (Vec<i32>, Vec<i32>) = chunk
                        .into_iter()
                        .map(|l| {
                            let mut l = l.split_whitespace();

                            let from_id: i32 = l.next().unwrap().parse().unwrap();
                            let to_id: i32 = l.next().unwrap().parse().unwrap();

                            (
                                node_id_mapping.get(&from_id).unwrap(),
                                node_id_mapping.get(&to_id).unwrap(),
                            )
                        })
                        .unzip();

                    query!(
                        r#"
WITH unique_edges AS (
    SELECT UNNEST($1::INT[]) AS from_id, UNNEST($2::INT[]) AS to_id
)
INSERT INTO edges(from_id, to_id)
SELECT ue.from_id, ue.to_id
FROM unique_edges ue
WHERE NOT EXISTS (
    SELECT 1
    FROM edges e
    WHERE e.from_id = ue.from_id
        AND e.to_id = ue.to_id
)
ON CONFLICT DO NOTHING
"#,
                        &from_ids,
                        &to_ids
                    )
                    .execute(&p)
                    .await?;
Stephen D's avatar
Stephen D committed

                    statsd.count("edges.inserts", from_ids.len() as f64, &None);

                    Ok(())
                })
                .try_collect()
                .await;

            res?;

            println!("Done insert");
        }

        Ok(())
    });

    let url = Url::parse(&format!(
Stephen D's avatar
Stephen D committed
        "https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-host-edges.paths.gz",
Stephen D's avatar
Stephen D committed
        name, name
    ))?;

    let download_result = download_file_with_indirection(url, tx).await;

    // check if this errors before checking if the download errored
    node_inserter.await??;

    download_result?;
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
#[derive(Deserialize, Debug)]
struct GraphInfo {
    id: String,
}

#[derive(Debug)]
struct PathInfo {
    id: i32,
    nodes: Url,
    edges: Url,
}

Stephen D's avatar
Stephen D committed
async fn get_dump_paths(pool: &Pool<Postgres>) -> anyhow::Result<()> {
    const GRAPH_JSON_PATH: &str = "https://index.commoncrawl.org/graphinfo.json";

    let text = reqwest::get(GRAPH_JSON_PATH).await?.text().await?;
    let infos: Vec<GraphInfo> = serde_json::from_str(&text)?;

    for info in infos {
        query!(
            "INSERT INTO dumps(name) VALUES ($1) ON CONFLICT DO NOTHING",
            info.id
        )
        .execute(pool)
        .await?;
    }

    Ok(())
}

Stephen D's avatar
Stephen D committed
async fn get_latest_path(pool: &Pool<Postgres>) -> anyhow::Result<Option<PathInfo>> {
    const GRAPH_JSON_PATH: &str = "https://index.commoncrawl.org/graphinfo.json";

    let text = reqwest::get(GRAPH_JSON_PATH).await?.text().await?;
    let info: Vec<GraphInfo> = serde_json::from_str(&text)?;
    let latest = &info.first().context("Missing graphs")?.id;

Stephen D's avatar
Stephen D committed
    /*query!(
            "INSERT INTO graph_versions(version) VALUES ($1)
        ON CONFLICT DO NOTHING",
            latest
        )
        .execute(pool)
        .await?;
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
        let res = query!(
            "SELECT id, status FROM graph_versions WHERE version = $1",
            latest
        )
        .fetch_one(pool)
        .await?;
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
        if res.status == 2 || res.status == 3 {
            // finished or errored
            return Ok(None);
        }
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
        Ok(Some(PathInfo {
            id: res.id,
            nodes: Url::parse(&format!(
                "https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-host-vertices.paths.gz",
                latest, latest
            ))?,
            edges: Url::parse(&format!(
                "https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-host-edges.paths.gz",
                latest, latest
            ))?,
Stephen D's avatar
Stephen D committed
    }))
Stephen D's avatar
Stephen D committed
         */
    todo!()
Stephen D's avatar
Stephen D committed
}

Stephen D's avatar
Stephen D committed
async fn download_file_with_indirection(path: Url, tx: mpsc::Sender<String>) -> anyhow::Result<()> {
    let paths = reqwest::get(path.clone()).await?.bytes().await?;
    let mut buf = String::new();
    GzDecoder::new(&*paths).read_to_string(&mut buf)?;

    for line in buf.lines() {
        let mut path = path.clone();
        path.set_path(line);

Stephen D's avatar
Stephen D committed
        let mut part = None;
Stephen D's avatar
Stephen D committed
        for i in 0.. {
Stephen D's avatar
Stephen D committed
            match download_one_file(path.clone()).await {
                Ok(x) => {
                    part = Some(x);
                    break;
                }
                Err(e) => {
                    eprintln!("Error downloading {path}: {e} [Attempt {i}]");
Stephen D's avatar
Stephen D committed
                    tokio::time::sleep(Duration::from_secs(2)).await;
Stephen D's avatar
Stephen D committed
                }
Stephen D's avatar
Stephen D committed
            }
        }
Stephen D's avatar
Stephen D committed
        let Some(part) = part else {
            bail!("Could not download {}", path);
        };

        println!("run");
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
        tx.send(part).await?;
    }

    Ok(())
}

async fn download_one_file(path: Url) -> anyhow::Result<String> {
    let compressed = reqwest::get(path).await?.bytes().await?;
    let mut buf = String::new();
    GzDecoder::new(&*compressed).read_to_string(&mut buf)?;

    Ok(buf)
}

Stephen D's avatar
Stephen D committed
async fn truncate_tables(pool: &Pool<Postgres>) -> anyhow::Result<()> {
    query!("TRUNCATE TABLE nodes_dupe").execute(pool).await?;
    query!("TRUNCATE TABLE edges_dupe").execute(pool).await?;
Stephen D's avatar
Stephen D committed
    Ok(())
}
Stephen D's avatar
Stephen D committed
async fn populate_tables(
    pool: &Pool<Postgres>,
    vertices_path: Url,
    edges_path: Url,
) -> anyhow::Result<()> {
Stephen D's avatar
Stephen D committed
    let (tx, mut rx) = mpsc::channel::<String>(1);
    let p = pool.clone();
    let node_inserter: JoinHandle<anyhow::Result<()>> = tokio::task::spawn(async move {
        while let Some(part) = rx.recv().await {
            println!("Start insert of {} bytes", part.bytes().len());

            let mut cp = p
                .copy_in_raw("COPY nodes_dupe FROM STDIN (FORMAT TEXT)")
                .await?;
            cp.read_from(Cursor::new(part)).await?;
            cp.finish().await?;
Stephen D's avatar
Stephen D committed
            println!("Done insert");
Stephen D's avatar
Stephen D committed
        }
Stephen D's avatar
Stephen D committed
        Ok(())
    });

    // nodes
    download_file_with_indirection(vertices_path, tx).await?;

    node_inserter.await??;

    let (tx, mut rx) = mpsc::channel::<String>(1);
    let p = pool.clone();
    let edge_inserter: JoinHandle<anyhow::Result<()>> = tokio::task::spawn(async move {
        while let Some(part) = rx.recv().await {
            println!("Start insert of {} bytes", part.bytes().len());

            let mut cp = p
                .copy_in_raw("COPY edges_dupe FROM STDIN (FORMAT TEXT)")
                .await?;
            cp.read_from(Cursor::new(part)).await.unwrap();
            cp.finish().await?;
Stephen D's avatar
Stephen D committed

            println!("Done insert");
Stephen D's avatar
Stephen D committed
        }
Stephen D's avatar
Stephen D committed

Stephen D's avatar
Stephen D committed
        Ok(())
    });

    // edges
    download_file_with_indirection(edges_path, tx).await?;

    edge_inserter.await??;