changeset 3:2e5f3664f3e4

documents analyzer almost finished
author Dennis C. M. <dennis@denniscm.com>
date Fri, 02 Jun 2023 20:12:29 +0100
parents ef8a4d95755a
children 9005b7590008
files .idea/finance-parser.iml analyze_document/__init__.py analyze_document/app.py analyze_document/requirements.txt events/analyze_document_event.json events/process_document_event.json events/upload_document_event.json process_document/app.py statemachine/statemachine.asl.json template.yaml upload_document/__init__.py upload_document/app.py upload_document/requirements.txt
diffstat 11 files changed, 357 insertions(+), 102 deletions(-) [+]
line wrap: on
line diff
--- a/.idea/finance-parser.iml	Thu Jun 01 18:51:18 2023 +0100
+++ b/.idea/finance-parser.iml	Fri Jun 02 20:12:29 2023 +0100
@@ -21,7 +21,7 @@
                   <option value="AUTO_EXPAND" />
                 </list>
               </option>
-              <option name="stackName" value="FinanceParser" />
+              <option name="stackName" value="Denniscm-FinanceParser" />
               <option name="tags">
                 <map />
               </option>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/analyze_document/app.py	Fri Jun 02 20:12:29 2023 +0100
@@ -0,0 +1,44 @@
+import json
+import boto3
+import uuid
+import re
+
+
+textract_client = boto3.client('textract')
+s3_client = boto3.client('s3')
+
+
+def lambda_handler(event, context):
+    event_detail = event['detail']
+    bucket_name = event_detail['bucket']['name']
+    object_key = event_detail['object']['key']
+    company_ticker = re.search('unprocessed/(.*).pdf', object_key).group(1)
+
+    data_dict = textract_client.analyze_document(
+        Document={'S3Object': {'Bucket': bucket_name, 'Name': object_key}},
+        FeatureTypes=['TABLES']
+    )
+
+    data_string = json.dumps(data_dict, indent=2, default=str)
+    filename = f'{company_ticker}_{uuid.uuid4()}.json'
+
+    s3_client.put_object(
+        Bucket=bucket_name,
+        Key=f'analyzed/{filename}',
+        Body=data_string
+    )
+
+    s3_client.delete_object(
+        Bucket=bucket_name,
+        Key=object_key
+    )
+
+    return {
+        "statusCode": 200,
+        "body": {
+            "message": {
+                "objectKey": f'analyzed/{filename}',
+                "bucketName": bucket_name
+            }
+        },
+    }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/analyze_document/requirements.txt	Fri Jun 02 20:12:29 2023 +0100
@@ -0,0 +1,1 @@
+boto3
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/events/analyze_document_event.json	Fri Jun 02 20:12:29 2023 +0100
@@ -0,0 +1,28 @@
+{
+   "version":"0",
+   "id":"434ad981-1cce-8857-e57c-10039ddf700a",
+   "detail-type":"Object Created",
+   "source":"aws.s3",
+   "account":"587328694482",
+   "time":"2023-06-02T16:15:26Z",
+   "region":"eu-central-1",
+   "resources":[
+      "arn:aws:s3:::sandbox-finance-parser-data"
+   ],
+   "detail":{
+      "version":"0",
+      "bucket":{
+         "name":"sandbox-finance-parser-data"
+      },
+      "object":{
+         "key":"unprocessed/san.pdf",
+         "size":49856,
+         "etag":"0adc595c8f2dbfabb5c4095f1f91b458",
+         "sequencer":"00647A159E6438B1A6"
+      },
+      "request-id":"47GMMWBYT14BGH15",
+      "requester":"587328694482",
+      "source-ip-address":"88.25.226.176",
+      "reason":"PutObject"
+   }
+}
\ No newline at end of file
--- a/events/process_document_event.json	Thu Jun 01 18:51:18 2023 +0100
+++ b/events/process_document_event.json	Fri Jun 02 20:12:29 2023 +0100
@@ -1,38 +1,9 @@
 {
-   "Records":[
-      {
-         "eventVersion":"2.1",
-         "eventSource":"aws:s3",
-         "awsRegion":"eu-central-1",
-         "eventTime":"2023-06-01T16:53:50.860Z",
-         "eventName":"ObjectCreated:Put",
-         "userIdentity":{
-            "principalId":"AWS:AROAYRP4EVDJOMTEWOIXJ:dennis"
-         },
-         "requestParameters":{
-            "sourceIPAddress":"88.25.226.176"
-         },
-         "responseElements":{
-            "x-amz-request-id":"X1HS8KY4ZX3GBSCD",
-            "x-amz-id-2":"AE2BzGU/+Dk0x2lsYhw6b8h2Ha67cxSK/hsI0NIRnjP9/UePvBfYS4GabPgzpdd6JSM6LYSLJvjDDFeOfES5ip1dtfsPSw5G"
-         },
-         "s3":{
-            "s3SchemaVersion":"1.0",
-            "configurationId":"f0a23387-f41d-4d90-b2cb-f16f2c61c5ab",
-            "bucket":{
-               "name":"sandbox-finance-parser-data",
-               "ownerIdentity":{
-                  "principalId":"A2WI146QA2L7B1"
-               },
-               "arn":"arn:aws:s3:::sandbox-finance-parser-data"
-            },
-            "object":{
-               "key":"balance_sheet.pdf",
-               "size":49856,
-               "eTag":"0adc595c8f2dbfabb5c4095f1f91b458",
-               "sequencer":"006478CD1EC996352F"
-            }
-         }
+   "statusCode": 200,
+   "body": {
+      "message": {
+         "objectKey": "analyzed/san_f0799678-a362-4b7f-9fff-c26b0bbf2b15.json",
+         "bucketName": "sandbox-finance-parser-data"
       }
-   ]
+   }
 }
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/events/upload_document_event.json	Fri Jun 02 20:12:29 2023 +0100
@@ -0,0 +1,9 @@
+{
+   "statusCode": 200,
+   "body": {
+      "message": {
+         "objectKey": "processed/san_d7312109-9099-4dd2-a984-55768641b25e.json",
+         "bucketName": "sandbox-finance-parser-data"
+      }
+   }
+}
\ No newline at end of file
--- a/process_document/app.py	Thu Jun 01 18:51:18 2023 +0100
+++ b/process_document/app.py	Fri Jun 02 20:12:29 2023 +0100
@@ -3,80 +3,95 @@
 from datetime import datetime
 from collections import defaultdict
 
+
 s3_client = boto3.client('s3')
-textract_client = boto3.client('textract')
 
 
 def lambda_handler(event, context):
-    for record in event['Records']:
-        metadata = record['s3']
-        bucket_name = metadata['bucket']['name']
-        object_key = metadata['object']['key']
+    event_message = event['body']['message']
+    object_key = event_message['objectKey']
+    bucket_name = event_message['bucketName']
+
+    # Download file from s3
+    s3_client.download_file(bucket_name, object_key, '/tmp/document.json')
 
-        doc = textract_client.analyze_document(
-            Document={'S3Object': {'Bucket': bucket_name, 'Name': object_key}},
-            FeatureTypes=['TABLES']
-        )
+    with open('/tmp/document.json') as f:
+        doc = json.load(f)
 
-        # Analyze document
-        result = defaultdict(dict)
-        blocks = doc['Blocks']
+    # Analyze document
+    result = defaultdict(dict)
+    blocks = doc['Blocks']
 
-        # Get format
-        lines = filter_blocks(blocks, 'BlockType', 'LINE')
-        for line in lines:
-            amount_format = get_format(line['Text'])
-            result['format'] = amount_format
-            if amount_format:
-                break
+    # Get format
+    lines = filter_blocks(blocks, 'BlockType', 'LINE')
+    for line in lines:
+        amount_format = get_format(line['Text'])
+        result['format'] = amount_format
+        if amount_format:
+            break
 
-        # Find dates value and position
-        data = defaultdict(dict)
-        cells = filter_blocks(blocks, 'BlockType', 'CELL')
-        for cell in cells:
-            if not 'Relationships' in cell:
-                continue
+    # Find dates value and position
+    data = defaultdict(dict)
+    cells = filter_blocks(blocks, 'BlockType', 'CELL')
+    for cell in cells:
+        if not 'Relationships' in cell:
+            continue
 
-            child_ids = [r['Ids'] for r in cell['Relationships'] if r['Type'] == 'CHILD'][0]
+        child_ids = [r['Ids'] for r in cell['Relationships'] if r['Type'] == 'CHILD'][0]
+
+        # Get `Text` from `CELL` block
+        cell_text = ''
+        for index, child_id in enumerate(child_ids):
+            word_block = filter_blocks(blocks, 'Id', child_id)[0]
+            cell_text += word_block['Text']
 
-            # Get `Text` from `CELL` block
-            cell_text = ''
-            for index, child_id in enumerate(child_ids):
-                word_block = filter_blocks(blocks, 'Id', child_id)[0]
-                cell_text += word_block['Text']
+            if index < len(child_ids) - 1:
+                cell_text += '_'
 
-                if index < len(child_ids) - 1:
-                    cell_text += '_'
+        # Verify if `Text` could be a valid date
+        date_string = is_date(cell_text)
+        if date_string:
+            cell_text = date_string
+            result['dateRow'] = cell['RowIndex']
+            result['dateColumns'][cell['ColumnIndex']] = date_string
 
-            # Verify if `Text` could be a valid date
-            date_string = is_date(cell_text)
-            if date_string:
-                cell_text = date_string
-                result['dateRow'] = cell['RowIndex']
-                result['dateColumns'][cell['ColumnIndex']] = date_string
+        cell_row_index = cell['RowIndex']
+        cell_column_index = cell['ColumnIndex']
+        data[cell_row_index][cell_column_index] = clean(cell_text)
+
+        try:
+            data[cell_row_index]['type'] = cell['EntityTypes']
+        except KeyError:
+            pass
 
-            cell_row_index = cell['RowIndex']
-            cell_column_index = cell['ColumnIndex']
-            data[cell_row_index][cell_column_index] = clean(cell_text)
+    # Delete unused row and columns
+    for row_index in list(data.keys()):
+        row = data[row_index]
+        for column_index in list(row.keys()):
+            if column_index not in result['dateColumns'] \
+                    and column_index != 1 and column_index != 'type':
+                del row[column_index]
 
-        # Delete unused row and columns
-        for row_index in list(data.keys()):
-            if row_index > result['dateRow']:
-                row = data[row_index]
-                for column_index in list(row.keys()):
-                    if column_index not in result['dateColumns'] and column_index != 1:
-                        del row[column_index]
+            if len(row) > 1:
+                result['data'][row_index] = row
+
+    filename = object_key.replace('analyzed/', 'processed/')
+    data_string = json.dumps(result, indent=2, default=str)
 
-                if len(row) > 1:
-                    result['data'][row_index] = row
-
-        print(f'RESULT: {result}')
+    s3_client.put_object(
+        Bucket=bucket_name,
+        Key=filename,
+        Body=data_string
+    )
 
     return {
         "statusCode": 200,
-        "body": json.dumps({
-            "message": "ok"
-        }),
+        "body": {
+            "message": {
+                "objectKey": filename,
+                "bucketName": bucket_name
+            }
+        },
     }
 
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/statemachine/statemachine.asl.json	Fri Jun 02 20:12:29 2023 +0100
@@ -0,0 +1,20 @@
+{
+  "StartAt": "AnalyzeDocumentWithTextract",
+  "States": {
+    "AnalyzeDocumentWithTextract": {
+      "Type": "Task",
+      "Resource": "${AnalyzeDocumentFunctionArn}",
+      "Next": "ProcessDocument"
+    },
+    "ProcessDocument": {
+      "Type": "Task",
+      "Resource": "${ProcessDocumentFunctionArn}",
+      "Next": "UploadDocument"
+    },
+    "UploadDocument": {
+      "Type": "Task",
+      "Resource": "${UploadDocumentFunctionArn}",
+      "End": true
+    }
+  }
+}
\ No newline at end of file
--- a/template.yaml	Thu Jun 01 18:51:18 2023 +0100
+++ b/template.yaml	Fri Jun 02 20:12:29 2023 +0100
@@ -14,7 +14,7 @@
     Architectures:
       - x86_64
     Timeout: 20
-    MemorySize: 256
+    MemorySize: 128
     Tracing: Active
 
 Resources:
@@ -25,11 +25,50 @@
         - CreateProdResources
         - finance-parser-data
         - sandbox-finance-parser-data
+      NotificationConfiguration:
+        EventBridgeConfiguration:
+          EventBridgeEnabled: true
 
-  ProcessDocumentFunction:
+  StateMachine:
+    Type: AWS::Serverless::StateMachine
+    Properties:
+      Tracing:
+        Enabled: true
+      DefinitionUri: statemachine/statemachine.asl.json
+      DefinitionSubstitutions:
+        AnalyzeDocumentFunctionArn: !GetAtt AnalyzeDocumentFunction.Arn
+        ProcessDocumentFunctionArn: !GetAtt ProcessDocumentFunction.Arn
+        UploadDocumentFunctionArn: !GetAtt UploadDocumentFunction.Arn
+      Events:
+        StateChange:
+          Type: EventBridgeRule
+          Properties:
+            Pattern:
+              source:
+                - aws.s3
+              detail-type:
+                - Object Created
+              detail:
+                bucket:
+                  name:
+                    - !Ref S3Bucket
+                object:
+                  key:
+                    - "prefix": "unprocessed/"
+    Connectors:
+      StateMachineConnector:
+        Properties:
+          Destination:
+            - Id: AnalyzeDocumentFunction
+            - Id: ProcessDocumentFunction
+            - Id: UploadDocumentFunction
+          Permissions:
+            - Write
+
+  AnalyzeDocumentFunction:
     Type: AWS::Serverless::Function
     Properties:
-      CodeUri: process_document/
+      CodeUri: analyze_document/
       Handler: app.lambda_handler
       Runtime: python3.7
       Policies:
@@ -39,12 +78,21 @@
               Action:
                 - textract:AnalyzeDocument
               Resource: "*"
-      Events:
-        NewBalanceSheetEvent:
-          Type: S3
-          Properties:
-            Bucket: !Ref S3Bucket
-            Events: s3:ObjectCreated:*
+    Connectors:
+      S3Connector:
+        Properties:
+          Destination:
+            Id: S3Bucket
+          Permissions:
+            - Read
+            - Write
+
+  ProcessDocumentFunction:
+    Type: AWS::Serverless::Function
+    Properties:
+      CodeUri: process_document/
+      Handler: app.lambda_handler
+      Runtime: python3.7
     Connectors:
       S3Connector:
         Properties:
@@ -52,4 +100,44 @@
             Id: S3Bucket
           Permissions:
             - Read
-            - Write
\ No newline at end of file
+            - Write
+
+  UploadDocumentFunction:
+    Type: AWS::Serverless::Function
+    Properties:
+      CodeUri: upload_document/
+      Handler: app.lambda_handler
+      Runtime: python3.7
+    Connectors:
+      DynamoConnector:
+        Properties:
+          Destination:
+            Id: DynamoTable
+          Permissions:
+            - Write
+      S3Connector:
+        Properties:
+          Destination:
+            Id: S3Bucket
+          Permissions:
+            - Read
+
+  DynamoTable:
+    Type: AWS::DynamoDB::Table
+    Properties:
+      TableName: FinanceParser
+      BillingMode: PAY_PER_REQUEST
+      DeletionProtectionEnabled: !If
+        - CreateProdResources
+        - True
+        - False
+      KeySchema:
+        - AttributeName: pk
+          KeyType: HASH
+        - AttributeName: sk
+          KeyType: RANGE
+      AttributeDefinitions:
+        - AttributeName: pk
+          AttributeType: S
+        - AttributeName: sk
+          AttributeType: S
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/upload_document/app.py	Fri Jun 02 20:12:29 2023 +0100
@@ -0,0 +1,78 @@
+import json
+import boto3
+import re
+
+s3_client = boto3.client('s3')
+dynamodb = boto3.resource('dynamodb')
+table = dynamodb.Table('FinanceParser')
+
+
+def lambda_handler(event, context):
+    event_message = event['body']['message']
+    object_key = event_message['objectKey']
+    bucket_name = event_message['bucketName']
+    company_ticker = re.search('processed/(.*)_', object_key).group(1)
+
+    # Download file from s3
+    s3_client.download_file(bucket_name, object_key, '/tmp/document.json')
+
+    with open('/tmp/document.json') as f:
+        doc = json.load(f)
+
+    for dateColumn, date in doc['dateColumns'].items():
+        for row_index, account in doc['data'].items():
+
+            try:
+                column_types = account['type']
+            except KeyError:
+                column_types = []
+
+            """
+            The following statement avoids getting a `2020` as the value 
+            of `ASSETS`.
+            
+            +------------------+------+------+
+            | ASSETS           | 2020 | 2019 |
+            +------------------+------+------+
+            | ASSETS_ACCOUNT_1 |      |      |
+            +------------------+------+------+
+            | ASSETS_ACCOUNT_2 |      |      |
+            +------------------+------+------+
+            """
+
+            account_value = account[dateColumn]
+            if 'COLUMN_HEADER' in column_types and date == account_value:
+                account_value = ''
+
+            with table.batch_writer() as batch:
+
+                # pk -> item_type#company_ticker
+                # sk -> date#row_index
+
+                batch.put_item(
+                    Item={
+                        'pk': f'balance#{company_ticker}',
+                        'sk': f'{date}#{row_index}',
+                        'account_name': account['1'],
+                        'account_value': account_value,
+                        'column_types': column_types
+                    }
+                )
+
+        # pk -> item_type#company_ticker
+        # sk -> date
+
+        table.put_item(
+            Item={
+                'pk': f'file#{company_ticker}',
+                'sk': f"{date}",
+                'filename': object_key.replace('processed/', '')
+            }
+        )
+
+    return {
+        "statusCode": 200,
+        "body": json.dumps({
+            "message": "ok"
+        }),
+    }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/upload_document/requirements.txt	Fri Jun 02 20:12:29 2023 +0100
@@ -0,0 +1,1 @@
+boto3
\ No newline at end of file