-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1478 from camptocamp/backport/1475-to-prod-2-9
[Backport prod-2-9] Add poc Swisscom heatmap
- Loading branch information
Showing
20 changed files
with
1,277 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import logging | ||
import os | ||
from datetime import datetime | ||
|
||
import pyramid.httpexceptions # type: ignore[import-untyped] | ||
import pyramid.request # type: ignore[import-untyped] | ||
import pyramid.response # type: ignore[import-untyped] | ||
from cornice import Service # type: ignore[import-untyped] | ||
from geojson import FeatureCollection # type: ignore[import-untyped] | ||
|
||
from .query_swisscom_heatmap_api import SwisscomHeatmapApi | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
api = SwisscomHeatmapApi() | ||
|
||
|
||
swisscom_heatmap_get_config = Service( | ||
name="swisscom-heatmap-get-config", | ||
description="The swisscom-heatmap get config service", | ||
path="/swisscom-heatmap/get-config.json", | ||
cors_origins=( | ||
(f'https://{os.environ["VISIBLE_WEB_HOST"]}' if "VISIBLE_WEB_HOST" in os.environ else "*"), | ||
), | ||
) | ||
|
||
|
||
swisscom_heatmap_dwell_density = Service( | ||
name="swisscom-heatmap-dwell-density", | ||
description="The swisscom-heatmap dwell density service", | ||
path="/swisscom-heatmap/dwell-density.json", | ||
cors_origins=( | ||
(f'https://{os.environ["VISIBLE_WEB_HOST"]}' if "VISIBLE_WEB_HOST" in os.environ else "*"), | ||
), | ||
) | ||
|
||
|
||
swisscom_heatmap_dwell_demographics = Service( | ||
name="swisscom-heatmap-dwell-demographics", | ||
description="The swisscom-heatmap dwell demographics service", | ||
path="/swisscom-heatmap/dwell-demographics.json", | ||
cors_origins=( | ||
(f'https://{os.environ["VISIBLE_WEB_HOST"]}' if "VISIBLE_WEB_HOST" in os.environ else "*"), | ||
), | ||
) # type: ignore[import-untyped] | ||
|
||
|
||
def entry_get_config(_request: pyramid.request.Request) -> pyramid.response.Response: | ||
return api.get_config() | ||
|
||
|
||
def get_params(request: pyramid.request.Request) -> tuple[int, datetime]: | ||
try: | ||
postal_code = int(request.params["postal_code"]) | ||
date_time = api.parse_date_time(request.params["date_time"]) | ||
except ValueError as exc: | ||
raise pyramid.httpexceptions.HTTPBadRequest() from exc | ||
return postal_code, date_time | ||
|
||
|
||
@swisscom_heatmap_dwell_density.get(renderer="geojson") | ||
def entry_get_dwell_density( | ||
request: pyramid.request.Request, | ||
) -> FeatureCollection | pyramid.response.Response: | ||
postal_code, date_time = get_params(request) | ||
return api.get_dwell_density(postal_code, date_time) | ||
|
||
|
||
@swisscom_heatmap_dwell_demographics.get(renderer="geojson") | ||
def entry_get_dwell_demographics( | ||
request: pyramid.request.Request, | ||
) -> FeatureCollection | pyramid.response.Response: | ||
postal_code, date_time = get_params(request) | ||
return api.get_dwell_demographics(postal_code, date_time) |
124 changes: 124 additions & 0 deletions
124
custom/custom/views/swisscom_heatmap/query_swisscom_heatmap_api.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
import logging | ||
import os | ||
from datetime import datetime | ||
from typing import Any | ||
|
||
from geojson import Feature, FeatureCollection, Point # type: ignore[import-untyped] | ||
from oauthlib.oauth2 import BackendApplicationClient | ||
from pyramid.response import Response # type: ignore[import-untyped] | ||
from requests_oauthlib import OAuth2Session # type: ignore[import-untyped] | ||
|
||
from .tile_id_to_coordinates import tile_id_to_ll | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
CLIENT_ID = os.getenv("SWISSCOM_CLIENT_ID", "") # customer key in the Swisscom digital marketplace | ||
CLIENT_SECRET = os.getenv("SWISSCOM_CLIENT_SECRET", "") # customer secret in the Swisscom digital marketplace | ||
MIN_DATE = os.getenv("MIN_DATE", "03.10.2022") | ||
MAX_DATE = os.getenv("MAX_DATE", "16.10.2022") | ||
MAX_NB_TILES_REQUEST = int(os.getenv("MAX_NB_TILES_REQUEST", "100")) | ||
|
||
BASE_URL = "https://api.swisscom.com/layer/heatmaps/demo" | ||
TKN_URL = "https://consent.swisscom.com/o/oauth2/token" | ||
HEADERS = {"scs-version": "2"} # API version | ||
|
||
|
||
class ExternalAPIError(Exception): | ||
pass | ||
|
||
|
||
class APIUsageExceededError(Exception): | ||
pass | ||
|
||
|
||
class SwisscomHeatmapApi: | ||
error: Response = None | ||
request_date = datetime.now() | ||
nb_requests = 0 | ||
|
||
@staticmethod | ||
def parse_date_time(date_time: str) -> datetime: | ||
return datetime.strptime(date_time, "%d.%m.%YT%H:%M") | ||
|
||
def get_config(self) -> dict[str, str]: | ||
return {"minDate": f"{MIN_DATE}", "maxDate": f"{MAX_DATE}"} | ||
|
||
def auth(self) -> OAuth2Session: | ||
# Fetch an access token | ||
client = BackendApplicationClient(client_id=CLIENT_ID) | ||
oauth = OAuth2Session(client=client) | ||
oauth.fetch_token(token_url=TKN_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET) | ||
return oauth | ||
|
||
def get_tiles_ids(self, oauth: OAuth2Session, postal_code: int) -> list[int]: | ||
# For muni/district id, see https://www.atlas.bfs.admin.ch/maps/13/fr/17804_229_228_227/27579.html | ||
# Municipalities and Districts doesn't work well probably because of the free plan | ||
# Get all the first MAX_NB_TILES_REQUEST tile ids associated with the postal code of interest | ||
muni_tiles_json = oauth.get(BASE_URL + f"/grids/postal-code-areas/{postal_code}", headers=HEADERS) | ||
self.check_api_error(muni_tiles_json) | ||
tiles = muni_tiles_json.json()["tiles"] | ||
LOG.info("Nb tiles received: %s", len(tiles)) | ||
return [t["tileId"] for t in muni_tiles_json.json()["tiles"]][:MAX_NB_TILES_REQUEST] | ||
|
||
def query_api_generic( | ||
self, oauth: OAuth2Session, path: str, postal_code: int, date_time: datetime | ||
) -> str: | ||
LOG.info("Querying with %s, %s, %s", path, postal_code, date_time) | ||
tile_ids = self.get_tiles_ids(oauth, postal_code) | ||
return ( | ||
BASE_URL | ||
+ f"/heatmaps/{path}/{date_time.isoformat()}" | ||
+ "?tiles=" | ||
+ "&tiles=".join(map(str, tile_ids)) | ||
) | ||
|
||
def response_to_geojson_result(self, data: dict[str, Any]) -> FeatureCollection: | ||
features = [] | ||
for element in data["tiles"]: | ||
coordinate = tile_id_to_ll(element["tileId"]) | ||
features.append(Feature(geometry=Point(coordinate), properties=element)) | ||
return FeatureCollection(features) | ||
|
||
def get_dwell_density(self, postal_code: int, date_time: datetime) -> FeatureCollection | Response: | ||
self.error = None | ||
try: | ||
self.limit_query() | ||
oauth = self.auth() | ||
api_request = self.query_api_generic(oauth, "/dwell-density/hourly", postal_code, date_time) | ||
response = oauth.get(api_request, headers=HEADERS) | ||
self.check_api_error(response) | ||
except (ExternalAPIError, APIUsageExceededError): | ||
return self.error | ||
return self.response_to_geojson_result(response.json()) | ||
|
||
def get_dwell_demographics(self, postal_code: int, date_time: datetime) -> FeatureCollection | Response: | ||
self.error = None | ||
try: | ||
self.limit_query() | ||
oauth = self.auth() | ||
api_request = self.query_api_generic(oauth, "/dwell-demographics/hourly", postal_code, date_time) | ||
response = oauth.get(api_request, headers=HEADERS) | ||
self.check_api_error(response) | ||
except (ExternalAPIError, APIUsageExceededError): | ||
return self.error | ||
return self.response_to_geojson_result(response.json()) | ||
|
||
def check_api_error(self, response: Response): | ||
if response.status_code != 200: | ||
err_code = response.status_code | ||
err_txt = response.text | ||
LOG.warning("External API error (code %s): %s", err_code, err_txt) | ||
self.error = Response(err_txt, status=err_code) | ||
raise ExternalAPIError("External api error") | ||
|
||
def limit_query(self): | ||
now = datetime.now() | ||
if now.day > self.request_date.day: | ||
self.nb_requests = 0 | ||
self.nb_requests += 1 | ||
LOG.info("Request today %s", self.nb_requests) | ||
if self.nb_requests > 500: | ||
# [bgerber] It's rude, but we are using my own key ! | ||
error = "Too much query today, try again tomorrow" | ||
self.error = Response(error, status=403) | ||
raise APIUsageExceededError(error) |
36 changes: 36 additions & 0 deletions
36
custom/custom/views/swisscom_heatmap/tile_id_to_coordinates.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import math | ||
|
||
TILE_W = 100 | ||
TILE_H = 100 | ||
|
||
|
||
def _cantor_pairing(n1: int, n2: int) -> int: | ||
"""Cantor pairing function.""" | ||
return ((n1 + n2) * (n1 + n2 + 1)) // 2 + n2 | ||
|
||
|
||
def _cantor_unpairing(n: int) -> tuple[int, int]: | ||
"""Inverse Cantor pairing function.""" | ||
w = int((math.sqrt(8 * n + 1) - 1) // 2) | ||
t = (w * w + w) // 2 | ||
n2 = n - t | ||
n1 = w - n2 | ||
return n1, n2 | ||
|
||
|
||
def tile_ll_at(x: float, y: float) -> tuple[int, int]: | ||
"""Given a point (LV03), returns the lower-left corner of the tile it belongs to.""" | ||
tix, tiy = x // TILE_W, y // TILE_H | ||
return int(tix * TILE_W), int(tiy * TILE_H) | ||
|
||
|
||
def tile_ll_to_id(tx: int, ty: int) -> int: | ||
"""Given the lower-left corner (LV03) of a tile, returns the id of that tile.""" | ||
tix, tiy = tx // TILE_W, ty // TILE_H | ||
return _cantor_pairing(tix, tiy) | ||
|
||
|
||
def tile_id_to_ll(tid: int) -> tuple[int, int]: | ||
"""Given the id of a tile, returns the lower-left corner (LV03) of that tile.""" | ||
tix, tiy = _cantor_unpairing(tid) | ||
return tix * TILE_W, tiy * TILE_H |
Oops, something went wrong.