#!/home/enigma/.virtualenvs/gpx_clean/bin/python import configparser from collections import defaultdict import datetime import io import json import os import os.path from pathlib import Path import re import sqlite3 import sys import matplotlib.pyplot as plt import matplotlib.dates as dates import numpy as np import gpxpy from xdg import xdg_config_home from clean import clean_gpx class flike(): def __init__(self): self.internal = "" def write(self, string): self.internal += string CREATE_SQL = """CREATE TABLE IF NOT EXISTS cache( filename text PRIMARY KEY, distance numeric, time numeric, speed numeric, date timestamp)""" SELECT_SQL = """SELECT distance, time, speed, date FROM cache WHERE filename = ?""" SELECT_PERIOD_SQL = """SELECT distance, time, speed, date FROM cache WHERE date >= ? AND date <= ? ORDER BY date ASC""" INSERT_SQL = """INSERT OR REPLACE INTO cache(filename, distance, time, speed, date) values (?, ?, ?, ?, ?)""" RESCAN = False def date_from_str(date_str): date_str = date_str[:22] + date_str[-2:] try: date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z') except ValueError: try: date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f%z') except ValueError: date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f') return date def get_file_details(filename): con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3') con.execute(CREATE_SQL) if not RESCAN: for row in con.execute(SELECT_SQL, (filename,)): return row[0], row[1], row[2], row[3] with open(filename, 'r') as infile: gpx = gpxpy.parse(infile) start, end = gpx.get_time_bounds() if not start or not end: return if (end - start).total_seconds() < 60: return clean_gpx(gpx) md = gpx.get_moving_data() distance = md.moving_distance / 1000 time = md.moving_time if time == 0: return speed = distance / (time / 3600) con.execute(INSERT_SQL, (filename, distance, time, speed, start)) con.commit() return distance, time, speed, start def reset(): for exer in TRACKED: exer['distance'] = 0.0 exer['time'] = 0 def check_file(filename): try: distance, time, speed, date = get_file_details(filename) except TypeError: sys.stderr.write(f"Error parsing \"{filename}\"\n") return for exer in TRACKED: if 'maxspeed' in exer and speed > exer['maxspeed']: #print(f"Not {exer['name']} as too fast: {filename}") continue if 'minspeed' in exer and speed < exer['minspeed']: #print(f"Not {exer['name']} as too fast: {filename}") continue if 'maxdist' in exer and distance > exer['maxdist']: #print(f"Not {exer['name']} as too long: {filename}") continue exer['distance'] += distance exer['time'] += time return def exer_type(distance, time, speed, date): for exer in TRACKED: if 'maxspeed' in exer and speed > exer['maxspeed']: #print(f"Not {exer['name']} as too fast: {filename}") continue if 'minspeed' in exer and speed < exer['minspeed']: #print(f"Not {exer['name']} as too fast: {filename}") continue if 'maxdist' in exer and distance > exer['maxdist']: #print(f"Not {exer['name']} as too long: {filename}") continue return exer['describe'] raise Exception('Could not determine exercise type') def walk(location): for root, dirs, files in os.walk(location): for filename in files: if filename[-4:] == ".gpx": check_file(os.path.join(root, filename)) def make_pretty_td(seconds): string = "" minutes, seconds = divmod(int(seconds), 60) hours, minutes = divmod(minutes, 60) if hours > 0: string += f"{hours}h, " if hours + minutes > 0: string += f"{minutes:02}m, " string += f"{seconds:02}s" return f"{string:13}" def summarise(descr=""): for exer in TRACKED: duration = exer['time'] if duration < 60: continue time = make_pretty_td(duration) speed = exer['distance'] / (exer['time'] / 3600) s = f"{exer['describe']:7} {exer['distance']:6.2f} km in {time} ({speed:>5.2f} km/h)" if descr: s += f" ({descr})" print(s) WIDTH=0.2 SPACE=0.05 def plt_line(data, axes, desc, nbins=31): bins = range(nbins) bins = np.append(bins, [np.inf]) if not data: return days=[x[0] for x in data] weights=[x[1] for x in data] axes.hist(days, bins=bins, weights=weights, cumulative=True, histtype='step', label=desc) def _linedata(mode, start, end): data = {} for exer in TRACKED: data[exer['describe']] = [] con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3') cursor = con.cursor() end = datetime.datetime(end.year, end.month, end.day, 23, 59, 59) cursor.execute(SELECT_PERIOD_SQL, (start, end)) lastday = 0 for row in cursor.fetchall(): dist, time, speed, date_str = row try: date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z') except ValueError: date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f%z') number = date.timetuple().tm_yday lastday = max(number, lastday) if mode == 'time': value = time / 3600 else: value = dist data[exer_type(dist, time, speed, date)].append([number, value]) return data, lastday def lines(mode='distance', duration='thismonth'): plt.style.use('dark_background') fig = plt.figure(figsize=(12,6)) ax = fig.subplots() data, lastday = _linedata(mode, datetime.date.today().replace(day=1).replace(month=1), datetime.date.today() + datetime.timedelta(days=1), ) for exer in TRACKED: plt_line(data[exer['describe']], ax, exer['describe'], nbins=lastday) data, lastday = _linedata(mode, (datetime.date.today().replace(day=1).replace(month=1) - datetime.timedelta(days=1)).replace(day=1).replace(month=1), datetime.date.today().replace(day=1).replace(month=1) - datetime.timedelta(days=1), ) for exer in TRACKED: plt_line(data[exer['describe']], ax, exer['describe'] + " (prev. year)", nbins=lastday) ax.legend(loc='upper left') ax2 = ax.twiny() # Time hierarchy: Month / day (1/15), then day (8/22) # Axis 2: Month ax2.xaxis.set_major_locator(dates.MonthLocator()) ax2.xaxis.set_major_formatter(dates.DateFormatter('%b')) # Axis 1: Major (Days 1/15) ax.xaxis.set_major_locator(dates.DayLocator(bymonthday=[1, 15])) ax.xaxis.set_major_formatter(dates.DateFormatter('%-d')) ax.xaxis.set_tick_params(labelsize=8) #ax.set_xlim(datetime.date.today().replace(day=1).replace(month=1), # (datetime.date.today() + datetime.timedelta(days=365)).replace(day=1,month=1)) ax.set_xlim(0, 366) # Axis 1: Minor (Days 8 / 22) ax.xaxis.set_minor_locator(dates.DayLocator(bymonthday=[8, 22])) ax.xaxis.set_minor_formatter(dates.DateFormatter('%-d')) ax.xaxis.set_tick_params(which="minor", labelsize=5) # This is a lot of work just to move the second axis to the bottom # of the chart (we made room for it with subplots_adjust above) ax2.set_xlim(ax.get_xlim()) ax2.spines["bottom"].set_position(("axes", -0.05)) # position of text. ax2.spines["bottom"].set_visible(False) # don't show the axis line ax2.xaxis.set_ticks_position("bottom") ax2.xaxis.set_label_position("bottom") #ax2.xaxis.set_tick_params(grid_visible=False) # already have ax's grid ax2.xaxis.set_tick_params(bottom=False, top=False) # already have ax's grid for label in ax2.xaxis.get_ticklabels(): label.set_horizontalalignment('left') # ax2.patch.set_visible(False) # for sp in ax2.spines.values(): sp.set_visible(False) if mode == "time": ax.set_title('Tracked time (hours)') else: ax.set_title('Tracked distance (km)') svg = io.StringIO() fig.savefig(svg, format="svg") print(svg.getvalue()) WIDTH=0.2 SPACE=0.05 def plt_bar(data, axes, desc, start): x = np.arange(len(data.keys())) bars = axes.bar(x+start, list(data.values()), width=WIDTH, tick_label=list(data.keys()), label=desc) return bars def bars(mode='distance', num_months=12): dists = defaultdict(lambda: defaultdict(int)) times = defaultdict(lambda: defaultdict(int)) start = datetime.date.today().replace(day=1) for _ in range(num_months - 1): # Back one day to previous month start -= datetime.timedelta(days=1) # And set day to 1 start = start.replace(day=1) # Now ready to print for _ in range(num_months): end = (start + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1) con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3') cursor = con.cursor() end = datetime.datetime(end.year, end.month, end.day, 23, 59, 59) cursor.execute(SELECT_PERIOD_SQL, (start, end)) for row in cursor.fetchall(): dist, time, speed, date_str = row date = date_from_str(date_str) #datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z') month = date.strftime("%b") dists[exer_type(dist, time, speed, date)][month] += dist times[exer_type(dist, time, speed, date)][month] += time / 3600 start = end + datetime.timedelta(days=1) if mode == 'distance': data = dists elif mode == 'time': data = times total_width = len(data.keys())*WIDTH + (len(data.keys())-1)*SPACE start = -(total_width)/2 plt.style.use('dark_background') fig, ax = plt.subplots(figsize=(12,6)) for exer in TRACKED: if sum(data[exer['describe']].values()) == 0: continue bars = plt_bar(data[exer['describe']], ax, exer['describe'], start) start += WIDTH + SPACE last = data[TRACKED[-1]['describe']] x = np.arange(len(last.keys())) ax.set_xticks(x - WIDTH/2) ax.set_xticklabels(list(last.keys())) ax.legend() if mode == "time": ax.set_title('Tracked time (hours)') else: ax.set_title('Tracked distance (km)') svg = io.StringIO() fig.savefig(svg, format="svg") print(svg.getvalue()) def textout(start=datetime.date.today().replace(day=1), end=(datetime.date.today().replace(day=1) + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1), each=False): dists = defaultdict(int) times = defaultdict(int) con = sqlite3.connect(xdg_config_home() / 'gpx_clean' / 'cache.sqlite3') cursor = con.cursor() # End should be end of day end = datetime.datetime(end.year, end.month, end.day, 23, 59, 59) cursor.execute(SELECT_PERIOD_SQL, (start, end)) for row in cursor.fetchall(): dist, time, speed, date_str = row date_str = date_str[:22] + date_str[-2:] try: date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S%z') except ValueError: try: date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f%z') except ValueError: date = datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S.%f') if each: s = f"{exer_type(dist, time, speed, date):7} on {date.strftime('%d %b, %H:%M')} {dist:6.2f} km in {make_pretty_td(time)} ({speed:>5.2f} km/h)" print(s) else: dists[exer_type(dist, time, speed, date)] += dist times[exer_type(dist, time, speed, date)] += time for exer in TRACKED: duration = times[exer['describe']] if duration < 60: continue time = make_pretty_td(duration) speed = dists[exer['describe']] / (duration / 3600) s = f"{exer['describe']:7} {dists[exer['describe']]:6.2f} km in {time} ({speed:>5.2f} km/h)" print(s) OPENTRACKS_RE = re.compile(r'^(\d{4})-(\d{2})-(\d{2})') def move_files(directory): """For each file in the directory (not in subdirectories!) put it into a subfolder for year / month (i.e. 2023/03/) """ root = Path(directory) with os.scandir(root) as it: for entry in it: if not entry.is_file(): continue if entry.stat().st_size == 0 and entry.name.endswith(".gpx"): os.remove(entry) print(f"WARNING: Removing empty file {entry.name}") continue if match := OPENTRACKS_RE.search(entry.name): year = Path(match.group(1)) month = Path(match.group(2)) if not (root / year).is_dir(): os.mkdir(root / year) if not (root / year / month).is_dir(): os.mkdir(root / year / month) os.rename(entry, root / year / month / entry.name) try: with open(xdg_config_home() / 'gpx_clean' / 'track.json', 'r') as infile: everything = json.load(infile) TRACKED = everything["modes"] except FileNotFoundError as e: print("Could not find any tracking info in {e.msg}") raise e CONFIG = configparser.ConfigParser() CONFIG['main'] = {} try: CONFIG.read(xdg_config_home() / 'gpx_clean' / 'config.ini') except FileNotFoundError as e: print("Could not find the config file, please create one and specify a data directory") raise e if 'datadir' not in CONFIG['main']: raise Exception("Could not find the data directory in the config file") if __name__ == "__main__": # Move files that are just in the data dir, not subdirectories move_files(CONFIG['main']['datadir']) if len(sys.argv) >= 2: if sys.argv[1] == "--rescan": RESCAN = True # TODO Do a scan of the data dir, complete walk(CONFIG['main']['datadir']) if sys.argv[1] == "bars": if sys.argv[2] == "time": bars('time') else: bars('distance') elif sys.argv[1] == "lines": if len(sys.argv) > 3: exer_types = sys.argv[3] else: exer_types = None if len(sys.argv) > 2 and sys.argv[2] == "time": lines('time') else: lines('distance') elif sys.argv[1] == "monthly": NUM_MONTHS = 13 start = datetime.date.today().replace(day=1) for _ in range(NUM_MONTHS - 1): # Back one day to previous month start -= datetime.timedelta(days=1) # And set day to 1 start = start.replace(day=1) # Now ready to print for _ in range(NUM_MONTHS): print(f"Summary for {start.strftime('%b')} {start.year}") end = (start + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1) textout(start, end) start = end + datetime.timedelta(days=1) elif sys.argv[1] == "yearly": NUM_YEARS = 5 start = datetime.date.today().replace(day=1).replace(month=1) for _ in range(NUM_YEARS - 1): # Back one day to previous year start -= datetime.timedelta(days=1) # And set day to 1 start = start.replace(day=1).replace(month=1) # Now ready to print for _ in range(NUM_YEARS): print(f"Summary for {start.year}") end = (start + datetime.timedelta(days=367)).replace(day=1) - datetime.timedelta(days=1) textout(start, end) start = end + datetime.timedelta(days=1) elif sys.argv[1] == "month": start = None if len(sys.argv) >= 3: try: d = datetime.datetime.strptime(sys.argv[2], '%b') start = datetime.date.today().replace(month=d.month).replace(day=1) except ValueError: d = datetime.datetime.strptime(sys.argv[2], '%B') start = datetime.date.today().replace(month=d.month).replace(day=1) if start is None: start = datetime.date.today().replace(day=1) end = (start + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1) textout(start, end, each=True)