在SQLite中插入10億條Python VS Rust
在實(shí)際生活中,市場有這樣的案例:寫腳本來進(jìn)行數(shù)據(jù)處理,比如說給數(shù)據(jù)庫導(dǎo)入導(dǎo)出數(shù)據(jù),這種任務(wù)一般來說最方便的方法是用python腳本,但是如果數(shù)據(jù)量比較大時候(比如上億條)時候Python就會超級慢,看到無法忍受。在這種案例時候該怎么做呢,有一個外國老哥分享了自己的實(shí)踐經(jīng)歷,并且對比了Python和Rust語言給SQLite插入十一條數(shù)據(jù)的情況,最后用Rust實(shí)現(xiàn)了在一分鐘來完成任務(wù)。我們在此分享一下該實(shí)踐過程,希望能對大家有所啟迪,大家也可以嘗試自己最拿手方法來實(shí)現(xiàn)該例子,并對比一下具體性能。
概述
案例中的任務(wù)是SQLite數(shù)據(jù)庫插入10億條的數(shù)據(jù)。表(user)數(shù)據(jù)結(jié)構(gòu)和約束如下:
- create table IF NOT EXISTS user
 - (
 - id INTEGER not null primary key,
 - area CHAR(6),
 - age INTEGER not null,
 - active INTEGER not null
 - );
 
隨機(jī)生成數(shù)據(jù)。其中are列為六位數(shù)的區(qū)號(任何六位數(shù)字)。age將是5、10 或15中的一個數(shù)字。Active為0或1。
- 實(shí)驗(yàn)環(huán)境硬件配置為:MacBook Pro,2019(2.4 GHz 四核i5,8GB內(nèi)存,256GB SSD硬盤,Big Sur 11.1)。
 - 任務(wù)前提:任務(wù)無需保持程序穩(wěn)健性,如果進(jìn)程崩潰并且所有數(shù)據(jù)都丟失了也沒關(guān)系。可以再次運(yùn)行腳本。
 - 需要充分利用我的機(jī)器資源:100% CPU、8GB 內(nèi)存和千兆字節(jié)的SSD空間。
 
無需使用真正的隨機(jī)方法,stdlib偽隨機(jī)方法即可。
Python
首先是原始版本的Python方法。Python標(biāo)準(zhǔn)庫提供了一個SQLite模塊,首先使用它編寫了第一個版本。代碼如下:
- import sqlite3
 - from commons import get_random_age, get_random_active, get_random_bool, get_random_area_code, create_table
 - DB_NAME = "naive.db"
 - def faker(con: sqlite3.Connection, count=100_000):
 - for _ in range(count):
 - age = get_random_age()
 - active = get_random_active()
 - # switch for area code
 - if get_random_bool():
 - # random 6 digit number
 - area = get_random_area_code()
 - con.execute('INSERT INTO user VALUES (NULL,?,?,?)', (area, age, active))
 - else:
 - con.execute('INSERT INTO user VALUES (NULL,NULL,?,?)', (age, active))
 - con.commit()
 - def main():
 - con = sqlite3.connect(DB_NAME, isolation_level=None)
 - con.execute('PRAGMA journal_mode = WAL;')
 - create_table(con)
 - faker(con, count=10_000_000)
 - if __name__ == '__main__':
 - main()
 
在該腳本中,通for循環(huán)中一一插入1000萬條數(shù)據(jù)。執(zhí)行花了將近15分鐘?;诖诉M(jìn)行優(yōu)化迭代,提高性能。
SQLite中,每次插入都是原子性的并且為一個事務(wù)。每個事務(wù)都需要保證寫入磁盤(涉及IO操作),因此可能會很慢。為了優(yōu)化,可以嘗試通過不同大小的批量插入,對比發(fā)現(xiàn),100000是最佳選擇。通過這個簡單的更改,運(yùn)行時間減少到了10分鐘,優(yōu)化了3分之一,但是仍然非常耗時。優(yōu)化后,批量插入版本源碼:
SQLite庫優(yōu)化
除了在代碼層優(yōu)化外,如果對于單純的數(shù)據(jù)寫入,對數(shù)據(jù)庫本身搞的優(yōu)化也是非常重要的。對于SQLite優(yōu)化,可以做如下配置:
- PRAGMA journal_mode = OFF;
 - PRAGMA synchronous = 0;
 - PRAGMA cache_size = 1000000;
 - PRAGMA locking_mode = EXCLUSIVE;
 - PRAGMA temp_store = MEMORY;
 
具體解釋:
首先,journal_mode設(shè)置為OFF,將會關(guān)閉回滾日志,禁用 SQLite 的原子提交和回滾功能,這樣在事務(wù)失敗情況下,無法恢復(fù),基于例子實(shí)例穩(wěn)健性要求可以設(shè)置,但是嚴(yán)禁在生產(chǎn)環(huán)境中使用。
其次,關(guān)閉synchronous,SQLite可以不再校驗(yàn)磁盤寫入的數(shù)據(jù)可靠性。寫入SQLite可能并不意味著它已刷新到磁盤。同樣,嚴(yán)禁在生產(chǎn)環(huán)境中啟用。
cache_size用戶指定SQLite允許在內(nèi)存中保留多少內(nèi)存頁。不要在生產(chǎn)中分配太高的的數(shù)值。
使用在EXCLUSIVE鎖定模式,SQLite連接持有的鎖永遠(yuǎn)不會被釋放。
設(shè)置temp_store到MEMOR將使其表現(xiàn)得像一個內(nèi)存數(shù)據(jù)庫。
優(yōu)化性能
對上面的兩個腳本,添加 SQLite優(yōu)化參數(shù),然后重新運(yùn)行:
- def main():
 - con = sqlite3.connect(DB_NAME, isolation_level=None)
 - con.execute('PRAGMA journal_mode = OFF;')
 - con.execute('PRAGMA synchronous = 0;')
 - con.execute('PRAGMA cache_size = 1000000;') # give it a GB
 - con.execute('PRAGMA locking_mode = EXCLUSIVE;')
 - con.execute('PRAGMA temp_store = MEMORY;')
 - create_table(con)
 
faker(con, count=100_000_000)
優(yōu)化后版本,原始版本,插入1億行數(shù)據(jù),大概花了10分鐘;對比批量插入版本大概花了8.5分鐘。
pypy版本
對比CPython PyPy在數(shù)據(jù)處理中可以提高性能,據(jù)說可以提高4倍以上的性能。本實(shí)驗(yàn)中也嘗試編譯PyPy解釋器,運(yùn)行腳本(代碼無需修改)。
使用pypy解釋器,批處理版本,插入1億行數(shù)據(jù)只需2.5分鐘。性能大概是Cpython的3.5倍,可見傳說的4倍性能提高確實(shí)是真的,誠不我欺也!。同時,為了測試在純循環(huán)插入中消耗的時間,在腳本中刪除SQL指令并運(yùn)行:
以上腳本在CPython中耗時5.5分鐘 。PyPy執(zhí)行耗時1.5分鐘(同樣提高了3.5倍)。
Rust
在完成Python各種優(yōu)化折騰。又嘗試了Rust版本的插入,對比也有個原始版本和批量插入版本。原始版本,也是每行插入:
- use rusqlite::{params, Connection};
 - mod common;
 - fn faker(mut conn: Connection, count: i64) {
 - let tx = conn.transaction().unwrap();
 - for _ in 0..count {
 - let with_area = common::get_random_bool();
 - let age = common::get_random_age();
 - let is_active = common::get_random_active();
 - if with_area {
 - let area_code = common::get_random_area_code();
 - tx.execute(
 - "INSERT INTO user VALUES (NULL, ?, ?, ?)",
 - params![area_code, age, is_active],
 - )
 - .unwrap();
 - } else {
 - tx.execute(
 - "INSERT INTO user VALUES (NULL, NULL, ?, ?)",
 - params![age, is_active],
 - )
 - .unwrap();
 - }
 - }
 - tx.commit().unwrap();
 - }
 - fn main() {
 - let conn = Connection::open("basic.db").unwrap();
 - conn.execute_batch(
 - "PRAGMA journal_mode = OFF;
 - PRAGMA synchronous = 0;
 - PRAGMA cache_size = 1000000;
 - PRAGMA locking_mode = EXCLUSIVE;
 - PRAGMA temp_store = MEMORY;",
 - )
 - .expect("PRAGMA");
 - conn.execute(
 - "CREATE TABLE IF NOT EXISTS user (
 - id INTEGER not null primary key,
 - area CHAR(6),
 - age INTEGER not null,
 - active INTEGER not null)",
 - [],
 - )
 - .unwrap();
 - faker(conn, 100_000_000)
 - }
 
該版執(zhí)行,大概用時3分鐘。然后我做了進(jìn)一步的實(shí)驗(yàn):
將rusqlite,換成sqlx異步運(yùn)行。
- use std::str::FromStr;
 - use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous};
 - use sqlx::{ConnectOptions, Connection, Executor, SqliteConnection, Statement};
 - mod common;
 - async fn faker(mut conn: SqliteConnection, count: i64) -> Result<(), sqlx::Error> {
 - let mut tx = conn.begin().await?;
 - let stmt_with_area = tx
 - .prepare("INSERT INTO user VALUES (NULL, ?, ?, ?)")
 - .await?;
 - let stmt = tx
 - .prepare("INSERT INTO user VALUES (NULL, NULL, ?, ?)")
 - .await?;
 - for _ in 0..count {
 - let with_area = common::get_random_bool();
 - let age = common::get_random_age();
 - let is_active = common::get_random_active();
 - if with_area {
 - let area_code = common::get_random_area_code();
 - stmt_with_area
 - .query()
 - .bind(area_code)
 - .bind(age)
 - .bind(is_active)
 - .execute(&mut tx)
 - .await?;
 - } else {
 - stmt.query()
 - .bind(age)
 - .bind(is_active)
 - .execute(&mut tx)
 - .await?;
 - }
 - }
 - tx.commit().await?;
 - Ok(())
 - }
 - #[tokio::main]
 - async fn main() -> Result<(), sqlx::Error> {
 - let mut conn = SqliteConnectOptions::from_str("basic_async.db")
 - .unwrap()
 - .create_if_missing(true)
 - .journal_mode(SqliteJournalMode::Off)
 - .synchronous(SqliteSynchronous::Off)
 - .connect()
 - .await?;
 - conn.execute("PRAGMA cache_size = 1000000;").await?;
 - conn.execute("PRAGMA locking_mode = EXCLUSIVE;").await?;
 - conn.execute("PRAGMA temp_store = MEMORY;").await?;
 - conn.execute(
 - "CREATE TABLE IF NOT EXISTS user (
 - id INTEGER not null primary key,
 - area CHAR(6),
 - age INTEGER not null,
 - active INTEGER not null);",
 - )
 - .await?;
 - faker(conn, 100_000_000).await?;
 - Ok(())
 - }
 
這個版本花了大約14分鐘。性能反而下降下降了。比Python版本還要差(原因值得深析)。
對執(zhí)行的原始SQL語句,切換到準(zhǔn)備好的語句并在循環(huán)中插入行,但重用了準(zhǔn)備好的語句。該版本只用了大約一分鐘。
使用準(zhǔn)備好的語句并將它們插入到50行的批次中,插入10億條,耗時34.3 秒。
- use rusqlite::{Connection, ToSql, Transaction};
 - mod common;
 - fn faker_wrapper(mut conn: Connection, count: i64) {
 - let tx = conn.transaction().unwrap();
 - faker(&tx, count);
 - tx.commit().unwrap();
 - }
 - fn faker(tx: &Transaction, count: i64) {
 - // that is, we will batch 50 inserts of rows at once
 - let min_batch_size: i64 = 50;
 - if count < min_batch_size {
 - panic!("count cant be less than min batch size");
 - }
 - // jeez, refactor this!
 - let mut with_area_params = " (NULL, ?, ?, ?),".repeat(min_batch_size as usize);
 - with_area_params.pop();
 - let with_area_paramswith_area_params = with_area_params.as_str();
 - let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(min_batch_size as usize);
 - without_area_params.pop();
 - let without_area_paramswithout_area_params = without_area_params.as_str();
 - let st1 = format!("INSERT INTO user VALUES {}", with_area_params);
 - let st2 = format!("INSERT INTO user VALUES {}", without_area_params);
 - let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap();
 - let mut stmt = tx.prepare_cached(st2.as_str()).unwrap();
 - for _ in 0..(count / min_batch_size) {
 - let with_area = common::get_random_bool();
 - let age = common::get_random_age();
 - let is_active = common::get_random_active();
 - let mut param_values: Vec<_> = Vec::new();
 - if with_area {
 - // lets prepare the batch
 - let mut vector = Vec::<(String, i8, i8)>::new();
 - for _ in 0..min_batch_size {
 - let area_code = common::get_random_area_code();
 - vector.push((area_code, age, is_active));
 - }
 - for batch in vector.iter() {
 - param_values.push(&batch.0 as &dyn ToSql);
 - param_values.push(&batch.1 as &dyn ToSql);
 - param_values.push(&batch.2 as &dyn ToSql);
 - }
 - stmt_with_area.execute(&*param_values).unwrap();
 - } else {
 - // lets prepare the batch
 - let mut vector = Vec::<(i8, i8)>::new();
 - for _ in 0..min_batch_size {
 - vector.push((age, is_active));
 - }
 - for batch in vector.iter() {
 - param_values.push(&batch.0 as &dyn ToSql);
 - param_values.push(&batch.1 as &dyn ToSql);
 - }
 - stmt.execute(&*param_values).unwrap();
 - }
 - }
 - }
 - fn main() {
 - let conn = Connection::open("basic_batched.db").unwrap();
 - conn.execute_batch(
 - "PRAGMA journal_mode = OFF;
 - PRAGMA synchronous = 0;
 - PRAGMA cache_size = 1000000;
 - PRAGMA locking_mode = EXCLUSIVE;
 - PRAGMA temp_store = MEMORY;",
 - )
 - .expect("PRAGMA");
 - conn.execute(
 - "CREATE TABLE IF NOT EXISTS user (
 - id INTEGER not null primary key,
 - area CHAR(6),
 - age INTEGER not null,
 - active INTEGER not null)",
 - [],
 - )
 - .unwrap();
 - faker_wrapper(conn, 100_000_000)
 - }
 - 創(chuàng)建了一個線程版本,其中有一個從通道接收數(shù)據(jù)的寫入線程和四個將數(shù)據(jù)推送到管道其他線程。
 - use rusqlite::{Connection, ToSql};
 - use std::sync::mpsc;
 - use std::sync::mpsc::{Receiver, Sender};
 - use std::thread;
 - mod common;
 - static MIN_BATCH_SIZE: i64 = 50;
 - enum ParamValues {
 - WithArea(Vec<(String, i8, i8)>),
 - WithoutArea(Vec<(i8, i8)>),
 - }
 - fn consumer(rx: Receiver<ParamValues>) {
 - let mut conn = Connection::open("threaded_batched.db").unwrap();
 - conn.execute_batch(
 - "PRAGMA journal_mode = OFF;
 - PRAGMA synchronous = 0;
 - PRAGMA cache_size = 1000000;
 - PRAGMA locking_mode = EXCLUSIVE;
 - PRAGMA temp_store = MEMORY;",
 - )
 - .expect("PRAGMA");
 - conn.execute(
 - "CREATE TABLE IF NOT EXISTS user (
 - id INTEGER not null primary key,
 - area CHAR(6),
 - age INTEGER not null,
 - active INTEGER not null)",
 - [],
 - )
 - .unwrap();
 - let tx = conn.transaction().unwrap();
 - {
 - // jeez, refactor this!
 - let mut with_area_params = " (NULL, ?, ?, ?),".repeat(MIN_BATCH_SIZE as usize);
 - with_area_params.pop();
 - let with_area_paramswith_area_params = with_area_params.as_str();
 - let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(MIN_BATCH_SIZE as usize);
 - without_area_params.pop();
 - let without_area_paramswithout_area_params = without_area_params.as_str();
 - let st1 = format!("INSERT INTO user VALUES {}", with_area_params);
 - let st2 = format!("INSERT INTO user VALUES {}", without_area_params);
 - let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap();
 - let mut stmt_without_area = tx.prepare_cached(st2.as_str()).unwrap();
 - for param_values in rx {
 - let mut row_values: Vec<&dyn ToSql> = Vec::new();
 - match param_values {
 - ParamValues::WithArea(values) => {
 - for batch in values.iter() {
 - row_values.push(&batch.0 as &dyn ToSql);
 - row_values.push(&batch.1 as &dyn ToSql);
 - row_values.push(&batch.2 as &dyn ToSql);
 - }
 - stmt_with_area.execute(&*row_values).unwrap();
 - }
 - ParamValues::WithoutArea(values) => {
 - for batch in values.iter() {
 - row_values.push(&batch.0 as &dyn ToSql);
 - row_values.push(&batch.1 as &dyn ToSql);
 - }
 - stmt_without_area.execute(&*row_values).unwrap();
 - }
 - }
 - }
 - }
 - tx.commit().unwrap();
 - }
 - fn producer(tx: Sender<ParamValues>, count: i64) {
 - if count < MIN_BATCH_SIZE {
 - panic!("count cant be less than min batch size");
 - }
 - for _ in 0..(count / MIN_BATCH_SIZE) {
 - let with_area = common::get_random_bool();
 - let age = common::get_random_age();
 - let is_active = common::get_random_active();
 - let mut param_values: Vec<_> = Vec::new();
 - if with_area {
 - // lets prepare the batch
 - let mut vector = Vec::<(String, i8, i8)>::new();
 - for _ in 0..MIN_BATCH_SIZE {
 - let area_code = common::get_random_area_code();
 - vector.push((area_code, age, is_active));
 - }
 - for batch in vector.iter() {
 - param_values.push(&batch.0 as &dyn ToSql);
 - param_values.push(&batch.1 as &dyn ToSql);
 - param_values.push(&batch.2 as &dyn ToSql);
 - }
 - // send the values
 - tx.send(ParamValues::WithArea(vector)).unwrap();
 - } else {
 - // lets prepare the batch
 - let mut vector = Vec::<(i8, i8)>::new();
 - for _ in 0..MIN_BATCH_SIZE {
 - vector.push((age, is_active));
 - }
 - for batch in vector.iter() {
 - param_values.push(&batch.0 as &dyn ToSql);
 - param_values.push(&batch.1 as &dyn ToSql);
 - }
 - // send the values
 - tx.send(ParamValues::WithoutArea(vector)).unwrap();
 - }
 - }
 - }
 - fn main() {
 - // setup the DB and tables
 - let (tx, rx): (Sender<ParamValues>, Receiver<ParamValues>) = mpsc::channel();
 - // lets launch the consumer
 - let consumer_handle = thread::spawn(|| consumer(rx));
 - let cpu_count = num_cpus::get();
 - let total_rows = 100_000_000;
 - let each_producer_count = (total_rows / cpu_count) as i64;
 - let mut handles = Vec::with_capacity(cpu_count);
 - for _ in 0..cpu_count {
 - let thread_tx = tx.clone();
 - handles.push(thread::spawn(move || {
 - producer(thread_tx, each_producer_count.clone())
 - }))
 - }
 - for t in handles {
 - t.join().unwrap();
 - }
 - drop(tx);
 - // wait till consumer is exited
 - consumer_handle.join().unwrap();
 - }
 
這是性能最好的版本,耗時約32.37秒。
基準(zhǔn)測試對比:
總結(jié)
通過案例不同任務(wù)實(shí)驗(yàn),總體上可以得到:
- 通過SQLite PRAGMA語句優(yōu)化設(shè)置可以提高插入性能。
 - 使用準(zhǔn)備好的語句可以提高性能
 - 進(jìn)行批量插入可以提高性能。
 - PyPy 實(shí)際上比CPython快4倍
 - 線程/異步不一定能提高性能。
 




















 
 
 















 
 
 
 