gpx_clean/gpx_stats.py

499 lines
18 KiB
Python
Executable file

#!/home/enigma/.virtualenvs/gpx_clean/bin/python
import configparser
from collections import defaultdict
import datetime
import io
import json
import os
import os.path
from pathlib import Path
import re
import sqlite3
import sys
import matplotlib.pyplot as plt
import matplotlib.dates as dates
import numpy as np
import gpxpy
from xdg import xdg_config_home
from clean import clean_gpx
class flike():
def __init__(self):
self.internal = ""
def write(self, string):
self.internal += string
CREATE_SQL = """CREATE TABLE IF NOT EXISTS cache(
filename text PRIMARY KEY, distance numeric, time numeric, speed numeric, date timestamp)"""
SELECT_SQL = """SELECT distance, time, speed, date FROM cache
WHERE filename = ?"""
SELECT_PERIOD_SQL = """SELECT distance, time, speed, date FROM cache
WHERE date >= ? AND date <= ? ORDER BY date ASC"""
INSERT_SQL = """INSERT OR REPLACE INTO cache(filename, distance, time, speed, date)
values (?, ?, ?, ?, ?)"""
RESCAN = False
def adapt_datetime_iso(val):
"""Adapt datetime.datetime to timezone-aware ISO 8601 date using system timezone??."""
return val.astimezone().strftime("%Y-%m-%d %H:%M:%S.%f%:z")
def adapt_date_iso(val):
"""Adapt datetime.datetime to timezone-aware ISO 8601 date using system timezone??."""
return val.strftime("%Y-%m-%d")
def datetime_from_str(date_str):
date_str = date_str[:22] + date_str[-2:]
try:
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z')
except ValueError:
try:
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f%z')
except ValueError:
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f')
return date
def date_from_str(date_str):
date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
print(f"{date_str} to {date}")
return date
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
sqlite3.register_adapter(datetime.date, adapt_date_iso)
sqlite3.register_converter("datetime", datetime_from_str)
sqlite3.register_converter("date", date_from_str)
def get_file_details(filename):
con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3')
con.execute(CREATE_SQL)
if not RESCAN:
for row in con.execute(SELECT_SQL, (filename,)):
return row[0], row[1], row[2], row[3]
with open(filename, 'r') as infile:
gpx = gpxpy.parse(infile)
start, end = gpx.get_time_bounds()
if not start or not end:
return
if (end - start).total_seconds() < 60:
return
clean_gpx(gpx)
md = gpx.get_moving_data()
distance = md.moving_distance / 1000
time = md.moving_time
if time == 0:
return
speed = distance / (time / 3600)
con.execute(INSERT_SQL, (filename, distance, time, speed, start))
con.commit()
return distance, time, speed, start
def reset():
for exer in TRACKED:
exer['distance'] = 0.0
exer['time'] = 0
def check_file(filename):
try:
distance, time, speed, date = get_file_details(filename)
except TypeError:
sys.stderr.write(f"Error parsing \"{filename}\"\n")
return
for exer in TRACKED:
if 'maxspeed' in exer and speed > exer['maxspeed']:
#print(f"Not {exer['name']} as too fast: {filename}")
continue
if 'minspeed' in exer and speed < exer['minspeed']:
#print(f"Not {exer['name']} as too fast: {filename}")
continue
if 'maxdist' in exer and distance > exer['maxdist']:
#print(f"Not {exer['name']} as too long: {filename}")
continue
exer['distance'] += distance
exer['time'] += time
return
def exer_type(distance, time, speed, date):
for exer in TRACKED:
if 'maxspeed' in exer and speed > exer['maxspeed']:
#print(f"Not {exer['name']} as too fast: {filename}")
continue
if 'minspeed' in exer and speed < exer['minspeed']:
#print(f"Not {exer['name']} as too fast: {filename}")
continue
if 'maxdist' in exer and distance > exer['maxdist']:
#print(f"Not {exer['name']} as too long: {filename}")
continue
return exer['describe']
raise Exception('Could not determine exercise type')
def walk(location):
for root, dirs, files in os.walk(location):
for filename in files:
if filename[-4:] == ".gpx":
check_file(os.path.join(root, filename))
def make_pretty_td(seconds):
string = ""
minutes, seconds = divmod(int(seconds), 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
string += f"{hours}h, "
if hours + minutes > 0:
string += f"{minutes:02}m, "
string += f"{seconds:02}s"
return f"{string:13}"
def summarise(descr=""):
for exer in TRACKED:
duration = exer['time']
if duration < 60:
continue
time = make_pretty_td(duration)
speed = exer['distance'] / (exer['time'] / 3600)
pace = f"{int(60 / speed)}:{round((3600/speed) % 60):>02.0f}"
s = f"{exer['describe']:7} {exer['distance']:6.2f} km in {time} ({speed:>5.2f} km/h) ({pace} m/km)"
if descr:
s += f" ({descr})"
print(s)
WIDTH=0.2
SPACE=0.05
def plt_line(data, axes, desc, nbins=31):
bins = range(nbins)
bins = np.append(bins, [np.inf])
if not data:
return
days=[x[0] for x in data]
weights=[x[1] for x in data]
axes.hist(days,
bins=bins,
weights=weights,
cumulative=True,
histtype='step',
label=desc)
def _linedata(mode, start, end):
data = {}
for exer in TRACKED:
data[exer['describe']] = []
con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3')
cursor = con.cursor()
end = adapt_datetime_iso(datetime.datetime(end.year, end.month, end.day, 23, 59, 59))
cursor.execute(SELECT_PERIOD_SQL, (start, end))
lastday = 0
for row in cursor.fetchall():
dist, time, speed, date_str = row
try:
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z')
except ValueError:
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f%z')
number = date.timetuple().tm_yday
lastday = max(number, lastday)
if mode == 'time':
value = time / 3600
else:
value = dist
data[exer_type(dist, time, speed, date)].append([number, value])
return data, lastday
def lines(mode='distance', duration='thismonth'):
plt.style.use('dark_background')
fig = plt.figure(figsize=(12,6))
ax = fig.subplots()
data, lastday = _linedata(mode,
datetime.date.today().replace(day=1).replace(month=1),
datetime.date.today() + datetime.timedelta(days=1),
)
for exer in TRACKED:
plt_line(data[exer['describe']], ax, exer['describe'], nbins=lastday)
data, lastday = _linedata(mode,
(datetime.date.today().replace(day=1).replace(month=1)
- datetime.timedelta(days=1)).replace(day=1).replace(month=1),
datetime.date.today().replace(day=1).replace(month=1)
- datetime.timedelta(days=1),
)
for exer in TRACKED:
plt_line(data[exer['describe']], ax, exer['describe'] + " (prev. year)", nbins=lastday)
ax.legend(loc='upper left')
ax2 = ax.twiny()
# Time hierarchy: Month / day (1/15), then day (8/22)
# Axis 2: Month
ax2.xaxis.set_major_locator(dates.MonthLocator())
ax2.xaxis.set_major_formatter(dates.DateFormatter('%b'))
# Axis 1: Major (Days 1/15)
ax.xaxis.set_major_locator(dates.DayLocator(bymonthday=[1, 15]))
ax.xaxis.set_major_formatter(dates.DateFormatter('%-d'))
ax.xaxis.set_tick_params(labelsize=8)
#ax.set_xlim(datetime.date.today().replace(day=1).replace(month=1),
# (datetime.date.today() + datetime.timedelta(days=365)).replace(day=1,month=1))
ax.set_xlim(0, 366)
# Axis 1: Minor (Days 8 / 22)
ax.xaxis.set_minor_locator(dates.DayLocator(bymonthday=[8, 22]))
ax.xaxis.set_minor_formatter(dates.DateFormatter('%-d'))
ax.xaxis.set_tick_params(which="minor", labelsize=5)
# This is a lot of work just to move the second axis to the bottom
# of the chart (we made room for it with subplots_adjust above)
ax2.set_xlim(ax.get_xlim())
ax2.spines["bottom"].set_position(("axes", -0.05)) # position of text.
ax2.spines["bottom"].set_visible(False) # don't show the axis line
ax2.xaxis.set_ticks_position("bottom")
ax2.xaxis.set_label_position("bottom")
#ax2.xaxis.set_tick_params(grid_visible=False) # already have ax's grid
ax2.xaxis.set_tick_params(bottom=False, top=False) # already have ax's grid
for label in ax2.xaxis.get_ticklabels():
label.set_horizontalalignment('left')
# ax2.patch.set_visible(False)
# for sp in ax2.spines.values(): sp.set_visible(False)
if mode == "time":
ax.set_title('Tracked time (hours)')
else:
ax.set_title('Tracked distance (km)')
svg = io.StringIO()
fig.savefig(svg, format="svg")
print(svg.getvalue())
WIDTH=0.2
SPACE=0.05
def plt_bar(data, axes, desc, start):
x = np.arange(len(data.keys()))
bars = axes.bar(x+start,
list(data.values()),
width=WIDTH,
tick_label=list(data.keys()),
label=desc)
return bars
def bars(mode='distance', num_months=12):
dists = defaultdict(lambda: defaultdict(int))
times = defaultdict(lambda: defaultdict(int))
start = datetime.date.today().replace(day=1)
for _ in range(num_months - 1):
# Back one day to previous month
start -= datetime.timedelta(days=1)
# And set day to 1
start = start.replace(day=1)
# Now ready to print
for _ in range(num_months):
end = (start + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1)
con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3')
cursor = con.cursor()
end = datetime.datetime(end.year, end.month, end.day, 23, 59, 59)
cursor.execute(SELECT_PERIOD_SQL, (start, end))
for row in cursor.fetchall():
dist, time, speed, date_str = row
date = date_from_str(date_str) #datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z')
month = date.strftime("%b")
dists[exer_type(dist, time, speed, date)][month] += dist
times[exer_type(dist, time, speed, date)][month] += time / 3600
start = end + datetime.timedelta(days=1)
if mode == 'distance':
data = dists
elif mode == 'time':
data = times
total_width = len(data.keys())*WIDTH + (len(data.keys())-1)*SPACE
start = -(total_width)/2
plt.style.use('dark_background')
fig, ax = plt.subplots(figsize=(12,6))
for exer in TRACKED:
if sum(data[exer['describe']].values()) == 0:
continue
bars = plt_bar(data[exer['describe']], ax, exer['describe'], start)
start += WIDTH + SPACE
last = data[TRACKED[-1]['describe']]
x = np.arange(len(last.keys()))
ax.set_xticks(x - WIDTH/2)
ax.set_xticklabels(list(last.keys()))
ax.legend()
if mode == "time":
ax.set_title('Tracked time (hours)')
else:
ax.set_title('Tracked distance (km)')
svg = io.StringIO()
fig.savefig(svg, format="svg")
print(svg.getvalue())
def textout(start=datetime.date.today().replace(day=1), end=(datetime.date.today().replace(day=1) + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1), each=False):
dists = defaultdict(int)
times = defaultdict(int)
con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3')
cursor = con.cursor()
start_s = adapt_datetime_iso(datetime.datetime(start.year, start.month, start.day, 0, 0, 0))
# End should be end of day
end = adapt_datetime_iso(datetime.datetime(end.year, end.month, end.day, 23, 59, 59))
cursor.execute(SELECT_PERIOD_SQL, (start_s, end))
for row in cursor.fetchall():
dist, time, speed, date_str = row
date_str = date_str[:22] + date_str[-2:]
try:
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z')
except ValueError:
try:
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f%z')
except ValueError:
date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f')
if each:
pace = f"{int(60 / speed)}:{round((3600/speed) % 60):>02.0f}"
s = f"{exer_type(dist, time, speed, date):7} on {date.strftime('%d %b, %H:%M')} {dist:6.2f} km in {make_pretty_td(time)} ({speed:>5.2f} km/h) ({pace} m/km)"
print(s)
else:
dists[exer_type(dist, time, speed, date)] += dist
times[exer_type(dist, time, speed, date)] += time
for exer in TRACKED:
duration = times[exer['describe']]
if duration < 60:
continue
time = make_pretty_td(duration)
speed = dists[exer['describe']] / (duration / 3600)
pace = f"{int(60 / speed)}:{round((3600/speed) % 60):>02.0f}"
s = f"{exer['describe']:7} {dists[exer['describe']]:6.2f} km in {time} ({speed:>5.2f} km/h) ({pace} m/km)"
print(s)
OPENTRACKS_RE = re.compile(r'^(\d{4})-(\d{2})-(\d{2})')
def move_files(directory):
"""For each file in the directory (not in subdirectories!) put it into a
subfolder for year / month (i.e. 2023/03/)
"""
root = Path(directory)
with os.scandir(root) as it:
for entry in it:
if not entry.is_file():
continue
if entry.stat().st_size == 0 and entry.name.endswith(".gpx"):
os.remove(entry)
print(f"WARNING: Removing empty file {entry.name}")
continue
if match := OPENTRACKS_RE.search(entry.name):
year = Path(match.group(1))
month = Path(match.group(2))
if not (root / year).is_dir():
os.mkdir(root / year)
if not (root / year / month).is_dir():
os.mkdir(root / year / month)
os.rename(entry, root / year / month / entry.name)
try:
with open(xdg_config_home() / 'gpx_clean' / 'track.json', 'r') as infile:
everything = json.load(infile)
TRACKED = everything["modes"]
except FileNotFoundError as e:
print("Could not find any tracking info in {e.msg}")
raise e
CONFIG = configparser.ConfigParser()
CONFIG['main'] = {}
try:
CONFIG.read(xdg_config_home() / 'gpx_clean' / 'config.ini')
except FileNotFoundError as e:
print("Could not find the config file, please create one and specify a data directory")
raise e
if 'datadir' not in CONFIG['main']:
raise Exception("Could not find the data directory in the config file")
if __name__ == "__main__":
# Move files that are just in the data dir, not subdirectories
move_files(CONFIG['main']['datadir'])
if len(sys.argv) >= 2:
if sys.argv[1] == "--rescan":
RESCAN = True
# TODO Do a scan of the data dir, complete
walk(CONFIG['main']['datadir'])
if sys.argv[1] == "bars":
if sys.argv[2] == "time":
bars('time')
else:
bars('distance')
elif sys.argv[1] == "lines":
if len(sys.argv) > 3:
exer_types = sys.argv[3]
else:
exer_types = None
if len(sys.argv) > 2 and sys.argv[2] == "time":
lines('time')
else:
lines('distance')
elif sys.argv[1] == "monthly":
NUM_MONTHS = 13
start = datetime.date.today().replace(day=1)
for _ in range(NUM_MONTHS - 1):
# Back one day to previous month
start -= datetime.timedelta(days=1)
# And set day to 1
start = start.replace(day=1)
# Now ready to print
for _ in range(NUM_MONTHS):
print(f"Summary for {start.strftime('%b')} {start.year}")
end = (start + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1)
textout(start, end)
start = end + datetime.timedelta(days=1)
elif sys.argv[1] == "yearly":
NUM_YEARS = 5
start = datetime.date.today().replace(day=1).replace(month=1)
for _ in range(NUM_YEARS - 1):
# Back one day to previous year
start -= datetime.timedelta(days=1)
# And set day to 1
start = start.replace(day=1).replace(month=1)
# Now ready to print
for _ in range(NUM_YEARS):
print(f"Summary for {start.year}")
end = (start + datetime.timedelta(days=367)).replace(day=1) - datetime.timedelta(days=1)
textout(start, end)
start = end + datetime.timedelta(days=1)
elif sys.argv[1] == "month":
start = None
if len(sys.argv) >= 3:
try:
d = datetime.datetime.strptime(sys.argv[2], '%b')
start = datetime.date.today().replace(month=d.month).replace(day=1)
except ValueError:
d = datetime.datetime.strptime(sys.argv[2], '%B')
start = datetime.date.today().replace(month=d.month).replace(day=1)
if start is None:
start = datetime.date.today().replace(day=1)
end = (start + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1)
textout(start, end, each=True)