-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrwords.py
More file actions
503 lines (450 loc) · 20.6 KB
/
rwords.py
File metadata and controls
503 lines (450 loc) · 20.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
#!/usr/bin/env python3
# @file: rwords.py
# @auth: sprax
# @date: 2016-07-25 12:33:48 Mon 25 Jul
# Sprax Lines 2016.07.25 Written with Python 3.5
'''Class and driver script to solve simple substitution cipher from
a corpus and encoded text in separate text files.
Usage: python3 <this_script> [encoded_file [corpus_file [verbosity]]]
Where:
encoded_file contains text (at least mostly English) in which
every lower [upper[ case ASCII letter has been replaced by the
lower [upper] case subsititution-cipher value for that letter;
corpus_file contains (mostly English) text whose word distribution
is not too dissimilar from that of the encoded text; and
verbosity sets the level of trace output.
Two new files are written:
cipher_text.key will contain the discovered forward mapping of letter
to cipher, and
cipher_text.decoded will contain the deciphered contents the
cipher_text file.
'''
import heapq
import re
import sys
from collections import Counter, defaultdict
class SubCipher:
'''Solver to infer a simple substituion cipher based on a large
corpus and small sample of encoded text. Assumes English for
boot-strapping off these four words: I, a, the, and.'''
def __init__(self, cipher_file, corpus_file, verbose):
self.cipher_file = cipher_file
self.corpus_file = corpus_file
self.cipher_lines = read_file_lines(cipher_file)
self.cipher_len_1, self.cipher_words = word_counts_short_and_long(
cipher_file, 1)
self.corpus_len_1, self.corpus_words = word_counts_short_and_long(
corpus_file, 1)
self.cipher_chars = count_chars_from_words(self.cipher_words)
self.corpus_chars = count_chars_from_words(self.corpus_words)
self.forward_map = defaultdict(int)
self.inverse_map = defaultdict(int)
self.inverse_score = 0
self.verbose = verbose
if self.verbose > 1:
print("The dozen most common corpus words and their counts:")
for word, count in self.corpus_words.most_common(12):
print("\t", word, "\t", count)
def solve(self):
'''Given a file of ordinary English sentences encoded using a simple
substitution cipher, and a corpus of English text expected to contain
most of the words in the encoded text, decipher the encoded file.
Uses the SubCipher class.
'''
self.find_a_and_i()
self.find_the_and_and()
self.find_words_from_ciphers()
if self.verbose > 0:
matches, misses, missing_words = self.count_decoded_words_in_corpus()
print("Distinct decoded words found in corpus: {} misses: {}".format(
matches, misses))
if self.verbose > 1:
print("decoded words missing from corpus:", missing_words)
if self.verbose > 2:
self.print_deciphered_words()
print("Score from all matched words using the key below: ",
self.inverse_score, " (total matched letters)")
self.print_forward_map()
self.print_deciphered_lines()
self.write_forward_cipher_key(self.cipher_file + ".key")
self.write_deciphered_text(self.cipher_file + ".decoded")
def assign(self, corp, ciph):
'''Assigns corpus char -> cipher char in the forward cipher map,
and the opposite in the inverse map. Asserts that these character
are not already mapped. TODO: Replace assert with actual error?'''
assert self.forward_map[corp] == 0, (
"Cannot forward assign {} -> {} because already {} -> {}"
.format(corp, ciph, corp, self.forward_map[corp]))
assert self.inverse_map[ciph] == 0, (
"Cannot inverse assign {} -> {} because already {} -> {}"
.format(corp, ciph, self.inverse_map[ciph], ciph))
self.forward_map[corp] = ciph
self.inverse_map[ciph] = corp
if self.verbose > 1:
print("Accept", corp, "->", ciph)
def find_a_and_i(self):
'''Try to find the word "I" as the most common capitalized
single-letter word, and "a" as the most common lowercase
single-letter word. Assuming English, obviously.'''
if self.verbose > 0:
print("Search for the words 'a' and 'I'")
# Peek at these most common corpus words as a sanity-check
corp_ai = self.corpus_len_1.most_common(2)
corpchars = (corp_ai[0][0], corp_ai[1][0])
if corpchars != ('a', 'I') and corpchars != ('I', 'a'):
print("Unexpected most common 1-letter words in corpus: ", corp_ai)
ciph_ai = self.cipher_len_1.most_common(2)
if ciph_ai[0][0].islower():
self.assign('a', ciph_ai[0][0])
self.assign('i', ciph_ai[1][0].lower())
else:
self.assign('a', ciph_ai[1][0])
self.assign('i', ciph_ai[0][0].lower())
def find_the_and_and(self):
'''Try to find the two most common English words: "the" and "and".'''
if self.verbose > 0:
print("Search for the words 'the' and 'and'")
# Peek at the most common three-letter corpus words as a sanity-check
len3words = []
for word, _ in self.corpus_words.most_common(12):
if len(word) == 3:
len3words.append(word)
words = len3words[:2]
if words != ['the', 'and'] and words != ['and', 'the']:
print("Unexpected most common 3-letter words in corpus:\n", len3words)
most_freq_ciphs = self.cipher_chars.most_common(2)
probable_e = most_freq_ciphs[0][0]
alternate_e = most_freq_ciphs[1][0]
found_and = False
found_the = False
for ciph, _ in self.cipher_words.most_common(10):
if len(ciph) == 3:
if not found_the and (ciph[2] == probable_e or ciph[2] == alternate_e):
found_the = True
self.assign('t', ciph[0])
self.assign('h', ciph[1])
self.assign('e', ciph[2])
if not found_and and ciph[0] == self.forward_map['a']:
found_and = True
self.assign('n', ciph[1])
self.assign('d', ciph[2])
def find_words_from_ciphers(self):
'''Find common corpus words comprised mostly of known inverse
cipher chars, and try filling in the missing letters. Trials
are evaluated by scoring how many decoded cipher words then
match corpus words. The highest score wins. (That is, the
decision is immediate, not defered to accumulate multiple
scoring passes or backpropogating votes.'''
num_words = len(self.corpus_words)
corpus = self.corpus_words.most_common(num_words) # Just try them all
# priority = [num_unknown (updated on pop), -count, length]
inverse_pq = []
for ciph, count in self.cipher_words.items():
entry = [self.number_of_unknowns(ciph), -count, len(ciph), ciph]
heapq.heappush(inverse_pq, entry)
# terminate the loop when the sentinel is seen a second time
sentinel = ''
while inverse_pq:
num_unk, neg_count, length, ciph = heapq.heappop(inverse_pq)
if num_unk == 0:
continue
# update (unknowns can become known)
num_unk, idx_unk = self.num_idx_unknown(ciph)
if num_unk == 1:
self.inverse_match_1_unknown(ciph, length, idx_unk, corpus)
elif num_unk > 1:
if ciph == sentinel:
print('Breaking from queue at: ',
ciph, num_unk, -neg_count)
break
elif not sentinel:
# Set the sentinel and give each item still in the queue
# a chance to update its unknowns. Some may change to 1
# and get matched. Quit when the sentinel comes back to
# the front.
sentinel = ciph
print('Repush entry [', ciph, num_unk, -
neg_count, '] to end of the queue')
heapq.heappush(inverse_pq, [1000, 0, length, ciph])
elif self.verbose > 2:
print('\tAlready deciphered: ', num_unk, -
neg_count, ciph, self.decipher_word(ciph))
def inverse_match_1_unknown(self, ciph, length, idx_unknown, corpus):
'''Try to match one cipher word with a single unknown against all
corpus words of same length. Accept the match that maximaly
improves the total score (if there is any such a match).'''
if self.verbose > 3:
print('Trying to match cipher word {} at index {}'.format(
ciph, idx_unknown))
self.inverse_score = self.score_inverse_map()
ciph_char = ciph[idx_unknown]
max_score = 0
max_word = ''
deciphered = self.decipher_word(ciph)
for word, _ in corpus:
if len(word) == length:
# Match inverted ciphers to word chars
for idx in range(length):
if idx == idx_unknown:
continue # skip over the single unknown
if word[idx] != deciphered[idx]:
break # break on the first known mismatch
# all known chars matched, hole excluded
else:
# Compute the total score that would result from accepting
# this mapping
word_char = word[idx_unknown]
# create temporary inverse mapping
self.inverse_map[ciph_char] = word_char
# compute score with this mapping
try_score = self.score_inverse_map()
# delete temporary inverse mapping
self.inverse_map[ciph_char] = 0
if max_score < try_score:
max_score = try_score
max_word = word
if max_score > self.inverse_score:
self.update_mapping_on_better_score(
ciph, idx_unknown, max_word, max_score)
def update_mapping_on_better_score(self, ciph, idx_unknown, max_word, max_score):
'''Update forward and inverse maps and, if verbose > 0, show why.
For now this method assumes inverse-map scoring,
but could be generalized
for forward-mapping, partial-word matches (stemming), and so forth.
This method exists mainly to provide a trace of the solver's progress.'''
ciph_char = ciph[idx_unknown]
max_char = max_word[idx_unknown]
old_forward = self.forward_map[max_char]
count = self.cipher_words[ciph]
# Must delete the previous forward mapping, if it exists
if old_forward != 0:
if self.verbose > 0:
old_word = self.decipher_word(ciph)
print("Delete {} -> {} because {} x '{}' => '{}' gave old score: {}".format(
max_char, self.forward_map[max_char], count,
ciph, old_word, self.inverse_score))
self.forward_map[max_char] = 0
self.inverse_map[old_forward] = 0
if self.verbose > 0:
print("Assign {} -> {} because {} x '{}' => '{}' gives new score {} > {}".format(
max_char, ciph_char, count, ciph, max_word, max_score, self.inverse_score))
self.inverse_score = max_score
self.assign(max_char, ciph_char)
def score_inverse_map(self):
'''
score based on totality of deciphered cipher words matching corpus
words
'''
score_total = 0
for ciph, ciph_count in self.cipher_words.items():
word = self.decipher_word(ciph)
# 0 if not in corpus
word_count = 1 if self.corpus_words[word] else 0
score = word_count * ciph_count * len(ciph)
if self.verbose > 5:
print(" {:9}\t {} => {}".format(score, ciph, word))
score_total += score
return score_total
def count_decoded_words_in_corpus(self):
'''returns three things: the hit and miss counts, and a list of
all the decoded cipher text words missing from the corpus:
(1) hits: the number of distinct encoded words that, decoded with the
current best guess at the cipher key, match some word found in the
corpus, and
(2) misses: the number that do not.
Note that the set of 'words' may include such strings as "t" and
"ll", which result from splitting "don't" and "you'll" on the
apostrophe. No assumption is made of proper grammer or orthography
(3) missing words: a list containing all decoded cipher text
words not found in the corpus, in lexicographically sorted order
'''
num_ciphers = len(self.cipher_len_1) + len(self.cipher_words)
num_matches = 0
missing = []
for cipher in self.cipher_len_1:
deciph = self.decipher_word(cipher)
if self.corpus_len_1[deciph] > 0:
num_matches += 1
else:
missing.append(deciph)
for cipher in self.cipher_words:
deciph = self.decipher_word(cipher)
if self.corpus_words[deciph] > 0:
num_matches += 1
else:
missing.append(deciph)
return num_matches, num_ciphers - num_matches, sorted(missing)
def number_of_unknowns(self, ciph):
'''
returns the number of unknown cipher characters in the string ciph
'''
return sum(map(lambda x: self.inverse_map[x] == 0, ciph))
def num_idx_unknown(self, ciph):
'''returns the number of unknown cipher characters in the string ciph
and the index of the right-most unknown'''
num_unknown = 0
idx_unknown = -1
idx = 0
for fwd in ciph:
inv = self.inverse_map[fwd]
if inv == 0:
num_unknown += 1
idx_unknown = idx
idx += 1
return num_unknown, idx_unknown
def decipher_word(self, encoded_word):
'''Replace contents of encoded_word with inverse mapped chars'''
out = []
for fwd in encoded_word:
inv = self.inverse_map[fwd]
if inv == 0:
out.append('_')
else:
out.append(inv)
return ''.join(out)
def decipher_text(self, line):
'''Decode any string using the current inverse cipher map. Only lower
and upper case ASCII letters will be changed, preserving case.'''
decoded = []
for char in line:
if char >= 'a' and char <= 'z':
inv = self.inverse_map[char]
decoded.append(inv if inv else '_')
elif char >= 'A' and char <= 'Z':
inv = self.inverse_map[char.lower()]
decoded.append(inv.upper() if inv else '_')
else:
decoded.append(char)
return ''.join(decoded)
######## OUTPUT METHODS ########
def print_deciphered_words(self, outfile=sys.stdout):
'''Print all the words from the cipher file as decoded using
the current inverse_map to stdout (default) or a file'''
print('cipher\tcorpus \tcoded\tdecoded')
for ciph in sorted(self.cipher_words.keys(), key=self.cipher_words.get,
reverse=True):
ciph_count = self.cipher_words[ciph]
word = self.decipher_word(ciph)
corp_count = self.corpus_words[word]
print('{}\t{} \t {} => {}'.format(
ciph_count, corp_count, ciph, word), file=outfile)
def print_forward_map(self, outfile=sys.stdout):
'''Print the forward cipher mapping to stdout (default) or a file'''
for word_char in char_range_inclusive('a', 'z'):
ciph_char = self.forward_map[word_char]
print(word_char, "->", ciph_char if ciph_char else ' ', file=outfile)
def print_deciphered_lines(self, outfile=sys.stdout):
'''Print the decoded contents of the original cipher file to the
console
(default) or a file'''
for line in self.cipher_lines:
text = self.decipher_text(line)
if outfile == sys.stdout:
uprint(text) # compensate for non-UTF-terminals
else:
print(text, file=outfile)
def write_forward_cipher_key(self, path):
'''Write the forward cipher mapping into a new file.'''
with open(path, 'w') as out:
self.print_forward_map(out)
out.close()
def write_deciphered_text(self, path):
'''
Write the decoded contents of the original cipher file into a new file
'''
with open(path, 'w') as out:
self.print_deciphered_lines(out)
out.close()
def uprint(*objects, sep=' ', end='\n', outfile=sys.stdout):
'''Prints non-ASCII Unicode (UTF-8) characters in a safe (but possibly
ugly) way even in a Windows command terminal. Unicode-enabled terminals
such as on Mac or KDE have no problem, nor do most IDE's, but calling
Python's built-in print to print such characters (e.g., an em-dash)
from a Windows cmd or Powershell terminal causes errors such as:
UnicodeEncodeError: 'charmap' codec can't encode characters in position
32-33:
character maps to <undefined> '''
enc = outfile.encoding
if enc == 'UTF-8':
print(*objects, sep=sep, end=end, file=outfile)
else:
def enc_dec(obj): return str(obj).encode(
enc, errors='backslashreplace').decode(enc)
print(*map(enc_dec, objects), sep=sep, end=end, file=outfile)
def char_range_inclusive(first, last, step=1):
'''ranges from specified first to last character, inclusive, in
any character set, depending only on ord()'''
for char in range(ord(first), ord(last) + 1, step):
yield chr(char)
def read_file_lines(path):
'''reads a text file into a list of lines'''
lines = []
with open(path, 'r') as text:
for line in text:
lines.append(line.rstrip())
return lines
def count_words(path):
'''
Returns a Counter that has counted all ASCII-only words found in a text
file.
'''
rgx_match = re.compile(r"[A-Za-z]+")
counter = Counter()
with open(path, 'r') as text:
for line in text:
words = re.findall(rgx_match, line.rstrip())
words = [x.lower() if len(x) > 1 else x for x in words]
counter.update(words)
return counter
def word_counts_short_and_long(path, max_short_len):
'''Returns two Counters containing all the ASCII-only words found in a text
file.
The first counter counts only words up to length max_short_len, as-is.
The second counter contains all the longer words, but lowercased.'''
rgx_match = re.compile(r"[A-Za-z]+")
short_counter = Counter()
other_counter = Counter()
with open(path, 'r', encoding="utf8") as text:
for line in text:
short = []
other = []
words = re.findall(rgx_match, line.rstrip())
for word in words:
if len(word) <= max_short_len:
short.append(word)
else:
other.append(word.lower())
short_counter.update(short)
other_counter.update(other)
return short_counter, other_counter
def count_chars_from_words(word_counter):
'''Count chars from all words times their counts'''
char_counter = Counter()
for item in word_counter.items():
for _ in range(item[1]):
char_counter.update(item[0])
return char_counter
def solve_simple_substition_cipher(cipher_file, corpus_file, verbose):
'''Given a file of ordinary English sentences encoded using a simple
substitution cipher, and a corpus of English text expected to contain
most of the words in the encoded text, decipher the encoded file.
Uses the SubCipher class.
'''
subs = SubCipher(cipher_file, corpus_file, verbose)
subs.solve()
def main():
'''Get file names for cipher and corpus texts and call
solve_simple_substition_cipher.'''
# simple, inflexible arg parsing:
argc = len(sys.argv)
if argc > 4:
print(sys.argv[0])
print(__doc__)
exit(0)
# Get the paths to the files (relative or absolute)
cipher_file = sys.argv[1] if argc > 1 else r'cipher.txt'
corpus_file = sys.argv[2] if argc > 2 else r'corpus.txt'
verbose = int(sys.argv[3]) if argc > 3 else 1
solve_simple_substition_cipher(cipher_file, corpus_file, verbose)
if __name__ == '__main__':
main()