-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdemo.py
70 lines (57 loc) · 1.85 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import psycopg2
import pandas as pd
import numpy as np
from postgres_binary_parser.schema import Schema, num, cat, dt, delta, bool_, id_
from postgres_binary_parser.psql_binary_datastore import PsqlBinaryDataStore
from postgres_binary_parser.sql_generator import SchemaSqlGenerator
from random import choice
import io
test_schema = Schema('test', [
num('a', int=True),
cat('b'),
num('c'),
num('d'),
num('e'),
num('f'),
dt('g'),
delta('h'),
bool_('i')
])
print('using test schema:')
print(test_schema)
test_df = pd.DataFrame(index=list(range(1000)))
test_df['a'] = 88
s = 'abcdefghijklmnopqrs'
test_df['b'] = [''.join(choice(s) for n in range(5)) for m in range(1000)]
test_df['c'] = 7
test_df.loc[np.unique(np.random.randint(0, 1000, size=100)), 'c'] = np.nan
test_df['d'] = np.random.randint(0, 10000000, size=1000)
test_df['e'] = np.random.randint(0, 10000000, size=1000)
test_df['f'] = np.random.randint(0, 10000000, size=1000)
test_df['g'] = pd.to_datetime('now')
test_df['h'] = pd.to_timedelta('15 days 5 hours 30 minutes')
test_df['i'] = True
conn = psycopg2.connect(database='dan', user='dan')
cur = conn.cursor()
cur.execute('drop table if exists test')
sql_gen = SchemaSqlGenerator(test_schema)
create_table_sql = sql_gen.create_table()
print(create_table_sql)
cur.execute(create_table_sql)
f = io.BytesIO()
bin_store = PsqlBinaryDataStore(test_schema, f)
bin_store.store(test_df)
f.seek(0)
copy_from_sql = sql_gen.copy_from()
print(copy_from_sql)
cur.copy_expert(copy_from_sql, f)
print('Successfully wrote binary data to Postgres from a DataFrame')
f = io.BytesIO()
copy_to_sql = sql_gen.copy_to()
print(copy_to_sql)
cur.copy_expert(copy_to_sql, f)
f.seek(0)
bin_store = PsqlBinaryDataStore(test_schema, f)
decoded_df = bin_store.load()
print('Successfully loaded binary from Postgres and parsed it into a DataFrame')
conn.commit()