-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheventbarrier.py
86 lines (64 loc) · 2.49 KB
/
eventbarrier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import json
import os
from typing import Dict, List
import boto3
s3_client = boto3.client('s3')
def set_s3_client(mock_s3_client: object):
global s3_client
s3_client = mock_s3_client
def s3_head(s3bucket_nm: str, object_key: str):
try:
s3_client.head_object(Bucket=s3bucket_nm, Key=object_key)
except Exception as e:
print(f'Object: {object_key} was not found in {s3bucket_nm} S3 bucket.:\n {e}')
return False
else:
print(f'Object: {object_key} was found in {s3bucket_nm} S3 bucket.')
return True
def lambda_handler(event, context):
bucket_name = 'eventbarrier'
print('## ENVIRONMENT VARIABLES')
# print(os.environ)
print('## EVENT')
print(event, context)
event_key = event['Records'][0]['s3']['object']['key']
event_prefix = event_key[:event_key.rfind('/')]
with open('eventbarrier.json') as f:
map_barrier_to_prefixes: dict = json.load(f)
map_prefix_to_barrier: Dict[str, List[str]] = {}
for item in map_barrier_to_prefixes:
for value in list(map_barrier_to_prefixes[item]):
if value in map_prefix_to_barrier:
map_prefix_to_barrier[value].append(item)
else:
map_prefix_to_barrier[value] = list([item])
event_barrier: str = None
for item in map_prefix_to_barrier:
if item.startswith(event_prefix):
event_barrier = map_prefix_to_barrier[item][0]
break
print(f'map_barrier_to_prefixes {map_barrier_to_prefixes}')
print(f'map_prefix_to_barrier {map_prefix_to_barrier}')
print(f'new_event {event_key}')
if event_barrier is None:
print(f'event {event_key} is not associated with an event barrier')
return
else:
print(f'event {event_key} is associated with event barrier {event_barrier} ')
print(f'determine if event barrier {event_barrier} condition is met')
missing: bool = False
for prefix in list(map_barrier_to_prefixes[event_barrier]):
# make object_key from prefix and event
object_key = prefix + "/file1.txt"
if not s3_head(bucket_name, object_key):
missing = True
if missing:
print(f'condition not met for event barrier {event_barrier}')
return False
else:
print(f'condition is met for event barrier {event_barrier}')
return True
if __name__ == '__main__':
with open('zzzz/eventbarrier_test.json') as f:
event: dict = json.load(f)
lambda_handler(event, "stub")