fix_csv: bonus 1

This commit is contained in:
Cacahuete 2021-01-25 23:42:24 +01:00
parent d70505ac1d
commit aa060e4c33
2 changed files with 89 additions and 58 deletions

View file

@ -1,27 +1,28 @@
import logging import logging
from pathlib import Path from pathlib import Path
import csv
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
def escape(string): def normalise(input, output, in_delimiter="|", in_quote='"'):
"""Return an escaped version of `string`."""
string = string.replace('"', '""')
if "," in string:
string = '"' + string + '"'
return string
def normalise(input, output):
"""Normalise `input` CSV file into `output` file.""" """Normalise `input` CSV file into `output` file."""
Path(output).touch(exist_ok=True) Path(output).touch(exist_ok=True)
with Path(input).open("r") as in_file, Path(output).open("w") as out_file: with Path(input).open("r", newline="") as in_file, Path(output).open(
for in_line in in_file.readlines(): "w"
out_elements = [ ) as out_file:
escape(in_element) for in_element in in_line.strip().split("|") reader = csv.reader(in_file, delimiter=in_delimiter, quotechar=in_quote)
] writer = csv.writer(
out_file.write(f"{','.join(out_elements)}\n") out_file,
delimiter=",",
quotechar=in_quote,
quoting=csv.QUOTE_MINIMAL,
lineterminator="\n",
)
for row in reader:
row = [element.replace('"', '"""') for element in row]
writer.writerow(row)
if __name__ == "__main__": if __name__ == "__main__":
@ -32,6 +33,14 @@ if __name__ == "__main__":
parser.add_argument("input") parser.add_argument("input")
parser.add_argument("output") parser.add_argument("output")
parser.add_argument("--in-delimiter", dest="in_delimiter", default="|")
parser.add_argument("--in-quote", dest="in_quote", default='"')
args = parser.parse_args() args = parser.parse_args()
normalise(args.input, args.output) normalise(
input=args.input,
output=args.output,
in_delimiter=args.in_delimiter,
in_quote=args.in_quote,
)

View file

@ -17,29 +17,34 @@ class FixCSVTests(unittest.TestCase):
maxDiff = None maxDiff = None
def test_pipe_file_to_csv_file(self): def test_pipe_file_to_csv_file(self):
old_contents = dedent(""" old_contents = dedent(
"""
2012|Lexus|LFA 2012|Lexus|LFA
2009|GMC|Yukon XL 1500 2009|GMC|Yukon XL 1500
1965|Ford|Mustang 1965|Ford|Mustang
2005|Hyundai|Sonata 2005|Hyundai|Sonata
1995|Mercedes-Benz|C-Class 1995|Mercedes-Benz|C-Class
""").lstrip() """
expected = dedent(""" ).lstrip()
expected = dedent(
"""
2012,Lexus,LFA 2012,Lexus,LFA
2009,GMC,Yukon XL 1500 2009,GMC,Yukon XL 1500
1965,Ford,Mustang 1965,Ford,Mustang
2005,Hyundai,Sonata 2005,Hyundai,Sonata
1995,Mercedes-Benz,C-Class 1995,Mercedes-Benz,C-Class
""").lstrip() """
).lstrip()
with make_file(old_contents) as old, make_file("") as new: with make_file(old_contents) as old, make_file("") as new:
output = run_program(f'fix_csv.py {old} {new}') output = run_program(f"fix_csv.py {old} {new}")
with open(new) as new_file: with open(new) as new_file:
new_contents = new_file.read() new_contents = new_file.read()
self.assertEqual(expected, new_contents) self.assertEqual(expected, new_contents)
self.assertEqual("", output) self.assertEqual("", output)
def test_delimiter_in_output(self): def test_delimiter_in_output(self):
old_contents = dedent(""" old_contents = dedent(
"""
02|Waylon Jennings|Honky Tonk Heroes (Like Me) 02|Waylon Jennings|Honky Tonk Heroes (Like Me)
04|Kris Kristofferson|To Beat The Devil 04|Kris Kristofferson|To Beat The Devil
11|Johnny Cash|Folsom Prison Blues 11|Johnny Cash|Folsom Prison Blues
@ -47,8 +52,10 @@ class FixCSVTests(unittest.TestCase):
21|Hank Williams III|Mississippi Mud 21|Hank Williams III|Mississippi Mud
22|David Allan Coe|Willie, Waylon, And Me 22|David Allan Coe|Willie, Waylon, And Me
24|Bob Dylan|House Of The Risin' Sun 24|Bob Dylan|House Of The Risin' Sun
""").lstrip() """
expected = dedent(""" ).lstrip()
expected = dedent(
"""
02,Waylon Jennings,Honky Tonk Heroes (Like Me) 02,Waylon Jennings,Honky Tonk Heroes (Like Me)
04,Kris Kristofferson,To Beat The Devil 04,Kris Kristofferson,To Beat The Devil
11,Johnny Cash,Folsom Prison Blues 11,Johnny Cash,Folsom Prison Blues
@ -56,21 +63,24 @@ class FixCSVTests(unittest.TestCase):
21,Hank Williams III,Mississippi Mud 21,Hank Williams III,Mississippi Mud
22,David Allan Coe,"Willie, Waylon, And Me" 22,David Allan Coe,"Willie, Waylon, And Me"
24,Bob Dylan,House Of The Risin' Sun 24,Bob Dylan,House Of The Risin' Sun
""").lstrip() """
).lstrip()
with make_file(old_contents) as old, make_file("") as new: with make_file(old_contents) as old, make_file("") as new:
output = run_program(f'fix_csv.py {old} {new}') output = run_program(f"fix_csv.py {old} {new}")
with open(new) as new_file: with open(new) as new_file:
new_contents = new_file.read() new_contents = new_file.read()
self.assertEqual(expected, new_contents) self.assertEqual(expected, new_contents)
self.assertEqual("", output) self.assertEqual("", output)
def test_original_file_is_unchanged(self): def test_original_file_is_unchanged(self):
old_contents = dedent(""" old_contents = dedent(
"""
2012|Lexus|LFA 2012|Lexus|LFA
2009|GMC|Yukon XL 1500 2009|GMC|Yukon XL 1500
""").lstrip() """
).lstrip()
with make_file(old_contents) as old, make_file("") as new: with make_file(old_contents) as old, make_file("") as new:
run_program(f'fix_csv.py {old} {new}') run_program(f"fix_csv.py {old} {new}")
with open(old) as old_file: with open(old) as old_file:
contents = old_file.read() contents = old_file.read()
self.assertEqual(old_contents, contents) self.assertEqual(old_contents, contents)
@ -78,54 +88,63 @@ class FixCSVTests(unittest.TestCase):
def test_call_with_too_many_files(self): def test_call_with_too_many_files(self):
with make_file("") as old, make_file("") as new: with make_file("") as old, make_file("") as new:
with self.assertRaises(BaseException): with self.assertRaises(BaseException):
run_program(f'fix_csv.py {old} {new} {old}') run_program(f"fix_csv.py {old} {new} {old}")
# To test the Bonus part of this exercise, comment out the following line # To test the Bonus part of this exercise, comment out the following line
@unittest.expectedFailure # @unittest.expectedFailure
def test_in_delimiter_and_in_quote(self): def test_in_delimiter_and_in_quote(self):
old_contents = dedent(""" old_contents = dedent(
"""
2012 Lexus "LFA" 2012 Lexus "LFA"
2009 GMC 'Yukon XL 1500' 2009 GMC 'Yukon XL 1500'
1995 "Mercedes-Benz" C-Class 1995 "Mercedes-Benz" C-Class
""").lstrip() """
expected1 = dedent(""" ).lstrip()
expected1 = dedent(
"""
2012,Lexus,LFA 2012,Lexus,LFA
2009,GMC,'Yukon,XL,1500' 2009,GMC,'Yukon,XL,1500'
1995,Mercedes-Benz,C-Class 1995,Mercedes-Benz,C-Class
""").lstrip() """
expected2 = dedent(''' ).lstrip()
expected2 = dedent(
'''
2012,Lexus,"""LFA""" 2012,Lexus,"""LFA"""
2009,GMC,Yukon XL 1500 2009,GMC,Yukon XL 1500
1995,"""Mercedes-Benz""",C-Class 1995,"""Mercedes-Benz""",C-Class
''').lstrip() '''
).lstrip()
with make_file(old_contents) as old, make_file("") as new: with make_file(old_contents) as old, make_file("") as new:
run_program(f'fix_csv.py {old} {new} --in-delimiter=" "') run_program(f'fix_csv.py {old} {new} --in-delimiter=" "')
with open(new) as new_file: with open(new) as new_file:
self.assertEqual(expected1, new_file.read()) self.assertEqual(expected1, new_file.read())
run_program( run_program(f"""fix_csv.py --in-delimiter=" " --in-quote="'" {old} {new}""")
f'''fix_csv.py --in-delimiter=" " --in-quote="'" {old} {new}'''
)
with open(new) as new_file: with open(new) as new_file:
self.assertEqual(expected2, new_file.read()) self.assertEqual(expected2, new_file.read())
# To test the Bonus part of this exercise, comment out the following line # To test the Bonus part of this exercise, comment out the following line
@unittest.expectedFailure # @unittest.expectedFailure
def test_autodetect_input_format(self): def test_autodetect_input_format(self):
contents1 = dedent(""" contents1 = dedent(
"""
'2012' 'Lexus' 'LFA' '2012' 'Lexus' 'LFA'
'2009' 'GMC' 'Yukon XL 1500' '2009' 'GMC' 'Yukon XL 1500'
'1995' 'Mercedes-Benz' 'C-Class' '1995' 'Mercedes-Benz' 'C-Class'
""").lstrip() """
expected1 = dedent(""" ).lstrip()
expected1 = dedent(
"""
2012,Lexus,LFA 2012,Lexus,LFA
2009,GMC,Yukon XL 1500 2009,GMC,Yukon XL 1500
1995,Mercedes-Benz,C-Class 1995,Mercedes-Benz,C-Class
""").lstrip() """
).lstrip()
with make_file(contents1) as old, make_file("") as new: with make_file(contents1) as old, make_file("") as new:
run_program(f'fix_csv.py {old} {new}') run_program(f"fix_csv.py {old} {new}")
with open(new) as new_file: with open(new) as new_file:
self.assertEqual(expected1, new_file.read()) self.assertEqual(expected1, new_file.read())
contents2 = dedent(""" contents2 = dedent(
"""
"02"\t"Waylon Jennings"\t"Honky Tonk Heroes (Like Me)"\t"3:29" "02"\t"Waylon Jennings"\t"Honky Tonk Heroes (Like Me)"\t"3:29"
"04"\t"Kris Kristofferson"\t"To Beat The Devil"\t"4:05" "04"\t"Kris Kristofferson"\t"To Beat The Devil"\t"4:05"
"11"\t"Johnny Cash"\t"Folsom Prison Blues"\t"2:51" "11"\t"Johnny Cash"\t"Folsom Prison Blues"\t"2:51"
@ -133,8 +152,10 @@ class FixCSVTests(unittest.TestCase):
"21"\t"Hank Williams III"\t"Mississippi Mud"\t"3:32" "21"\t"Hank Williams III"\t"Mississippi Mud"\t"3:32"
"22"\t"David Allan Coe"\t"Willie, Waylon, And Me"\t"3:24" "22"\t"David Allan Coe"\t"Willie, Waylon, And Me"\t"3:24"
"24"\t"Bob Dylan"\t"House Of The Risin' Sun"\t"5:20" "24"\t"Bob Dylan"\t"House Of The Risin' Sun"\t"5:20"
""").lstrip() """
expected2 = dedent(""" ).lstrip()
expected2 = dedent(
"""
02,Waylon Jennings,Honky Tonk Heroes (Like Me),3:29 02,Waylon Jennings,Honky Tonk Heroes (Like Me),3:29
04,Kris Kristofferson,To Beat The Devil,4:05 04,Kris Kristofferson,To Beat The Devil,4:05
11,Johnny Cash,Folsom Prison Blues,2:51 11,Johnny Cash,Folsom Prison Blues,2:51
@ -142,9 +163,10 @@ class FixCSVTests(unittest.TestCase):
21,Hank Williams III,Mississippi Mud,3:32 21,Hank Williams III,Mississippi Mud,3:32
22,David Allan Coe,"Willie, Waylon, And Me",3:24 22,David Allan Coe,"Willie, Waylon, And Me",3:24
24,Bob Dylan,House Of The Risin' Sun,5:20 24,Bob Dylan,House Of The Risin' Sun,5:20
""").lstrip() """
).lstrip()
with make_file(contents2) as old, make_file("") as new: with make_file(contents2) as old, make_file("") as new:
run_program(f'fix_csv.py {old} {new}') run_program(f"fix_csv.py {old} {new}")
with open(new) as new_file: with open(new) as new_file:
self.assertEqual(expected2, new_file.read()) self.assertEqual(expected2, new_file.read())
@ -159,25 +181,25 @@ def run_program(arguments="", raises=DummyException):
If raises is specified, ensure the given exception is raised. If raises is specified, ensure the given exception is raised.
""" """
arguments = arguments.replace('\\', '\\\\') arguments = arguments.replace("\\", "\\\\")
path, *args = shlex.split(arguments) path, *args = shlex.split(arguments)
old_args = sys.argv old_args = sys.argv
warnings.simplefilter("ignore", ResourceWarning) warnings.simplefilter("ignore", ResourceWarning)
try: try:
sys.argv = [path] + args sys.argv = [path] + args
try: try:
if '__main__' in sys.modules: if "__main__" in sys.modules:
del sys.modules['__main__'] del sys.modules["__main__"]
with redirect_stdout(StringIO()) as output: with redirect_stdout(StringIO()) as output:
with redirect_stderr(output): with redirect_stderr(output):
SourceFileLoader('__main__', path).load_module() SourceFileLoader("__main__", path).load_module()
except raises: except raises:
return output.getvalue() return output.getvalue()
except SystemExit as e: except SystemExit as e:
if e.args != (0,): if e.args != (0,):
raise SystemExit(output.getvalue()) from e raise SystemExit(output.getvalue()) from e
finally: finally:
sys.modules.pop('__main__', None) sys.modules.pop("__main__", None)
if raises is not DummyException: if raises is not DummyException:
raise AssertionError("{} not raised".format(raises)) raise AssertionError("{} not raised".format(raises))
return output.getvalue() return output.getvalue()
@ -188,7 +210,7 @@ def run_program(arguments="", raises=DummyException):
@contextmanager @contextmanager
def make_file(contents=None): def make_file(contents=None):
"""Context manager providing name of a file containing given contents.""" """Context manager providing name of a file containing given contents."""
with NamedTemporaryFile(mode='wt', encoding='utf-8', delete=False) as f: with NamedTemporaryFile(mode="wt", encoding="utf-8", delete=False) as f:
if contents: if contents:
f.write(contents) f.write(contents)
try: try: