Skip to content

Commit

Permalink
merge: PR #161 from dev
Browse files Browse the repository at this point in the history
Weekly release 2024-07-08
  • Loading branch information
alycejenni authored Jul 8, 2024
2 parents 35ace98 + dff08aa commit ed5121c
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
35 changes: 35 additions & 0 deletions ckanext/versioned_datastore/lib/datastore_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from ckan import model
from ckan.plugins import toolkit, PluginImplementations
from splitgill.indexing.utils import DOC_TYPE
import time

from . import common
from ..interfaces import IVersionedDatastore
Expand Down Expand Up @@ -285,3 +286,37 @@ def is_ingestible(resource):
)
or (resource_format is not None and resource_format.lower() == 'zip')
)


def get_queue_length(queue_name):
"""
This is a *very* hacky way to get the length of a queue, including anything already
processing.
:param queue_name: the name of the queue to check, e.g. 'download'
:return: length of queue as int
"""
# because only pending jobs are counted, not active/running, if you add to the queue
# and job_list can't see it, the queue was empty; if it can, something else is
# already running.
def _temp_job():
time.sleep(1)

job = toolkit.enqueue_job(
_temp_job,
queue=queue_name,
title=f'{queue_name} queue status test',
rq_kwargs={'ttl': '1s'},
)

queued_jobs = toolkit.get_action('job_list')(
{'ignore_auth': True}, {'queues': [queue_name]}
)

job.delete()

return len(queued_jobs)


def get_es_health():
return {'ping': common.ES_CLIENT.ping(), 'info': common.ES_CLIENT.info()}
63 changes: 63 additions & 0 deletions ckanext/versioned_datastore/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,22 @@
ReadOnlyResourceException,
InvalidVersionException,
update_resources_privacy,
get_queue_length,
get_es_health,
)
from .lib.query.schema import register_schema
from .lib.query.v1_0_0 import v1_0_0Schema
from .logic import auth
from .logic.actions import basic_search, crud, downloads, extras, multisearch
from ckantools.loaders import create_actions, create_auth

try:
from ckanext.status.interfaces import IStatus

status_available = True
except ImportError:
status_available = False

log = logging.getLogger(__name__)


Expand All @@ -42,6 +51,8 @@ class VersionedSearchPlugin(SingletonPlugin):
implements(interfaces.IBlueprint, inherit=True)
implements(IVersionedDatastoreQuerySchema)
implements(interfaces.IClick)
if status_available:
implements(IStatus)

# IActions
def get_actions(self):
Expand Down Expand Up @@ -167,3 +178,55 @@ def configure(self, ckan_config):
# IVersionedDatastoreQuerySchema
def get_query_schemas(self):
return [(v1_0_0Schema.version, v1_0_0Schema())]

# IStatus
def modify_status_reports(self, status_reports):
queued_downloads = get_queue_length('download')

status_reports.append(
{
'label': toolkit._('Downloads'),
'value': queued_downloads,
'group': toolkit._('Queues'),
'help': toolkit._(
'Number of downloads either currently processing or waiting in the queue'
),
'state': 'good'
if queued_downloads == 0
else ('ok' if queued_downloads < 3 else 'bad'),
}
)

queued_imports = get_queue_length('importing')

status_reports.append(
{
'label': toolkit._('Imports'),
'value': queued_imports,
'group': toolkit._('Queues'),
'help': toolkit._(
'Number of import jobs either currently processing or waiting in the queue'
),
'state': 'good'
if queued_imports == 0
else ('ok' if queued_imports < 3 else 'bad'),
}
)

es_health = get_es_health()
server_status_text = (
toolkit._('available') if es_health['ping'] else toolkit._('unavailable')
)

status_reports.append(
{
'label': toolkit._('Search'),
'value': server_status_text,
'help': toolkit._(
'Multisearch functionality is provided by an Elasticsearch cluster'
),
'state': 'good' if es_health['ping'] else 'bad',
}
)

return status_reports

0 comments on commit ed5121c

Please sign in to comment.