from whoosh.fields import *
from whoosh.index import create_in, open_dir
from whoosh.qparser import MultifieldParser
from whoosh.query import *
import abc
import copy
import csv
import json
import os.path
import sys
import utils
[docs]class GenericSearchEngine(object):
"""
An abstract class for any search engine, whether that's an external API
you've already built or a Whoosh-based search engine you can make from
scratch via searchbetter.
This class encapsulates some useful functionality like query rewriting
that can benefit any search engine, even one not made using SearchBetter
tools.
Extending this class is easy - you just need to provide a search function
and a few other details, and we'll build in functionality from there.
"""
# make it an abstract class
__metaclass__ = abc.ABCMeta
def __init__(self):
# no rewriter yet
# TODO let someone pass this in the constructor
self.rewriter = None
[docs] def set_rewriter(self, rewriter):
"""
Sets a new query rewriter (from this_package.rewriter) as the default
rewriter for this search engine.
"""
self.rewriter = rewriter
[docs] def search(self, term):
"""
Runs a plain-English search and returns results.
:param term {String}: a query like you'd type into Google.
:return: a list of dicts, each of which encodes a search result.
"""
if self.rewriter is None:
# if there's no query rewriter in place, just search for the
# original term
return self.single_search(term)
else:
# there's a rewriter! use it
rewritten_queries = self.rewriter.rewrite(term)
results = [self.single_search(q) for q in rewritten_queries]
# results are multi-level... flatten it
flattened_results = utils.flatten(results)
return self.process_raw_results(flattened_results)
[docs] def process_raw_results(self, raw_results):
"""
After rewriting, we'll pass the full list of results in here
for you to clean up. This could include sorting, removing duplicates,
etc. (What you can do, and how you do it, really depends on what kind
of objects your search engine returns.)
"""
# default operation is a no-op
return raw_results
###
###
### functions you need to specify
###
###
[docs] def single_search(self, term):
"""
Runs the search engine on a single term (no rewriting or anything),
returning a list of objects.
Subclasses must implement!
:param str term: a word or phrase to search for
:return: a list of objects that were found. Can be anything: dicts,
strings, custom objects, whatever.
:rtype: list(object)
"""
raise NotImplementedError("Subclasses must implement!")
[docs]class WhooshSearchEngine(GenericSearchEngine):
"""
An abstract class for custom, Whoosh-based search engines.
A batteries-included search engine that can operate on any
given dataset. Uses the Whoosh library to index and run searches
on the dataset. Has built-in support for query rewriting.
"""
# make it an abstract class
__metaclass__ = abc.ABCMeta
# TODO consider making more hierarchy. This is the WhooshSearchEngine,
# which has the cool indexing capabilities. But more generally, you
# could have a search engine that only has to support search().
# but at that point it's just a useless interface, mostly.
# anyway, such a search engine would let the query rewriting search engine
# inherit from search engine too.
[docs] def __init__(self, create, search_fields, index_path):
"""
Creates a new search engine.
:param create {bool}: If True, recreates an index from scratch.
If False, loads the existing index
:param search_fields {str[]}: An array names of fields in the index that our
search engine will search against.
:param index_path {str}: A relative path to a folder where the whoosh
index should be stored.
"""
super(WhooshSearchEngine, self).__init__()
# TODO have an auto-detect feature that will determine if the
# index exists, and depending on that creates or loads the index
# TODO have the `create` option become `force_create`; normally
# it'll intelligently auto-generate, but if you force it it'll
# do what you say
self.index_path = index_path
# both these functions return an index
if create:
self.index = self.create_index()
else:
self.index = self.load_index()
# set up searching
# first, query parser
self.parser = MultifieldParser(search_fields, self.index.schema)
[docs] def load_index(self):
"""
Used when the index is already created. This just loads it and
returns it for you.
"""
index = open_dir(self.index_path)
return index
[docs] def create_index(self):
"""
Creates and returns a brand-new index. This will call
get_empty_index() behind the scenes.
Subclasses must implement!
"""
raise NotImplementedError("Subclasses must implement!")
[docs] def get_empty_index(self, path, schema):
"""
Makes an empty index file, making the directory where it needs
to be stored if necessary. Returns the index.
This is called within create_index().
TODO this breakdown is still confusing
"""
if not os.path.exists(path):
os.mkdir(path)
index = create_in(path, schema)
return index
[docs] def get_num_documents(self):
"""
Returns the number of documents in this search engine's corpus. That is,
this is the size of the search engine.
"""
query = Every()
with self.index.searcher() as searcher:
result = searcher.search(query)
return len(result)
return None
def __len__(self):
return self.get_num_documents()
[docs] def single_search(self, term):
"""
Helper function for search() that just returns search results for a
single, non-rewritten search term.
Returns a list of results, each of which is a Result object.
The makeup of the results objects varies
from search engine to search engine.
OVERRIDDEN from GenericSearchEngine.
"""
outer_results = []
with self.index.searcher() as searcher:
query_obj = self.parser.parse(term)
# this variable is closed when the searcher is closed, so save this data
# in a variable outside the with-block
results = list(searcher.search(query_obj, limit=None))
# this list of Hits, each of which has `fields()`` which is a dict version
# of the item we got (contains title, description, or other fields)
# `score` tells you how relevant the hit is (higher = better)
cleaned_results = [WhooshResult(hit.fields(), hit.score) for hit in results]
# make sure we store it outside the with-block b/c scope
outer_results = cleaned_results
return outer_results
def process_raw_results(self, raw_results):
# our search engine returns WhooshResult objects, so we can unique/sort
# them
# only give the unique ones
# this works now that we use a Result object, which is hashable!
unique_results = list(set(raw_results))
# now let's sort all the results by their relevance score (descending
# b/c higher is better)
# so the best stuff bubbles to the top
unique_results.sort(key=lambda result: result.score, reverse=True)
return unique_results
[docs]class UdacitySearchEngine(WhooshSearchEngine):
"""
Udacity
"""
# DATASET_PATH = secure.DATASET_PATH_BASE+'udacity-api.json'
# INDEX_PATH = secure.INDEX_PATH_BASE+'udacity'
"""Which dataset fields we should search over."""
SEARCH_FIELDS = ["title", "subtitle", "expected_learning",
"syllabus", "summary", "short_summary"]
[docs] def __init__(self, dataset_path, index_path, create=False):
"""
Creates a new Udacity search engine.
:param dataset_path {string}: the path to the Udacity API JSON file.
:param index_path {string}: the path to a folder where you'd like to
store the search engine index. The given folder doesn't have to exist,
but its *parent* folder does.
:param create {bool}: If True, recreates an index from scratch.
If False, loads the existing index
"""
self.dataset_path = dataset_path
super(UdacitySearchEngine, self).__init__(
create, self.SEARCH_FIELDS, index_path)
[docs] def create_index(self):
"""
Creates a new index to search the Udacity dataset. You only need to
call this once; once the index is created, you can just load it again
instead of creating it afresh all the time.
"""
# load data
udacity_data = None
with open(self.dataset_path, 'r') as file:
udacity_data = json.load(file)
# set up whoosh
# schema
# TODO: use StemmingAnalyzer here so we get the built-in benefits
# of stemming in our search engine
# http://whoosh.readthedocs.io/en/latest/stemming.html
schema = Schema(
slug=ID(stored=True),
title=TEXT(stored=True),
subtitle=TEXT,
expected_learning=TEXT,
syllabus=TEXT,
summary=TEXT,
short_summary=TEXT
)
# make an index to store this stuff in
index = self.get_empty_index(self.index_path, schema)
# start adding documents (i.e. the courses) to the index
try:
writer = index.writer()
for course in udacity_data['courses']:
writer.add_document(
slug=course['slug'],
title=course['title'],
subtitle=course['subtitle'],
expected_learning=course['expected_learning'],
syllabus=course['syllabus'],
summary=course['summary'],
short_summary=course['short_summary'])
writer.commit()
except Exception as e:
print e
# all done for now
return index
[docs] def count_words(self):
"""
Returns the number of words in the underlying Udacity dataset.
"""
# will be useful for extracting textual content from a course later
def extract_text_from_course(c):
return [c[field] for field in self.SEARCH_FIELDS]
# load data
with open(self.dataset_path, 'r') as file:
udacity_data = json.load(file)
# extract just the text fields, no other markup or fields
courses = udacity_data['courses']
paragraphs = [extract_text_from_course(c) for c in courses]
# these are nested... flatten into one huge string array
raw_lines = utils.flatten(paragraphs)
# then flatten into one huge string
mega_string = (" ").join(raw_lines)
return utils.unique_words_in_string(mega_string)
[docs]class HarvardXSearchEngine(WhooshSearchEngine):
"""
HX
"""
# INDEX_PATH = secure.INDEX_PATH_BASE+'harvardx'
SEARCH_FIELDS = ["display_name", "contents"]
[docs] def __init__(self, dataset_path, index_path, create=False):
"""
Creates a new HarvardX search engine. Searches over the HarvardX/DART
database of all courses and course materials used in HarvardX. This includes
videos, quizzes, etc.
TODO: consider renaming to DART, probz
:param dataset_path {string}: the path to the HarvardX course catalog CSV file.
:param index_path {string}: the path to a folder where you'd like to
store the search engine index. The given folder doesn't have to exist,
but its *parent* folder does.
:param create {bool}: If True, recreates an index from scratch.
If False, loads the existing index
"""
super(HarvardXSearchEngine, self).__init__(
create, self.SEARCH_FIELDS, index_path)
self.dataset_path = dataset_path
[docs] def create_index(self):
"""
Creates a new index to search the dataset. You only need to
call this once; once the index is created, you can just load it again
instead of creating it afresh all the time.
Returns the index object.
"""
# load data
# real data
# csvfile_path = secure.DATASET_PATH_BASE+'corpus_HarvardX_LatestCourses_based_on_2016-10-18.csv'
# test data
# csvfile_path = 'datasets/test.csv'
# only consider resources with this category (type of content)
# unsure about courses (b/c they have no content) and html (b/c they often include messy CSS/JS in there)
# TODO: add "html" support. requires stripping comments
# http://stackoverflow.com/questions/753052/strip-html-from-strings-in-python
#
supported_categories = ('problem', 'video', 'course')
# set up whoosh schema
schema = Schema(
course_id=ID(stored=True),
display_name=TEXT(stored=True),
contents=TEXT
)
# TODO: use StemmingAnalyzer here so we get the built-in benefits
# of stemming in our search engine
# http://whoosh.readthedocs.io/en/latest/stemming.html
# make an index to store this stuff in
index = self.get_empty_index(self.index_path, schema)
# start adding documents (i.e. the courses) to the index
# first, some of the fields are HUGE so we need to let the csv
# reader handle them
csv.field_size_limit(sys.maxsize)
with open(self.dataset_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
writer = index.writer()
try:
for row in reader:
# ensure the content is actually a valid type
if row['category'] not in supported_categories:
pass
# write
writer.add_document(
course_id=row['course_id'].decode('utf8'),
display_name=row['display_name'].decode('utf8'),
contents=row['contents'].decode('utf8'))
writer.commit()
except Exception as e:
print e
writer.cancel()
# all done for now
return index
[docs]class EdXSearchEngine(WhooshSearchEngine):
"""
edX
"""
# INDEX_PATH = secure.INDEX_PATH_BASE+'edx'
SEARCH_FIELDS = ["name"]
[docs] def __init__(self, dataset_path, index_path, create=False):
"""
Creates a new search engine that searches over edX courses.
:param dataset_path {string}: the path to the edX course listings file.
:param index_path {string}: the path to a folder where you'd like to
store the search engine index. The given folder doesn't have to exist,
but its *parent* folder does.
:param create {bool}: If True, recreates an index from scratch.
If False, loads the existing index
"""
super(EdXSearchEngine, self).__init__(
create, self.SEARCH_FIELDS, index_path)
self.dataset_path = dataset_path
[docs] def create_index(self):
"""
Creates a new index to search the dataset. You only need to
call this once; once the index is created, you can just load it again
instead of creating it afresh all the time.
Returns the index object.
"""
# load data
# csvfile_path = secure.DATASET_PATH_BASE+'Master CourseListings - edX.csv'
# set up whoosh schema
schema = Schema(
course_id=ID(stored=True),
name=TEXT(stored=True)
)
# TODO: use StemmingAnalyzer here so we get the built-in benefits
# of stemming in our search engine
# http://whoosh.readthedocs.io/en/latest/stemming.html
# make an index to store this stuff in
index = self.get_empty_index(self.index_path, schema)
# start adding documents (i.e. the courses) to the index
with open(self.dataset_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
writer = index.writer()
try:
for row in reader:
# write
writer.add_document(
course_id=row['course_id'].decode('utf8'),
name=row['name'].decode('utf8'))
writer.commit()
except Exception as e:
print e
writer.cancel()
# all done for now
return index
[docs] def count_words(self):
"""
Returns the number of words in the underlying Udacity dataset.
"""
with open(self.dataset_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
# the only text field that's useful is the name field
names = [row['name'].decode('utf8') for row in reader]
# turn into one huge string then count words in that
mega_string = (" ").join(names)
return utils.unique_words_in_string(mega_string)
[docs]class PrebuiltSearchEngine(WhooshSearchEngine):
"""
A search engine designed for when you're just given a model file and can
use that directly without having to build anything.
"""
def __init__(self, search_fields, index_path):
super(PrebuiltSearchEngine, self).__init__(
create=False, search_fields=search_fields, index_path=index_path)
def create_index(self):
# no need to create!!
# TODO raise an error
raise NotImplementedError(
"This search engine doesn't need to create an index! Use create = False.")
pass
[docs]class WhooshResult(object):
"""
Encodes a search result from a Whoosh-based search engine.
Basically a wrapper around a result dict and its relevance score
(higher is better).
"""
def __init__(self, dict_data, score):
self.dict_data = dict_data
self.score = score
[docs] def get_dict(self):
"""
Get the underlying dict data
"""
return self.dict_data
def __repr__(self):
"""
Stringified version of the result, which encodes the dict and the score
"""
return str((self.dict_data, self.score))
# enable lookup as if this was a real dict
def __getitem__(self, key):
return self.dict_data[key]
# to enable hashing
def __hash__(self):
return hash(frozenset(self.dict_data.items()))
def __eq__(self, other):
return frozenset(self.dict_data.items()) == frozenset(other.dict_data.items())