"""COUNTER journal and book reports and associated functions."""
from __future__ import absolute_import
import collections
import datetime
import logging
import re
import warnings
import pendulum
import six
from pycounter import csvhelper
from pycounter.constants import CODES, HEADER_FIELDS, METRICS
from pycounter.constants import REPORT_DESCRIPTIONS, TOTAL_TEXT
from pycounter.exceptions import (
PycounterException,
PycounterWarning,
UnknownReportTypeError,
)
from pycounter.helpers import (
convert_covered,
convert_date_column,
convert_date_run,
format_stat,
guess_type_from_content,
is_first_last,
last_day,
next_month,
)
[docs]class CounterReport(object):
"""
a COUNTER usage statistics report.
Iterate over the report object to get its rows (each of which is a
:class:`CounterBook <CounterBook>` or :class:`CounterJournal
<CounterJournal>` instance.
:param metric: metric being tracked by this report. For database
reports (which have multiple metrics per report), this should be
set to `None`.
:param report_type: type of report (e.g., "JR1", "BR2")
:param report_version: COUNTER version
:param customer: name of customer on report
:param institutional_identifier: unique ID assigned by vendor for
customer
:param period: tuple of datetime.date objects corresponding to the
beginning and end of the covered range
:param date_run: date the COUNTER report was generated
:param section_type: predominant section type used for this report.
(applies to report BR2; should probably be None for any other report
type)
"""
# pylint: disable=too-many-instance-attributes
def __init__(
self,
report_type=None,
report_version=4,
metric=None,
customer=None,
institutional_identifier=None,
period=(None, None),
date_run=None,
section_type=None,
):
self.pubs = []
self.report_type = report_type
self.report_version = report_version
self.metric = metric
self.customer = customer
self.institutional_identifier = institutional_identifier
if not is_first_last(period):
warnings.warn(
"report period should be from"
"first day of a month to last day of a month.",
PycounterWarning,
)
self.period = period
if date_run is None:
self.date_run = datetime.date.today()
else:
self.date_run = date_run
self._year = None
self.section_type = section_type
def __repr__(self):
return "<CounterReport %s version %s for date range %s to %s>" % (
self.report_type,
self.report_version,
self.period[0],
self.period[1],
)
@property
def year(self):
"""Year report was issued (deprecated)."""
warnings.warn(
DeprecationWarning(
"CounterReport.year is deprecated."
"Reports may span multiple years. "
"COUNTER 5 reports will not have a year set."
)
)
return self._year
@year.setter
def year(self, value):
"""Set year report was issued."""
self._year = value
def __iter__(self):
return iter(self.pubs)
[docs] def write_to_file(self, path, format_):
"""
Output report to a file.
:param path: location to write file
:param format_: file format. Currently supports 'tsv'
:return:
"""
if format_ == "tsv":
self.write_tsv(path)
else:
raise PycounterException("unknown file type %s" % format_)
[docs] def write_tsv(self, path):
"""
Output report to a COUNTER 4 TSV file.
:param path: location to write file
"""
lines = self.as_generic()
with csvhelper.UnicodeWriter(path, delimiter="\t") as writer:
writer.writerows(lines)
[docs] def as_generic(self):
"""
Output report as list of lists.
Nested list will contain cells that would appear
in COUNTER report (suitable for writing as CSV, TSV, etc.)
"""
output_lines = []
rep_type = ""
for name, code in CODES.items():
if code == self.report_type[0:2]:
rep_type = name
report_name = "%s Report %s (R%s)" % (
rep_type,
self.report_type[-1],
self.report_version,
)
output_lines.append([report_name, REPORT_DESCRIPTIONS[self.report_type]])
if self.report_type == "BR2":
output_lines.append([self.customer, u"Section Type:"])
output_lines.append([self.institutional_identifier, self.section_type])
else:
output_lines.append([self.customer])
output_lines.append([self.institutional_identifier])
output_lines.append([u"Period covered by Report:"])
period = "%s to %s" % (
self.period[0].strftime("%Y-%m-%d"),
self.period[1].strftime("%Y-%m-%d"),
)
output_lines.append([period])
output_lines.append([u"Date run:"])
output_lines.append([self.date_run.strftime("%Y-%m-%d")])
output_lines.append(self._table_header())
if self.report_type in ("JR1", "BR1", "BR2", "DB2", "JR2", "BR3"):
output_lines.extend(self._totals_lines())
elif self.report_type.startswith("DB"):
self._ensure_required_metrics()
try:
self.pubs.sort(key=lambda x: METRICS[self.report_type].index(x.metric))
except ValueError:
pass
for pub in sorted(self.pubs, key=lambda x: x.title):
output_lines.append(pub.as_generic())
return output_lines
def _totals_lines(self):
"""Generate Totals for COUNTER report, as list of lists of cells."""
total_lines = []
metrics = set(resource.metric for resource in self.pubs)
for metric in sorted(metrics):
total_lines.append(self._totals_line(metric))
return total_lines
def _totals_line(self, metric):
"""Generate Totals for a given metric."""
total_cells = [TOTAL_TEXT[self.report_type]]
publishers = set(resource.publisher for resource in self.pubs)
if len(publishers) == 1:
total_cells.append(publishers.pop())
else:
total_cells.append(u"")
platforms = set(resource.platform for resource in self.pubs)
if len(platforms) == 1:
total_cells.append(platforms.pop())
else:
total_cells.append(u"")
if self.report_type in ("JR1", "BR1", "BR2", "JR2", "BR3"):
total_cells.extend([u""] * 4)
if self.report_type in ("DB2", "JR2", "BR3"):
total_cells.append(metric)
total_usage = 0
pdf_usage = 0
html_usage = 0
start_month_first_day = datetime.date(
self.period[0].year, self.period[0].month, 1
)
months = list(
pendulum.Period(start_month_first_day, self.period[1]).range("months")
)
month_data = [0] * len(months)
for pub in self.pubs:
if pub.metric != metric:
continue
if self.report_type == "JR1":
pdf_usage += pub.pdf_total # pytype: disable=attribute-error
html_usage += pub.html_total # pytype: disable=attribute-error
for data in pub:
total_usage += data[2]
month_data[months.index(data[0])] += data[2]
total_cells.append(six.text_type(total_usage))
if self.report_type == "JR1":
total_cells.append(six.text_type(html_usage))
total_cells.append(six.text_type(pdf_usage))
total_cells.extend(six.text_type(d) for d in month_data)
return total_cells
def _table_header(self):
"""Generate header for COUNTER table for report, as list of cells."""
header_cells = list(HEADER_FIELDS[self.report_type])
start_month_first_day = datetime.date(
self.period[0].year, self.period[0].month, 1
)
for d_obj in pendulum.Period(start_month_first_day, self.period[1]).range(
"months"
):
header_cells.append(d_obj.strftime("%b-%Y"))
return header_cells
def _ensure_required_metrics(self):
"""
Build up a dict of sets of known metrics for each database.
If any metric is missing add a 0 use
:class:`CounterDatabase<CounterDatabase>`.
Assumes platform and publisher are consistent across records.
"""
try:
required_metrics = METRICS[self.report_type]
except LookupError:
raise UnknownReportTypeError(self.report_type)
dbs = collections.defaultdict(set)
for database in self.pubs:
dbs[database.title].add(database.metric)
for database, metrics in six.iteritems(dbs):
for metric in (m for m in required_metrics if m not in metrics):
self.pubs.append(
CounterDatabase(
title=database,
platform=self.pubs[0].platform,
publisher=self.pubs[0].publisher,
period=self.period,
metric=metric,
month_data=[(self.period[0], 0)],
)
)
MonthsUsage = collections.namedtuple("MonthsUsage", "month metric usage")
[docs]class CounterEresource(six.Iterator):
"""
Base class for COUNTER statistics lines.
Iterating returns (first_day_of_month, metric, usage) tuples.
:param period: two-tuple of datetime.date objects corresponding
to the beginning and end dates of the covered range
:param metric: metric tracked by this report. Should be a value
from pycounter.report.METRICS dict.
:param month_data: a list containing usage data for this
resource, as (datetime.date, usage) tuples
:param title: title of the resource
:param publisher: name of the resource's publisher
:param platform: name of the platform providing the resource
"""
def __init__(
self,
period=None,
metric=None,
month_data=None,
title="",
platform="",
publisher="",
):
self.period = period
self.metric = metric
self._full_data = []
if month_data is not None:
for item in month_data:
self._full_data.append(item)
self.title = title
self.platform = platform
self.publisher = publisher
def __iter__(self):
if self._full_data:
for item in self._full_data:
yield MonthsUsage(item[0], self.metric, item[1])
def _fill_months(self):
"""Ensure each month in period represented and zero fill if not."""
start_month_first_day = datetime.date(
self.period[0].year, self.period[0].month, 1
)
start, end = start_month_first_day, self.period[1]
try:
for d_obj in pendulum.Period(start, end).range("months"):
if d_obj not in (x[0] for x in self._full_data):
self._full_data.append((d_obj, 0))
except IndexError:
pass
else:
self._full_data.sort()
[docs]class CounterJournal(CounterEresource):
"""
Statistics for a single electronic journal.
:param period: two-tuple of datetime.date objects corresponding
to the beginning and end dates of the covered range
:param metric: the metric tracked by this statistics line.
(Should probably always be "FT Article Requests" for
CounterJournal objects, as long as only JR1 is supported.)
:param issn: eJournal's print ISSN
:param eissn: eJournal's eISSN
:param month_data: a list containing usage data for this
journal, as (datetime.date, usage) tuples
:param title: title of the resource
:param publisher: name of the resource's publisher
:param platform: name of the platform providing the resource
:param html_total: total HTML usage for this title for reporting period
:param pdf_total: total PDF usage for this title for reporting period
"""
def __init__(
self,
period=None,
metric=METRICS[u"JR1"],
issn=None,
eissn=None,
month_data=None,
title="",
platform="",
publisher="",
html_total=0,
pdf_total=0,
doi="",
proprietary_id="",
):
super(CounterJournal, self).__init__(
period, metric, month_data, title, platform, publisher
)
self.html_total = html_total
self.pdf_total = pdf_total
self.doi = doi
self.proprietary_id = proprietary_id
self.isbn = None
if issn is not None:
self.issn = issn
else:
self.issn = ""
if eissn is not None:
self.eissn = eissn
else:
self.eissn = ""
def __repr__(self):
return """<CounterJournal %s, publisher %s,
platform %s>""" % (
self.title,
self.publisher,
self.platform,
)
[docs] def as_generic(self):
"""Get data for this line as list of COUNTER report cells."""
self._fill_months() # Ensure fill all months with zero at least
data_line = [
self.title,
self.publisher,
self.platform,
self.doi,
self.proprietary_id,
self.issn,
self.eissn,
]
total_usage = 0
month_data = []
for data in self:
total_usage += data[2]
month_data.append(six.text_type(data[2]))
if self.metric.startswith("Access"):
data_line.append(self.metric)
data_line.append(six.text_type(total_usage))
if not self.metric.startswith("Access"):
data_line.append(six.text_type(self.html_total))
data_line.append(six.text_type(self.pdf_total))
data_line.extend(month_data)
return data_line
[docs]class CounterBook(CounterEresource):
"""
statistics for a single electronic book.
:ivar isbn: eBook's ISBN
:ivar issn: eBook's ISSN (if any)
:param month_data: a list containing usage data for this
book, as (datetime.date, usage) tuples
:param title: title of the resource
:param publisher: name of the resource's publisher
:param platform: name of the platform providing the resource
"""
def __init__(
self,
period=None,
metric=None,
month_data=None,
title="",
platform="",
publisher="",
isbn=None,
issn=None,
doi="",
proprietary_id="",
print_isbn=None,
online_isbn=None,
):
super(CounterBook, self).__init__(
period, metric, month_data, title, platform, publisher
)
self.eissn = None
self.doi = doi
self.proprietary_id = proprietary_id
self._isbn = isbn
self.print_isbn = print_isbn
self.online_isbn = online_isbn
if issn is not None:
self.issn = issn
else:
self.issn = u""
def __repr__(self):
return """<CounterBook %s (ISBN: %s), publisher %s,
platform %s>""" % (
self.title,
self.isbn,
self.publisher,
self.platform,
)
@property
def isbn(self):
"""Return a suitable ISSN for the ebook.
The tabular COUNTER reports only report an "ISBN", while the SUSHI
(XML) reports include both a Print_ISBN and Online_ISBN.
This property will return a generic ISBN given in the constructor,
if any. If the CounterBook was created with no "isbn" but with
online_ISBN and/or print_ISBN, the online one, if any, will be
returned, otherwise the print.
"""
return self._isbn or self.online_isbn or self.print_isbn or u""
[docs] def as_generic(self):
"""Get data for this line as list of COUNTER report cells."""
self._fill_months() # Ensure fill all months with zero at least
data_line = [
self.title,
self.publisher,
self.platform,
self.doi,
self.proprietary_id,
self.isbn,
self.issn,
]
total_usage = 0
month_data = []
for data in self:
total_usage += data[2]
month_data.append(six.text_type(data[2]))
if self.metric and self.metric.startswith("Access"):
data_line.append(self.metric)
data_line.append(six.text_type(total_usage))
data_line.extend(month_data)
return data_line
[docs]class CounterDatabase(CounterEresource):
"""a COUNTER database report line."""
def __init__(
self,
period=None,
metric=None,
month_data=None,
title="",
platform="",
publisher="",
):
super(CounterDatabase, self).__init__(
period, metric, month_data, title, platform, publisher
)
self.isbn = None
[docs] def as_generic(self):
"""Return data for this line as list of COUNTER report cells."""
self._fill_months()
data_line = [self.title, self.publisher, self.platform, self.metric]
total_usage = 0
month_data = []
for data in self:
total_usage += data[2]
month_data.append(six.text_type(data[2]))
data_line.append(six.text_type(total_usage))
data_line.extend(month_data)
return data_line
class CounterPlatform(CounterEresource):
"""a COUNTER platform report line."""
def __init__(
self, period=None, metric=None, month_data=None, platform="", publisher=""
):
super(CounterPlatform, self).__init__(
period=period,
metric=metric,
month_data=month_data,
title="", # no title for platform report
platform=platform,
publisher=publisher,
)
self.isbn = None
def as_generic(self):
"""Return data for this line as list of COUNTER report cells."""
self._fill_months()
data_line = [self.platform, self.publisher, self.metric]
total_usage = 0
month_data = []
for data in self:
total_usage += data[2]
month_data.append(six.text_type(data[2]))
data_line.append(six.text_type(total_usage))
data_line.extend(month_data)
return data_line
[docs]def parse(filename, filetype=None, encoding="utf-8", fallback_encoding="latin-1"):
"""Parse a COUNTER file, first attempting to determine type.
Returns a :class:`CounterReport <CounterReport>` object.
:param filename: path to COUNTER report to load and parse.
:param filetype: type of file provided, one of "csv", "tsv", "xlsx".
If set to None (the default), an attempt will be made to
detect the correct type, first from the file extension, then from
the file's contents.
:param encoding: encoding to use to decode the file. Defaults to 'utf-8',
ignored for XLSX files (which specify their encoding in their XML)
:param fallback_encoding: alternative encoding to use to try to decode
the file if the primary encoding fails. This defaults to 'latin-1',
which will accept any bytes (possibly producing junk results...)
Ignored for XLSX files.
"""
if filetype is None:
if filename.endswith(".tsv"):
filetype = "tsv"
elif filename.endswith(".xlsx"):
filetype = "xlsx"
elif filename.endswith(".csv"):
filetype = "csv"
else:
with open(filename, "rb") as file_obj:
filetype = guess_type_from_content(file_obj)
if filetype == "tsv":
return parse_separated(filename, "\t", encoding, fallback_encoding)
elif filetype == "xlsx":
return parse_xlsx(filename)
elif filetype == "csv":
return parse_separated(filename, ",", encoding, fallback_encoding)
else:
raise PycounterException("Unknown file type %s" % filetype)
[docs]def parse_xlsx(filename):
"""Parse a COUNTER file in Excel format.
Invoked automatically by ``parse``.
:param filename: path to XLSX-format COUNTER report file.
"""
from openpyxl import load_workbook
with open(filename, "rb") as xlsx_file:
workbook = load_workbook(xlsx_file)
worksheet = workbook[workbook.sheetnames[0]]
row_it = worksheet.iter_rows()
split_row_list = (
[cell.value if cell.value is not None else "" for cell in row]
for row in row_it
)
return parse_generic(split_row_list)
[docs]def parse_separated(filename, delimiter, encoding="utf-8", fallback_encoding="latin-1"):
r"""Open COUNTER CSV/TSV report and parse into a CounterReport.
Invoked automatically by :py:func:`parse`.
:param filename: path to delimited COUNTER report file.
:param delimiter: character (such as ',' or '\\t') used as the
delimiter for this file
:param encoding: file's encoding. Default: utf-8
:param fallback_encoding: alternative encoding to try to decode if
default fails. Throws a warning if used.
:return: CounterReport object
"""
with csvhelper.UnicodeReader(
filename,
delimiter=delimiter,
fallback_encoding=fallback_encoding,
encoding=encoding,
) as report_reader:
return parse_generic(report_reader)
[docs]def parse_generic(report_reader):
"""Parse COUNTER report rows into a CounterReport.
:param report_reader: a iterable object that yields lists COUNTER
data formatted as tabular lists
:return: CounterReport object
"""
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
report = CounterReport()
first_line = six.next(report_reader)
if first_line[0] == "Report_Name": # COUNTER 5 report
second_line = six.next(report_reader)
third_line = six.next(report_reader)
report.report_type, report.report_version = _get_c5_type_and_version(
second_line, third_line
)
else:
report.report_type, report.report_version = _get_type_and_version(first_line[0])
if report.report_version != 5:
# noinspection PyTypeChecker
report.metric = METRICS.get(report.report_type)
report.customer = six.next(report_reader)[1 if report.report_version == 5 else 0]
if report.report_version >= 4:
inst_id_line = six.next(report_reader)
if inst_id_line:
report.institutional_identifier = inst_id_line[
1 if report.report_version == 5 else 0
]
if report.report_type == "BR2":
report.section_type = inst_id_line[1]
six.next(report_reader)
if report.report_version == 5:
for _ in range(3):
six.next(report_reader)
covered_line = six.next(report_reader)
report.period = convert_covered(
covered_line[1 if report.report_version == 5 else 0]
)
if report.report_version < 5:
six.next(report_reader)
date_run_line = six.next(report_reader)
report.date_run = convert_date_run(
date_run_line[1 if report.report_version == 5 else 0]
)
if report.report_version == 5:
for _ in range(2):
# Skip Created_By and blank line
six.next(report_reader)
header = six.next(report_reader)
if report.report_version < 5:
try:
report.year = _year_from_header(header, report)
except AttributeError:
warnings.warn("Could not determine year from malformed header")
if report.report_version >= 4:
countable_header = header[0:8]
for col in header[8:]:
if col:
countable_header.append(col)
last_col = len(countable_header)
else:
last_col = 0
for val in header:
if "YTD" in val:
break
last_col += 1
start_date = datetime.date(report.year, 1, 1)
end_date = last_day(convert_date_column(header[last_col - 1]))
report.period = (start_date, end_date)
if report.report_type not in ("DB1", "PR1") and report.report_version != 5:
# these reports do not have line with totals
six.next(report_reader)
if report.report_type in ("DB2", "BR3", "JR3"):
# this report has two lines of totals
six.next(report_reader)
for line in report_reader:
if not line:
continue
report.pubs.append(_parse_line(line, report, last_col))
return report
def _parse_line(line, report, last_col):
"""Parse a single line from a report.
:param line: sequence of cells in a report line
:param report: a CounterReport the line came from
:param last_col: last column number containing data
:return: an appropriate CounterResource subclass instance
"""
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
issn = None
eissn = None
isbn = None
html_total = 0
pdf_total = 0
doi = ""
prop_id = ""
old_line = line
metric = report.metric
if report.report_version >= 4:
if (
report.report_type.startswith("JR1")
or report.report_type == "TR_J1"
or report.report_type == "TR_J2"
):
line = line[0:3] + line[5:7] + line[10:last_col]
doi = old_line[3]
prop_id = old_line[4]
html_total = format_stat(old_line[8])
pdf_total = format_stat(old_line[9])
issn = line[3].strip()
eissn = line[4].strip()
if report.report_type.startswith("TR"):
metric = old_line[9]
elif report.report_type in ("BR1", "BR2"):
line = line[0:3] + line[5:7] + line[8:last_col]
isbn = line[3].strip()
issn = line[4].strip()
elif report.report_type in ("BR3", "JR2"):
metric = line[7]
doi = line[3]
prop_id = line[4]
line = line[0:3] + line[5:7] + line[9:last_col]
eissn = line[4].strip()
if report.report_type == "BR3":
isbn = line[3].strip()
else:
issn = line[3].strip()
# For DB1 and DB2, nothing additional to do here
else:
if report.report_type.startswith("JR1"):
html_total = format_stat(line[-2])
pdf_total = format_stat(line[-1])
issn = line[3].strip()
eissn = line[4].strip()
line = line[0:last_col]
logging.debug(line)
common_args = {
"title": line[0],
"publisher": line[1],
"platform": line[2],
"period": report.period,
}
month_data = []
curr_month = datetime.date(report.period[0].year, report.period[0].month, 1)
months_start_idx = 5 if report.report_type != "PR1" else 4
for data in line[months_start_idx:]:
month_data.append((curr_month, format_stat(data)))
curr_month = next_month(curr_month)
if (
report.report_type.startswith("JR")
or report.report_type == "TR_J1"
or report.report_type == "TR_J2"
):
return CounterJournal(
metric=metric,
month_data=month_data,
doi=doi,
issn=issn,
eissn=eissn,
proprietary_id=prop_id,
html_total=html_total,
pdf_total=pdf_total,
**common_args
)
elif report.report_type.startswith("BR"):
return CounterBook(
metric=metric,
month_data=month_data,
doi=doi,
issn=issn,
isbn=isbn,
proprietary_id=prop_id,
**common_args
)
elif report.report_type.startswith("DB"):
return CounterDatabase(metric=line[3], month_data=month_data, **common_args)
elif report.report_type == "PR1":
# there is no title in the PR1 report
return CounterPlatform(
metric=line[2],
month_data=month_data,
platform=line[0],
publisher=line[1],
period=report.period,
)
raise PycounterException("Should be unreachable") # pragma: no cover
def _get_type_and_version(specifier):
"""Given a COUNTER report specifier, find the type and version.
:param specifier: COUNTER report specifier
:return: type, version tuple
"""
report_types_clause = "|".join(CODES)
rt_match = re.match(
r".*(%s) Report (\d(?: GOA)?) ?\(R(\d)\)" % report_types_clause, specifier
)
if rt_match:
report_type = CODES[rt_match.group(1)] + rt_match.group(2)
report_version = int(rt_match.group(3))
else:
raise UnknownReportTypeError("No match in line: %s" % specifier)
if not any(report_type.startswith(x) for x in ("JR", "BR", "DB", "PR1")):
raise UnknownReportTypeError(report_type)
if report_version < 4:
warnings.warn(
DeprecationWarning(
"Parsing COUNTER versions before 4 ("
"current: {}) will not be supported in "
"the next release of pycounter.".format(report_version)
)
)
return report_type, report_version
def _get_c5_type_and_version(second_line, third_line):
"""Find COUNTER 5 specific type and version."""
return second_line[1], int(third_line[1])
def _year_from_header(header, report):
"""Get the year for the report from the header.
NOTE: for multi-year reports, this will be the date of the first month,
and probably doesn't make sense to talk of a report having a year...
"""
first_date_col = 10 if report.report_version == 4 else 5
if report.report_type in ("BR1", "BR2") and report.report_version == 4:
first_date_col = 8
elif report.report_type in ("DB1", "DB2") and report.report_version == 4:
first_date_col = 5
elif report.report_type == "PR1" and report.report_version == 4:
first_date_col = 4
elif report.report_type == "JR2":
first_date_col = 9
year = int(header[first_date_col].split("-")[1])
if year < 100:
year += 2000
return year