330 lines
11 KiB
Python
330 lines
11 KiB
Python
|
import datetime
|
||
|
import math
|
||
|
import os
|
||
|
import sqlite3
|
||
|
import sys
|
||
|
import time
|
||
|
|
||
|
|
||
|
UID_CHARACTERS = 16
|
||
|
RECENT_COUNT = 12
|
||
|
|
||
|
STRFTIME = '%Y %m %d %H:%M'
|
||
|
|
||
|
SQL_MEAL_ID = 0
|
||
|
SQL_MEAL_CREATED = 1
|
||
|
SQL_MEAL_HUMAN = 2
|
||
|
|
||
|
SQL_REL_FOOD = 1
|
||
|
|
||
|
SQL_GROUP_FOOD = 0
|
||
|
SQL_GROUP_GROUP = 1
|
||
|
|
||
|
DB_INIT = '''
|
||
|
CREATE TABLE IF NOT EXISTS meals(id TEXT, created INT, human TEXT);
|
||
|
CREATE TABLE IF NOT EXISTS meal_foods(mealid TEXT, food TEXT);
|
||
|
CREATE TABLE IF NOT EXISTS food_groups(food TEXT, foodgroup TEXT);
|
||
|
|
||
|
CREATE INDEX IF NOT EXISTS index_meal_id on meals(id);
|
||
|
CREATE INDEX IF NOT EXISTS index_meal_created on meals(created);
|
||
|
CREATE INDEX IF NOT EXISTS index_food_mealid on meal_foods(mealid);
|
||
|
CREATE INDEX IF NOT EXISTS index_food_food on meal_foods(food);
|
||
|
CREATE INDEX IF NOT EXISTS index_group_food on food_groups(food);
|
||
|
'''.strip()
|
||
|
|
||
|
HELP_TEXT = '''
|
||
|
> meal add pizza, soda : Add a new meal with the foods "pizza" and "soda".
|
||
|
> meal adjust ec2 +10 : Adjust the timestamp of the meal starting with "ec2" by +10 seconds.
|
||
|
> meal adjust ec2 +10*60 : Adjusting timestamps supports math operations.
|
||
|
> meal group water drinks : Add "water" to foodgroup "drinks". Used for organization & reports.
|
||
|
> meal group water : Display the name of the group "water" belongs to.
|
||
|
> meal recent : Display info and foods for recent meals. Default {recent_count}.
|
||
|
> meal recent 4 : Display the last 4 meals.
|
||
|
> meal recent all : Display ALL meals.
|
||
|
> meal remove ec2 : Remove the meal whose ID starts with "ec2".
|
||
|
> meal show ec2 : Display info and foods for the meal whose ID starts with "ec2".
|
||
|
> meal ungroup water : Remove "water" from its foodgroup.
|
||
|
'''.format(recent_count=RECENT_COUNT)
|
||
|
|
||
|
def listget(li, index, fallback=None):
|
||
|
try:
|
||
|
return li[index]
|
||
|
except IndexError:
|
||
|
return fallback
|
||
|
|
||
|
def uid(length=None):
|
||
|
'''
|
||
|
Generate a u-random hex string..
|
||
|
'''
|
||
|
if length is None:
|
||
|
length = UID_CHARACTERS
|
||
|
identifier = ''.join('{:02x}'.format(x) for x in os.urandom(math.ceil(length / 2)))
|
||
|
if len(identifier) > length:
|
||
|
identifier = identifier[:length]
|
||
|
return identifier
|
||
|
|
||
|
|
||
|
class MealDB():
|
||
|
def __init__(self, dbname='C:/Git/else/Meal/meal.db'):
|
||
|
self.dbname = dbname
|
||
|
self._sql = None
|
||
|
self._cur = None
|
||
|
|
||
|
@property
|
||
|
def sql(self):
|
||
|
if self._sql is None:
|
||
|
self._sql = sqlite3.connect(self.dbname)
|
||
|
return self._sql
|
||
|
|
||
|
@property
|
||
|
def cur(self):
|
||
|
if self._cur is None:
|
||
|
self._cur = self.sql.cursor()
|
||
|
statements = DB_INIT.split(';')
|
||
|
for statement in statements:
|
||
|
#print(statement)
|
||
|
self._cur.execute(statement)
|
||
|
return self._cur
|
||
|
|
||
|
def add_meal(self, foods=None):
|
||
|
if foods is None:
|
||
|
raise Exception('Empty meal!')
|
||
|
assert isinstance(foods, (list, tuple))
|
||
|
foods = set(foods)
|
||
|
|
||
|
if ''.join(foods).replace(' ', '') == '':
|
||
|
raise Exception('Empty meal!')
|
||
|
|
||
|
mealid = self.new_uid('meals')
|
||
|
now = datetime.datetime.now()
|
||
|
now_stamp = int(now.timestamp())
|
||
|
now_string = now.strftime(STRFTIME)
|
||
|
|
||
|
self.normalized_query('INSERT INTO meals VALUES(?, ?, ?)', [mealid, now_stamp, now_string])
|
||
|
for food in foods:
|
||
|
self.normalized_query('INSERT INTO meal_foods VALUES(?, ?)', [mealid, food])
|
||
|
self.sql.commit()
|
||
|
foods = ', '.join(foods)
|
||
|
print('Added meal %s at %s with %s' % (mealid, now_string, foods))
|
||
|
return mealid
|
||
|
|
||
|
def adjust_timestamp(self, mealid, adjustment):
|
||
|
'''
|
||
|
Move a certain meal by `adjustment` seconds. This is useful when you need to
|
||
|
report a meal that happened a while ago, rather than the current timestamp.
|
||
|
'''
|
||
|
meal = self.get_meal_by_id(mealid)
|
||
|
mealid = meal[SQL_MEAL_ID]
|
||
|
meal_time = meal[SQL_MEAL_CREATED]
|
||
|
meal_time += adjustment
|
||
|
time_string = datetime.datetime.fromtimestamp(meal_time).strftime(STRFTIME)
|
||
|
self.normalized_query('UPDATE meals SET created=?, human=? WHERE id=?', [meal_time, time_string, mealid])
|
||
|
self.sql.commit()
|
||
|
print('Adjusted %s to %s' % (mealid, time_string))
|
||
|
|
||
|
def normalized_query(self, query, bindings):
|
||
|
nbindings = []
|
||
|
for binding in bindings:
|
||
|
if isinstance(binding, str):
|
||
|
nbindings.append(binding.lower())
|
||
|
continue
|
||
|
nbindings.append(binding)
|
||
|
self.cur.execute(query, nbindings)
|
||
|
|
||
|
def get_foods_for_meal(self, mealid):
|
||
|
meal = self.get_meal_by_id(mealid)
|
||
|
mealid = meal[SQL_MEAL_ID]
|
||
|
self.normalized_query('SELECT food FROM meal_foods WHERE mealid == ?', [mealid])
|
||
|
items = self.cur.fetchall()
|
||
|
items = [item[0] for item in items]
|
||
|
return items
|
||
|
|
||
|
def get_meal_by_id(self, mealid):
|
||
|
if len(mealid) == UID_CHARACTERS:
|
||
|
meal_q = mealid
|
||
|
self.normalized_query('SELECT * FROM meals WHERE id == ?', [meal_q])
|
||
|
else:
|
||
|
meal_q = mealid + '%'
|
||
|
self.normalized_query('SELECT * FROM meals WHERE id LIKE ?', [meal_q])
|
||
|
|
||
|
items = self.cur.fetchall()
|
||
|
if len(items) > 1:
|
||
|
items = [str(item) for item in items]
|
||
|
items = '\n'.join(items)
|
||
|
raise Exception('Found multiple meals for id "%s"\n%s' % (meal_q, items))
|
||
|
if len(items) == 0:
|
||
|
raise Exception('Found no meal for id "%s"' % (meal_q))
|
||
|
|
||
|
meal = items[0]
|
||
|
return meal
|
||
|
|
||
|
def group(self, food, groupname):
|
||
|
'''
|
||
|
Insert `food` into the foodgroup `groupname`. This is used for organization,
|
||
|
normalization, and creating dietary reports.
|
||
|
'''
|
||
|
self.normalized_query('SELECT * FROM food_groups WHERE food == ?', [food])
|
||
|
belongs = self.cur.fetchone()
|
||
|
|
||
|
if groupname is None:
|
||
|
self.normalized_query('SELECT * FROM food_groups where foodgroup == ?', [food])
|
||
|
contains = self.cur.fetchall()
|
||
|
|
||
|
if belongs is not None:
|
||
|
print('"%s" belongs to group "%s".' % (food, belongs[1]))
|
||
|
else:
|
||
|
print('"%s" is not in any group.' % (food))
|
||
|
|
||
|
if contains is not None and len(contains) > 0:
|
||
|
contains = [x[0] for x in contains]
|
||
|
contains = [repr(x) for x in contains]
|
||
|
contains = ', '.join(contains)
|
||
|
print('The "%s" group contains: %s' % (food, contains))
|
||
|
return
|
||
|
|
||
|
if belongs is not None:
|
||
|
raise Exception('"%s" is already in group "%s"' % (f[0], f[1]))
|
||
|
self.normalized_query('INSERT INTO food_groups VALUES(?, ?)', [food, groupname])
|
||
|
self.sql.commit()
|
||
|
print('Added "%s" to group "%s"' % (food, groupname))
|
||
|
|
||
|
def new_uid(self, table):
|
||
|
'''
|
||
|
Create a new UID that is unique to the given table.
|
||
|
'''
|
||
|
result = None
|
||
|
query = 'SELECT * FROM {table} WHERE id == ?'.format(table=table)
|
||
|
while result is None:
|
||
|
i = uid()
|
||
|
# Just gotta be sure, man.
|
||
|
self.normalized_query(query, [i])
|
||
|
if self.cur.fetchone() is None:
|
||
|
result = i
|
||
|
return result
|
||
|
|
||
|
def remove_meal(self, mealid):
|
||
|
meal = self.get_meal_by_id(mealid)
|
||
|
mealid = meal[SQL_MEAL_ID]
|
||
|
self.normalized_query('DELETE FROM meals WHERE id == ?', [mealid])
|
||
|
self.normalized_query('DELETE FROM meal_foods WHERE mealid == ?', [mealid])
|
||
|
self.sql.commit()
|
||
|
print('Removed meal %s' % (mealid))
|
||
|
|
||
|
def show_meal(self, mealid):
|
||
|
'''
|
||
|
Display:
|
||
|
id
|
||
|
timestamp
|
||
|
foods
|
||
|
for the meal with the given ID.
|
||
|
'''
|
||
|
meal = self.get_meal_by_id(mealid)
|
||
|
foods = self.get_foods_for_meal(mealid)
|
||
|
print(meal[SQL_MEAL_ID])
|
||
|
print(meal[SQL_MEAL_HUMAN])
|
||
|
foods = ', '.join(foods)
|
||
|
print(foods)
|
||
|
|
||
|
def show_recent(self, count=RECENT_COUNT):
|
||
|
'''
|
||
|
Display:
|
||
|
id : timestamp : foods
|
||
|
for the `count` most recent meals. If count is "all" or "*", show ALL meals.
|
||
|
'''
|
||
|
if count in ('all', '*'):
|
||
|
self.normalized_query('SELECT * FROM meals ORDER BY created DESC', [])
|
||
|
else:
|
||
|
self.normalized_query('SELECT * FROM meals ORDER BY created DESC LIMIT ?', [count])
|
||
|
meals = self.cur.fetchall()
|
||
|
output = []
|
||
|
for meal in meals:
|
||
|
mealid = meal[SQL_MEAL_ID]
|
||
|
human = meal[SQL_MEAL_HUMAN]
|
||
|
foods = self.get_foods_for_meal(mealid)
|
||
|
foods = ', '.join(foods)
|
||
|
output.append('%s : %s : %s' % (mealid, human, foods))
|
||
|
output = '\n'.join(output)
|
||
|
print(output)
|
||
|
|
||
|
def ungroup(self, food):
|
||
|
'''
|
||
|
Remove `food` from whatever group it is in.
|
||
|
'''
|
||
|
self.normalized_query('SELECT * FROM food_groups WHERE food == ?', [food])
|
||
|
f = self.cur.fetchone()
|
||
|
if f is None:
|
||
|
raise Exception('"%s" is not part of a group' % (food))
|
||
|
groupname = f[1]
|
||
|
self.normalized_query('DELETE FROM food_groups WHERE food == ?', [food])
|
||
|
self.sql.commit()
|
||
|
print('Removed "%s" from group "%s"' % (food, groupname))
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
mealdb = MealDB()
|
||
|
|
||
|
args = sys.argv[1:]
|
||
|
if len(args) == 0:
|
||
|
command = ''
|
||
|
else:
|
||
|
command = args[0].lower()
|
||
|
|
||
|
if command == 'add':
|
||
|
args = args[1:]
|
||
|
|
||
|
elif command == 'adjust':
|
||
|
mealid = args[1]
|
||
|
adjustment = args[2]
|
||
|
adjustment = eval(adjustment)
|
||
|
mealdb.adjust_timestamp(mealid, adjustment)
|
||
|
quit()
|
||
|
|
||
|
elif command == 'group':
|
||
|
food = args[1]
|
||
|
groupname = listget(args, 2, None)
|
||
|
mealdb.group(food, groupname)
|
||
|
quit()
|
||
|
|
||
|
elif command == 'recent':
|
||
|
count = listget(args, 1, RECENT_COUNT)
|
||
|
mealdb.show_recent(count)
|
||
|
quit()
|
||
|
|
||
|
elif command == 'remove':
|
||
|
mealids = args[1]
|
||
|
mealids = mealids.replace(' ', '')
|
||
|
mealids = mealids.split(',')
|
||
|
for mealid in mealids:
|
||
|
mealdb.remove_meal(mealid)
|
||
|
quit()
|
||
|
|
||
|
elif command == 'show':
|
||
|
mealid = args[1]
|
||
|
mealdb.show_meal(mealid)
|
||
|
quit()
|
||
|
|
||
|
elif command == 'ungroup':
|
||
|
food = args[1]
|
||
|
mealdb.ungroup(food)
|
||
|
quit()
|
||
|
|
||
|
else:
|
||
|
print(HELP_TEXT)
|
||
|
quit()
|
||
|
|
||
|
args = ' '.join(args)
|
||
|
|
||
|
if ';' in args:
|
||
|
(args, adjustment) = args.split(';')
|
||
|
adjustment = adjustment.strip()
|
||
|
adjustment = eval(adjustment)
|
||
|
else:
|
||
|
adjustment = 0
|
||
|
|
||
|
args = args.strip()
|
||
|
args = args.split(',')
|
||
|
args = [food.strip() for food in args]
|
||
|
meal = mealdb.add_meal(args)
|
||
|
if adjustment != 0:
|
||
|
mealdb.adjust_timestamp(meal, adjustment)
|
||
|
quit()
|