"""
An adapter to WeatherAPI (https://www.weatherapi.com/).
"""
import logging
import urllib.parse
from datetime import date, datetime, timedelta, timezone
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union, cast
import dateutil.parser
import dateutil.tz
import requests_cache
from shillelagh.adapters.base import Adapter
from shillelagh.exceptions import ImpossibleFilterError
from shillelagh.fields import DateTime, Float, IntBoolean, Integer, Order, String
from shillelagh.filters import Filter, Impossible, Operator, Range
from shillelagh.typing import RequestedOrder, Row
_logger = logging.getLogger(__name__)
INITIAL_COST = 0
FETCHING_COST = 1000
[docs]
def combine_time_filters(bounds: Dict[str, Filter]) -> Range:
"""
Combine both time filters together.
The adapter has two time columns that can be used to filter the data, "time" as
a timestamp and "time_epoch" as a float. We convert the latter to a timestamp and
combine the two filters into a single ``Range``.
"""
time_range = bounds.get("time", Range())
time_epoch_range = bounds.get("time_epoch", Range())
if isinstance(time_range, Impossible) or isinstance(time_epoch_range, Impossible):
raise ImpossibleFilterError()
if not isinstance(time_range, Range) or not isinstance(time_epoch_range, Range):
raise Exception("Invalid filter") # pylint: disable=broad-exception-raised
# convert time_epoch range to datetime so we can combine it
# with the time range
time_epoch_range.start = (
datetime.fromtimestamp(time_epoch_range.start, tz=timezone.utc)
if time_epoch_range.start is not None
else None
)
time_epoch_range.end = (
datetime.fromtimestamp(time_epoch_range.end, tz=timezone.utc)
if time_epoch_range.end is not None
else None
)
# combine time ranges together and check if the result is a valid range
time_range += time_epoch_range
if isinstance(time_range, Impossible):
raise ImpossibleFilterError()
return cast(Range, time_range)
[docs]
class WeatherAPI(Adapter):
"""
An adapter for WeatherAPI (https://www.weatherapi.com/).
The adapter expects an URL like::
https://api.weatherapi.com/v1/history.json?key=$key&q=$location
Where ``$key`` is an API key (available for free), and ``$location`` is a
freeform value that can be a US Zipcode, UK Postcode, Canada Postalcode,
IP address, Latitude/Longitude (decimal degree) or city name.
"""
safe = True
# Since the adapter doesn't return exact data (see the time columns below)
# implementing limit/offset is not worth the trouble.
supports_limit = False
supports_offset = False
# These two columns can be used to filter the results from the API. We
# define them as inexact since we will retrieve data for the whole day,
# even if specific hours are requested. The post-filtering will be done
# by the backend.
time = DateTime(filters=[Range], order=Order.ASCENDING, exact=False)
time_epoch = Float(filters=[Range], order=Order.ASCENDING, exact=False)
temp_c = Float()
temp_f = Float()
is_day = IntBoolean()
wind_mph = Float()
wind_kph = Float()
wind_degree = Integer()
wind_dir = String()
pressure_mb = Float()
pressure_in = Float()
precip_mm = Float()
precip_in = Float()
humidity = Integer()
cloud = Integer()
feelslike_c = Float()
feelslike_f = Float()
windchill_c = Float()
windchill_f = Float()
heatindex_c = Float()
heatindex_f = Float()
dewpoint_c = Float()
dewpoint_f = Float()
will_it_rain = IntBoolean()
chance_of_rain = String()
will_it_snow = IntBoolean()
chance_of_snow = String()
vis_km = Float()
vis_miles = Float()
gust_mph = Float()
gust_kph = Float()
[docs]
@staticmethod
def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]:
"""https://api.weatherapi.com/v1/history.json?key=XXX&q=94158"""
parsed = urllib.parse.urlparse(uri)
query_string = urllib.parse.parse_qs(parsed.query)
return (
parsed.netloc == "api.weatherapi.com"
and parsed.path == "/v1/history.json"
and "q" in query_string
and ("key" in query_string or "api_key" in kwargs)
)
[docs]
@staticmethod
def parse_uri(uri: str) -> Union[Tuple[str], Tuple[str, str]]:
parsed = urllib.parse.urlparse(uri)
query_string = urllib.parse.parse_qs(parsed.query)
location = query_string["q"][0]
# key can be passed in the URL or via connection arguments
if "key" in query_string:
return (location, query_string["key"][0])
return (location,)
def __init__(self, location: str, api_key: str, window: int = 7):
super().__init__()
self.location = location
self.api_key = api_key
self.window = window
# use a cache, since the adapter does a lot of similar API requests,
# and the data should rarely (never?) change
self._session = requests_cache.CachedSession(
cache_name="weatherapi_cache",
backend="sqlite",
expire_after=180,
)
[docs]
def get_cost(
self,
filtered_columns: List[Tuple[str, Operator]],
order: List[Tuple[str, RequestedOrder]],
) -> float:
cost = INITIAL_COST
# if the operator is ``Operator.EQ`` we only need to fetch 1 day of data;
# otherwise we potentially need to fetch "window" days of data
for _, operator in filtered_columns:
weight = 1 if operator == Operator.EQ else self.window
cost += FETCHING_COST * weight
return cost
[docs]
def get_data( # pylint: disable=too-many-locals
self,
bounds: Dict[str, Filter],
order: List[Tuple[str, RequestedOrder]],
**kwargs: Any,
) -> Iterator[Row]:
# combine filters from the two time columns
try:
time_range = combine_time_filters(bounds)
except ImpossibleFilterError:
return
today = date.today()
first = today - timedelta(days=self.window - 1)
start = time_range.start.date() if time_range.start else first
end = time_range.end.date() if time_range.end else today
_logger.debug("Range is %s to %s", start, end)
# download data from every today from [start, end]
while start <= end:
url = "https://api.weatherapi.com/v1/history.json"
params = {"key": self.api_key, "q": self.location, "dt": start}
query_string = urllib.parse.urlencode(params)
_logger.info("GET %s?%s", url, query_string)
response = self._session.get(url, params=params)
if response.ok:
payload = response.json()
local_timezone = dateutil.tz.gettz(payload["location"]["tz_id"])
for record in payload["forecast"]["forecastday"][0]["hour"]:
row = {column: record[column] for column in self.get_columns()}
row["time"] = dateutil.parser.parse(record["time"]).replace(
tzinfo=local_timezone,
)
row["rowid"] = int(row["time_epoch"])
_logger.debug(row)
yield row
start += timedelta(days=1)