-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlastbart.py
executable file
·190 lines (155 loc) · 6.43 KB
/
lastbart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/usr/bin/env python3
import cgi
import datetime
import os
import pystache
import re
import sqlite3
import stat
import sys
def urlify_name(name):
name = re.sub(" BART$", "", name)
name = name.lower()
name = re.sub("/", "-", name)
name = re.sub("\\.", "", name)
name = re.sub("[^0-9a-z-]", "-", name)
return name + ".html"
def friendly_time(time_string):
"""
Convert a time in GTFS format, e.g. "25:09:00", to a friendly format like "1:09
AM".
"""
(hour, minute, second) = time_string.split(":")
if (int(hour) / 12) % 2:
am_pm = "PM"
else:
am_pm = "AM"
hour = int(hour) % 12
if hour == 0:
hour = 12
return "%d:%s %s" % (hour, minute, am_pm)
def friendly_service_id(service_id):
"""
Extract the human-readable part of the service_id.
"""
m = re.search(r"(Sunday|Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Weekday)",
service_id)
return service_id[m.start():m.end()]
class StopNotFound(Exception):
def __init__(self, message):
Exception.__init__(self, message)
class Stop(object):
def __init__(self, stop_name, conn):
self.conn = conn
# What a human thinks of as a single stop can be represented as multiple
# stops in the GTFS format, e.g. MCAR and MCAR_S for north and south
# platforms respectively. We merge these by name.
self.all_stop_ids = []
for (stop_id, stop_name_from_db) in get_stops(self.conn):
if stop_name == urlify_name(stop_name_from_db):
self.all_stop_ids.append(stop_id)
self.stop_name = stop_name_from_db
if len(self.all_stop_ids) == 0:
raise StopNotFound(stop_name)
super(Stop, self).__init__()
def stop_name(self):
return self.stop_name
def last_valid_date(self):
# Note: There are some localtime issues here, but given that the resolution
# is a day, and we expect the transit feeds to have up-to-date data
# available before they expire, this shouldn't be an issue.
q = """SELECT end_date FROM calendar ORDER BY end_date DESC limit 1"""
c = self.conn.execute(q)
return c.fetchone()[0]
def fetch_date(self):
dt = datetime.datetime.fromtimestamp(os.stat("google_transit.zip")[8])
return dt.date()
def shortest_stop_id(self):
return sorted(self.all_stop_ids, key=lambda x: len(x))[0]
def service(self):
q = """SELECT date('now', 'localtime') as d,
service_id, sunday, monday, tuesday, wednesday, thursday, friday, saturday
FROM calendar
WHERE calendar.start_date <= d and calendar.end_date >= d"""
for values in self.conn.execute(q):
# Trim the `d` column:
values = values[1:]
service_id = values[0]
# We have a bit for each weekday indicating whether this schedule is
# visible on that day. Zip those bits with a numerical index
day_tuples = zip(range(0, 7), values[1:])
# And remove the ones that are zero, so we just have the days that are
# visible.
visible_days_tuples = filter(lambda x: x[1] != 0, day_tuples)
# We use this to add a set of classes to the table that control
# visibility.
visible_days = map(
lambda x: {
"day": "vd%d" %
x[0]}, visible_days_tuples)
departures = list(self.list_departures(service_id))
if len(departures) > 0:
yield {"service_id": friendly_service_id(service_id),
"visible_days": visible_days,
"departure": departures}
def list_departures(self, service_id):
placeholders = ",".join("?" for i in self.all_stop_ids)
values = self.all_stop_ids + [service_id]
q = """SELECT MAX(departure_time), stop_headsign, service_id
FROM stop_times JOIN trips ON stop_times.trip_id = trips.trip_id
WHERE stop_id in (%s) AND service_id =? AND stop_headsign LIKE '%%'
GROUP BY stop_headsign
ORDER by MAX(departure_time) DESC""" % placeholders
c = self.conn.execute(q, values)
# We will make the `service_id` friendly, but to be extra careful we
# want to make sure that for each departure, each friendly ID is
# unique. Currently, this is true in the data. But it's good to check,
# since if it ever weren't true, we'd have to use the distinct but
# non-friendly IDs.
uniques = set()
for (departure_time, stop_headsign, service_id) in c:
fsid = friendly_service_id(service_id)
key = str(departure_time) + str(stop_headsign) + str(service_id)
if key in uniques:
raise "BUG: Making the `service_id`s friendly caused a collision."
uniques.add(key)
yield {
"friendly_time": friendly_time(departure_time),
"headsign": stop_headsign,
"service_id": fsid
}
def get_stops(conn):
q = """SELECT stop_id, stop_name
FROM stops
WHERE stop_name NOT LIKE '%Enter/Exit%'
ORDER BY stop_name"""
for (stop_id, stop_name) in conn.execute(q):
yield (stop_id, stop_name)
class Index(object):
def __init__(self, conn):
self.conn = conn
super(Index, self).__init__()
def stop(self):
# Record shown stops by name so we only show each stop name once. Sometimes
# the DB has two "stops" with the same name, that actually refer to
# different platforms of the same station.
shown_stops = {}
for stop_id, stop_name in get_stops(self.conn):
if stop_name not in shown_stops:
shown_stops[stop_name] = 1
yield {"stop_name_urlified": urlify_name(stop_name),
"stop_name": re.sub("BART$", "", stop_name)
}
def main(argv):
conn = sqlite3.connect('bart.sqlite')
renderer = pystache.Renderer()
index_html = open("html/index.html", "w")
index_html.write(renderer.render(Index(conn)))
index_html.close()
for (unused, stop_name) in get_stops(conn):
f = open("html/" + urlify_name(stop_name), "w")
stop = Stop(urlify_name(stop_name), conn)
f.write(renderer.render(stop))
f.close()
if __name__ == "__main__":
main(sys.argv)