Source code for pyesgf.search.context

"""

Module :mod:`pyesgf.search.context`
===================================

Defines the :class:`SearchContext` class which represents each ESGF search
query.

"""

import os
import sys
import copy

from webob.multidict import MultiDict

from .constraints import GeospatialConstraint
from .consts import (TYPE_DATASET, TYPE_FILE, TYPE_AGGREGATION,
                     QUERY_KEYWORD_TYPES, DEFAULT_BATCH_SIZE)
from .results import ResultSet
from .exceptions import EsgfSearchException


[docs]class SearchContext(object): """Instances of this class represent the state of a current search. It exposes what facets are available to select and the facet counts if they are available. Subclasses of this class can restrict the search options. For instance FileSearchContext, DatasetSerachContext or CMIP5SearchContext SearchContext instances are connected to SearchConnection instances. You normally create SearchContext instances via one of: 1. Calling SearchConnection.new_context() 2. Calling SearchContext.constrain() :ivar constraints: A dictionary of facet constraints currently in effect. ``constraint[facet_name] = [value, value, ...]`` :ivar facets: A string containing a comma-separated list of facets to be returned (for example ``'source_id,ensemble_id'``). If set, this will be used to select which facet counts to include, as returned in the ``facet_counts`` dictionary. Defaults to including all available facets, but with distributed searches (where the SearchConnection instance was created with ``distrib=True``), some results may be missing for server-side reasons when requesting all facets, so a warning message will be issued. This contains further details. :property facet_counts: A dictionary of available hits with each facet value for the search as currently constrained. This property returns a dictionary of dictionaries where ``facet_counts[facet][facet_value] == hit_count`` :property hit_count: The total number of hits available with current constraints. """ DEFAULT_SEARCH_TYPE = NotImplemented def __init__(self, connection, constraints, search_type=None, latest=None, facets=None, fields=None, from_timestamp=None, to_timestamp=None, replica=None, shards=None): """ :param connection: The SearchConnection :param constraints: A dictionary of initial constraints :param search_type: One of TYPE_* constants defining the document type to search for. Overrides SearchContext.DEFAULT_SEARCH_TYPE :param facets: The list of facets for which counts will be retrieved and constraints be validated against. Or None to represent all facets. :param fields: A list of field names to return in search responses :param replica: A boolean defining whether to return master records or replicas, or None to return both. :param latest: A boolean defining whether to return only latest versions or only non-latest versions, or None to return both. :param shards: list of shards to restrict searches to. Should be from the list self.connection.get_shard_list() :param from_timestamp: Date-time string to specify start of search range (e.g. "2000-01-01T00:00:00Z"). :param to_timestamp: Date-time string to specify end of search range (e.g. "2100-12-31T23:59:59Z"). """ self.connection = connection self.__facet_counts = None self.__hit_count = None self._did_facets_star_warning = False if search_type is None: search_type = self.DEFAULT_SEARCH_TYPE # Constraints self.freetext_constraint = None self.facet_constraints = MultiDict() self.temporal_constraint = [from_timestamp, to_timestamp] self.geospatial_constraint = None self._update_constraints(constraints) # Search configuration parameters self.timestamp_range = (from_timestamp, to_timestamp) search_types = [TYPE_DATASET, TYPE_FILE, TYPE_AGGREGATION] if search_type not in search_types: raise EsgfSearchException('search_type must be one of %s' % ','.join(search_types)) self.search_type = search_type self.latest = latest self.facets = facets self.fields = fields self.replica = replica self.shards = shards # ------------------------------------------------------------------------- # Functional search interface # These do not change the constraints on self.
[docs] def search(self, batch_size=DEFAULT_BATCH_SIZE, ignore_facet_check=False, **constraints): """ Perform the search with current constraints returning a set of results. :batch_size: The number of results to get per HTTP request. :ignore_facet_check: Do not make an extra HTTP request to populate :py:attr:`~facet_counts` and :py:attr:`~hit_count`. :param constraints: Further constraints for this query. Equivalent to calling ``self.constrain(**constraints).search()`` :return: A ResultSet for this query """ if constraints: sc = self.constrain(**constraints) else: sc = self if not ignore_facet_check: sc.__update_counts() return ResultSet(sc, batch_size=batch_size)
[docs] def constrain(self, **constraints): """ Return a *new* instance with the additional constraints. """ new_sc = copy.deepcopy(self) new_sc._update_constraints(constraints) return new_sc
[docs] def get_download_script(self, **constraints): """ Download a script for downloading all files in the set of results. :param constraints: Further constraints for this query. Equivalent to calling ``self.constrain(**constraints).get_download_script()`` :return: A string containing the script """ if constraints: sc = self.constrain(**constraints) else: sc = self sc.__update_counts() query_dict = sc._build_query() # !TODO: allow setting limit script = sc.connection.send_wget(query_dict, shards=self.shards) return script
@property def facet_counts(self): self.__update_counts() return self.__facet_counts @property def hit_count(self): self.__update_counts() return self.__hit_count
[docs] def get_facet_options(self): """ Return a dictionary of facet counts filtered to remove all facets that are completely constrained. This method is similar to the property ``facet_counts`` except facet values which are not relevant for further constraining are removed. """ facet_options = {} hits = self.hit_count for facet, counts in list(self.facet_counts.items()): # filter out counts that match total hits counts = dict(items for items in list(counts.items()) if items[1] < hits) if len(counts) > 1: facet_options[facet] = counts return facet_options
def __update_counts(self): # If hit_count is set the counts are already retrieved if self.__hit_count is not None: return self.__facet_counts = {} self.__hit_count = None query_dict = self._build_query() if self.facets: query_dict['facets'] = self.facets else: query_dict['facets'] = '*' if self.connection.distrib: self._do_facets_star_warning() response = self.connection.send_search(query_dict, limit=0) for facet, counts in (list(response['facet_counts']['facet_fields'].items())): d = self.__facet_counts[facet] = {} while counts: d[counts.pop()] = counts.pop() self.__hit_count = response['response']['numFound'] def _do_facets_star_warning(self): env_var_name = 'ESGF_PYCLIENT_NO_FACETS_STAR_WARNING' if env_var_name in os.environ: return if not self._did_facets_star_warning: sys.stderr.write(f''' ------------------------------------------------------------------------------- Warning - defaulting to search with facets=* This behavior is kept for backward-compatibility, but ESGF indexes might not successfully perform a distributed search when this option is used, so some results may be missing. For full results, it is recommended to pass a list of facets of interest when instantiating a context object. For example, ctx = conn.new_context(facets='project,experiment_id') Only the facets that you specify will be present in the facets_counts dictionary. This warning is displayed when a distributed search is performed while using the facets=* default, a maximum of once per context object. To suppress this warning, set the environment variable {env_var_name} to any value or explicitly use conn.new_context(facets='*') ------------------------------------------------------------------------------- ''') self._did_facets_star_warning = True # ------------------------------------------------------------------------- # Constraint mutation interface # These functions update the instance in-place. # Use constrain() and search() to generate new contexts with tighter # constraints. def _update_constraints(self, constraints): """ Update the constraints in-place by calling _constrain_*() methods. """ constraints_split = self._split_constraints(constraints) self._constrain_facets(constraints_split['facet']) if 'query' in constraints_split['freetext']: new_freetext = constraints_split['freetext']['query'] self._constrain_freetext(new_freetext) # !TODO: implement temporal and geospatial constraints if 'from_timestamp' in constraints_split['temporal']: self.temporal_constraint[0] = (constraints_split['temporal'] ['from_timestamp']) if 'to_timestamp' in constraints_split['temporal']: self.temporal_constraint[1] = (constraints_split['temporal'] ['to_timestamp']) # self._constrain_geospatial() # reset cached values self.__hit_count = None self.__facet_counts = None def _constrain_facets(self, facet_constraints): for key, values in list(facet_constraints.mixed().items()): current_values = self.facet_constraints.getall(key) if isinstance(values, list): for value in values: if value not in current_values: self.facet_constraints.add(key, value) else: if values not in current_values: self.facet_constraints.add(key, values) def _constrain_freetext(self, query): self.freetext_constraint = query def _constrain_geospatial(self, lat=None, lon=None, bbox=None, location=None, radius=None, polygon=None): self.geospatial_constraint = GeospatialConstraint( lat, lon, bbox, location, radius, polygon) raise NotImplementedError # ------------------------------------------------------------------------- def _split_constraints(self, constraints): """ Divide a constraint dictionary into 4 types of constraints: 1. Freetext query 2. Facet constraints 3. Temporal constraints 4. Geospatial constraints :return: A dictionary of the 4 types of constraint. """ # local import to prevent circular importing from .connection import query_keyword_type constraints_split = dict((kw, MultiDict()) for kw in QUERY_KEYWORD_TYPES) for kw, val in list(constraints.items()): constraint_type = query_keyword_type(kw) constraints_split[constraint_type][kw] = val return constraints_split def _build_query(self): """ Build query string parameters as a dictionary. """ query_dict = MultiDict({"query": self.freetext_constraint, "type": self.search_type, "latest": self.latest, "facets": self.facets, "fields": self.fields, "replica": self.replica}) query_dict.extend(self.facet_constraints) # !TODO: encode datetime start, end = self.temporal_constraint query_dict.update(start=start, end=end) return query_dict
[docs]class DatasetSearchContext(SearchContext): DEFAULT_SEARCH_TYPE = TYPE_DATASET
[docs]class FileSearchContext(SearchContext): DEFAULT_SEARCH_TYPE = TYPE_FILE
[docs]class AggregationSearchContext(SearchContext): DEFAULT_SEARCH_TYPE = TYPE_AGGREGATION