comparison process_document/app.py @ 2:ef8a4d95755a

add aws sam project
author Dennis C. M. <dennis@denniscm.com>
date Thu, 01 Jun 2023 18:51:18 +0100
parents
children 2e5f3664f3e4
comparison
equal deleted inserted replaced
1:e23b7617bbc4 2:ef8a4d95755a
1 import json
2 import boto3
3 from datetime import datetime
4 from collections import defaultdict
5
6 s3_client = boto3.client('s3')
7 textract_client = boto3.client('textract')
8
9
10 def lambda_handler(event, context):
11 for record in event['Records']:
12 metadata = record['s3']
13 bucket_name = metadata['bucket']['name']
14 object_key = metadata['object']['key']
15
16 doc = textract_client.analyze_document(
17 Document={'S3Object': {'Bucket': bucket_name, 'Name': object_key}},
18 FeatureTypes=['TABLES']
19 )
20
21 # Analyze document
22 result = defaultdict(dict)
23 blocks = doc['Blocks']
24
25 # Get format
26 lines = filter_blocks(blocks, 'BlockType', 'LINE')
27 for line in lines:
28 amount_format = get_format(line['Text'])
29 result['format'] = amount_format
30 if amount_format:
31 break
32
33 # Find dates value and position
34 data = defaultdict(dict)
35 cells = filter_blocks(blocks, 'BlockType', 'CELL')
36 for cell in cells:
37 if not 'Relationships' in cell:
38 continue
39
40 child_ids = [r['Ids'] for r in cell['Relationships'] if r['Type'] == 'CHILD'][0]
41
42 # Get `Text` from `CELL` block
43 cell_text = ''
44 for index, child_id in enumerate(child_ids):
45 word_block = filter_blocks(blocks, 'Id', child_id)[0]
46 cell_text += word_block['Text']
47
48 if index < len(child_ids) - 1:
49 cell_text += '_'
50
51 # Verify if `Text` could be a valid date
52 date_string = is_date(cell_text)
53 if date_string:
54 cell_text = date_string
55 result['dateRow'] = cell['RowIndex']
56 result['dateColumns'][cell['ColumnIndex']] = date_string
57
58 cell_row_index = cell['RowIndex']
59 cell_column_index = cell['ColumnIndex']
60 data[cell_row_index][cell_column_index] = clean(cell_text)
61
62 # Delete unused row and columns
63 for row_index in list(data.keys()):
64 if row_index > result['dateRow']:
65 row = data[row_index]
66 for column_index in list(row.keys()):
67 if column_index not in result['dateColumns'] and column_index != 1:
68 del row[column_index]
69
70 if len(row) > 1:
71 result['data'][row_index] = row
72
73 print(f'RESULT: {result}')
74
75 return {
76 "statusCode": 200,
77 "body": json.dumps({
78 "message": "ok"
79 }),
80 }
81
82
83 def filter_blocks(blocks, block_key, block_value):
84 """
85 Extract a block by key-value from array of blocks
86 """
87
88 return [block for block in blocks if block[block_key] == block_value]
89
90
91 def is_date(string_date):
92 """
93 Verify if a string could be a date.
94
95 -> Funciona pero es un desastre <-
96 """
97
98 formats_allowed = ['%d-%m-%Y', '%d_%m_%Y', '%d/%m/%Y', '%d.%m.%Y', '%Y']
99
100 for format_allowed in formats_allowed:
101 try:
102 date = datetime.strptime(string_date, format_allowed)
103
104 if date.year > datetime.now().year or date.year < 1900:
105 return # Fecha fuera de rango
106
107 return date.strftime("%Y")
108 except ValueError:
109
110 # Try removing characters from the beginning and end
111 options = [string_date[:-1], string_date[1:], string_date[1:-1]]
112 for option in options:
113 try:
114 date = datetime.strptime(option, format_allowed)
115
116 if date.year > datetime.now().year or date.year < 1900:
117 return # Fecha fuera de rango
118
119 return date.strftime("%Y")
120 except ValueError:
121 continue
122
123 return
124
125
126 def get_format(phrase):
127 """
128 Given a phrase verify if it is specified the amount format
129 """
130
131 amount_formats = ['thousand', 'million', 'billion']
132
133 for amount_format in amount_formats:
134 plural_amount_format = f'{amount_format}s'
135
136 if amount_format in phrase or plural_amount_format in phrase:
137 return amount_format
138
139
140 def clean(text):
141 """"
142 Remove bad characters from word
143 """
144
145 characters = ['.', ',', '-', ' ']
146
147 for character in characters:
148 text = text.replace(character, '')
149
150 return text.lower()