-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdelete-ip.py
133 lines (109 loc) · 4.97 KB
/
delete-ip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/python3
import sys
from argparse import ArgumentParser
from typing import Dict, Any, Optional, List
from bitdiscovery.api import BitDiscoveryApi, try_multiple_times, get_lastid
parser = ArgumentParser(description="Delete source or IP from inventory")
parser.add_argument('apikey', metavar="APIKEY", type=str, help="Your Bit Discovery API key.")
parser.add_argument('type', metavar="TYPE", type=str, choices=['ip', 'source'], help="The type of the item to delete.")
parser.add_argument('value', metavar="IP/SOURCE", type=str, help="The IP or source to be deleted.")
parser.add_argument('--env', choices=['dev', 'staging', 'prod'], default="dev",
help="The Bit Discovery environment (by default 'dev')")
parser.add_argument('--offset', type=int, default=0, help="Offset to the API request data (by default 0).")
parser.add_argument('--limit', type=int, default=5000, help="Limit to the API request data (by default 500).")
args = parser.parse_args()
APIKEY: str = args.apikey
APIURL: str = "https://bitdiscovery.com/api/1.0"
OFFSET: int = args.offset
LIMIT: int = args.limit
IP_TYPE: str = args.type
VALUE: str = args.value
print("Initializing and pulling assets from Bit Discovery...")
api = BitDiscoveryApi(APIURL, APIKEY)
inventories_json: Dict[str, Any] = {}
try:
inventories_json = api.find_inventories(OFFSET, LIMIT)
except:
print("API call failed. Try again later.")
exit(1)
# TODO: maybe remove iteration if we cannot delete from multiple inventories
inventories: Dict[str, str] = {inventories_json['actualInventory']['inventory_name']: APIKEY}
for entityname in inventories:
jsondata: List[Dict[str, Any]] = []
deletednum = 0
if IP_TYPE == 'ip':
print("Starting inventory: " + str(entityname) + ".")
# Collect the IP addresses from Bit Discovery inventory with pagination
lastid: str = ''
offset: int = OFFSET
while True:
result: Optional[Dict[str, Any]] = try_multiple_times(
lambda: api.search_for_ip_address(LIMIT, lastid, VALUE),
max_tries=5
)
if result is None:
print("\tAPI call failed too many times. Try again later.")
exit(1)
# Append to results list if successfully found
jsondata.append(result)
lastid = get_lastid(result)
offset += LIMIT
total: int = int(jsondata[0]['total'])
if offset < total:
print("\t\t{0:.0%} complete.".format(offset / float(total)))
else:
# Exit when the total is reached
break
# Iterate over the returned assets and remove the matching assets
for assets in jsondata:
for asset in assets.get("assets", []):
if 'bd.ip_address' in asset and str(asset['bd.ip_address']) == VALUE:
# Try to call to IP archivation API endpoint
result: Optional[bool] = try_multiple_times(
lambda: api.archive_ip(asset['id']),
max_tries=5
)
if result is None:
print("\tAPI call failed too many times. Try again later.")
exit(1)
# Increment deleted count
deletednum += 1
# TODO: shouldn't we "else" here?
print("Starting sources for: " + str(entityname) + ".")
sourcesdata: List[Dict[str, Any]] = []
# Collect the sources from Bit Discovery inventory with pagination
lastid: str = ''
offset: int = OFFSET
while True:
result: Optional[Dict[str, Any]] = try_multiple_times(
lambda: api.search_for_source(LIMIT, lastid, VALUE),
max_tries=5
)
if result is None:
print("\tAPI call failed too many times. Try again later.")
exit(1)
# Append to sources list if successfully found
sourcesdata.append(result)
lastid = get_lastid(result)
offset += LIMIT
total: int = int(sourcesdata[0]['total'])
if offset < total:
print("\t\t{} complete.".format('{0:.0%}'.format(offset / float(total))))
else:
# Exit when the total is reached
break
# Iterate over the sources and remove all matching values
for sources in sourcesdata:
for source in sources.get('searches', []):
if 'keyword' in source and str(source['keyword']).lower() == VALUE:
# Try to call to source delete API endpoint
result: Optional[bool] = try_multiple_times(
lambda: api.delete_source(str(source['id'])),
max_tries=5
)
if result is None:
print("\tAPI call failed too many times. Try again later.")
exit(1)
# Increment deleted count
deletednum += 1
print("\tDeleted a total of " + str(deletednum) + " IPs.")