from whoosh.fields import *
from whoosh.index import create_in, open_dir
from whoosh.qparser import MultifieldParser
from whoosh.query import *
import abc
import copy
import csv
import json
import os.path
import sys
import utils
[docs]class SearchEngine(object):
"""
An abstract class for search engines.
A batteries-included search engine that can operate on any
given dataset. Uses the Whoosh library to index and run searches
on the dataset. Has built-in support for query rewriting.
"""
# make it an abstract class
__metaclass__ = abc.ABCMeta
# TODO consider making more hierarchy. This is the WhooshSearchEngine,
# which has the cool indexing capabilities. But more generally, you
# could have a search engine that only has to support search().
# but at that point it's just a useless interface, mostly.
# anyway, such a search engine would let the query rewriting search engine
# inherit from search engine too.
[docs] def __init__(self, create, search_fields, index_path):
"""
Creates a new search engine.
:param create {bool}: If True, recreates an index from scratch.
If False, loads the existing index
:param search_fields {str[]}: An array names of fields in the index that our
search engine will search against.
:param index_path {str}: A relative path to a folder where the whoosh
index should be stored.
"""
# TODO have an auto-detect feature that will determine if the
# index exists, and depending on that creates or loads the index
# TODO have the `create` option become `force_create`; normally
# it'll intelligently auto-generate, but if you force it it'll
# do what you say
self.index_path = index_path
# both these functions return an index
if create:
self.index = self.create_index()
else:
self.index = self.load_index()
# set up searching
# first, query parser
self.parser = MultifieldParser(search_fields, self.index.schema)
# no rewriter yet
# TODO let someone pass this in the constructor
self.rewriter = None
[docs] def load_index(self):
"""
Used when the index is already created. This just loads it and
returns it for you.
"""
index = open_dir(self.index_path)
return index
[docs] def create_index(self):
"""
Creates and returns a brand-new index. This will call
get_empty_index() behind the scenes.
Subclasses must implement!
"""
raise NotImplementedError("Subclasses must implement!")
[docs] def get_empty_index(self, path, schema):
"""
Makes an empty index file, making the directory where it needs
to be stored if necessary. Returns the index.
This is called within create_index().
TODO this breakdown is still confusing
"""
if not os.path.exists(path):
os.mkdir(path)
index = create_in(path, schema)
return index
[docs] def set_rewriter(self, rewriter):
"""
Sets a new query rewriter (from this_package.rewriter) as the default
rewriter for this search engine.
"""
self.rewriter = rewriter
[docs] def get_num_documents(self):
"""
Returns the number of documents in this search engine's corpus. That is,
this is the size of the search engine.
"""
query = Every()
with self.index.searcher() as searcher:
result = searcher.search(query)
return len(result)
return None
def __len__(self):
return self.get_num_documents()
[docs] def search(self, term):
"""
Runs a plain-English search and returns results.
:param term {String}: a query like you'd type into Google.
:return: a list of dicts, each of which encodes a search result.
"""
if self.rewriter is None:
# if there's no query rewriter in place, just search for the
# original term
return self._single_search(term)
else:
# there's a rewriter! use it
rewritten_queries = self.rewriter.rewrite(term)
results = [self._single_search(q) for q in rewritten_queries]
# results are multi-level... flatten it
flattened_results = utils.flatten(results)
# only give the unique ones
# this works now that we use a Result object, which is hashable!
unique_results = list(set(flattened_results))
# now let's sort all the results by their relevance score (descending
# b/c higher is better)
# so the best stuff bubbles to the top
unique_results.sort(key=lambda result: result.score, reverse=True)
return unique_results
def _single_search(self, term):
"""
Helper function for search() that just returns search results for a
single, non-rewritten search term.
Returns a list of results, each of which is a Result object.
The makeup of the results objects varies
from search engine to search engine.
"""
outer_results = []
with self.index.searcher() as searcher:
query_obj = self.parser.parse(term)
# this variable is closed when the searcher is closed, so save this data
# in a variable outside the with-block
results = list(searcher.search(query_obj, limit=None))
# this list of Hits, each of which has `fields()`` which is a dict version
# of the item we got (contains title, description, or other fields)
# `score` tells you how relevant the hit is (higher = better)
cleaned_results = [Result(hit.fields(), hit.score) for hit in results]
# make sure we store it outside the with-block b/c scope
outer_results = cleaned_results
return outer_results
[docs]class UdacitySearchEngine(SearchEngine):
"""
Udacity
"""
# DATASET_PATH = secure.DATASET_PATH_BASE+'udacity-api.json'
# INDEX_PATH = secure.INDEX_PATH_BASE+'udacity'
"""Which dataset fields we should search over."""
SEARCH_FIELDS = ["title", "subtitle", "expected_learning",
"syllabus", "summary", "short_summary"]
[docs] def __init__(self, dataset_path, index_path, create=False):
"""
Creates a new Udacity search engine.
:param dataset_path {string}: the path to the Udacity API JSON file.
:param index_path {string}: the path to a folder where you'd like to
store the search engine index. The given folder doesn't have to exist,
but its *parent* folder does.
:param create {bool}: If True, recreates an index from scratch.
If False, loads the existing index
"""
self.dataset_path = dataset_path
super(UdacitySearchEngine, self).__init__(
create, self.SEARCH_FIELDS, index_path)
[docs] def create_index(self):
"""
Creates a new index to search the Udacity dataset. You only need to
call this once; once the index is created, you can just load it again
instead of creating it afresh all the time.
"""
# load data
udacity_data = None
with open(self.dataset_path, 'r') as file:
udacity_data = json.load(file)
# set up whoosh
# schema
# TODO: use StemmingAnalyzer here so we get the built-in benefits
# of stemming in our search engine
# http://whoosh.readthedocs.io/en/latest/stemming.html
schema = Schema(
slug=ID(stored=True),
title=TEXT(stored=True),
subtitle=TEXT,
expected_learning=TEXT,
syllabus=TEXT,
summary=TEXT,
short_summary=TEXT
)
# make an index to store this stuff in
index = self.get_empty_index(self.index_path, schema)
# start adding documents (i.e. the courses) to the index
try:
writer = index.writer()
for course in udacity_data['courses']:
writer.add_document(
slug=course['slug'],
title=course['title'],
subtitle=course['subtitle'],
expected_learning=course['expected_learning'],
syllabus=course['syllabus'],
summary=course['summary'],
short_summary=course['short_summary'])
writer.commit()
except Exception as e:
print e
# all done for now
return index
[docs] def count_words(self):
"""
Returns the number of words in the underlying Udacity dataset.
"""
# will be useful for extracting textual content from a course later
def extract_text_from_course(c):
return [c[field] for field in self.SEARCH_FIELDS]
# load data
with open(self.dataset_path, 'r') as file:
udacity_data = json.load(file)
# extract just the text fields, no other markup or fields
courses = udacity_data['courses']
paragraphs = [extract_text_from_course(c) for c in courses]
# these are nested... flatten into one huge string array
raw_lines = utils.flatten(paragraphs)
# then flatten into one huge string
mega_string = (" ").join(raw_lines)
return utils.unique_words_in_string(mega_string)
[docs]class HarvardXSearchEngine(SearchEngine):
"""
HX
"""
# INDEX_PATH = secure.INDEX_PATH_BASE+'harvardx'
SEARCH_FIELDS = ["display_name", "contents"]
[docs] def __init__(self, dataset_path, index_path, create=False):
"""
Creates a new HarvardX search engine. Searches over the HarvardX/DART
database of all courses and course materials used in HarvardX. This includes
videos, quizzes, etc.
TODO: consider renaming to DART, probz
:param dataset_path {string}: the path to the HarvardX course catalog CSV file.
:param index_path {string}: the path to a folder where you'd like to
store the search engine index. The given folder doesn't have to exist,
but its *parent* folder does.
:param create {bool}: If True, recreates an index from scratch.
If False, loads the existing index
"""
super(HarvardXSearchEngine, self).__init__(
create, self.SEARCH_FIELDS, index_path)
self.dataset_path = dataset_path
[docs] def create_index(self):
"""
Creates a new index to search the dataset. You only need to
call this once; once the index is created, you can just load it again
instead of creating it afresh all the time.
Returns the index object.
"""
# load data
# real data
# csvfile_path = secure.DATASET_PATH_BASE+'corpus_HarvardX_LatestCourses_based_on_2016-10-18.csv'
# test data
# csvfile_path = 'datasets/test.csv'
# only consider resources with this category (type of content)
# unsure about courses (b/c they have no content) and html (b/c they often include messy CSS/JS in there)
# TODO: add "html" support. requires stripping comments
# http://stackoverflow.com/questions/753052/strip-html-from-strings-in-python
#
supported_categories = ('problem', 'video', 'course')
# set up whoosh schema
schema = Schema(
course_id=ID(stored=True),
display_name=TEXT(stored=True),
contents=TEXT
)
# TODO: use StemmingAnalyzer here so we get the built-in benefits
# of stemming in our search engine
# http://whoosh.readthedocs.io/en/latest/stemming.html
# make an index to store this stuff in
index = self.get_empty_index(self.index_path, schema)
# start adding documents (i.e. the courses) to the index
# first, some of the fields are HUGE so we need to let the csv
# reader handle them
csv.field_size_limit(sys.maxsize)
with open(self.dataset_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
writer = index.writer()
try:
for row in reader:
# ensure the content is actually a valid type
if row['category'] not in supported_categories:
pass
# write
writer.add_document(
course_id=row['course_id'].decode('utf8'),
display_name=row['display_name'].decode('utf8'),
contents=row['contents'].decode('utf8'))
writer.commit()
except Exception as e:
print e
writer.cancel()
# all done for now
return index
[docs]class EdXSearchEngine(SearchEngine):
"""
edX
"""
# INDEX_PATH = secure.INDEX_PATH_BASE+'edx'
SEARCH_FIELDS = ["name"]
[docs] def __init__(self, dataset_path, index_path, create=False):
"""
Creates a new search engine that searches over edX courses.
:param dataset_path {string}: the path to the edX course listings file.
:param index_path {string}: the path to a folder where you'd like to
store the search engine index. The given folder doesn't have to exist,
but its *parent* folder does.
:param create {bool}: If True, recreates an index from scratch.
If False, loads the existing index
"""
super(EdXSearchEngine, self).__init__(
create, self.SEARCH_FIELDS, index_path)
self.dataset_path = dataset_path
[docs] def create_index(self):
"""
Creates a new index to search the dataset. You only need to
call this once; once the index is created, you can just load it again
instead of creating it afresh all the time.
Returns the index object.
"""
# load data
# csvfile_path = secure.DATASET_PATH_BASE+'Master CourseListings - edX.csv'
# set up whoosh schema
schema = Schema(
course_id=ID(stored=True),
name=TEXT(stored=True)
)
# TODO: use StemmingAnalyzer here so we get the built-in benefits
# of stemming in our search engine
# http://whoosh.readthedocs.io/en/latest/stemming.html
# make an index to store this stuff in
index = self.get_empty_index(self.index_path, schema)
# start adding documents (i.e. the courses) to the index
with open(self.dataset_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
writer = index.writer()
try:
for row in reader:
# write
writer.add_document(
course_id=row['course_id'].decode('utf8'),
name=row['name'].decode('utf8'))
writer.commit()
except Exception as e:
print e
writer.cancel()
# all done for now
return index
[docs] def count_words(self):
"""
Returns the number of words in the underlying Udacity dataset.
"""
with open(self.dataset_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
# the only text field that's useful is the name field
names = [row['name'].decode('utf8') for row in reader]
# turn into one huge string then count words in that
mega_string = (" ").join(names)
return utils.unique_words_in_string(mega_string)
[docs]class PrebuiltSearchEngine(SearchEngine):
"""
A search engine designed for when you're just given a model file and can
use that directly without having to build anything.
"""
def __init__(self, search_fields, index_path):
super(PrebuiltSearchEngine, self).__init__(
create=False, search_fields=search_fields, index_path=index_path)
def create_index(self):
# no need to create!!
# TODO raise an error
raise NotImplementedError(
"This search engine doesn't need to create an index! Use create = False.")
pass
[docs]class Result(object):
"""
Encodes a search result. Basically a wrapper around a result dict and
its relevance score (higher is better).
"""
def __init__(self, dict_data, score):
self.dict_data = dict_data
self.score = score
[docs] def get_dict(self):
"""
Get the underlying dict data
"""
return self.dict_data
def __repr__(self):
"""
Stringified version of the result, which encodes the dict and the score
"""
return str((self.dict_data, self.score))
# enable lookup as if this was a real dict
def __getitem__(self, key):
return self.dict_data[key]
# to enable hashing
def __hash__(self):
return hash(frozenset(self.dict_data.items()))
def __eq__(self, other):
return frozenset(self.dict_data.items()) == frozenset(other.dict_data.items())