Skip to content
Snippets Groups Projects
Commit 5d5fa1db authored by Stephen D's avatar Stephen D
Browse files

wip

parent 14b056a8
No related branches found
No related tags found
No related merge requests found
CREATE TABLE nodes(
id INT NOT NULL,
name TEXT NOT NULL
);
CREATE TABLE edges(
from_id INT NOT NULL,
to_id INT NOT NULL
);
CREATE TABLE nodes_dupe(
id INT NOT NULL,
name TEXT NOT NULL
);
CREATE TABLE edges_dupe(
from_id INT NOT NULL,
to_id INT NOT NULL
);
CREATE TABLE graph_versions (
id SERIAL NOT NULL PRIMARY KEY,
version TEXT NOT NULL UNIQUE,
status INT NOT NULL DEFAULT 0
);
-- Add migration script here
CREATE TABLE dumps (
id SERIAL NOT NULL PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
status INT NOT NULL DEFAULT 0
);
CREATE TABLE nodes (
id SERIAL NOT NULL PRIMARY KEY,
name TEXT NOT NULL UNIQUE
);
CREATE INDEX nodes_text_idx ON nodes(name text_pattern_ops);
CREATE TABLE edges (
from_id INT NOT NULL,
to_id INT NOT NULL,
PRIMARY KEY (from_id, to_id)
);
CREATE INDEX edges_from_id_idx ON edges(from_id);
CREATE INDEX edges_to_id_idx ON edges(to_id);
use std::io::{Cursor, Read};
use std::{
collections::HashMap,
io::{Cursor, Read},
sync::Arc,
};
use anyhow::{bail, Context};
use commoncrawl_graph::measure_async;
use datadog_statsd::Client;
use flate2::read::GzDecoder;
use futures::{stream, StreamExt, TryStreamExt};
use reqwest::Url;
use serde::Deserialize;
use sqlx::{
......@@ -19,39 +24,63 @@ async fn main() -> anyhow::Result<()> {
.connect("postgres://localhost/commoncrawl_graph")
.await?;
let client = Client::new("127.0.0.1:8125", "commoncrawl_graph_import", None)?;
let paths = match get_latest_path(&pool).await? {
Some(x) => x,
None => {
eprintln!("Nothing to do - no newer graphs. Aborting.");
client.incr("already_up_to_date", &None);
return Ok(());
}
};
let client = Arc::new(Client::new(
"127.0.0.1:8125",
"commoncrawl_graph_import",
None,
)?);
let id = paths.id;
match run(&pool, &client, paths).await {
Ok(x) => {
query!("UPDATE graph_versions SET status = 2 WHERE id = $1", id)
.execute(&pool)
.await?;
get_dump_paths(&pool).await?;
return Ok(x);
}
while let Some(unprocessed) = query!(
r#"
WITH unprocessed AS (SELECT id FROM dumps WHERE status = 0 LIMIT 1)
UPDATE dumps d SET status = 1 FROM unprocessed u WHERE d.id = u.id RETURNING d.id, d.name
"#
)
.fetch_optional(&pool)
.await?
{
match run(&pool, client.clone(), &unprocessed.name).await {
Ok(_) => {
query!("UPDATE dumps SET status = 2 WHERE id = $1", unprocessed.id)
.execute(&pool)
.await?;
}
Err(e) => {
query!("UPDATE graph_versions SET status = 3 WHERE id = $1", id)
.execute(&pool)
.await?;
Err(e) => {
query!("UPDATE dumps SET status = 3 WHERE id = $1", unprocessed.id)
.execute(&pool)
.await?;
return Err(e);
return Err(e);
}
}
}
Ok(())
}
async fn run(pool: &Pool<Postgres>, client: &Client, paths: PathInfo) -> anyhow::Result<()> {
println!("Truncate tables");
async fn run(pool: &Pool<Postgres>, statsd: Arc<Client>, name: &str) -> anyhow::Result<()> {
println!("Retrieve nodes");
let node_id_mapping = measure_async(
&statsd,
"retrieve_nodes",
get_nodes(pool, statsd.clone(), name),
)
.await?;
println!("Retrieve edges");
measure_async(
&statsd,
"retrieve_edges",
get_edges(pool, statsd.clone(), name, node_id_mapping),
)
.await?;
/*println!("Truncate tables");
measure_async(client, "truncate_tables", truncate_tables(pool)).await?;
......@@ -74,7 +103,155 @@ async fn run(pool: &Pool<Postgres>, client: &Client, paths: PathInfo) -> anyhow:
println!("Create tables");
measure_async(client, "create_tables", create_tables(pool)).await?;
measure_async(client, "create_tables", create_tables(pool)).await?;*/
Ok(())
}
// Key = CommonCrawl domain ID
// Value = Postgres node ID
async fn get_nodes(
pool: &Pool<Postgres>,
statsd: Arc<Client>,
name: &str,
) -> anyhow::Result<HashMap<i32, i32>> {
let (tx, mut rx) = mpsc::channel::<String>(1);
let p = pool.clone();
let node_inserter: JoinHandle<anyhow::Result<HashMap<_, _>>> = tokio::task::spawn(async move {
let mut mapping = HashMap::new();
while let Some(part) = rx.recv().await {
let chunks = stream::iter(part.lines()).chunks(100_000);
let mappings: anyhow::Result<Vec<Vec<_>>> = chunks
.then(|chunk| async {
let (ids, names): (Vec<_>, Vec<_>) = chunk
.into_iter()
.map(|l| {
let mut l = l.split_whitespace();
let id: i32 = l.next().unwrap().parse().unwrap();
let name = l.next().unwrap();
(id, name.to_string())
})
.unzip();
query!(
"INSERT INTO nodes(name) SELECT UNNEST($1::text[]) ON CONFLICT DO NOTHING;",
&names
)
.execute(&p)
.await?;
let map = query!(
r#"
SELECT id AS pg_id, cc_id
FROM nodes
INNER JOIN (
SELECT UNNEST($1::int[]) AS cc_id, UNNEST($2::text[]) AS name
) og ON og.name = nodes.name
"#,
&ids,
&names
)
.fetch_all(&p)
.await?
.into_iter()
.map(|r| (r.cc_id.unwrap(), r.pg_id));
statsd.count("nodes.inserts", ids.len() as f64, &None);
Ok(map.collect::<Vec<_>>())
})
.try_collect()
.await;
let mappings = mappings?;
for m in mappings {
for (k, v) in m {
mapping.insert(k, v);
}
}
println!("Done insert");
}
Ok(mapping)
});
let url = Url::parse(&format!(
"https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-host-vertices.paths.gz",
name, name
))?;
let download_result = download_file_with_indirection(url, tx).await;
// check if this errors before checking if the download errored
let mapping = node_inserter.await??;
download_result?;
Ok(mapping)
}
async fn get_edges(
pool: &Pool<Postgres>,
statsd: Arc<Client>,
name: &str,
node_id_mapping: HashMap<i32, i32>,
) -> anyhow::Result<()> {
let (tx, mut rx) = mpsc::channel::<String>(1);
let p = pool.clone();
let node_inserter: JoinHandle<anyhow::Result<_>> = tokio::task::spawn(async move {
while let Some(part) = rx.recv().await {
let chunks = stream::iter(part.lines()).chunks(100_000);
let res: anyhow::Result<()> = chunks
.then(|chunk| async {
let (from_ids, to_ids): (Vec<i32>, Vec<i32>) = chunk
.into_iter()
.map(|l| {
let mut l = l.split_whitespace();
let from_id: i32 = l.next().unwrap().parse().unwrap();
let to_id: i32 = l.next().unwrap().parse().unwrap();
(
node_id_mapping.get(&from_id).unwrap(),
node_id_mapping.get(&to_id).unwrap(),
)
})
.unzip();
query!("INSERT INTO edges(from_id, to_id) SELECT UNNEST($1::INT[]), UNNEST($2::INT[]) ON CONFLICT DO NOTHING", &from_ids, &to_ids).execute(&p).await?;
statsd.count("edges.inserts", from_ids.len() as f64, &None);
Ok(())
})
.try_collect()
.await;
res?;
println!("Done insert");
}
Ok(())
});
let url = Url::parse(&format!(
"https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-hostedgesvertices.paths.gz",
name, name
))?;
let download_result = download_file_with_indirection(url, tx).await;
// check if this errors before checking if the download errored
node_inserter.await??;
download_result?;
Ok(())
}
......@@ -91,6 +268,24 @@ struct PathInfo {
edges: Url,
}
async fn get_dump_paths(pool: &Pool<Postgres>) -> anyhow::Result<()> {
const GRAPH_JSON_PATH: &str = "https://index.commoncrawl.org/graphinfo.json";
let text = reqwest::get(GRAPH_JSON_PATH).await?.text().await?;
let infos: Vec<GraphInfo> = serde_json::from_str(&text)?;
for info in infos {
query!(
"INSERT INTO dumps(name) VALUES ($1) ON CONFLICT DO NOTHING",
info.id
)
.execute(pool)
.await?;
}
Ok(())
}
async fn get_latest_path(pool: &Pool<Postgres>) -> anyhow::Result<Option<PathInfo>> {
const GRAPH_JSON_PATH: &str = "https://index.commoncrawl.org/graphinfo.json";
......@@ -98,37 +293,39 @@ async fn get_latest_path(pool: &Pool<Postgres>) -> anyhow::Result<Option<PathInf
let info: Vec<GraphInfo> = serde_json::from_str(&text)?;
let latest = &info.first().context("Missing graphs")?.id;
query!(
"INSERT INTO graph_versions(version) VALUES ($1)
ON CONFLICT DO NOTHING",
latest
)
.execute(pool)
.await?;
/*query!(
"INSERT INTO graph_versions(version) VALUES ($1)
ON CONFLICT DO NOTHING",
latest
)
.execute(pool)
.await?;
let res = query!(
"SELECT id, status FROM graph_versions WHERE version = $1",
latest
)
.fetch_one(pool)
.await?;
let res = query!(
"SELECT id, status FROM graph_versions WHERE version = $1",
latest
)
.fetch_one(pool)
.await?;
if res.status == 2 || res.status == 3 {
// finished or errored
return Ok(None);
}
if res.status == 2 || res.status == 3 {
// finished or errored
return Ok(None);
}
Ok(Some(PathInfo {
id: res.id,
nodes: Url::parse(&format!(
"https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-host-vertices.paths.gz",
latest, latest
))?,
edges: Url::parse(&format!(
"https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-host-edges.paths.gz",
latest, latest
))?,
Ok(Some(PathInfo {
id: res.id,
nodes: Url::parse(&format!(
"https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-host-vertices.paths.gz",
latest, latest
))?,
edges: Url::parse(&format!(
"https://data.commoncrawl.org/projects/hyperlinkgraph/{}/host/{}-host-edges.paths.gz",
latest, latest
))?,
}))
*/
todo!()
}
async fn download_file_with_indirection(path: Url, tx: mpsc::Sender<String>) -> anyhow::Result<()> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment