Skip to content
Snippets Groups Projects
Commit 4185e32a authored by Stephen D's avatar Stephen D
Browse files

revert to postgres

parent f812a353
No related branches found
No related tags found
1 merge request!1Postgres
use std::{env, io::Read};
use std::{env, future::Future, io::Read, str::Lines};
use flate2::read::GzDecoder;
use futures::future::select_all;
use itertools::{Chunk, Itertools};
use reqwest::Url;
use tokio::{
fs::File,
io::{AsyncWriteExt, BufWriter},
sync::mpsc::{channel, Receiver, Sender},
};
use sqlx::{postgres::PgPoolOptions, query, Pool, Postgres};
use tokio::task::JoinError;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
todo!(); // putting this here so I don't run this accidentally
let args: Vec<_> = env::args().collect();
let vertices_path = Url::parse(&args[1])?;
let edges_path = Url::parse(&args[2])?;
// nodes
let (tx, rx) = channel(1);
tokio::task::spawn(async {
download_file_with_indirection(vertices_path, tx)
.await
.unwrap();
});
let pool = PgPoolOptions::new()
.max_connections(48)
.connect("postgres://localhost/commoncrawl_graph")
.await?;
save_nodes(rx).await?;
println!("Create tables");
// edges
let (tx, rx) = channel(1);
tokio::task::spawn(async {
download_file_with_indirection(edges_path, tx)
.await
.unwrap();
});
//create_tables(&pool).await?;
println!("Populate tables");
populate_tables(&pool, vertices_path, edges_path).await?;
println!("Create indexes");
save_edges(rx).await?;
create_indexes(&pool).await?;
Ok(())
}
async fn download_file_with_indirection(path: Url, sender: Sender<String>) -> anyhow::Result<()> {
async fn download_file_with_indirection<
F: Fn(Vec<String>) -> Fut,
Fut: Future<Output = Result<(), JoinError>> + Unpin,
>(
path: Url,
part_handler: F,
) -> anyhow::Result<()> {
let paths = reqwest::get(path.clone()).await?.bytes().await?;
let mut buf = String::new();
GzDecoder::new(&*paths).read_to_string(&mut buf)?;
let mut joins = vec![];
for line in buf.lines() {
let mut path = path.clone();
path.set_path(line);
let part = download_one_file(path).await.unwrap();
sender.send(part).await?;
let chunks: Vec<_> = part
.lines()
.chunks(10_000_000)
.into_iter()
.map(|x| x.map(|y| y.to_string()).collect())
.collect();
for chunk in chunks {
println!("run");
joins.push(part_handler(chunk));
// only have a certain number of joins running at a time
while joins.len() > 0 {
joins = select_all(joins).await.2;
}
}
}
for x in joins {
x.await?;
}
Ok(())
......@@ -65,103 +86,118 @@ async fn download_one_file(path: Url) -> anyhow::Result<String> {
Ok(buf)
}
async fn save_nodes(mut rx: Receiver<String>) -> anyhow::Result<()> {
let mut nodes_file =
BufWriter::with_capacity(10 * 1024 * 1024, File::create("nodes.bin").await?);
let mut cur_offset = 0;
let mut offsets_by_name = vec![];
let mut offsets_by_id = vec![];
while let Some(part) = rx.recv().await {
for line in part.lines() {
let mut iter = line.split('\t');
let id: u32 = iter.next().unwrap().parse().unwrap();
let host = iter.next().unwrap();
let offset = cur_offset;
nodes_file.write_all(&id.to_le_bytes()).await?;
nodes_file.write_all(host.as_bytes()).await?;
nodes_file.write_all(b"\n").await?;
offsets_by_id.push((id, u64::try_from(offset).unwrap()));
offsets_by_name.push(u64::try_from(offset).unwrap());
cur_offset += 4 + host.as_bytes().len() + 1;
}
}
// sort offsets
offsets_by_id.sort_by_key(|(id, _offset)| *id);
// TODO sort offsets_by_name - can't assume it'll be sorted
let offsets_by_name_bytes: Vec<u8> = offsets_by_name
.into_iter()
.flat_map(|x| x.to_le_bytes())
.collect();
async fn create_tables(pool: &Pool<Postgres>) -> anyhow::Result<()> {
query!(
"
CREATE TABLE node_dupe(
id BIGINT NOT NULL PRIMARY KEY,
name TEXT NOT NULL
);
"
)
.execute(pool)
.await?;
query!(
"
CREATE TABLE edge_dupe(
from_id BIGINT NOT NULL,
to_id BIGINT NOT NULL,
PRIMARY KEY (from_id, to_id)
);
"
)
.execute(pool)
.await?;
// save our offsets
File::create("nodes_by_name_index.bin")
.await?
.write_all(&offsets_by_name_bytes)
.await?;
drop(offsets_by_name_bytes); // get some RAM back
Ok(())
}
let offsets_by_id_bytes: Vec<u8> = offsets_by_id
.into_iter()
.flat_map(|(id, offset)| {
// An allocation in here is gross
let mut x = id.to_le_bytes().to_vec(); // TODO no reason to put id here?
x.extend(offset.to_le_bytes());
async fn populate_tables(
pool: &Pool<Postgres>,
vertices_path: Url,
edges_path: Url,
) -> anyhow::Result<()> {
// nodes
download_file_with_indirection(vertices_path, |chunk| {
let p = pool.clone();
tokio::task::spawn(async move {
let (ids, hosts): (Vec<_>, Vec<_>) = chunk
.into_iter()
.map(|line| {
let mut iter = line.split('\t');
let id: i64 = iter.next().unwrap().parse().unwrap();
let host = iter.next().unwrap();
(id, host.to_string())
})
.unzip();
println!("Start insert of {} rows", ids.len());
query!(
"INSERT INTO nodes (id, name) VALUES (UNNEST($1::BIGINT[]), UNNEST($2::TEXT[]))",
&ids,
&hosts
)
.execute(&p)
.await
.unwrap();
x
println!("Done insert");
})
.collect();
})
.await?;
File::create("nodes_by_id_index.bin")
.await?
.write_all(&offsets_by_id_bytes)
// edges
download_file_with_indirection(edges_path, |chunk| {
let p = pool.clone();
tokio::task::spawn(async move {
let (from_ids, to_ids): (Vec<_>, Vec<_>) = chunk.into_iter().map(|line| {
let mut iter = line.split('\t');
let from_id: i64 = iter.next().unwrap().parse().unwrap();
let to_id: i64 = iter.next().unwrap().parse().unwrap();
(from_id, to_id)
})
.unzip();
println!("Start insert of {} rows", from_ids.len());
query!(
"INSERT INTO edges (from_id, to_id) VALUES (UNNEST($1::BIGINT[]), UNNEST($2::BIGINT[]))",
&from_ids,
&to_ids
)
.execute(&p)
.await
.unwrap();
println!("Done insert");
})
})
.await?;
Ok(())
}
async fn save_edges(mut rx: Receiver<String>) -> anyhow::Result<()> {
// (to, from)
let mut edges_to_file =
BufWriter::with_capacity(10 * 1024 * 1024, File::create("edges_to.bin").await?);
// (from, to)
let mut edges_from_file =
BufWriter::with_capacity(10 * 1024 * 1024, File::create("edges_from.bin").await?);
let mut edges = vec![];
while let Some(part) = rx.recv().await {
for line in part.lines() {
let mut iter = line.split('\t');
let from: u32 = iter.next().unwrap().parse().unwrap();
let to: u32 = iter.next().unwrap().parse().unwrap();
edges.push((to, from));
}
}
edges.sort_by_key(|x| x.0);
for (to, from) in &edges {
edges_to_file.write_all(&to.to_le_bytes()).await?;
edges_to_file.write_all(&from.to_le_bytes()).await?;
}
edges_to_file.flush().await?;
async fn create_indexes(pool: &Pool<Postgres>) -> anyhow::Result<()> {
query!("CREATE INDEX nodes_text_idx ON nodes(name, text_pattern_ops);")
.execute(pool)
.await?;
edges.sort_by_key(|x| x.1);
query!("CREATE INDEX edges_from_id_idx ON edges(from_id);")
.execute(pool)
.await?;
for (to, from) in &edges {
edges_from_file.write_all(&from.to_le_bytes()).await?;
edges_from_file.write_all(&to.to_le_bytes()).await?;
}
query!("CREATE INDEX edges_to_id_idx ON edges(to_id);")
.execute(pool)
.await?;
Ok(())
}
async fn swap_tables() {}
use commoncrawl_graph::graph::{EdgeReader, NodeReader};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut nr = NodeReader::new().await?;
let mut er = EdgeReader::new().await?;
let id = nr.id_from_name("com.scd31").await?.unwrap();
for e in er.get_nodes_to(id).await? {
dbg!(&e);
let host = nr.name_from_id(e.from).await?;
println!("{:?}", host);
}
Ok(())
}
use std::{cmp::Ordering, io::SeekFrom};
use anyhow::Context;
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader},
};
pub struct Node {
id: u32,
name: String,
}
impl Node {
pub fn new(id: u32, name: String) -> Self {
Self { id, name }
}
}
pub struct NodeReader {
nodes: File,
name_index: File,
id_index: File,
num_nodes: u64,
}
impl NodeReader {
pub async fn new() -> anyhow::Result<Self> {
let nodes = File::open("nodes.bin").await?;
let name_index = File::open("nodes_by_name_index.bin").await?;
let id_index = File::open("nodes_by_id_index.bin").await?;
let num_nodes = name_index.metadata().await?.len() / 8; // 8 bytes per node
Ok(Self {
nodes,
name_index,
id_index,
num_nodes,
})
}
pub async fn id_from_name(&mut self, name: &str) -> anyhow::Result<Option<u32>> {
// binary search for name
let mut min = 0;
let mut max = self.num_nodes - 1;
while max >= min {
let cur = (max + min) / 2;
let node = self.get_node_from_name_idx(cur).await?;
let c = name.cmp(&node.name);
match c {
Ordering::Less => max = cur - 1,
Ordering::Equal => {
return Ok(Some(node.id));
}
Ordering::Greater => min = cur + 1,
}
}
Ok(None)
}
pub async fn name_from_id(&mut self, id: u32) -> anyhow::Result<Option<String>> {
let mut min = 0;
let mut max = self.num_nodes - 1;
loop {
let cur = (max + min) / 2;
self.id_index.seek(SeekFrom::Start(cur * 12 + 4)).await?;
let offset = self.id_index.read_u64_le().await?;
self.nodes.seek(SeekFrom::Start(offset)).await?;
let host_id = self.nodes.read_u32_le().await?;
let host = BufReader::new(&mut self.nodes)
.lines()
.next_line()
.await?
.context("Missing line")?;
let c = id.cmp(&host_id);
match c {
Ordering::Less => max = cur - 1,
Ordering::Equal => {
return Ok(Some(host));
}
Ordering::Greater => min = cur + 1,
}
if max == min {
return Ok(None);
}
}
}
/*async fn get_nearby_matching_names(
&self,
cur: u64,
name: &str,
wildcard: bool,
) -> anyhow::Result<Vec<u64>> {
let mut min = cur;
loop {
let node = self.get_node_from_name_idx(min).await?;
if node.name != node {}
min -= 1;
}
}*/
async fn get_node_from_name_idx(&mut self, idx: u64) -> anyhow::Result<Node> {
self.name_index.seek(SeekFrom::Start(idx * 8)).await?;
let offset = self.name_index.read_u64_le().await?;
self.nodes.seek(SeekFrom::Start(offset)).await?;
let id = self.nodes.read_u32_le().await?;
let host = BufReader::new(&mut self.nodes)
.lines()
.next_line()
.await?
.context("Missing line")?;
Ok(Node::new(id, host))
}
}
#[derive(Debug)]
pub struct Edge {
pub from: u32,
pub to: u32,
}
pub struct EdgeReader {
edges_from: File,
edges_to: File,
num_edges: u64,
}
impl EdgeReader {
pub async fn new() -> anyhow::Result<Self> {
let edges_from = File::open("edges_from.bin").await?;
let edges_to = File::open("edges_to.bin").await?;
let num_edges = edges_from.metadata().await?.len() / 8; // 8 bytes per edge
Ok(Self {
edges_from,
edges_to,
num_edges,
})
}
pub async fn get_nodes_from(&mut self, from: u32) -> anyhow::Result<Vec<Edge>> {
let mut min = 0;
let mut max = self.num_edges - 1;
loop {
let cur = (max + min) / 2;
let edge = self.get_edge_from_from_idx(cur).await?;
let c = from.cmp(&edge.from);
match c {
Ordering::Less => max = cur - 1,
Ordering::Equal => {
return self.get_all_consecutive_matching_from(cur, from).await;
}
Ordering::Greater => min = cur + 1,
}
}
}
pub async fn get_nodes_to(&mut self, to: u32) -> anyhow::Result<Vec<Edge>> {
let mut min = 0;
let mut max = self.num_edges - 1;
loop {
let cur = (max + min) / 2;
let edge = self.get_edge_from_to_idx(cur).await?;
let c = to.cmp(&edge.to);
match c {
Ordering::Less => max = cur - 1,
Ordering::Equal => {
return self.get_all_consecutive_matching_to(cur, to).await;
}
Ordering::Greater => min = cur + 1,
}
}
}
async fn get_all_consecutive_matching_from(
&mut self,
cur: u64,
from: u32,
) -> anyhow::Result<Vec<Edge>> {
let mut out = vec![];
let mut min = cur;
let mut max = cur + 1;
loop {
let edge = self.get_edge_from_from_idx(min).await?;
if edge.from != from {
break;
}
out.push(edge);
min -= 1;
}
loop {
let edge = self.get_edge_from_from_idx(max).await?;
if edge.from != from {
break;
}
out.push(edge);
max += 1;
}
Ok(out)
}
async fn get_edge_from_from_idx(&mut self, idx: u64) -> anyhow::Result<Edge> {
self.edges_from.seek(SeekFrom::Start(idx * 8)).await?;
let from = self.edges_from.read_u32_le().await?;
let to = self.edges_from.read_u32_le().await?;
Ok(Edge { from, to })
}
async fn get_all_consecutive_matching_to(
&mut self,
cur: u64,
to: u32,
) -> anyhow::Result<Vec<Edge>> {
let mut out = vec![];
let mut min = cur;
let mut max = cur + 1;
loop {
let edge = self.get_edge_from_to_idx(min).await?;
if edge.to != to {
break;
}
out.push(edge);
min -= 1;
}
loop {
let edge = self.get_edge_from_from_idx(max).await?;
if edge.to != to {
break;
}
out.push(edge);
max += 1;
}
Ok(out)
}
async fn get_edge_from_to_idx(&mut self, idx: u64) -> anyhow::Result<Edge> {
self.edges_to.seek(SeekFrom::Start(idx * 8)).await?;
let to = self.edges_to.read_u32_le().await?;
let from = self.edges_to.read_u32_le().await?;
Ok(Edge { from, to })
}
}
pub mod graph;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment