-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathdbeel.py
49 lines (37 loc) · 1.04 KB
/
dbeel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import socket
import msgpack
import struct
import contextlib
@contextlib.contextmanager
def client():
with contextlib.closing(socket.socket()) as s:
s.connect(('127.0.0.1', 10000))
yield s
def _db_send(s, p):
r = msgpack.dumps(p)
s.send(struct.pack("<H", len(r)))
s.send(r)
buf = b''
while packet := s.recv(1024 * 64):
buf += packet
return msgpack.loads(buf)
def _db_request(**kwargs):
with client() as s:
return _db_send(s, kwargs)
class DB:
@staticmethod
def create(c):
return _db_request(type="create_collection", name=c)
@staticmethod
def drop(c):
return _db_request(type="drop_collection", name=c)
@staticmethod
def set(c, k, v, w=None):
return _db_request(type="set", collection=c, key=k, value=v,
consistensy=w)
@staticmethod
def delete(c, k):
return _db_request(type="delete", collection=c, key=k)
@staticmethod
def get(c, k):
return _db_request(type="get", collection=c, key=k)