Skip to content

Commit

Permalink
Remove open/close calls to read/write disk
Browse files Browse the repository at this point in the history
*** http-disk-server:
- Add a fd attached to the server instead of using open/close calls.
- Don't throw in case of failure if we can't open the device on another
  server using a DRBD volume and if EROFS is returned. Just retry later.
- Handle SIGTERM to clean properly the resources.
- If a DRBD volume is used, the HTTP server never runs without a valid
  fd, so only one server is reachable. The other daemons remain blocked
  in the `open` call as long as the volume is not closed.
- Use a prefix before each log to simplify the maintenance.

*** nbd-http-server:
- The timeout of the nbd-client is modified. Should normally be never
  reached now.
- In case of failure during the NBD connection, we try to force detach
  the NBD device. Otherwise we may have resource leaks.
- Use a prefix before each log to simplify the maintenance.

*** nbdkit plugin:
- Use a timeout now during the server selection.

Signed-off-by: Ronan Abhamon <[email protected]>
  • Loading branch information
Wescoeur committed Feb 17, 2023
1 parent f74527d commit 1917911
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 110 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# http-nbd-transfer

Set of tools to transfer NBD requests to a HTTP server:
Set of tools to transfer NBD requests to an HTTP server:

- `http-disk-server` is used to handle HTTP requests and to read/write in a device.
- `nbd-server` is used to create a new NBD on the system, and to communicate with one or many HTTP servers.
Expand Down
167 changes: 107 additions & 60 deletions http-disk-server
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,54 @@ import argparse
import errno
import fcntl
import os
import signal
import socket
import stat
import struct
import subprocess
import sys
import threading
import time
import traceback

BLKGETSIZE64 = 0x80081272
BLKFLSBUF = (0x12 << 8) + 97 # Flush buffer cache.

REQ_LIMIT_SIZE = 1024 * 1024 * 128 # In bytes
REQ_LIMIT_SIZE = 1024 * 1024 * 128 # In bytes.

DRBD_MAJOR = 147

DRBD_OPEN_ATTEMPTS = 100
DRBD_OPEN_SLEEP_INTERVAL = 0.100 # In seconds.
DRBD_OPEN_SLEEP = 1.0 # In seconds.

# ==============================================================================

OUTPUT_PREFIX = ''

SIGTERM_RECEIVED = False

def handle_sigterm(*args):
global SIGTERM_RECEIVED
SIGTERM_RECEIVED = True

# -----------------------------------------------------------------------------

def check_bindable(port):
p = subprocess.Popen(
['lsof', '-i:' + str(port), '-Fp'],
stdout=subprocess.PIPE,
close_fds=True
)

stdout, stderr = p.communicate()
if p.returncode:
return True

pid = stdout[1:].rstrip() # remove 'p' before pid
eprint('Cannot use port {}, already used by: {}.'.format(port, pid))
return False

def eprint(str):
print >> sys.stderr, str
print >> sys.stderr, OUTPUT_PREFIX + str

def is_drbd_device(path):
try:
Expand All @@ -55,12 +83,12 @@ def open_device(dev_path):
def cannot_open(e):
raise Exception('Cannot open device `{}`: `{}`.'.format(dev_path, e))

attempt = 0
is_drbd = None
while True:
while not SIGTERM_RECEIVED:
try:
# Note: Can't use O_DIRECT with DRBD diskless volumes.
return os.open(dev_path, os.O_RDWR)
disk_fd = os.open(dev_path, os.O_RDWR)
eprint('Disk open!')
return disk_fd
except OSError as e:
if e.errno == errno.EAGAIN or e.errno == errno.EINTR:
continue
Expand All @@ -71,11 +99,9 @@ def open_device(dev_path):
is_drbd = is_drbd_device(dev_path)
if not is_drbd:
cannot_open(e)
if attempt >= DRBD_OPEN_ATTEMPTS:
raise Exception('Cannot open DRBD device `{}`: `{}`.'.format(dev_path, e))

attempt += 1
time.sleep(DRBD_OPEN_SLEEP_INTERVAL)
if not SIGTERM_RECEIVED:
time.sleep(DRBD_OPEN_SLEEP)

def close_device(fd):
if not fd:
Expand Down Expand Up @@ -113,7 +139,9 @@ def get_device_size(fd):
raise
return disk_capacity

def MakeRequestHandler(disk, disk_capacity):
# -----------------------------------------------------------------------------

def MakeRequestHandler(disk_fd, is_block_device):
class RequestHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
# Note: We cannot use this call in python 2:
Expand All @@ -122,10 +150,13 @@ def MakeRequestHandler(disk, disk_capacity):
#
# The base class of `BaseHTTPRequestHandler` uses an old def:
# "class BaseRequestHandler:"
self.disk = disk
self.disk_capacity = disk_capacity # TODO: Detect capacity update?
self.disk_fd = disk_fd
self.is_block_device = is_block_device
BaseHTTPRequestHandler.__init__(self, *args, **kwargs)

def get_content_size(self):
return get_device_size(self.disk_fd)

# See: https://stackoverflow.com/questions/6063416/python-basehttpserver-how-do-i-catch-trap-broken-pipe-errors#answer-14355079
def finish(self):
try:
Expand All @@ -150,7 +181,7 @@ def MakeRequestHandler(disk, disk_capacity):
values = req_range[1:].lstrip().split('-')
begin = int(values[0])
end = int(values[1])
if begin <= end and end - begin < self.disk_capacity:
if begin <= end and end - begin < self.get_content_size():
return [begin, end - begin + 1]
except Exception:
pass
Expand All @@ -172,7 +203,7 @@ def MakeRequestHandler(disk, disk_capacity):
values = req_range.split('-')
begin = int(values[0])
end = int(values[1].split('/')[0].strip())
if begin <= end and end - begin < self.disk_capacity:
if begin <= end and end - begin < self.get_content_size():
return [begin, end - begin + 1]
except Exception:
pass
Expand All @@ -184,7 +215,7 @@ def MakeRequestHandler(disk, disk_capacity):
def do_HEAD(self):
self.send_response(200)
self.send_header('Accept-Ranges', 'bytes')
self.send_header('Content-Length', str(self.disk_capacity))
self.send_header('Content-Length', str(self.get_content_size()))
self.end_headers()

def do_GET(self):
Expand All @@ -195,22 +226,18 @@ def MakeRequestHandler(disk, disk_capacity):
offset = req_range[0]
size = min(req_range[1], REQ_LIMIT_SIZE)

dev = None
try:
dev = open_device(self.disk)
eprint('GET [{}]: Read {}B at {}.'.format(self.client_address[0], size, offset))
os.lseek(dev, offset, os.SEEK_SET)
chunk = os.read(dev, size)
os.lseek(self.disk_fd, offset, os.SEEK_SET)
chunk = os.read(self.disk_fd, size)
except Exception as e:
eprint('Can\'t do GET: `{}`.'.format(e))
self.send_response(500)
self.end_headers()
return
finally:
close_device(dev)

self.send_response(206)
self.send_header('Content-Range', 'bytes {}-{}/{}'.format(offset, size - 1, self.disk_capacity))
self.send_header('Content-Range', 'bytes {}-{}/{}'.format(offset, size - 1, self.get_content_size()))
self.send_header('Content-Length', str(int(size)))
self.end_headers()

Expand All @@ -224,7 +251,6 @@ def MakeRequestHandler(disk, disk_capacity):
offset = req_range[0]
size = req_range[1]

dev = None
try:
encoding = self.headers.getheader('Transfer-Encoding')
if encoding is not None:
Expand All @@ -246,17 +272,18 @@ def MakeRequestHandler(disk, disk_capacity):
if len(chunk) < size:
raise Exception('Truncated chunk!')

dev = open_device(self.disk)
eprint('PUT [{}]: Write {}B at {}.'.format(self.client_address[0], len(chunk), offset))
os.lseek(dev, offset, os.SEEK_SET)
os.write(dev, chunk)
os.lseek(self.disk_fd, offset, os.SEEK_SET)
os.write(self.disk_fd, chunk)
if self.is_block_device:
fcntl.ioctl(self.disk_fd, BLKFLSBUF)
else:
os.fsync(self.disk_fd)
except Exception as e:
eprint('Can\'t do PUT: `{}`.'.format(e))
self.send_response(500)
self.end_headers()
return
finally:
close_device(dev)

self.send_response(200)
self.end_headers()
Expand All @@ -272,26 +299,60 @@ class HttpDiskServer(HTTPServer):
self.allow_reuse_address = True
HTTPServer.__init__(self, *args, **kwargs)

def run_server(disk, disk_capacity, ip, port):
HandlerClass = MakeRequestHandler(disk, disk_capacity)
httpd = HttpDiskServer((ip or '', port), HandlerClass)
# -----------------------------------------------------------------------------

# Must be placed just after the bind call of HTTPServer to notify
# parent process that would be waiting before launching NBD server(s).
def run_server(disk, ip, port):
# Check if we can use this port.
if not check_bindable(port):
return

# Must be printed to notify parent processes that are waiting
# before starting the NBD server(s).
eprint('Server ready!')

run = True
while run:
# Note: `open_device` must be called after the message (never before!)
# to ensure we don't block if the device is already open somewhere.

httpd = None
httpd_thread = None
disk_fd = None
while True:
if SIGTERM_RECEIVED:
eprint('SIGTERM received. Exiting server...')
break

try:
httpd.serve_forever()
disk_fd = open_device(disk)
is_block_device = stat.S_ISBLK(os.fstat(disk_fd).st_mode)

HandlerClass = MakeRequestHandler(disk_fd, is_block_device)
httpd = HttpDiskServer((ip or '', port), HandlerClass)
httpd_thread = threading.Thread(target=httpd.serve_forever)
httpd_thread.start()

while not SIGTERM_RECEIVED:
signal.pause()
except KeyboardInterrupt:
run = False
eprint('Exiting server...')
break
except Exception as e:
eprint('Unhandled exception: `{}`.'.format(e))
eprint(traceback.format_exc())
eprint('Restarting server...')
time.sleep(1)
finally:
try:
httpd.server_close()
if httpd_thread:
httpd.shutdown()
httpd_thread.join()
httpd_thread = None
if httpd:
httpd.server_close()
httpd = None
except Exception as e:
eprint('Failed to close server: {}.'.format(e))
pass
close_device(disk_fd)


# ==============================================================================

Expand All @@ -312,24 +373,10 @@ def main():
)

args = parser.parse_args()

# Ensure we can open the device and get size.
dev = None
try:
dev = open_device(args.disk)
disk_capacity = get_device_size(dev)
finally:
close_device(dev)

if dev and disk_capacity > 0:
while True:
try:
run_server(args.disk, disk_capacity, args.ip, args.port)
except Exception as e:
eprint('Unhandled exception: `{}`.'.format(e))
eprint(traceback.format_exc())
eprint('Restarting server...')
time.sleep(1)
global OUTPUT_PREFIX
OUTPUT_PREFIX = '[' + os.path.basename(os.path.realpath(args.disk)) + '] '
signal.signal(signal.SIGTERM, handle_sigterm)
run_server(args.disk, args.ip, args.port)

if __name__ == '__main__':
main()
Loading

0 comments on commit 1917911

Please sign in to comment.