Skip to content

5. Sending GitHub notifications

John Wieczorek edited this page Nov 1, 2018 · 4 revisions

Extraction process V - Sending GitHub notifications

  1. Summary
  2. Overview
  3. HTTP call
  4. Retrieving parameters
  5. Get all non-issued reports
  6. Get position cursor
  7. Main loop
    1. Send the issue
      1. Prepare and check the variables
      2. Prepare GitHub related information
      3. Create the email with links
      4. Build the GitHub request
      5. Launch the request
      6. Check the output
      7. Store the updated entity and wait
    2. When there are no more reports
  8. When DeadlineExceededError happens

Summary

  1. Retrieve extraction parameters
  2. Get all reports for given period not already notified of
  3. Begin the process loop until all issues are submitted or a DeadlineExceededError (timeout)
    1. Get position cursor (if any)
    2. Send notification, until DeadlineExceededError or done
  4. Build cursor and re-launch process with remaining data
  5. When it's done, usage statistic are done

Overview

This short part of the process creates an issue in the GitHub repository of each corresponding resource, so that the users watching that repo can receive an email notification informing that their report is ready.

The workflow is the same as in the previous step, github_store. The only important differences are:

  1. Instead of using the content method of the repos API, it uses the issues method of the same API.
  2. There is no template file. The notification is short enough to fit in the code itself.
  3. Since there is no new commit, there is no need to provide committer information, or to worry about SHAs.
  4. This is the last step in the Usage Statistics generation and reporting process.

The workflow is as follows: extract a list of elements to process, iterate over the list until until all elements are done or there is a DeadlineExceeded error. The optimal page size here is 1 (same as in github_store). This is because the method makes use of the GitHub API and GitHub API methods are very time sensitive. Trying to apply too many operations in a row can easily fire improper-use triggers and restrict access to the API for some time.

HTTP call

It all starts with a POST call to the github_issue method of the admin/parser family endpoint. The call will usually be made asynchronously by one of the previous methods, either process_events or github_store, but it can also be made directly in case a previous attempt of executing the github_issue method fails.

Currently, the URL for this endpoint is:

http://tools-usagestats.vertnet-portal.appspot.com/admin/parser/github_issue

The normal call to the endpoint includes just the period to create issues for:

curl -i -X POST -d "period=201204" http://tools-usagestats.vertnet-portal.appspot.com/admin/parser/github_issue

Though the service can be run in test mode or to report for a single gbifdatasetid.

According to usagestats.py, a call to this URL launches a new instance of the GitHubIssue class, found in the admin.parser.GitHubIssue module.

Retrieving parameters

As usual, the process takes the original call's parameters from a request of via memcache. However, this step is more prone to errors, and manual relaunching is expected to be somewhat frequent, so there is a very detailed set of steps to guarantee the required parameters are sent in the correct shape.

        # Try to get period from the request in case GitHubStore was called directly
        try:
            # If "period" is not in the request, None will be returned
            # taking lower() of None will throw an exception, which is the 
            # desired result here.
            self.period = self.request.get("period", None).lower()
            
            self.params_from_request = True
            s =  "Version: %s\n" % __version__
            s += "Period %s determined from request: %s" % (self.period, self.request)
            logging.info(s)
            
        # If not in request, try to get parameters from memcache in case GitHubStore was
        # called from a previous task.
        except Exception:
            memcache_keys = ["period", "testing", "gbifdatasetid"]
            params = memcache.get_multi(memcache_keys, key_prefix="usagestats_parser_")
            self.period = params['period']
            self.params_from_request = False
            s =  "Version: %s\n" % __version__
            s += "Period %s determined from memcache: %s" % (self.period, params)
            logging.info(s)

Then check for the existence of a datastore entity for the requested period. If there is none --like when this endpoint has been called directly, without passing through all the previous steps--, the process stops.

        # If still not there, halt
        if self.period is None or len(self.period)==0:
            self.error(400)
            resp = {
                "status": "error",
                "message": "Period parameter was not provided."
            }
            s =  "Version: %s\n" % __version__
            s += "%s" % resp
            logging.error(s)
            self.response.write(json.dumps(resp)+"\n")
            return

        # If Period not already stored, halt
        period_key = ndb.Key("Period", self.period)
        period_entity = period_key.get()
        if not period_entity:
            self.error(400)
            resp = {
                "status": "error",
                "message": "Provided period does not exist in datastore",
                "data": {
                    "period": self.period
                }
            }
            logging.error(resp)
            self.response.write(json.dumps(resp)+"\n")
            return

If there is a Period in the datastore, we can proceed by getting the remaining parameters, either from the request if this service was called directly, or from memcache if we got here from a previous process.

        # Get the remaining parameters based on the parameter source
        if self.params_from_request == True: 
            # Get parameters from request

            # 'testing' parameter
            try:
                self.testing = self.request.get('testing').lower() == 'true'
            except Exception:
                # default value for 'testing' if not provided is False
                self.testing = False

            # 'gbifdatasetid' parameter
            try:
                self.gbifdatasetid = self.request.get('gbifdatasetid').lower()
            except Exception:
                # default value for 'gbifdatasetid' if not provided is None
                self.gbifdatasetid = None

        else:
            # Get parameters from memcache

            # 'testing' parameter
            try:
                self.testing = params['testing']
            except KeyError:
                # default value for 'testing' if not provided is False
                self.testing = False

            # 'gbifdatasetid' parameter
            try:
                self.gbifdatasetid = params['gbifdatasetid']
            except KeyError:
                # default value for 'github_issue' if not provided is None
                self.github_issue = None

        # Set the parameters in memcache for child tasks to use
        memcache.set("usagestats_parser_period", self.period)
        memcache.set("usagestats_parser_testing", self.testing)
        memcache.set("usagestats_parser_gbifdatasetid", self.gbifdatasetid)

Get all non-issued reports

The next step is to prepare a list with all the Report entities that have the issue_sent property set to False. That means they have not undergone this part of the process, and it is a way of selecting only those entities that need to be processed.

        # Prepare list of reports to store
        # Base query
        reports_q = Report.query()

        # Only Reports for current Period
        reports_q = reports_q.filter(Report.reported_period == period_key)

        # Only those with 'issue_sent' property set to False
        reports_q = reports_q.filter(Report.issue_sent == False)

        # Only those with 'report_stored' property set to True
        reports_q = reports_q.filter(Report.stored == True)

        # And if there is a gbifdatasetid, filter on that too
        if self.gbifdatasetid is not None and len(self.gbifdatasetid) > 0:
            dataset_key = ndb.Key("Dataset", self.gbifdatasetid)
            if dataset_key is None:
                s =  "Version: %s\n" % __version__
                s += "gbifdatasetid %s not found in data store." % self.gbifdatasetid
                logging.error(s)
                return
            else:
                reports_q = reports_q.filter(Report.reported_resource == dataset_key)

        # Store final query
        reports_query = reports_q

One "pythonic" note: In principle, boolean equalities are preferred to be coded as foo is False rather than foo == False (see PEP8 guide page). However, Google Cloud Datastore does not admit this. I tried using reports_q = reports_q.filter(Report.issue_sent is False) instead, but it didn't work. This looks like it's the only way of assessing this type of equality.

Get position cursor

Just the same as in the previous step, the position cursor is taken from the HTTP call parameters (if any), so that the list does not start from the beginning:

        # Get cursor from request, if any
        cursor_str = self.request.get("cursor", None)
        cursor = None
        if cursor_str:
            cursor = ndb.Cursor(urlsafe=cursor_str)
            logging.info("Cursor built: %s" % cursor)

Main loop

Pages with 1 Report entity are extracted from the list using the fetch_page method of the query object. Later, the report will be processed using the send_issue method of this class.

All this is encapsulated in a while loop that runs while there is any report left, and that is encapsulated in a try/except block that runs until a DeadlineExceededError happens.

        # Initialize loop
        if reports_query.count==0:
            more = False
        else:
            more = True

        # Loop until DeadlineExceededError
        # or until there are no more reports left to store
        try:
            # Keep track of dataset for which Reports have been stored in this run
            datasets = []
            while more is True:
                s =  "Version: %s\n" % __version__
                s += "Issuing query: %s" % reports_query
                logging.info(s)

                # Get next (or first) round of results
                report, new_cursor, more = reports_query.fetch_page(
                    PAGE_SIZE, start_cursor=cursor
                )

                # Check to see if there is actually another report
                if report is not None and len(report) != 0:
                    # Send issue
                    self.send_issue(report[0])
                    gbifdatasetid = report[0].reported_resource.id()
                    datasets.append(gbifdatasetid)

                if more is True:
                    cursor = new_cursor

            s =  "Version: %s\n" % __version__
            s += "Finished creating all %d issues" % len(datasets)
            logging.info(s)

Send the issue

The general process of this part is as follows:

  1. Build some variables from the extracted Report
  2. Check some dataset-related information is present
  3. Populate a template email with some links to reports
  4. Prepare and execute a GitHub request
  5. Check the response and, if successful, update the issue_sent property of the Report entity

Prepare and check the variables

Some values are extracted directly from the stored entity, others are taken from the Period and referred Dataset entities.

        report_key = report_entity.key
        gbifdatasetid = report_entity.reported_resource.id()
        s =  "Version: %s\n" % __version__
        s += "Storing issue for dataset %s" % gbifdatasetid
        logging.info(s)

        # Build variables
        dataset_key = report_entity.reported_resource
        period_key = report_entity.reported_period
        dataset_entity, period_entity = ndb.get_multi([dataset_key, period_key])

If the referred dataset does not exist, this process cannot continue, because some key parts of the template cannot be rendered. Therefore, a function to check if the Dataset exists is mandatory.

        # Check that dataset exists
        if not dataset_entity:
            self.error(500)
            resp = {
                "status": "error",
                "message": "Missing dataset in datastore. Please run /setup_datasets "
                           "or remove associated Period entity from data store to fix.",
                "data": {
                    "missing_dataset_key": gbifdatasetid
                }
            }
            s =  "Version: %s\n" % __version__
            s += "Response: %s" % resp
            logging.error(s)
            self.response.write(json.dumps(resp)+"\n")

            # Set 'issue_sent' to True to avoid endless loop in the case a dataset does
            # not exist in the datastore.
            # TODO: Better if the Report entity had a flag for 'issue_skipped'
            # with default None. But, for now...
            report_entity.issue_sent = True

            # Store updated version of Report entity
            report_entity.put()

            return

Prepare GitHub related information

The process needs to have handy the name of the GitHub org, repo, an API key and a user_agent, and build a header to be used in the API call:

        # GitHub stuff
        org = dataset_entity.github_orgname
        repo = dataset_entity.github_reponame
        user_agent = 'VertNet'
        key = apikey('ghb')

        # Testing block
        if self.testing:
            org = 'VertNet'
            repo = 'statReports'
            user_agent = 'VertNet'
            key = apikey('ghb')

        s =  "Version: %s\n" % __version__
        s += "Using GitHub repository %s/%s " % (org, repo)
        s += "as user_agent %s" % user_agent
        logging.info(s)

        # GitHub request headers
        headers = {
            'User-Agent': user_agent,
            'Authorization': 'token {0}'.format(key),
            "Accept": "application/vnd.github.v3+json"
        }

The "testing block" is only used if the testing initial parameter is set to True. In this case, instead of sending the issues to the corresponding repositories, a testing repo is used, with a custom API key and user_agent.

Create the email with links

The sent email (issue) is very simple. It only informs of the availabilty of a new report in a specific location, and provides some links to access the report directly.

The first part of this section builds the links. The link variable holds a reference to the webpage where the report itself can be found. The link_all refers to the main "publisher" page, where all available reports can be found.

        # Issue creation, only if issue not previously created
        if report_entity.issue_sent == False:
            link_all = "http://%s/reports/%s/" % (MODULE, gbifdatasetid)
            link = "http://%s/reports/%s/%s/" % (MODULE, gbifdatasetid, self.period)
            link_gh = "https://github.com/%s/%s/tree/master/reports" % (org, repo)

Then, the title and body of the notification are created

        # Issue creation, only if issue not previously created
        if report_entity.issue_sent == False:
            link_all = "http://%s/reports/%s/" % (MODULE, gbifdatasetid)
            link = "http://%s/reports/%s/%s/" % (MODULE, gbifdatasetid, self.period)
            link_gh = "https://github.com/%s/%s/tree/master/reports" % (org, repo)
            title = 'Monthly VertNet data use report for %s-%s, resource %s' \
                    % (period_entity.year,
                       period_entity.month,
                       dataset_entity.ccode)
            body = """Your monthly VertNet data use report is ready!

You can see the HTML rendered version of this report at:

{0}

Raw text and JSON-formatted versions of the report are also available for
download from this link. 

A copy of the text version has been also beeb uploaded to your GitHub 
repository under the "reports" folder at:

{1}

A full list of all available reports can be accessed from:

{2}

You can find more information on the reporting system, along with an
explanation of each metric, at:

http://www.vertnet.org/resources/usagereportingguide.html

Please post any comments or questions to:
http://www.vertnet.org/feedback/contact.html

Thank you for being a part of VertNet.
""".format(link, link_gh, link_all)

In order to organize notifications better, the process applies the report label to the issue that will be created on GitHub. This will not be reflected in the sent email.

            labels = ['report']

Build the GitHub request

This part uses the GitHub repos API, and it basically launches a POST request to the issues endpoint, with the body of the notification in the request body.

            request_url = '{0}/{1}/{2}/issues'.format(GH_REPOS, org, repo)
            json_input = json.dumps({
                'title': title,
                'body': body,
                'labels': labels
            })

Launch the request

This is simply a matter of using the urlfetch module's fetch method, specifying the request_url, the headers and the payload as the JSON document. Also, the request method for creating new files is POST.

            # Make GitHub call
            r = urlfetch.fetch(
                url=request_url,
                method=urlfetch.POST,
                headers=headers,
                payload=json_input
            )

Check the output

If the request finished successfully, the GitHub API returns a 201 HTTP code. As opposed to the previous method (github_store), there is no clear or common error that can happen, so any problem must be caught and treated generically.

If the 201 is returned, the issue_sent property is set to True for later update in the Datastore.

            # Check output
            # HTTP 201 = Success
            if r.status_code == 201:
                s =  "Version: %s\n" % __version__
                s += "Status: %s. Issue %s sent." % (r.status_code, report_key.id())
                logging.info(s)
                report_entity.issue_sent = True
            # Other generic problems
            else:
                resp = {
                    "status": "failed",
                    "message": "Got uncaught error code when uploading"
                               " report to GitHub. Aborting issue creation.",
                    "source": "send_to_github",
                    "data": {
                        "report_key": report_key,
                        "period": self.period,
                        "testing": self.testing,
                        "error_code": r.status_code,
                        "error_content": r.content
                    }
                }
                s =  "Version: %s\n" % __version__
                s += "Response: %s. " % resp
                logging.error(s)
                return

Just in case, the last else in this block of code deals with the situation where a Report entity has its issue_sent property set to True and still it is selected in the query. This should obviously never happen (if the issue is already sent, there is no need to send it again), and so far it has never happened, but just in case...

        # This 'else' should NEVER happen
        else:
            logging.warning("Issue for %s was already sent. This call"
                            " shouldn't have happened" % report_key.id())

Store the updated entity and wait

When the issue has been sent, the entity is updated (its issue_sent property is set to True)

        # Store updated version of Report entity
        report_entity.put()

and, last but not least, the process waits for 2 seconds before processing the next report. This was a patch revealed crucial after a series of failed attempts and discovering the GitHub API was considering the usage stats generator a DoS attacker. The docs clearly state it, though...

        time.sleep(2)

When there are no more reports

In this case, there is no general, wrap-up update in the datastore. Simply, create a response to the request:

            resp = {
                "status": "success",
                "message": "Successfully finished creating all issues",
            }

and send the admin message to let him/her know everything finished smoothly

            period_entity.status = "done"
            mail.send_mail(
                sender=EMAIL_SENDER,
                to=EMAIL_ADMINS,
                subject="Usage reports for period %s" % self.period,
                body="""
Hey there!

Just a note to let you know the GitHubIssue process for period %s 
stats has successfully finished. Reports have been stored in their 
respective GitHub repositories and issues have been created. 

Issues submitted for (%d) datasets:
%s

Code version: %s
""" % (self.period, len(datasets), datasets, __version__) )

The only update to the Period entity is to indicate it finished (set the status to done). Then, throw the response and finish

            period_entity.put()
            logging.info(resp)
            self.response.write(json.dumps(resp)+"\n")

When DeadlineExceededError happens

If a DeadlineExceededError happens before the process is finished, the app takes the last available cursor and re-launches the process with that cursor. Thus, when the list of resources to process is built, it will start from the last available, un-processed, chunk.

        except DeadlineExceededError:
            # Launch new instance with current (failed) cursor
            taskqueue.add(url=URI_GITHUB_ISSUE,
                          params={"cursor": cursor.urlsafe()},
                          queue_name=QUEUENAME)
            logging.info("Caught a DeadlineExceededError. Relaunching")

The except is paired with the try of the "Main Process" section.

Also, the process builds and sends the corresponding JSON message to the logs

            resp = {
                "status": "in progress",
                "message": "Caught a DeadlineExceededError."
                           " Relaunching with new cursor",
                "data": {
                    "period": self.period,
                    "cursor": cursor.urlsafe()
                }
            }
            logging.info(resp)
            self.response.write(json.dumps(resp)+"\n")