Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions edtf/parser/grammar.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pyparsing
from edtf.appsettings import DEBUG_PYPARSING
from edtf.util import remapparams

pyparsing.ParserElement.enablePackrat()

Expand Down Expand Up @@ -343,6 +344,7 @@ def f(toks):
)


@remapparams(parseAll="parse_all")
def parse_edtf(
input_string: str,
parse_all: bool = True,
Expand Down
75 changes: 75 additions & 0 deletions edtf/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from time import struct_time

from edtf import convert
from edtf.parser.edtf_exceptions import EDTFParseException
from edtf.parser.grammar import parse_edtf
from edtf.util import remapparams


def test_dt_to_struct_time_for_datetime():
Expand Down Expand Up @@ -107,3 +110,75 @@ def test_roll_negative_time_fields():
assert convert._roll_negative_time_fields(
year, month, day, hour, minute, second
) == (-102, 5, 24, 21, 41, 47)


def test_remapparams():
@remapparams(parseAll="parse_all")
def parser(s, parse_all=True):
pass

assert parser.__name__ == "parser" # noqa: S101
parser("foo")
# this should not warn
parser("foo", parse_all=False)
# this should warn, but only once
for _ in 1, 2:
parser("foo", parseAll=False)
try:
parser("foo", parseAll=False, parse_all=True)
except ValueError:
pass
else:
raise AssertionError("expected ValueError because of duplicated parameters")

try:

@remapparams()
def no_remappings():
pass
except ValueError:
pass
else:
raise AssertionError(
"expected ValueError from @remapparams() because no remappings"
)
try:

@remapparams(p1="p2", p2="p3")
def no_remappings():
pass
except ValueError:
pass
else:
raise AssertionError(
"expected ValueError from @remapparams() because p1 remaps to another remapped parameter"
)


def test_remapparams_parse_edtf():
edtf_s = "2005-09-24T10:00:00" # ISO8601 example from the EDTF spec
dat = parse_edtf(edtf_s) # implicit parse_all=True
assert dat.isoformat() == edtf_s
assert parse_edtf(edtf_s, parse_all=True).isoformat() == edtf_s
assert parse_edtf(edtf_s, parseAll=True).isoformat() == edtf_s
assert parse_edtf(f"{edtf_s} SNORT", parse_all=False).isoformat() == edtf_s
assert parse_edtf(f"{edtf_s} SNORT", parseAll=False).isoformat() == edtf_s
# make sure parse_all=True fails the SNORT parse
try:
parse_edtf(f"{edtf_s} SNORT")
except EDTFParseException:
pass
else:
raise AssertionError("expected EDTFParseException")
try:
parse_edtf(f"{edtf_s} SNORT", parse_all=True)
except EDTFParseException:
pass
else:
raise AssertionError("expected EDTFParseException")
try:
parse_edtf(f"{edtf_s} SNORT", parseAll=True)
except EDTFParseException:
pass
else:
raise AssertionError("expected EDTFParseException")
67 changes: 67 additions & 0 deletions edtf/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python3

"""
Assorted utility functions.
"""

from functools import update_wrapper
from logging import warning
from traceback import extract_stack


def remapparams(**remap):
"""
Remap the specified named parameters.

Example to support an obsolete `parseAll` parameter:

@remapparams(parseAll='parse_all')
def parse(s, parse_all=True):

"""
if not remap:
raise ValueError("no parameters specified for remapping")
for old, new in remap.items():
if new in remap:
raise ValueError(f"{old}={new!r}: {new!r} is also remapped")

def remapparams_decorator(func):
"""The decorator to apply the remappings."""
# a record of callers whose parameters were remapped
remapped_callers = set()

def remapparams_wrapper(*a, **kw):
remappings = {}
for param, value in list(kw.items()):
try:
remapped = remap[param]
except KeyError:
continue
if remapped in kw:
raise ValueError(
f"remap {param}= to {remapped}=: this is already present in the keyword arguments"
)
del kw[param]
kw[remapped] = value
remappings[param] = remapped
if remappings:
caller_frame = extract_stack(limit=2)[-2]
caller_key = caller_frame.filename, caller_frame.lineno
if caller_key not in remapped_callers:
warning(
"call of %s.%s() from %s:%d: remapped the following obsolete parameters: %s",
func.__module__,
func.__name__,
caller_frame.filename,
caller_frame.lineno,
", ".join(
sorted(f"{old}->{new}" for old, new in remappings.items())
),
)
remapped_callers.add(caller_key)
return func(*a, **kw)

update_wrapper(remapparams_wrapper, func)
return remapparams_wrapper

return remapparams_decorator
Loading