forked from mehdiBezahaf/intent-deploy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
262 lines (212 loc) · 8.52 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
from __future__ import annotations
import logging
import secrets
from urllib.parse import quote_plus
from fastapi import FastAPI, status
from fastapi.exceptions import HTTPException, RequestValidationError
from fastapi.params import Depends
from fastapi.security import HTTPBasic
from pydantic import UUID4
from pydantic.main import BaseModel
from starlette.background import BackgroundTasks
from starlette.responses import JSONResponse
from intent_deployer import (
alerting,
config,
load_alerts,
load_intents,
network_state,
parser,
)
from intent_deployer.compile import (
Context,
handle_alert,
handle_dialogflow,
handle_intent,
handle_nile,
intent_executors,
response_type_union,
)
from intent_deployer.config import get_settings
from intent_deployer.deploy import deploy_policy
from intent_deployer.dialogflow import DialogflowContext
from intent_deployer.state_track import ActiveIntent, list_intents, lookup_intent, new_status_tracker
from intent_deployer.switches import Switches
from intent_deployer.types import (
APIResponse,
APIResponseStatus,
CreatedIntentResponse,
GetIntentResponse,
IntentDeployException,
ListIntentResponse,
NileIntentRequest,
)
from intents.utils import PushIntentPolicy
from webhook.types import (
AlertmanagerMessage,
DialogflowMessageText,
DialogflowRequest,
DialogflowResponse,
)
log: logging.Logger = logging.getLogger("intent_deployer")
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
load_intents()
load_alerts()
settings = get_settings()
app = FastAPI()
@app.on_event("startup")
async def remove_existing_intents():
log.info("Starting up, removing existing intents")
try:
async with config.onos_api_client() as client:
resp = await client.post(f"/applications/{network_state.app_name}/register")
resp.raise_for_status()
network_state.app_id = int(resp.json()["id"])
# Don't actually delete intents? IMR doesn't allow us to create new
# intents it only allows us to change their paths therefore, we start
# with ifwd on to generate an initial set of intents for each host then
# we replace all the paths with our own.
#
# resp = await client.get("/intents")
# resp.raise_for_status()
# intents = [(i["appId"], i["key"]) for i in resp.json()["intents"]]
# for app_id, key in intents:
# resp = await client.delete(f"/intents/{app_id}/{quote_plus(key)}")
# resp.raise_for_status()
log.info("Dropped all intents :)")
errors = []
network = await network_state.Topology.from_current_state(errors)
switches = await Switches.from_api()
for error in errors:
log.warn("Error encountered while building initial topology: %s", error)
base_routes = network.generate_routing_intents()
await network_state.add_intents(base_routes, network.hosts, switches)
except Exception as e:
raise Exception("Failed to perform the initial ONOS setup, IS ONOS_API_URL correct?") from e
def format_json_error(code: int, details: str) -> JSONResponse:
content = APIResponse(
status=APIResponseStatus(code=code, details=details),
).dict()
return JSONResponse(status_code=code, content=content)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc) -> JSONResponse:
return format_json_error(400, str(exc))
@app.get("/")
def home():
return "Network Intent Assistent (Nia) Deploy APIs"
deploy_response_union = response_type_union()
@app.post(
"/deploy",
response_model=APIResponse,
responses={400: {"model": APIResponse}, 422: {"model": APIResponse}},
)
async def deploy(body: NileIntentRequest, background_tasks: BackgroundTasks):
try:
parsed = parser.parse(body.intent)
ctx = Context(background_tasks)
await handle_nile(ctx, parsed)
# TODO: do we want to assign IDs to intents and keep track of them
# then add endpoints for querying them/ update channels
return APIResponse(status=APIResponseStatus(code=200, details="success"))
# return DeployResponse[policy_type](
# status=APIResponseStatus(code=200, details="Deployment success."),
# input=body,
# output=DeployResponseOutput[policy_type](
# type="Onos reroute API calls", policy=policy_info.policy
# ),
# )
except IntentDeployException as e:
log.exception(e)
return format_json_error(400, f"Could not deploy intent. \n{e}")
except Exception as e:
log.exception(e)
return format_json_error(400, f"Could not deploy intent. internal error")
security = HTTPBasic()
def ensure_authenticated(credentials=Depends(security)):
user_ok = secrets.compare_digest(credentials.username, settings.webhook_user)
pass_ok = secrets.compare_digest(credentials.password, settings.webhook_pass)
if not (user_ok and pass_ok):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
headers={"WWW-Authenticate": "Basic"},
)
@app.post("/dialogflow_webhook") # , response_model=DialogflowResponse)
async def dialogflow_webhook(
body: DialogflowRequest,
background_tasks: BackgroundTasks,
_secure=Depends(ensure_authenticated),
):
try:
ctx = DialogflowContext.from_request(background_tasks, body)
ctx.reset_messages()
type_ = body.query_result.action
params = body.query_result.parameters
await handle_dialogflow(ctx, type_, params)
if not (ctx.fulfillment_messages or ctx.should_inhibit_fulfillment_message):
ctx.add_message("Intent deployed")
return DialogflowResponse(
fulfillment_messages=ctx.fulfillment_messages,
)
except IntentDeployException as e:
log.exception(e)
return DialogflowResponse(
fulfillment_messages=[
DialogflowMessageText(text=["Could not deploy that intent"]),
DialogflowMessageText(text=[str(e)]),
]
)
except Exception as e:
log.exception(e)
return DialogflowResponse(
fulfillment_messages=[
DialogflowMessageText(
text=["Could not deploy that intent, internal error"]
)
]
)
for intent in intent_executors.keys():
if not issubclass(intent, BaseModel):
continue
@app.post(
f"/{intent.__name__}",
name=f"invoke_{intent.__name__}",
response_model=CreatedIntentResponse,
responses={400: {"model": APIResponse}, 422: {"model": APIResponse}},
)
async def invoke_intent(body: intent, background_tasks: BackgroundTasks):
ctx = None
try:
tracked_intent = new_status_tracker(intent.__name__, body)
ctx = Context(background_tasks, tracked_intent)
await handle_intent(ctx, body)
return CreatedIntentResponse(status=APIResponseStatus(code=200, details="success"),
intent_id=tracked_intent.id)
except IntentDeployException as e:
log.exception(e)
if ctx: ctx.push_status("failure", str(e))
return format_json_error(400, f"Could not deploy intent. \n{e}")
except Exception as e:
log.exception(e)
if ctx: ctx.push_status("failure", str(e))
return format_json_error(400, f"Could not deploy intent. internal error")
@app.post("/alert")
async def alertmanager_webhook(
body: AlertmanagerMessage,
_secure=Depends(ensure_authenticated),
):
log.info("Dispatching alert: %s", body)
await handle_alert(body)
return APIResponse(status=APIResponseStatus(code=200, details="success"))
@app.get("/intents", response_model=ListIntentResponse)
async def list_intents_endpoint():
return ListIntentResponse(status=APIResponseStatus(code=200, details="success"),
intents=list_intents())
@app.get("/intents/{intent_id}", response_model=GetIntentResponse,
responses={404: {"model": APIResponse}})
async def get_intent_endpoint(intent_id: UUID4):
intent_status = lookup_intent(intent_id)
if intent_status:
return GetIntentResponse(status=APIResponseStatus(code=200, details="success"),
intent=intent_status)
return format_json_error(404, f"Could not find an intent with id: {intent_id}")