Compare commits

..

3 Commits

Author SHA1 Message Date
682503a24a Minor change to CLI help menu
All checks were successful
ydeng/renamebycsv/pipeline/head This commit looks good
2023-04-26 03:12:33 -05:00
4bf334c9d5 Restructured code structure 2023-04-26 03:07:09 -05:00
90a1db4f0c Began work on 0.0.6 2023-04-26 03:06:25 -05:00
3 changed files with 81 additions and 78 deletions

View File

@ -1,88 +1,14 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import csv
import os
import re
from typing import Iterable
import logging import logging
from renamebycsv.renamer import find_all_candidates, rename_by_csv
def find_all_candidates(input_dir: str, regex: str, recursive: bool):
results = []
for subitem in os.listdir(input_dir):
subitem_path = os.path.join(input_dir, subitem)
match = re.match(regex, subitem)
if os.path.isdir(subitem_path) and recursive:
logging.debug(f'Checking directory "{subitem}"...')
results.extend(find_all_candidates(subitem_path, regex, recursive))
else:
if not match:
logging.debug(f'Ignoring "{subitem}"...')
continue
results.append((subitem_path, subitem, match))
logging.debug(f'Collecting "{subitem}"...')
return results
def rename(
csv_path: str,
candidates: Iterable[tuple[str, str, re.Match]],
current: str,
become: str,
dry: bool,
extension: str,
keep_extension: bool,
):
replacement_dict = {}
with open(csv_path, "r") as csv_fd:
reader = csv.reader(csv_fd)
current_col_ind = None
become_col_ind = None
for row in reader:
if current_col_ind is None and become_col_ind is None:
current_col_ind = row.index(current)
become_col_ind = row.index(become)
continue
if (
row[current_col_ind] in replacement_dict
and replacement_dict[row[current_col_ind]] != row[become_col_ind]
):
raise Exception("Duplicate current key.")
replacement_dict[row[current_col_ind]] = row[become_col_ind]
for subitem_path, subitem, match in candidates:
if match.group(1) not in replacement_dict:
logging.warning(
'Group "%s" was not matched to any row in the provided CSV. '
"Skipping...",
match.group(1),
)
continue
original = subitem_path
objective = os.path.join(
os.path.dirname(subitem_path),
re.sub(match.re, replacement_dict[match.group(1)], subitem.strip()),
)
if extension:
objective += ("." if not extension.startswith(".") else "") + extension
if keep_extension:
objective += os.path.splitext(subitem_path)[1]
logging.info(f'Will rename "{original}" to "{os.path.basename(objective)}"')
if os.path.exists(objective):
logging.error(
f'Path at "{objective}" exists, not continuing. '
"Use -f to overwrite instead of stopping."
)
exit(1)
if not dry:
os.rename(original, objective)
if dry:
logging.info("No file names were modified.")
def run(args): def run(args):
candidates = find_all_candidates(args.input_dir, args.regex, args.recursive) candidates = find_all_candidates(args.input_dir, args.regex, args.recursive)
rename( rename_by_csv(
args.csv, args.csv,
candidates, candidates,
args.current, args.current,
@ -104,7 +30,7 @@ def main():
metavar="I", metavar="I",
) )
argparser.add_argument( argparser.add_argument(
"regex", "pattern",
help="The regex to apply to each file name. The first capture group is used to " help="The regex to apply to each file name. The first capture group is used to "
"perform the replacement.", "perform the replacement.",
metavar="R", metavar="R",

77
renamebycsv/renamer.py Normal file
View File

@ -0,0 +1,77 @@
import csv
import logging
import os
import re
from typing import Iterable
def find_all_candidates(input_dir: str, regex: str, recursive: bool):
results = []
for subitem in os.listdir(input_dir):
subitem_path = os.path.join(input_dir, subitem)
match = re.match(regex, subitem)
if os.path.isdir(subitem_path) and recursive:
logging.debug(f'Checking directory "{subitem}"...')
results.extend(find_all_candidates(subitem_path, regex, recursive))
else:
if not match:
logging.debug(f'Ignoring "{subitem}"...')
continue
results.append((subitem_path, subitem, match))
logging.debug(f'Collecting "{subitem}"...')
return results
def rename_by_csv(
csv_path: str,
candidates: Iterable[tuple[str, str, re.Match]],
current: str,
become: str,
dry: bool,
extension: str,
keep_extension: bool,
):
replacement_dict = {}
with open(csv_path, "r") as csv_fd:
reader = csv.reader(csv_fd)
current_col_ind = None
become_col_ind = None
for row in reader:
if current_col_ind is None and become_col_ind is None:
current_col_ind = row.index(current)
become_col_ind = row.index(become)
continue
if (
row[current_col_ind] in replacement_dict
and replacement_dict[row[current_col_ind]] != row[become_col_ind]
):
raise Exception("Duplicate current key.")
replacement_dict[row[current_col_ind]] = row[become_col_ind]
for subitem_path, subitem, match in candidates:
if match.group(1) not in replacement_dict:
logging.warning(
'Group "%s" was not matched to any row in the provided CSV. '
"Skipping...",
match.group(1),
)
continue
original = subitem_path
objective = os.path.join(
os.path.dirname(subitem_path),
re.sub(match.re, replacement_dict[match.group(1)], subitem.strip()),
)
if extension:
objective += ("." if not extension.startswith(".") else "") + extension
if keep_extension:
objective += os.path.splitext(subitem_path)[1]
logging.info(f'Will rename "{original}" to "{os.path.basename(objective)}"')
if os.path.exists(objective):
logging.error(
f'Path at "{objective}" exists, not continuing. '
"Use -f to overwrite instead of stopping."
)
exit(1)
if not dry:
os.rename(original, objective)
if dry:
logging.info("No file names were modified.")

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = renamebycsv name = renamebycsv
version = 0.0.5 version = 0.0.6
[options] [options]
packages = renamebycsv packages = renamebycsv