Mercurial > public > finance-parser
comparison process_document/app.py @ 2:ef8a4d95755a
add aws sam project
author | Dennis C. M. <dennis@denniscm.com> |
---|---|
date | Thu, 01 Jun 2023 18:51:18 +0100 |
parents | |
children | 2e5f3664f3e4 |
comparison
equal
deleted
inserted
replaced
1:e23b7617bbc4 | 2:ef8a4d95755a |
---|---|
1 import json | |
2 import boto3 | |
3 from datetime import datetime | |
4 from collections import defaultdict | |
5 | |
6 s3_client = boto3.client('s3') | |
7 textract_client = boto3.client('textract') | |
8 | |
9 | |
10 def lambda_handler(event, context): | |
11 for record in event['Records']: | |
12 metadata = record['s3'] | |
13 bucket_name = metadata['bucket']['name'] | |
14 object_key = metadata['object']['key'] | |
15 | |
16 doc = textract_client.analyze_document( | |
17 Document={'S3Object': {'Bucket': bucket_name, 'Name': object_key}}, | |
18 FeatureTypes=['TABLES'] | |
19 ) | |
20 | |
21 # Analyze document | |
22 result = defaultdict(dict) | |
23 blocks = doc['Blocks'] | |
24 | |
25 # Get format | |
26 lines = filter_blocks(blocks, 'BlockType', 'LINE') | |
27 for line in lines: | |
28 amount_format = get_format(line['Text']) | |
29 result['format'] = amount_format | |
30 if amount_format: | |
31 break | |
32 | |
33 # Find dates value and position | |
34 data = defaultdict(dict) | |
35 cells = filter_blocks(blocks, 'BlockType', 'CELL') | |
36 for cell in cells: | |
37 if not 'Relationships' in cell: | |
38 continue | |
39 | |
40 child_ids = [r['Ids'] for r in cell['Relationships'] if r['Type'] == 'CHILD'][0] | |
41 | |
42 # Get `Text` from `CELL` block | |
43 cell_text = '' | |
44 for index, child_id in enumerate(child_ids): | |
45 word_block = filter_blocks(blocks, 'Id', child_id)[0] | |
46 cell_text += word_block['Text'] | |
47 | |
48 if index < len(child_ids) - 1: | |
49 cell_text += '_' | |
50 | |
51 # Verify if `Text` could be a valid date | |
52 date_string = is_date(cell_text) | |
53 if date_string: | |
54 cell_text = date_string | |
55 result['dateRow'] = cell['RowIndex'] | |
56 result['dateColumns'][cell['ColumnIndex']] = date_string | |
57 | |
58 cell_row_index = cell['RowIndex'] | |
59 cell_column_index = cell['ColumnIndex'] | |
60 data[cell_row_index][cell_column_index] = clean(cell_text) | |
61 | |
62 # Delete unused row and columns | |
63 for row_index in list(data.keys()): | |
64 if row_index > result['dateRow']: | |
65 row = data[row_index] | |
66 for column_index in list(row.keys()): | |
67 if column_index not in result['dateColumns'] and column_index != 1: | |
68 del row[column_index] | |
69 | |
70 if len(row) > 1: | |
71 result['data'][row_index] = row | |
72 | |
73 print(f'RESULT: {result}') | |
74 | |
75 return { | |
76 "statusCode": 200, | |
77 "body": json.dumps({ | |
78 "message": "ok" | |
79 }), | |
80 } | |
81 | |
82 | |
83 def filter_blocks(blocks, block_key, block_value): | |
84 """ | |
85 Extract a block by key-value from array of blocks | |
86 """ | |
87 | |
88 return [block for block in blocks if block[block_key] == block_value] | |
89 | |
90 | |
91 def is_date(string_date): | |
92 """ | |
93 Verify if a string could be a date. | |
94 | |
95 -> Funciona pero es un desastre <- | |
96 """ | |
97 | |
98 formats_allowed = ['%d-%m-%Y', '%d_%m_%Y', '%d/%m/%Y', '%d.%m.%Y', '%Y'] | |
99 | |
100 for format_allowed in formats_allowed: | |
101 try: | |
102 date = datetime.strptime(string_date, format_allowed) | |
103 | |
104 if date.year > datetime.now().year or date.year < 1900: | |
105 return # Fecha fuera de rango | |
106 | |
107 return date.strftime("%Y") | |
108 except ValueError: | |
109 | |
110 # Try removing characters from the beginning and end | |
111 options = [string_date[:-1], string_date[1:], string_date[1:-1]] | |
112 for option in options: | |
113 try: | |
114 date = datetime.strptime(option, format_allowed) | |
115 | |
116 if date.year > datetime.now().year or date.year < 1900: | |
117 return # Fecha fuera de rango | |
118 | |
119 return date.strftime("%Y") | |
120 except ValueError: | |
121 continue | |
122 | |
123 return | |
124 | |
125 | |
126 def get_format(phrase): | |
127 """ | |
128 Given a phrase verify if it is specified the amount format | |
129 """ | |
130 | |
131 amount_formats = ['thousand', 'million', 'billion'] | |
132 | |
133 for amount_format in amount_formats: | |
134 plural_amount_format = f'{amount_format}s' | |
135 | |
136 if amount_format in phrase or plural_amount_format in phrase: | |
137 return amount_format | |
138 | |
139 | |
140 def clean(text): | |
141 """" | |
142 Remove bad characters from word | |
143 """ | |
144 | |
145 characters = ['.', ',', '-', ' '] | |
146 | |
147 for character in characters: | |
148 text = text.replace(character, '') | |
149 | |
150 return text.lower() |