diff process_document/app.py @ 2:ef8a4d95755a

add aws sam project
author Dennis C. M. <dennis@denniscm.com>
date Thu, 01 Jun 2023 18:51:18 +0100
parents
children 2e5f3664f3e4
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/process_document/app.py	Thu Jun 01 18:51:18 2023 +0100
@@ -0,0 +1,150 @@
+import json
+import boto3
+from datetime import datetime
+from collections import defaultdict
+
+s3_client = boto3.client('s3')
+textract_client = boto3.client('textract')
+
+
+def lambda_handler(event, context):
+    for record in event['Records']:
+        metadata = record['s3']
+        bucket_name = metadata['bucket']['name']
+        object_key = metadata['object']['key']
+
+        doc = textract_client.analyze_document(
+            Document={'S3Object': {'Bucket': bucket_name, 'Name': object_key}},
+            FeatureTypes=['TABLES']
+        )
+
+        # Analyze document
+        result = defaultdict(dict)
+        blocks = doc['Blocks']
+
+        # Get format
+        lines = filter_blocks(blocks, 'BlockType', 'LINE')
+        for line in lines:
+            amount_format = get_format(line['Text'])
+            result['format'] = amount_format
+            if amount_format:
+                break
+
+        # Find dates value and position
+        data = defaultdict(dict)
+        cells = filter_blocks(blocks, 'BlockType', 'CELL')
+        for cell in cells:
+            if not 'Relationships' in cell:
+                continue
+
+            child_ids = [r['Ids'] for r in cell['Relationships'] if r['Type'] == 'CHILD'][0]
+
+            # Get `Text` from `CELL` block
+            cell_text = ''
+            for index, child_id in enumerate(child_ids):
+                word_block = filter_blocks(blocks, 'Id', child_id)[0]
+                cell_text += word_block['Text']
+
+                if index < len(child_ids) - 1:
+                    cell_text += '_'
+
+            # Verify if `Text` could be a valid date
+            date_string = is_date(cell_text)
+            if date_string:
+                cell_text = date_string
+                result['dateRow'] = cell['RowIndex']
+                result['dateColumns'][cell['ColumnIndex']] = date_string
+
+            cell_row_index = cell['RowIndex']
+            cell_column_index = cell['ColumnIndex']
+            data[cell_row_index][cell_column_index] = clean(cell_text)
+
+        # Delete unused row and columns
+        for row_index in list(data.keys()):
+            if row_index > result['dateRow']:
+                row = data[row_index]
+                for column_index in list(row.keys()):
+                    if column_index not in result['dateColumns'] and column_index != 1:
+                        del row[column_index]
+
+                if len(row) > 1:
+                    result['data'][row_index] = row
+
+        print(f'RESULT: {result}')
+
+    return {
+        "statusCode": 200,
+        "body": json.dumps({
+            "message": "ok"
+        }),
+    }
+
+
+def filter_blocks(blocks, block_key, block_value):
+    """
+    Extract a block by key-value from array of blocks
+    """
+
+    return [block for block in blocks if block[block_key] == block_value]
+
+
+def is_date(string_date):
+    """
+    Verify if a string could be a date.
+
+    -> Funciona pero es un desastre <-
+    """
+
+    formats_allowed = ['%d-%m-%Y', '%d_%m_%Y', '%d/%m/%Y', '%d.%m.%Y', '%Y']
+
+    for format_allowed in formats_allowed:
+        try:
+            date = datetime.strptime(string_date, format_allowed)
+
+            if date.year > datetime.now().year or date.year < 1900:
+                return  # Fecha fuera de rango
+
+            return date.strftime("%Y")
+        except ValueError:
+
+            # Try removing characters from the beginning and end
+            options = [string_date[:-1], string_date[1:], string_date[1:-1]]
+            for option in options:
+                try:
+                    date = datetime.strptime(option, format_allowed)
+
+                    if date.year > datetime.now().year or date.year < 1900:
+                        return  # Fecha fuera de rango
+
+                    return date.strftime("%Y")
+                except ValueError:
+                    continue
+
+    return
+
+
+def get_format(phrase):
+    """
+    Given a phrase verify if it is specified the amount format
+    """
+
+    amount_formats = ['thousand', 'million', 'billion']
+
+    for amount_format in amount_formats:
+        plural_amount_format = f'{amount_format}s'
+
+        if amount_format in phrase or plural_amount_format in phrase:
+            return amount_format
+
+
+def clean(text):
+    """"
+    Remove bad characters from word
+    """
+
+    characters = ['.', ',', '-', ' ']
+
+    for character in characters:
+        text = text.replace(character, '')
+
+    return text.lower()