483 lines
13 KiB
Python
483 lines
13 KiB
Python
'''
|
|
This file provides helper functions used to normalize the arguments that
|
|
go into search queries. Mainly converting the strings given by the user
|
|
into proper data types.
|
|
'''
|
|
from . import constants
|
|
from . import exceptions
|
|
from . import helpers
|
|
from . import objects
|
|
|
|
from voussoirkit import expressionmatch
|
|
from voussoirkit import sqlhelpers
|
|
|
|
|
|
def expand_mmf(tag_musts, tag_mays, tag_forbids):
|
|
def _set(x):
|
|
if x is None:
|
|
return set()
|
|
return set(x)
|
|
|
|
tag_musts = _set(tag_musts)
|
|
tag_mays = _set(tag_mays)
|
|
tag_forbids = _set(tag_forbids)
|
|
|
|
forbids_expanded = set()
|
|
|
|
def _expand_flat(tagset):
|
|
'''
|
|
I am not using tag.walk_children because if the user happens to give us
|
|
two tags in the same lineage, we have the opportunity to bail early,
|
|
which walk_children won't know about. So instead I'm doing the queue
|
|
popping and pushing myself.
|
|
'''
|
|
expanded = set()
|
|
while len(tagset) > 0:
|
|
tag = tagset.pop()
|
|
if tag in forbids_expanded:
|
|
continue
|
|
if tag in expanded:
|
|
continue
|
|
expanded.add(tag)
|
|
tagset.update(tag.get_children())
|
|
return expanded
|
|
|
|
def _expand_nested(tagset):
|
|
expanded = []
|
|
total = set()
|
|
for tag in tagset:
|
|
if tag in total:
|
|
continue
|
|
this_expanded = _expand_flat(set([tag]))
|
|
total.update(this_expanded)
|
|
expanded.append(this_expanded)
|
|
return expanded
|
|
|
|
# forbids must come first so that musts and mays don't waste their time
|
|
# expanding the forbidden subtrees.
|
|
forbids_expanded = _expand_flat(tag_forbids)
|
|
musts_expanded = _expand_nested(tag_musts)
|
|
mays_expanded = _expand_flat(tag_mays)
|
|
|
|
return (musts_expanded, mays_expanded, forbids_expanded)
|
|
|
|
def minmax(key, value, minimums, maximums, warning_bag=None):
|
|
'''
|
|
Dissects a hyphenated range string and inserts the correct k:v pair into
|
|
both minimums and maximums.
|
|
('area', '100-200', {}, {})
|
|
-->
|
|
{'area': 100}, {'area': 200} (MODIFIED IN PLACE)
|
|
'''
|
|
if value is None:
|
|
return
|
|
|
|
if isinstance(value, str):
|
|
value = value.strip()
|
|
|
|
if value == '':
|
|
return
|
|
|
|
if isinstance(value, (int, float)):
|
|
minimums[key] = value
|
|
return
|
|
|
|
try:
|
|
(low, high) = helpers.hyphen_range(value)
|
|
|
|
except ValueError as e:
|
|
if warning_bag:
|
|
warning_bag.add(constants.WARNING_MINMAX_INVALID.format(field=key, value=value))
|
|
return
|
|
else:
|
|
raise
|
|
|
|
except exceptions.OutOfOrder as e:
|
|
if warning_bag:
|
|
warning_bag.add(e.error_message)
|
|
return
|
|
else:
|
|
raise
|
|
|
|
if low is not None:
|
|
minimums[key] = low
|
|
|
|
if high is not None:
|
|
maximums[key] = high
|
|
|
|
def normalize_author(authors, photodb, warning_bag=None):
|
|
'''
|
|
Either:
|
|
- A string, where the usernames are separated by commas
|
|
- An iterable containing
|
|
- Usernames as strings
|
|
- User objects
|
|
|
|
Returns: A set of user IDs.
|
|
'''
|
|
if authors is None:
|
|
authors = []
|
|
|
|
if isinstance(authors, str):
|
|
authors = helpers.comma_space_split(authors)
|
|
|
|
users = set()
|
|
for requested_author in authors:
|
|
if isinstance(requested_author, objects.User):
|
|
if requested_author.photodb == photodb:
|
|
users.add(requested_author)
|
|
else:
|
|
requested_author = requested_author.username
|
|
|
|
try:
|
|
user = photodb.get_user(username=requested_author)
|
|
except exceptions.NoSuchUser as e:
|
|
if warning_bag:
|
|
warning_bag.add(e.error_message)
|
|
else:
|
|
raise
|
|
else:
|
|
users.add(user)
|
|
|
|
return users
|
|
|
|
def normalize_extension(extensions):
|
|
'''
|
|
Either:
|
|
- A string, where extensions are separated by commas or spaces.
|
|
- An iterable containing extensions as strings.
|
|
|
|
Returns: A set of strings with no leading dots.
|
|
'''
|
|
if extensions is None:
|
|
extensions = set()
|
|
|
|
elif isinstance(extensions, str):
|
|
extensions = helpers.comma_space_split(extensions)
|
|
|
|
extensions = [e.lower().strip('.').strip() for e in extensions]
|
|
extensions = set(e for e in extensions if e)
|
|
|
|
return extensions
|
|
|
|
def normalize_filename(filename_terms):
|
|
'''
|
|
Either:
|
|
- A string.
|
|
- An iterable containing search terms as strings.
|
|
|
|
Returns: A string where terms are separated by spaces.
|
|
'''
|
|
if filename_terms is None:
|
|
filename_terms = ''
|
|
|
|
if not isinstance(filename_terms, str):
|
|
filename_terms = ' '.join(filename_terms)
|
|
|
|
filename_terms = filename_terms.strip()
|
|
|
|
return filename_terms
|
|
|
|
def normalize_has_tags(has_tags):
|
|
'''
|
|
See etiquette.helpers.truthystring.
|
|
'''
|
|
return helpers.truthystring(has_tags)
|
|
|
|
def normalize_has_thumbnail(has_thumbnail):
|
|
'''
|
|
See etiquette.helpers.truthystring.
|
|
'''
|
|
return helpers.truthystring(has_thumbnail)
|
|
|
|
def normalize_is_searchhidden(is_searchhidden):
|
|
'''
|
|
See etiquette.helpers.truthystring.
|
|
'''
|
|
return helpers.truthystring(is_searchhidden)
|
|
|
|
def _limit_offset(number, warning_bag):
|
|
if number is None:
|
|
return None
|
|
|
|
try:
|
|
number = normalize_positive_integer(number)
|
|
except ValueError as exc:
|
|
if warning_bag:
|
|
warning_bag.add(exc)
|
|
number = 0
|
|
return number
|
|
|
|
def normalize_limit(limit, warning_bag=None):
|
|
'''
|
|
Either:
|
|
- None to indicate unlimited.
|
|
- A non-negative number as an int, float, or string.
|
|
|
|
Returns: None or a positive integer.
|
|
'''
|
|
return _limit_offset(limit, warning_bag)
|
|
|
|
def normalize_mimetype(mimetype, warning_bag=None):
|
|
'''
|
|
Either:
|
|
- A string, where mimetypes are separated by commas or spaces.
|
|
- An iterable containing mimetypes as strings.
|
|
|
|
Returns: A set of strings.
|
|
'''
|
|
return normalize_extension(mimetype, warning_bag)
|
|
|
|
def normalize_mmf_vs_expression_conflict(
|
|
tag_musts,
|
|
tag_mays,
|
|
tag_forbids,
|
|
tag_expression,
|
|
warning_bag=None,
|
|
):
|
|
'''
|
|
The user cannot provide both mmf sets and tag expression at the same time.
|
|
If both are provided, nullify everything.
|
|
'''
|
|
if (tag_musts or tag_mays or tag_forbids) and tag_expression:
|
|
exc = exceptions.NotExclusive(['tag_musts+mays+forbids', 'tag_expression'])
|
|
if warning_bag:
|
|
warning_bag.add(exc.error_message)
|
|
else:
|
|
raise exc
|
|
conflict = True
|
|
conflict = False
|
|
|
|
if conflict:
|
|
tag_musts = None
|
|
tag_mays = None
|
|
tag_forbids = None
|
|
tag_expression = None
|
|
return (tag_musts, tag_mays, tag_forbids, tag_expression)
|
|
|
|
def normalize_offset(offset, warning_bag=None):
|
|
'''
|
|
Either:
|
|
- None.
|
|
- A non-negative number as an int, float, or string.
|
|
|
|
Returns: None or a positive integer.
|
|
'''
|
|
if offset is None:
|
|
return 0
|
|
return _limit_offset(offset, warning_bag)
|
|
|
|
def normalize_orderby(orderby, warning_bag=None):
|
|
'''
|
|
Either:
|
|
- A string of orderbys separated by commas, where a single orderby consists
|
|
of 'column' or 'column-direction' or 'column direction'.
|
|
- A list of such orderby strings.
|
|
- A list of tuples of (column, direction)
|
|
|
|
With no direction, direction is implied desc.
|
|
|
|
Returns: A list of tuples of (column, direction)
|
|
'''
|
|
if orderby is None:
|
|
orderby = []
|
|
|
|
if isinstance(orderby, str):
|
|
orderby = orderby.replace('-', ' ')
|
|
orderby = orderby.split(',')
|
|
|
|
final_orderby = []
|
|
for requested_order in orderby:
|
|
if isinstance(requested_order, str):
|
|
requested_order = requested_order.strip().lower()
|
|
if not requested_order:
|
|
continue
|
|
split_order = requested_order.split()
|
|
else:
|
|
split_order = tuple(x.strip().lower() for x in requested_order)
|
|
|
|
if len(split_order) == 2:
|
|
(column, direction) = split_order
|
|
|
|
elif len(split_order) == 1:
|
|
column = split_order[0]
|
|
direction = 'desc'
|
|
|
|
else:
|
|
message = constants.WARNING_ORDERBY_INVALID.format(request=requested_order)
|
|
if warning_bag:
|
|
warning_bag.add(message)
|
|
else:
|
|
raise ValueError(message)
|
|
continue
|
|
|
|
if column not in constants.ALLOWED_ORDERBY_COLUMNS:
|
|
message = constants.WARNING_ORDERBY_BADCOL.format(column=column)
|
|
if warning_bag:
|
|
warning_bag.add(message)
|
|
else:
|
|
raise ValueError(message)
|
|
continue
|
|
|
|
if column == 'random':
|
|
column = 'RANDOM()'
|
|
|
|
if direction not in ('asc', 'desc'):
|
|
message = constants.WARNING_ORDERBY_BADDIRECTION.format(
|
|
column=column,
|
|
direction=direction,
|
|
)
|
|
if warning_bag:
|
|
warning_bag.add(message)
|
|
else:
|
|
raise ValueError(message)
|
|
direction = 'desc'
|
|
|
|
requested_order = (column, direction)
|
|
final_orderby.append(requested_order)
|
|
|
|
return final_orderby
|
|
|
|
def normalize_positive_integer(number):
|
|
if number is None:
|
|
number = 0
|
|
|
|
elif isinstance(number, str):
|
|
# Convert to float, then int, just in case they type '-4.5'
|
|
# because int('-4.5') does not work.
|
|
number = float(number)
|
|
|
|
number = int(number)
|
|
|
|
if number < 0:
|
|
raise ValueError(f'{number} must be >= 0.')
|
|
|
|
return number
|
|
|
|
def normalize_tag_expression(expression):
|
|
if not expression:
|
|
return None
|
|
|
|
if not isinstance(expression, str):
|
|
expression = ' '.join(expression)
|
|
|
|
expression = expression.strip()
|
|
|
|
if not expression:
|
|
return None
|
|
|
|
return expression
|
|
|
|
EXIST_FORMAT = '''
|
|
{operator} (
|
|
SELECT 1 FROM photo_tag_rel WHERE photos.id == photo_tag_rel.photoid
|
|
AND tagid IN {tagset}
|
|
)
|
|
'''.strip()
|
|
def photo_tag_rel_exist_clauses(tag_musts, tag_mays, tag_forbids):
|
|
(tag_musts, tag_mays, tag_forbids) = expand_mmf(
|
|
tag_musts,
|
|
tag_mays,
|
|
tag_forbids,
|
|
)
|
|
|
|
clauses = []
|
|
for tag_must_group in tag_musts:
|
|
clauses.append( ('EXISTS', tag_must_group) )
|
|
if tag_mays:
|
|
clauses.append( ('EXISTS', tag_mays) )
|
|
if tag_forbids:
|
|
clauses.append( ('NOT EXISTS', tag_forbids) )
|
|
|
|
clauses = [
|
|
(operator, sqlhelpers.listify(tag.id for tag in tagset))
|
|
for (operator, tagset) in clauses
|
|
]
|
|
clauses = [
|
|
EXIST_FORMAT.format(operator=operator, tagset=tagset)
|
|
for (operator, tagset) in clauses
|
|
]
|
|
return clauses
|
|
|
|
def normalize_tagset(photodb, tags, warning_bag=None):
|
|
if not tags:
|
|
return None
|
|
|
|
if isinstance(tags, str):
|
|
tags = helpers.comma_space_split(tags)
|
|
|
|
tagset = set()
|
|
for tag in tags:
|
|
if isinstance(tag, objects.Tag):
|
|
if tag.photodb == photodb:
|
|
tagset.add(tag)
|
|
continue
|
|
else:
|
|
tag = tag.name
|
|
|
|
tag = tag.strip()
|
|
if tag == '':
|
|
continue
|
|
tag = tag.split('.')[-1]
|
|
|
|
try:
|
|
tag = photodb.get_tag(name=tag)
|
|
except exceptions.NoSuchTag as exc:
|
|
if warning_bag:
|
|
warning_bag.add(exc.error_message)
|
|
continue
|
|
else:
|
|
raise exc
|
|
tagset.add(tag)
|
|
return tagset
|
|
|
|
def tag_expression_tree_builder(
|
|
tag_expression,
|
|
photodb,
|
|
warning_bag=None
|
|
):
|
|
if not tag_expression:
|
|
return None
|
|
try:
|
|
expression_tree = expressionmatch.ExpressionTree.parse(tag_expression)
|
|
except expressionmatch.NoTokens:
|
|
return None
|
|
except Exception as exc:
|
|
warning_bag.add(f'Bad expression "{tag_expression}"')
|
|
return None
|
|
|
|
for node in expression_tree.walk_leaves():
|
|
try:
|
|
node.token = photodb.get_tag(name=node.token).name
|
|
except (exceptions.NoSuchTag) as exc:
|
|
if warning_bag:
|
|
warning_bag.add(exc.error_message)
|
|
node.token = None
|
|
else:
|
|
raise
|
|
|
|
if node.token is None:
|
|
continue
|
|
|
|
expression_tree.prune()
|
|
if expression_tree.token is None:
|
|
return None
|
|
return expression_tree
|
|
|
|
def tag_expression_matcher_builder(frozen_children):
|
|
def match_function(photo_tags, tagname):
|
|
'''
|
|
Used as the `match_function` for the ExpressionTree evaluation.
|
|
|
|
photo_tags:
|
|
The set of tag names owned by the photo in question.
|
|
tagname:
|
|
The tag which the ExpressionTree wants it to have.
|
|
'''
|
|
if not photo_tags:
|
|
return False
|
|
|
|
options = frozen_children[tagname]
|
|
return any(option in photo_tags for option in options)
|
|
|
|
return match_function
|