496 lines
18 KiB
Python
Executable file
496 lines
18 KiB
Python
Executable file
#!/home/enigma/.virtualenvs/gpx_clean/bin/python
|
|
|
|
import configparser
|
|
from collections import defaultdict
|
|
import datetime
|
|
import io
|
|
import json
|
|
import os
|
|
import os.path
|
|
from pathlib import Path
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.dates as dates
|
|
import numpy as np
|
|
import gpxpy
|
|
from xdg import xdg_config_home
|
|
|
|
from clean import clean_gpx
|
|
|
|
class flike():
|
|
def __init__(self):
|
|
self.internal = ""
|
|
|
|
def write(self, string):
|
|
self.internal += string
|
|
|
|
|
|
|
|
CREATE_SQL = """CREATE TABLE IF NOT EXISTS cache(
|
|
filename text PRIMARY KEY, distance numeric, time numeric, speed numeric, date timestamp)"""
|
|
|
|
SELECT_SQL = """SELECT distance, time, speed, date FROM cache
|
|
WHERE filename = ?"""
|
|
|
|
SELECT_PERIOD_SQL = """SELECT distance, time, speed, date FROM cache
|
|
WHERE date >= ? AND date <= ? ORDER BY date ASC"""
|
|
|
|
|
|
INSERT_SQL = """INSERT OR REPLACE INTO cache(filename, distance, time, speed, date)
|
|
values (?, ?, ?, ?, ?)"""
|
|
|
|
RESCAN = False
|
|
|
|
|
|
def adapt_datetime_iso(val):
|
|
"""Adapt datetime.datetime to timezone-aware ISO 8601 date using system timezone??."""
|
|
return val.astimezone().strftime("%Y-%m-%d %H:%M:%S.%f%:z")
|
|
|
|
|
|
def adapt_date_iso(val):
|
|
"""Adapt datetime.datetime to timezone-aware ISO 8601 date using system timezone??."""
|
|
return val.strftime("%Y-%m-%d")
|
|
|
|
|
|
def datetime_from_str(date_str):
|
|
date_str = date_str[:22] + date_str[-2:]
|
|
try:
|
|
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z')
|
|
except ValueError:
|
|
try:
|
|
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f%z')
|
|
except ValueError:
|
|
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f')
|
|
return date
|
|
|
|
|
|
def date_from_str(date_str):
|
|
date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
|
|
print(f"{date_str} to {date}")
|
|
return date
|
|
|
|
|
|
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
|
|
sqlite3.register_adapter(datetime.date, adapt_date_iso)
|
|
sqlite3.register_converter("datetime", datetime_from_str)
|
|
sqlite3.register_converter("date", date_from_str)
|
|
|
|
|
|
def get_file_details(filename):
|
|
con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3')
|
|
con.execute(CREATE_SQL)
|
|
if not RESCAN:
|
|
for row in con.execute(SELECT_SQL, (filename,)):
|
|
return row[0], row[1], row[2], row[3]
|
|
with open(filename, 'r') as infile:
|
|
gpx = gpxpy.parse(infile)
|
|
start, end = gpx.get_time_bounds()
|
|
if not start or not end:
|
|
return
|
|
if (end - start).total_seconds() < 60:
|
|
return
|
|
clean_gpx(gpx)
|
|
md = gpx.get_moving_data()
|
|
distance = md.moving_distance / 1000
|
|
time = md.moving_time
|
|
if time == 0:
|
|
return
|
|
speed = distance / (time / 3600)
|
|
con.execute(INSERT_SQL, (filename, distance, time, speed, start))
|
|
con.commit()
|
|
return distance, time, speed, start
|
|
|
|
|
|
def reset():
|
|
for exer in TRACKED:
|
|
exer['distance'] = 0.0
|
|
exer['time'] = 0
|
|
|
|
|
|
def check_file(filename):
|
|
try:
|
|
distance, time, speed, date = get_file_details(filename)
|
|
except TypeError:
|
|
sys.stderr.write(f"Error parsing \"{filename}\"\n")
|
|
return
|
|
for exer in TRACKED:
|
|
if 'maxspeed' in exer and speed > exer['maxspeed']:
|
|
#print(f"Not {exer['name']} as too fast: {filename}")
|
|
continue
|
|
if 'minspeed' in exer and speed < exer['minspeed']:
|
|
#print(f"Not {exer['name']} as too fast: {filename}")
|
|
continue
|
|
if 'maxdist' in exer and distance > exer['maxdist']:
|
|
#print(f"Not {exer['name']} as too long: {filename}")
|
|
continue
|
|
exer['distance'] += distance
|
|
exer['time'] += time
|
|
return
|
|
|
|
|
|
def exer_type(distance, time, speed, date):
|
|
for exer in TRACKED:
|
|
if 'maxspeed' in exer and speed > exer['maxspeed']:
|
|
#print(f"Not {exer['name']} as too fast: {filename}")
|
|
continue
|
|
if 'minspeed' in exer and speed < exer['minspeed']:
|
|
#print(f"Not {exer['name']} as too fast: {filename}")
|
|
continue
|
|
if 'maxdist' in exer and distance > exer['maxdist']:
|
|
#print(f"Not {exer['name']} as too long: {filename}")
|
|
continue
|
|
return exer['describe']
|
|
raise Exception('Could not determine exercise type')
|
|
|
|
|
|
def walk(location):
|
|
for root, dirs, files in os.walk(location):
|
|
for filename in files:
|
|
if filename[-4:] == ".gpx":
|
|
check_file(os.path.join(root, filename))
|
|
|
|
|
|
def make_pretty_td(seconds):
|
|
string = ""
|
|
minutes, seconds = divmod(int(seconds), 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
if hours > 0:
|
|
string += f"{hours}h, "
|
|
if hours + minutes > 0:
|
|
string += f"{minutes:02}m, "
|
|
string += f"{seconds:02}s"
|
|
return f"{string:13}"
|
|
|
|
|
|
def summarise(descr=""):
|
|
for exer in TRACKED:
|
|
duration = exer['time']
|
|
if duration < 60:
|
|
continue
|
|
time = make_pretty_td(duration)
|
|
speed = exer['distance'] / (exer['time'] / 3600)
|
|
s = f"{exer['describe']:7} {exer['distance']:6.2f} km in {time} ({speed:>5.2f} km/h)"
|
|
if descr:
|
|
s += f" ({descr})"
|
|
print(s)
|
|
|
|
|
|
WIDTH=0.2
|
|
SPACE=0.05
|
|
|
|
|
|
def plt_line(data, axes, desc, nbins=31):
|
|
bins = range(nbins)
|
|
bins = np.append(bins, [np.inf])
|
|
if not data:
|
|
return
|
|
days=[x[0] for x in data]
|
|
weights=[x[1] for x in data]
|
|
axes.hist(days,
|
|
bins=bins,
|
|
weights=weights,
|
|
cumulative=True,
|
|
histtype='step',
|
|
label=desc)
|
|
|
|
def _linedata(mode, start, end):
|
|
data = {}
|
|
for exer in TRACKED:
|
|
data[exer['describe']] = []
|
|
con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3')
|
|
cursor = con.cursor()
|
|
end = adapt_datetime_iso(datetime.datetime(end.year, end.month, end.day, 23, 59, 59))
|
|
cursor.execute(SELECT_PERIOD_SQL, (start, end))
|
|
lastday = 0
|
|
for row in cursor.fetchall():
|
|
dist, time, speed, date_str = row
|
|
try:
|
|
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z')
|
|
except ValueError:
|
|
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f%z')
|
|
number = date.timetuple().tm_yday
|
|
lastday = max(number, lastday)
|
|
if mode == 'time':
|
|
value = time / 3600
|
|
else:
|
|
value = dist
|
|
data[exer_type(dist, time, speed, date)].append([number, value])
|
|
return data, lastday
|
|
|
|
|
|
def lines(mode='distance', duration='thismonth'):
|
|
plt.style.use('dark_background')
|
|
fig = plt.figure(figsize=(12,6))
|
|
ax = fig.subplots()
|
|
data, lastday = _linedata(mode,
|
|
datetime.date.today().replace(day=1).replace(month=1),
|
|
datetime.date.today() + datetime.timedelta(days=1),
|
|
)
|
|
for exer in TRACKED:
|
|
plt_line(data[exer['describe']], ax, exer['describe'], nbins=lastday)
|
|
data, lastday = _linedata(mode,
|
|
(datetime.date.today().replace(day=1).replace(month=1)
|
|
- datetime.timedelta(days=1)).replace(day=1).replace(month=1),
|
|
datetime.date.today().replace(day=1).replace(month=1)
|
|
- datetime.timedelta(days=1),
|
|
)
|
|
for exer in TRACKED:
|
|
plt_line(data[exer['describe']], ax, exer['describe'] + " (prev. year)", nbins=lastday)
|
|
ax.legend(loc='upper left')
|
|
|
|
ax2 = ax.twiny()
|
|
|
|
# Time hierarchy: Month / day (1/15), then day (8/22)
|
|
# Axis 2: Month
|
|
ax2.xaxis.set_major_locator(dates.MonthLocator())
|
|
ax2.xaxis.set_major_formatter(dates.DateFormatter('%b'))
|
|
|
|
# Axis 1: Major (Days 1/15)
|
|
ax.xaxis.set_major_locator(dates.DayLocator(bymonthday=[1, 15]))
|
|
ax.xaxis.set_major_formatter(dates.DateFormatter('%-d'))
|
|
ax.xaxis.set_tick_params(labelsize=8)
|
|
#ax.set_xlim(datetime.date.today().replace(day=1).replace(month=1),
|
|
# (datetime.date.today() + datetime.timedelta(days=365)).replace(day=1,month=1))
|
|
ax.set_xlim(0, 366)
|
|
|
|
# Axis 1: Minor (Days 8 / 22)
|
|
ax.xaxis.set_minor_locator(dates.DayLocator(bymonthday=[8, 22]))
|
|
ax.xaxis.set_minor_formatter(dates.DateFormatter('%-d'))
|
|
ax.xaxis.set_tick_params(which="minor", labelsize=5)
|
|
|
|
# This is a lot of work just to move the second axis to the bottom
|
|
# of the chart (we made room for it with subplots_adjust above)
|
|
ax2.set_xlim(ax.get_xlim())
|
|
ax2.spines["bottom"].set_position(("axes", -0.05)) # position of text.
|
|
ax2.spines["bottom"].set_visible(False) # don't show the axis line
|
|
ax2.xaxis.set_ticks_position("bottom")
|
|
ax2.xaxis.set_label_position("bottom")
|
|
#ax2.xaxis.set_tick_params(grid_visible=False) # already have ax's grid
|
|
ax2.xaxis.set_tick_params(bottom=False, top=False) # already have ax's grid
|
|
for label in ax2.xaxis.get_ticklabels():
|
|
label.set_horizontalalignment('left')
|
|
# ax2.patch.set_visible(False)
|
|
# for sp in ax2.spines.values(): sp.set_visible(False)
|
|
|
|
|
|
if mode == "time":
|
|
ax.set_title('Tracked time (hours)')
|
|
else:
|
|
ax.set_title('Tracked distance (km)')
|
|
svg = io.StringIO()
|
|
fig.savefig(svg, format="svg")
|
|
print(svg.getvalue())
|
|
|
|
|
|
WIDTH=0.2
|
|
SPACE=0.05
|
|
|
|
|
|
def plt_bar(data, axes, desc, start):
|
|
x = np.arange(len(data.keys()))
|
|
bars = axes.bar(x+start,
|
|
list(data.values()),
|
|
width=WIDTH,
|
|
tick_label=list(data.keys()),
|
|
label=desc)
|
|
return bars
|
|
|
|
|
|
def bars(mode='distance', num_months=12):
|
|
dists = defaultdict(lambda: defaultdict(int))
|
|
times = defaultdict(lambda: defaultdict(int))
|
|
start = datetime.date.today().replace(day=1)
|
|
for _ in range(num_months - 1):
|
|
# Back one day to previous month
|
|
start -= datetime.timedelta(days=1)
|
|
# And set day to 1
|
|
start = start.replace(day=1)
|
|
# Now ready to print
|
|
for _ in range(num_months):
|
|
end = (start + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1)
|
|
con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3')
|
|
cursor = con.cursor()
|
|
end = datetime.datetime(end.year, end.month, end.day, 23, 59, 59)
|
|
cursor.execute(SELECT_PERIOD_SQL, (start, end))
|
|
for row in cursor.fetchall():
|
|
dist, time, speed, date_str = row
|
|
date = date_from_str(date_str) #datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z')
|
|
month = date.strftime("%b")
|
|
dists[exer_type(dist, time, speed, date)][month] += dist
|
|
times[exer_type(dist, time, speed, date)][month] += time / 3600
|
|
start = end + datetime.timedelta(days=1)
|
|
if mode == 'distance':
|
|
data = dists
|
|
elif mode == 'time':
|
|
data = times
|
|
total_width = len(data.keys())*WIDTH + (len(data.keys())-1)*SPACE
|
|
start = -(total_width)/2
|
|
plt.style.use('dark_background')
|
|
fig, ax = plt.subplots(figsize=(12,6))
|
|
for exer in TRACKED:
|
|
if sum(data[exer['describe']].values()) == 0:
|
|
continue
|
|
bars = plt_bar(data[exer['describe']], ax, exer['describe'], start)
|
|
start += WIDTH + SPACE
|
|
last = data[TRACKED[-1]['describe']]
|
|
x = np.arange(len(last.keys()))
|
|
ax.set_xticks(x - WIDTH/2)
|
|
ax.set_xticklabels(list(last.keys()))
|
|
ax.legend()
|
|
if mode == "time":
|
|
ax.set_title('Tracked time (hours)')
|
|
else:
|
|
ax.set_title('Tracked distance (km)')
|
|
svg = io.StringIO()
|
|
fig.savefig(svg, format="svg")
|
|
print(svg.getvalue())
|
|
|
|
|
|
def textout(start=datetime.date.today().replace(day=1), end=(datetime.date.today().replace(day=1) + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1), each=False):
|
|
dists = defaultdict(int)
|
|
times = defaultdict(int)
|
|
con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3')
|
|
cursor = con.cursor()
|
|
start_s = adapt_datetime_iso(datetime.datetime(start.year, start.month, start.day, 0, 0, 0))
|
|
# End should be end of day
|
|
end = adapt_datetime_iso(datetime.datetime(end.year, end.month, end.day, 23, 59, 59))
|
|
cursor.execute(SELECT_PERIOD_SQL, (start_s, end))
|
|
for row in cursor.fetchall():
|
|
dist, time, speed, date_str = row
|
|
date_str = date_str[:22] + date_str[-2:]
|
|
try:
|
|
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z')
|
|
except ValueError:
|
|
try:
|
|
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f%z')
|
|
except ValueError:
|
|
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f')
|
|
if each:
|
|
s = f"{exer_type(dist, time, speed, date):7} on {date.strftime('%d %b, %H:%M')} {dist:6.2f} km in {make_pretty_td(time)} ({speed:>5.2f} km/h)"
|
|
print(s)
|
|
else:
|
|
dists[exer_type(dist, time, speed, date)] += dist
|
|
times[exer_type(dist, time, speed, date)] += time
|
|
for exer in TRACKED:
|
|
duration = times[exer['describe']]
|
|
if duration < 60:
|
|
continue
|
|
time = make_pretty_td(duration)
|
|
speed = dists[exer['describe']] / (duration / 3600)
|
|
s = f"{exer['describe']:7} {dists[exer['describe']]:6.2f} km in {time} ({speed:>5.2f} km/h)"
|
|
print(s)
|
|
|
|
|
|
OPENTRACKS_RE = re.compile(r'^(\d{4})-(\d{2})-(\d{2})')
|
|
|
|
def move_files(directory):
|
|
"""For each file in the directory (not in subdirectories!) put it into a
|
|
subfolder for year / month (i.e. 2023/03/)
|
|
"""
|
|
root = Path(directory)
|
|
with os.scandir(root) as it:
|
|
for entry in it:
|
|
if not entry.is_file():
|
|
continue
|
|
if entry.stat().st_size == 0 and entry.name.endswith(".gpx"):
|
|
os.remove(entry)
|
|
print(f"WARNING: Removing empty file {entry.name}")
|
|
continue
|
|
if match := OPENTRACKS_RE.search(entry.name):
|
|
year = Path(match.group(1))
|
|
month = Path(match.group(2))
|
|
if not (root / year).is_dir():
|
|
os.mkdir(root / year)
|
|
if not (root / year / month).is_dir():
|
|
os.mkdir(root / year / month)
|
|
os.rename(entry, root / year / month / entry.name)
|
|
|
|
|
|
try:
|
|
with open(xdg_config_home() / 'gpx_clean' / 'track.json', 'r') as infile:
|
|
everything = json.load(infile)
|
|
TRACKED = everything["modes"]
|
|
except FileNotFoundError as e:
|
|
print("Could not find any tracking info in {e.msg}")
|
|
raise e
|
|
|
|
|
|
CONFIG = configparser.ConfigParser()
|
|
CONFIG['main'] = {}
|
|
|
|
try:
|
|
CONFIG.read(xdg_config_home() / 'gpx_clean' / 'config.ini')
|
|
except FileNotFoundError as e:
|
|
print("Could not find the config file, please create one and specify a data directory")
|
|
raise e
|
|
|
|
if 'datadir' not in CONFIG['main']:
|
|
raise Exception("Could not find the data directory in the config file")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Move files that are just in the data dir, not subdirectories
|
|
move_files(CONFIG['main']['datadir'])
|
|
if len(sys.argv) >= 2:
|
|
if sys.argv[1] == "--rescan":
|
|
RESCAN = True
|
|
# TODO Do a scan of the data dir, complete
|
|
walk(CONFIG['main']['datadir'])
|
|
if sys.argv[1] == "bars":
|
|
if sys.argv[2] == "time":
|
|
bars('time')
|
|
else:
|
|
bars('distance')
|
|
elif sys.argv[1] == "lines":
|
|
if len(sys.argv) > 3:
|
|
exer_types = sys.argv[3]
|
|
else:
|
|
exer_types = None
|
|
if len(sys.argv) > 2 and sys.argv[2] == "time":
|
|
lines('time')
|
|
else:
|
|
lines('distance')
|
|
elif sys.argv[1] == "monthly":
|
|
NUM_MONTHS = 13
|
|
start = datetime.date.today().replace(day=1)
|
|
for _ in range(NUM_MONTHS - 1):
|
|
# Back one day to previous month
|
|
start -= datetime.timedelta(days=1)
|
|
# And set day to 1
|
|
start = start.replace(day=1)
|
|
# Now ready to print
|
|
for _ in range(NUM_MONTHS):
|
|
print(f"Summary for {start.strftime('%b')} {start.year}")
|
|
end = (start + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1)
|
|
textout(start, end)
|
|
start = end + datetime.timedelta(days=1)
|
|
elif sys.argv[1] == "yearly":
|
|
NUM_YEARS = 5
|
|
start = datetime.date.today().replace(day=1).replace(month=1)
|
|
for _ in range(NUM_YEARS - 1):
|
|
# Back one day to previous year
|
|
start -= datetime.timedelta(days=1)
|
|
# And set day to 1
|
|
start = start.replace(day=1).replace(month=1)
|
|
# Now ready to print
|
|
for _ in range(NUM_YEARS):
|
|
print(f"Summary for {start.year}")
|
|
end = (start + datetime.timedelta(days=367)).replace(day=1) - datetime.timedelta(days=1)
|
|
textout(start, end)
|
|
start = end + datetime.timedelta(days=1)
|
|
elif sys.argv[1] == "month":
|
|
start = None
|
|
if len(sys.argv) >= 3:
|
|
try:
|
|
d = datetime.datetime.strptime(sys.argv[2], '%b')
|
|
start = datetime.date.today().replace(month=d.month).replace(day=1)
|
|
except ValueError:
|
|
d = datetime.datetime.strptime(sys.argv[2], '%B')
|
|
start = datetime.date.today().replace(month=d.month).replace(day=1)
|
|
if start is None:
|
|
start = datetime.date.today().replace(day=1)
|
|
end = (start + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1)
|
|
textout(start, end, each=True)
|