From: Jann Horn Date: Sun, 1 Sep 2024 20:57:13 +0000 (+0200) Subject: it sorta works!!! X-Git-Url: http://git.thejh.net/?a=commitdiff_plain;h=053b11ed2674c4fe9b948ff853fa1674c5e86e17;p=mehlbrei.git it sorta works!!! --- 053b11ed2674c4fe9b948ff853fa1674c5e86e17 diff --git a/threadview.py b/threadview.py new file mode 100755 index 0000000..40b5841 --- /dev/null +++ b/threadview.py @@ -0,0 +1,371 @@ +#!/usr/bin/env python3 +import copy +import difflib +import email +import email.policy +import html +import mailbox + +def join_whitespace(text): + return ' '.join(text.split()) + +class TextBlockRange: + def __init__(self, message, start_pos_child, end_pos_child, pos_parent, is_quote): + self.message = message + # index into message body lines for reading text + self.start_pos_orig = start_pos_child + # start anchor index + self.start_pos_child = start_pos_child + # length in current context (0 after moving up) + self.length = end_pos_child - start_pos_child + # original length + self.length_orig = self.length + # points to parent anchor line (for quotes: line where the parent text starts; for non-quotes: line at which the text should be anchored, or None for top-posted text) + self.pos_parent = pos_parent + self.is_quote = is_quote + + for line in self.message.lines.lines[self.start_pos_orig:self.start_pos_orig+self.length_orig]: + line.real_quote = True + + # returns exclusive end + def end_pos_child(self): + return self.start_pos_child + self.length + + def move_up_after(self, parent_block): + self.start_pos_child = parent_block.start_pos_child + parent_block.length + self.pos_parent = parent_block.pos_parent + (parent_block.length if parent_block.is_quote else 0) + self.length = 0 + + def move_up_at_head(self): + self.start_pos_child = -1 + self.pos_parent = -1 + self.length = 0 + + def split(self, split_pos): + split_off = split_pos - self.start_pos_child + assert(split_off > 0) + assert(split_off < self.length) + part2 = copy.copy(self) + + self.length = split_off + self.length_orig = split_off + + part2.start_pos_orig += split_off + part2.start_pos_child += split_off + part2.length -= split_off + part2.length_orig -= split_off + if self.is_quote: + part2.pos_parent += split_off + + return part2 + + def get_text(self): + return '\n'.join(self.message.lines.plain_lines[self.start_pos_orig:self.start_pos_orig+self.length_orig]) + + def get_formatted(self): + pad = ' '*self.message.depth + header = pad + '<<< FROM '+self.message.from_hdr+' >>>, '+str(self.length_orig)+' lines\n' + return header + '\n'.join(map((lambda line: pad+line), self.message.lines.plain_lines[self.start_pos_orig:self.start_pos_orig+self.length_orig])) + + def meta_origin_str(self): + # self.message.id ? + return str(self.start_pos_orig)+'-'+str(self.start_pos_orig+self.length_orig) + ' MSG ' + str(self.message.index) + ' FROM '+self.message.from_hdr + + def dump_meta(self): + pad = ' '*self.message.depth + print(pad + self.meta_origin_str()) + + def trim_blanks_and_trailing_quote_header(self): + # first, trim blank lines at start/end... + while self.length_orig != 0 and self.message.lines.lines[self.start_pos_orig].empty: + self.start_pos_orig += 1 + self.length_orig -= 1 + while self.length_orig != 0 and self.message.lines.lines[self.start_pos_orig+self.length_orig-1].empty: + self.length_orig -= 1 + + # Check for usual english quote header text pattern (assuming single line) + if self.length_orig == 0: + return + last_line = self.message.lines.plain_lines[self.start_pos_orig+self.length_orig-1] + if last_line.lstrip().startswith('On ') and last_line.rstrip().endswith(':'): + # Check if next block is quote + check_pos = self.start_pos_orig + self.length_orig + print('######## LOOKS LIKE QUOTE: <<'+last_line+'>>') + while check_pos < len(self.message.lines.lines): + if self.message.lines.lines[check_pos].real_quote: + # is real quote, strip last line + self.length_orig -= 1 + print(' IS QUOTE!') + break + if not self.message.lines.lines[check_pos].empty: + break + check_pos += 1 + + # trim blank lines before trailing header + while self.length_orig != 0 and self.message.lines.lines[self.start_pos_orig+self.length_orig-1].empty: + self.length_orig -= 1 + +class TextBlockRangeset: + def __init__(self): + self.ranges = [] + + def append(self, r): + if r.length_orig != 0: + self.ranges.append(r) + + def import_rangeset(self, other_ranges): + print('import_rangeset()') + print(' own ranges:') + for r in self.ranges: + print(' '+str(r.start_pos_child)+'-'+str(r.end_pos_child())+' '+('quote' if r.is_quote else 'new')) + print(' import ranges:') + for r in other_ranges: + print(' '+str(r.pos_parent)) + + own_i = 0 + for other_range in other_ranges: + print('importing element with pos_parent='+str(other_range.pos_parent)) + while own_i != len(self.ranges) and other_range.pos_parent >= self.ranges[own_i].end_pos_child(): + own_i += 1 + if other_range.pos_parent == -1: + # for -1: copy up after all the other blocks with -1 + other_range.move_up_at_head() + self.ranges.insert(own_i, other_range) + continue + + # for pos_parent after the start of the current block, we need to split the current block + if own_i < len(self.ranges) and other_range.pos_parent > self.ranges[own_i].start_pos_child: + self.ranges.insert(own_i+1, self.ranges[own_i].split(other_range.pos_parent)) + # insert position + own_i = own_i + 1 + else: + # for pos_parent at/before the start of the current block, just insert at current cursor + own_i = own_i + + other_range.move_up_after(self.ranges[own_i-1]) + self.ranges.insert(own_i, other_range) + + print(' own ranges after:') + for r in self.ranges: + print(' '+str(r.start_pos_child)+'-'+str(r.end_pos_child())+' '+('quote' if r.is_quote else 'new')+' '+r.meta_origin_str()) + + def drop_quotes(self): + self.ranges = list(filter(lambda r: not r.is_quote, self.ranges)) + + def trim_blocks(self): + for r in self.ranges: + r.trim_blanks_and_trailing_quote_header() + self.ranges = list(filter((lambda r: r.length_orig != 0), self.ranges)) + + def get_formatted(self): + return '\n'.join(map(lambda r: r.get_formatted(), self.ranges)) + + def dump_meta(self): + for r in self.ranges: + r.dump_meta() + +class Line: + def __init__(self, text): + self.text = text + nowhite = text.replace(' ', '').replace('\t', '') + nowhite_noquote = nowhite.lstrip('>') + self.maybe_quote = (len(nowhite)-len(nowhite_noquote) != 0) + self.real_quote = False + self.empty = len(nowhite_noquote) == 0 + + def as_hashable_inherited_line(self): + if not self.maybe_quote: + return '' # will be treated as junk + stripped = self.text.strip() + if stripped.startswith('>'): + stripped = stripped[1:].strip() + return join_whitespace(stripped) + + def as_hashable_line(self): + return join_whitespace(self.text.strip()) + +class Lines: + def __init__(self, text): + self.plain_lines = text.split('\n') + self.lines = list(map((lambda line: Line(line)), self.plain_lines)) + + def as_hashable_list(self): + return list(map(lambda line: line.as_hashable_line(), self.lines)) + + def as_hashable_inherited_list(self): + return list(map(lambda line: line.as_hashable_inherited_line(), self.lines)) + + def diff_to_parent(self, msg, parent): + sm = difflib.SequenceMatcher((lambda line: line == None or line == ''), parent.as_hashable_list(), self.as_hashable_inherited_list()) + inherited_chunks = sm.get_matching_blocks() + #print(inherited_chunks) + + blocks = TextBlockRangeset() + child_uninherited_start = 0 + pos_parent = -1 + for inherited_chunk in inherited_chunks: + #print('uninherited from '+str(child_uninherited_start)+' to '+str(inherited_chunk.b)) + #print('\n'.join(self.plain_lines[child_uninherited_start:inherited_chunk.b])) + # we don't need to repeat this after the loop - the last range is guaranteed to have size 0 + blocks.append(TextBlockRange(msg, child_uninherited_start, inherited_chunk.b, pos_parent, False)) + blocks.append(TextBlockRange(msg, inherited_chunk.b, inherited_chunk.b+inherited_chunk.size, inherited_chunk.a, True)) + child_uninherited_start = inherited_chunk.b + inherited_chunk.size + pos_parent = inherited_chunk.a + inherited_chunk.size + return blocks + + def dump(self): + for line in self.lines: + print(('Q' if line.maybe_quote else ' ') + ' ' + line.text) + +class ThreadMessage: + def __init__(self, msg, index): + self.msg = msg + self.index = index + self.lines = Lines(msg.get_body(('plain',)).get_content()) + self.id = msg.get('message-id', None) + self.from_hdr = msg.get('from', '?') + self.parent_id = msg.get('in-reply-to', None) + self.is_patch = msg.get('subject', '').startswith('[PATCH') + self.parent = None + self.children = [] + self.blocks = None + self.depth = None + self.non_diff_root_descendants = 0 + + def is_diff_root(self): + return self.parent == None or self.is_patch + + def dump_tree(self, level, topic_only): + print(' '*level + self.id + ' ' + ('PATCH' if self.is_patch else ' ')) + for child in self.children: + if topic_only and child.is_patch: + continue + child.dump_tree(level+1, topic_only) + + def init_diff_recursively(self): + print('init_diff_recursively') + if self.is_diff_root(): + self.blocks = TextBlockRangeset() + self.blocks.append(TextBlockRange(self, 0, len(self.lines.lines), -1, False)) + self.depth = 0 + else: + self.blocks = self.lines.diff_to_parent(self, self.parent.lines) + self.depth = self.parent.depth + 1 + if len(self.blocks.ranges) == 0: + raise Exception('empty rangeset?') + + all_child_blocks = [] + for child in self.children: + child.init_diff_recursively() + if not child.is_diff_root(): + self.non_diff_root_descendants += 1 + child.non_diff_root_descendants + for block in child.blocks.ranges: + all_child_blocks.append(block) + all_child_blocks.sort(key=lambda block:block.pos_parent) + print('importing for '+self.id+' from '+self.from_hdr) + self.blocks.import_rangeset(all_child_blocks) + self.blocks.drop_quotes() + + if self.is_diff_root(): + # only do this for diff roots, at the end + self.blocks.trim_blocks() + +mbox = mailbox.mbox('t.mbox', factory=lambda x: email.message_from_binary_file(x, policy=email.policy.default)) +root_message = None +messages_by_id = {} +all_messages = [] +all_topic_roots = [] +message_index = 0 +for msg in mbox: + tmsg = ThreadMessage(msg, message_index) + message_index += 1 + messages_by_id[tmsg.id] = tmsg + #print('have message "'+tmsg.id+'"') + all_messages.append(tmsg) + if tmsg.parent_id == None: + if root_message != None: + raise Exception('more than one root message') + root_message = tmsg + if tmsg.parent_id == None or tmsg.is_patch: + all_topic_roots.append(tmsg) + +if root_message == None: + raise Exception('no root message') + +for tmsg in all_messages: + #print('looking up message "'+tmsg.parent_id+'"') + if not tmsg.parent_id: + continue + if not tmsg.parent_id in messages_by_id: + raise Exception('missing intermediate message') + parent_tmsg = messages_by_id[tmsg.parent_id] + tmsg.parent = parent_tmsg + parent_tmsg.children.append(tmsg) + +# note: theoretically we can have loops in the "tree" at this point, but they'd +# have to be unreachable from the root, so, meh, whatever + +#root_message.dump_tree(0, False) +root_message.init_diff_recursively() +#messages_by_id['<20240827143852.163123189@linuxfoundation.org>'].init_diff_recursively() +#for topic in all_topic_roots: +# print('TOPIC: '+topic.msg.get('subject', '')) +# topic.dump_tree(1, True) +# print('') + + +print('<<<<<<') +#print(messages_by_id['<20240827143852.163123189@linuxfoundation.org>'].blocks.get_formatted()) +print(root_message.blocks.get_formatted()) +print('>>>>>>') +#messages_by_id['<20240827143852.163123189@linuxfoundation.org>'].blocks.dump_meta() +root_message.blocks.dump_meta() + + +with open('/tmp/lkml-out.html', 'wt') as outfile: + outfile.write('\n') + outfile.write('\n') + outfile.write(' \n') + outfile.write(' Mehlbrei\n') + outfile.write(' \n') + outfile.write(' \n') + outfile.write(' \n') + outfile.write('
\n') + outfile.write('

Topics

\n') + for topic_root in all_topic_roots: + outfile.write(' '+html.escape(topic_root.msg.get('subject', ''), quote=False)+' ('+str(topic_root.non_diff_root_descendants)+' replies)\n') + outfile.write('
\n') + outfile.write('
\n') + for topic_root in all_topic_roots: + outfile.write('
\n') + top_post_blocks = list(filter(lambda x: x.start_pos_child < 0, topic_root.blocks.ranges)) + thread_post_blocks = list(filter(lambda x: x.start_pos_child >= 0, topic_root.blocks.ranges)) + if len(top_post_blocks) != 0: + outfile.write('

Top-posted messages (sorry this looks like a mess)

\n') + for block in top_post_blocks: + outfile.write('
\n') + outfile.write('
'+html.escape(block.message.from_hdr, quote=False)+'
\n') + outfile.write('
'+html.escape(block.get_text(), quote=False)+'
\n') + outfile.write('
\n') + outfile.write('

Actual thread

\n') + for block in thread_post_blocks: + outfile.write('
\n') + outfile.write('
'+html.escape(block.message.from_hdr, quote=False)+'
\n') + outfile.write('
'+html.escape(block.get_text(), quote=False)+'
\n') + outfile.write('
\n') + outfile.write('
\n') + outfile.write('
\n') + outfile.write(' \n') + outfile.write('\n')