-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
303 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
#!/usr/bin/env python | ||
# encoding: utf-8 | ||
import logging | ||
# now we patch Python code to add color support to logging.StreamHandler | ||
def add_coloring_to_emit_windows(fn): | ||
# add methods we need to the class | ||
def _out_handle(self): | ||
import ctypes | ||
return ctypes.windll.kernel32.GetStdHandle(self.STD_OUTPUT_HANDLE) | ||
out_handle = property(_out_handle) | ||
|
||
def _set_color(self, code): | ||
import ctypes | ||
# Constants from the Windows API | ||
self.STD_OUTPUT_HANDLE = -11 | ||
hdl = ctypes.windll.kernel32.GetStdHandle(self.STD_OUTPUT_HANDLE) | ||
ctypes.windll.kernel32.SetConsoleTextAttribute(hdl, code) | ||
|
||
setattr(logging.StreamHandler, '_set_color', _set_color) | ||
|
||
def new(*args): | ||
FOREGROUND_BLUE = 0x0001 # text color contains blue. | ||
FOREGROUND_GREEN = 0x0002 # text color contains green. | ||
FOREGROUND_RED = 0x0004 # text color contains red. | ||
FOREGROUND_INTENSITY = 0x0008 # text color is intensified. | ||
FOREGROUND_WHITE = FOREGROUND_BLUE|FOREGROUND_GREEN |FOREGROUND_RED | ||
# winbase.h | ||
STD_INPUT_HANDLE = -10 | ||
STD_OUTPUT_HANDLE = -11 | ||
STD_ERROR_HANDLE = -12 | ||
|
||
# wincon.h | ||
FOREGROUND_BLACK = 0x0000 | ||
FOREGROUND_BLUE = 0x0001 | ||
FOREGROUND_GREEN = 0x0002 | ||
FOREGROUND_CYAN = 0x0003 | ||
FOREGROUND_RED = 0x0004 | ||
FOREGROUND_MAGENTA = 0x0005 | ||
FOREGROUND_YELLOW = 0x0006 | ||
FOREGROUND_GREY = 0x0007 | ||
FOREGROUND_INTENSITY = 0x0008 # foreground color is intensified. | ||
|
||
BACKGROUND_BLACK = 0x0000 | ||
BACKGROUND_BLUE = 0x0010 | ||
BACKGROUND_GREEN = 0x0020 | ||
BACKGROUND_CYAN = 0x0030 | ||
BACKGROUND_RED = 0x0040 | ||
BACKGROUND_MAGENTA = 0x0050 | ||
BACKGROUND_YELLOW = 0x0060 | ||
BACKGROUND_GREY = 0x0070 | ||
BACKGROUND_INTENSITY = 0x0080 # background color is intensified. | ||
|
||
levelno = args[1].levelno | ||
if(levelno>=50): | ||
color = BACKGROUND_YELLOW | FOREGROUND_RED | FOREGROUND_INTENSITY | BACKGROUND_INTENSITY | ||
elif(levelno>=40): | ||
color = FOREGROUND_RED | FOREGROUND_INTENSITY | ||
elif(levelno>=30): | ||
color = FOREGROUND_YELLOW | FOREGROUND_INTENSITY | ||
elif(levelno>=20): | ||
color = FOREGROUND_GREEN | ||
elif(levelno>=10): | ||
color = FOREGROUND_MAGENTA | ||
else: | ||
color = FOREGROUND_WHITE | ||
args[0]._set_color(color) | ||
|
||
ret = fn(*args) | ||
args[0]._set_color( FOREGROUND_WHITE ) | ||
#print "after" | ||
return ret | ||
return new | ||
|
||
def add_coloring_to_emit_ansi(fn): | ||
# add methods we need to the class | ||
def new(*args): | ||
levelno = args[1].levelno | ||
if(levelno>=50): | ||
color = '\x1b[31m' # red | ||
elif(levelno>=40): | ||
color = '\x1b[31m' # red | ||
elif(levelno>=30): | ||
color = '\x1b[33m' # yellow | ||
elif(levelno>=20): | ||
color = '\x1b[32m' # green | ||
elif(levelno>=10): | ||
color = '\x1b[35m' # pink | ||
else: | ||
color = '\x1b[0m' # normal | ||
args[1].msg = color + args[1].msg + '\x1b[0m' # normal | ||
#print "after" | ||
return fn(*args) | ||
return new | ||
|
||
import platform | ||
if platform.system()=='Windows': | ||
# Windows does not support ANSI escapes and we are using API calls to set the console color | ||
logging.StreamHandler.emit = add_coloring_to_emit_windows(logging.StreamHandler.emit) | ||
else: | ||
# all non-Windows platforms are supporting ANSI escapes so we use them | ||
logging.StreamHandler.emit = add_coloring_to_emit_ansi(logging.StreamHandler.emit) | ||
#log = logging.getLogger() | ||
#log.addFilter(log_filter()) | ||
#//hdlr = logging.StreamHandler() | ||
#//hdlr.setFormatter(formatter()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
# building on above and http://stackoverflow.com/questions/827371/is-there-a-way-to-list-all-the-available-drive-letters-in-python | ||
import string | ||
from ctypes import windll | ||
import time | ||
import os | ||
import re | ||
import sys | ||
from shutil import copy2 | ||
import logging | ||
import colourer | ||
|
||
import win32api, win32gui, win32con, win32file, struct | ||
|
||
def get_drives(): | ||
drives = [] | ||
bitmask = windll.kernel32.GetLogicalDrives() | ||
for letter in range(ord('A'), ord('Z')): | ||
if bitmask & 1: | ||
drives.append(chr(letter)) | ||
bitmask >>= 1 | ||
return drives | ||
|
||
def detect_pilot_id(drive, filename): | ||
try: | ||
file = open(drive + ':/XCSoarData/' + filename + '.txt', 'r') | ||
id = file.read() | ||
file.close() | ||
return id | ||
except Exception as e: | ||
logging.warn( "Failed to get pilot ID. " + str(e)) | ||
return False | ||
|
||
|
||
def set_pilot_id(drive, filename, id): | ||
try: | ||
file = open(drive + ':/XCSoarData/' + filename + '.txt', 'w') | ||
file.write(id) | ||
file.close() | ||
except Exception as e: | ||
logging.warn( "Failed to set pilot ID. " + str(e)) | ||
return False | ||
|
||
def eject_drive(drive): | ||
FSCTL_LOCK_VOLUME = 0x0090018 | ||
FSCTL_DISMOUNT_VOLUME = 0x00090020 | ||
IOCTL_STORAGE_MEDIA_REMOVAL = 0x002D4804 | ||
IOCTL_STORAGE_EJECT_MEDIA = 0x002D4808 | ||
|
||
|
||
lpFileName = r"\\.\%s:" % drive | ||
|
||
dwDesiredAccess = win32con.GENERIC_READ|win32con.GENERIC_WRITE | ||
dwShareMode = win32con.FILE_SHARE_READ|win32con.FILE_SHARE_WRITE | ||
dwCreationDisposition = win32con.OPEN_EXISTING | ||
|
||
try: | ||
logging.info("Ejecting %s." % drive) | ||
hVolume = win32file.CreateFile(lpFileName, dwDesiredAccess, dwShareMode, None, dwCreationDisposition, 0, None) | ||
|
||
## cnt = 0 | ||
## maxcnt = 5 | ||
## while(cnt < maxcnt): | ||
## try: | ||
## win32file.DeviceIoControl(hVolume, FSCTL_LOCK_VOLUME, None, 0, None) | ||
## print("Lock obtained" % drive) | ||
## break; | ||
## except: | ||
## logging.warn("Lock failed, retrying %s/%s" % (cnt,maxcnt)) | ||
## time.sleep(1) | ||
## cnt += 1 | ||
|
||
win32file.DeviceIoControl(hVolume, FSCTL_DISMOUNT_VOLUME, None, 0, None) | ||
win32file.DeviceIoControl(hVolume, IOCTL_STORAGE_MEDIA_REMOVAL, struct.pack("B", 0), 0, None) | ||
win32file.DeviceIoControl(hVolume, IOCTL_STORAGE_EJECT_MEDIA, None, 0, None) | ||
except: | ||
raise | ||
finally: | ||
win32file.CloseHandle(hVolume) | ||
|
||
if __name__ == '__main__': | ||
|
||
logging.basicConfig(level=logging.DEBUG) | ||
|
||
comp = 'pilot_id' | ||
if (len(sys.argv) >= 3): | ||
comp = ".".join(filter(None, ['pilot_id', sys.argv[2]])) | ||
logging.info("Pilot ID file is %s" % comp) | ||
|
||
out_default = "Z:/nats 2017/Desktop/NATS 2017/Tracks/task 2" | ||
if (len(sys.argv) >= 2 and os.path.isdir(sys.argv[1])): | ||
directory = sys.argv[1] | ||
else: | ||
directory = "" | ||
while (directory == ""): | ||
directory = input("Please enter the output directory. ") | ||
if (directory != "" and not os.path.isdir(directory)): | ||
logging.warn("Directory %s does not exist." % directory) | ||
directory = "" | ||
logging.info("Output directory is %s" % directory) | ||
|
||
before = set(get_drives()) | ||
while (time.sleep(1) or True): | ||
after = set(get_drives()) | ||
drives = after - before | ||
delta = len(drives) | ||
before = after | ||
if (delta): | ||
for drive in drives: | ||
if os.system("cd " + drive + ":") == 0: | ||
newly_mounted = drive | ||
logging.info( "There were %d drives added: %s. Newly mounted drive letter is %s" % (delta, drives, newly_mounted)) | ||
if (not os.path.isdir(drive + ":/XCSoarData/")): | ||
logging.warn("No XCSoarData directory detected") | ||
continue | ||
pilot_id = detect_pilot_id(drive, comp) | ||
if (not pilot_id): | ||
pilot_id = input("Please enter the pilots ID: ") | ||
set_pilot_id(drive, comp, pilot_id) | ||
|
||
kobo_path = drive + ":/XCSoarData/logs/" | ||
files = os.listdir(kobo_path) | ||
date = time.strftime("%Y-%m-%d") | ||
cFiles = [file for file in files if re.search("%s.*\.igc" % date, file)] | ||
files_c = len(cFiles) | ||
index = 0 | ||
if (files_c == 0): | ||
logging.warn("No files matching todays date. Please select manually.") | ||
print("0) Continue manually") | ||
print("1) Eject device") | ||
cnt = 2 | ||
for file in files: | ||
print("%s) %s - %s" % (cnt, file, os.path.getsize(kobo_path + file))) | ||
cnt = cnt + 1 | ||
index = int(input("Select a file ")) | ||
if (index == 1): | ||
eject_drive(drive) | ||
continue | ||
if (index == 0): | ||
continue | ||
index -= 2 | ||
else: | ||
files = cFiles | ||
if (files_c > 1): | ||
logging.warn("Multiple (%s) files matching todays date: " % files_c) | ||
print("0) Continue manually") | ||
print("1) Eject device") | ||
cnt = 2 | ||
for file in files: | ||
print("%s) %s - %s" % (cnt, file, os.path.getsize(kobo_path + file))) | ||
cnt = cnt + 1 | ||
index = int(input("Select a file (largest is most likely: ")) | ||
if (index == 1): | ||
eject_drive(drive) | ||
continue | ||
if (index == 0): | ||
continue | ||
index -= 2 | ||
logging.info("Copied %s to %s.igc" % (files[index], pilot_id)) | ||
copy2(kobo_path + files[index], directory + pilot_id + ".igc") | ||
eject_drive(drive) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from cx_Freeze import setup, Executable | ||
|
||
base = None | ||
|
||
|
||
executables = [Executable("kobo.py", base=base)] | ||
|
||
packages = [ | ||
"win32api", | ||
"win32file", | ||
"win32con", | ||
"win32gui", | ||
"string", | ||
#"ctypes", | ||
"encodings", | ||
"time", | ||
"os", | ||
"re", | ||
"sys", | ||
"shutil", | ||
"logging", | ||
"struct", | ||
|
||
] | ||
options = { | ||
'build_exe': { | ||
'packages':packages, | ||
}, | ||
|
||
} | ||
|
||
setup( | ||
name = "kobo", | ||
options = options, | ||
version = "0.1", | ||
description = 'Extracts igc files from Kobos using XCSoar', | ||
executables = executables | ||
) |