This repository has been archived on 2023-07-17. You can view files and clone it, but cannot push or open issues or pull requests.
pushee/kontr_emails.py

144 lines
4.6 KiB
Python
Raw Normal View History

2019-11-30 12:19:33 +01:00
#!/usr/bin/env python3
2019-11-30 12:46:56 +01:00
import datetime
2019-11-30 17:48:29 +01:00
import functools
import itertools
2019-11-30 12:19:33 +01:00
from mailbox import mbox, mboxMessage
2019-11-30 17:48:29 +01:00
import math
2019-11-30 12:19:33 +01:00
import re
2019-11-30 17:48:29 +01:00
from typing import Dict, List, Tuple
from submission import Submission, print_submissions
2019-11-30 12:19:33 +01:00
class Parser:
INFO_REGEX = re.compile(r"(\d{6}) \| (x\S*)\s*")
2019-12-02 10:52:57 +01:00
SUBMISSION_REGEX = re.compile(r"adresář:\s+(\S*\/(\S*))\s*")
2019-11-30 12:19:33 +01:00
POINTS_REGEX = re.compile(r"\*\scelkový počet bodů\s+((\d|\.)*)\s*")
2019-11-30 12:46:56 +01:00
DATE_FORMAT = "%Y_%m%d_%H%M%S"
OFFSET_FOR_CORRECTION = datetime.timedelta(days=8)
2019-11-30 12:19:33 +01:00
@staticmethod
2019-11-30 12:22:21 +01:00
def get_match_from_mail(regex: re.Pattern, mail: mboxMessage) -> re.Match:
body = mail.get_payload()
match = regex.search(body)
2019-11-30 12:19:33 +01:00
if not match:
raise ValueError("invalid mail has been given")
2019-11-30 12:22:21 +01:00
return match
2019-11-30 12:19:33 +01:00
@staticmethod
2019-11-30 12:22:21 +01:00
def parse_info(mail: mboxMessage) -> Tuple[str, str]:
match = Parser.get_match_from_mail(Parser.INFO_REGEX, mail)
return match.group(1), match.group(2)
2019-11-30 12:19:33 +01:00
2019-11-30 12:22:21 +01:00
@staticmethod
def parse_submission(mail: mboxMessage) -> str:
match = Parser.get_match_from_mail(Parser.SUBMISSION_REGEX, mail)
2019-12-02 10:52:57 +01:00
return match.group(1), match.group(2)
2019-11-30 12:19:33 +01:00
@staticmethod
2019-11-30 12:22:21 +01:00
def parse_points(mail: mboxMessage) -> float:
match = Parser.get_match_from_mail(Parser.POINTS_REGEX, mail)
2019-11-30 12:19:33 +01:00
return float(match.group(1))
2019-11-30 12:46:56 +01:00
def __init__(self, path: str, deadline: str, correction: bool = False) -> None:
2019-11-30 12:19:33 +01:00
self.box = mbox(path)
2019-11-30 17:48:29 +01:00
2019-11-30 12:46:56 +01:00
self.deadline = datetime.datetime.strptime(deadline, Parser.DATE_FORMAT)
2019-11-30 17:48:29 +01:00
self.correction = correction
2019-11-30 12:46:56 +01:00
if correction:
# in case of correction pass the date of
# submitting review to IS for automatic computation
self.deadline = self.deadline.replace(hour=0, minute=0, second=0)
self.deadline += Parser.OFFSET_FOR_CORRECTION
2019-11-30 12:19:33 +01:00
2019-11-30 17:48:29 +01:00
def __get_submissions(self, hw_tag: str) -> Dict[str, List[Submission]]:
submissions = []
for mail in self.box.values():
uco, login = Parser.parse_info(mail)
2019-12-02 10:52:57 +01:00
path, submission_id = Parser.parse_submission(mail)
2019-11-30 17:48:29 +01:00
points = Parser.parse_points(mail)
submissions.append(
2019-11-30 22:47:08 +01:00
Submission(
uco,
login,
path,
2019-12-02 10:52:57 +01:00
submission_id,
2019-11-30 22:47:08 +01:00
points,
hw_tag,
self.correction,
mail.get_payload(),
)
2019-11-30 17:48:29 +01:00
)
submissions[-1].set_late_tag(self.deadline)
submissions.sort(key=lambda e: (e.login, e.submitted_at))
return dict(
map(
lambda val: (val[0], list(val[1])),
itertools.groupby(submissions, key=lambda s: s.login),
)
)
def __filter(self, all_submissions: Dict[str, List[Submission]]) -> None:
for _, submissions in all_submissions.items():
2019-12-02 11:39:40 +01:00
if not submissions[0].submitted_before_deadline:
continue
2019-11-30 17:48:29 +01:00
length = len(submissions)
2019-12-02 11:39:40 +01:00
max_i = 0
for i in range(1, length):
if not submissions[i].submitted_before_deadline:
break
if submissions[i].points >= submissions[max_i].points:
max_i = i
2019-11-30 17:48:29 +01:00
2019-12-02 11:39:40 +01:00
submissions[max_i].flag = "REVIEW"
2019-11-30 17:48:29 +01:00
def __correct_errors(self, all_submissions: Dict[str, List[Submission]]) -> None:
response = input("Do you wish to fix errors? ").strip()
if not response or response == "n":
return True
print("Choose indices of submissions that are to be reviewed")
for login, submissions in all_submissions.items():
response = input(f"==> Choose submission to review for {login}: ").strip()
if not response or response == "n":
continue
j = int(response)
for i, submission in enumerate(submissions):
if submission.flag == "REVIEW":
submission.flag = None
if i == j:
submission.flag = "REVIEW"
return False
def parse(self, hw_tag: str) -> List[Submission]:
def __reducer(lst, submission):
if submission.flag == "REVIEW":
lst.append(submission)
return lst
submissions = self.__get_submissions(hw_tag)
self.__filter(submissions)
print_submissions(submissions)
while not self.__correct_errors(submissions):
print_submissions(submissions)
result = []
for something in submissions.values():
2019-12-02 10:52:57 +01:00
functools.reduce(__reducer, something, result)
2019-11-30 17:48:29 +01:00
return result
2019-11-30 12:19:33 +01:00