"""
An adapter to WeatherAPI (https://www.weatherapi.com/).
"""
import logging
import urllib.parse
from collections.abc import Iterator
from datetime import date, datetime, timedelta, timezone
from typing import Any, Optional, Union, cast
import dateutil.parser
import dateutil.tz
from shillelagh.adapters.base import Adapter
from shillelagh.exceptions import ImpossibleFilterError
from shillelagh.fields import DateTime, Float, IntBoolean, Integer, Order, String
from shillelagh.filters import Filter, Impossible, Operator, Range
from shillelagh.lib import get_session
from shillelagh.typing import RequestedOrder, Row
_logger = logging.getLogger(__name__)
INITIAL_COST = 0
FETCHING_COST = 1000
[docs]
def combine_time_filters(bounds: dict[str, Filter]) -> Range:
"""
Combine both time filters together.
The adapter has two time columns that can be used to filter the data, "time" as
a timestamp and "time_epoch" as a float. We convert the latter to a timestamp and
combine the two filters into a single ``Range``.
"""
time_range = bounds.get("time", Range())
time_epoch_range = bounds.get("time_epoch", Range())
if isinstance(time_range, Impossible) or isinstance(time_epoch_range, Impossible):
raise ImpossibleFilterError()
if not isinstance(time_range, Range) or not isinstance(time_epoch_range, Range):
raise Exception("Invalid filter") # pylint: disable=broad-exception-raised
# convert time_epoch range to datetime so we can combine it
# with the time range
time_epoch_range.start = (
datetime.fromtimestamp(time_epoch_range.start, tz=timezone.utc)
if time_epoch_range.start is not None
else None
)
time_epoch_range.end = (
datetime.fromtimestamp(time_epoch_range.end, tz=timezone.utc)
if time_epoch_range.end is not None
else None
)
# combine time ranges together and check if the result is a valid range
time_range += time_epoch_range
if isinstance(time_range, Impossible):
raise ImpossibleFilterError()
return cast(Range, time_range)
[docs]
class WeatherAPI(Adapter):
"""
An adapter for WeatherAPI (https://www.weatherapi.com/).
The adapter expects an URL like::
https://api.weatherapi.com/v1/history.json?key=$key&q=$location
Where ``$key`` is an API key (available for free), and ``$location`` is a
freeform value that can be a US Zipcode, UK Postcode, Canada Postalcode,
IP address, Latitude/Longitude (decimal degree) or city name.
"""
safe = True
# Since the adapter doesn't return exact data (see the time columns below)
# implementing limit/offset is not worth the trouble.
supports_limit = False
supports_offset = False
# These two columns can be used to filter the results from the API. We
# define them as inexact since we will retrieve data for the whole day,
# even if specific hours are requested. The post-filtering will be done
# by the backend.
time = DateTime(filters=[Range], order=Order.ASCENDING, exact=False)
time_epoch = Float(filters=[Range], order=Order.ASCENDING, exact=False)
temp_c = Float()
temp_f = Float()
is_day = IntBoolean()
wind_mph = Float()
wind_kph = Float()
wind_degree = Integer()
wind_dir = String()
pressure_mb = Float()
pressure_in = Float()
precip_mm = Float()
precip_in = Float()
humidity = Integer()
cloud = Integer()
feelslike_c = Float()
feelslike_f = Float()
windchill_c = Float()
windchill_f = Float()
heatindex_c = Float()
heatindex_f = Float()
dewpoint_c = Float()
dewpoint_f = Float()
will_it_rain = IntBoolean()
chance_of_rain = String()
will_it_snow = IntBoolean()
chance_of_snow = String()
vis_km = Float()
vis_miles = Float()
gust_mph = Float()
gust_kph = Float()
[docs]
@staticmethod
def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]:
"""https://api.weatherapi.com/v1/history.json?key=XXX&q=94158"""
parsed = urllib.parse.urlparse(uri)
query_string = urllib.parse.parse_qs(parsed.query)
return (
parsed.netloc == "api.weatherapi.com"
and parsed.path == "/v1/history.json"
and "q" in query_string
and ("key" in query_string or "api_key" in kwargs)
)
[docs]
@staticmethod
def parse_uri(uri: str) -> Union[tuple[str], tuple[str, str]]:
parsed = urllib.parse.urlparse(uri)
query_string = urllib.parse.parse_qs(parsed.query)
location = query_string["q"][0]
# key can be passed in the URL or via connection arguments
if "key" in query_string:
return (location, query_string["key"][0])
return (location,)
def __init__(self, location: str, api_key: str, window: int = 7):
super().__init__()
self.location = location
self.api_key = api_key
self.window = window
# use a cache, since the adapter does a lot of similar API requests,
# and the data should rarely (never?) change
self._session = get_session(
request_headers={},
cache_name="weatherapi_cache",
expire_after=timedelta(minutes=3),
)
[docs]
def get_cost(
self,
filtered_columns: list[tuple[str, Operator]],
order: list[tuple[str, RequestedOrder]],
) -> float:
cost = INITIAL_COST
# if the operator is ``Operator.EQ`` we only need to fetch 1 day of data;
# otherwise we potentially need to fetch "window" days of data
for _, operator in filtered_columns:
weight = 1 if operator == Operator.EQ else self.window
cost += FETCHING_COST * weight
return cost
[docs]
def get_data( # pylint: disable=too-many-locals
self,
bounds: dict[str, Filter],
order: list[tuple[str, RequestedOrder]],
**kwargs: Any,
) -> Iterator[Row]:
# combine filters from the two time columns
try:
time_range = combine_time_filters(bounds)
except ImpossibleFilterError:
return
today = date.today()
first = today - timedelta(days=self.window - 1)
start = time_range.start.date() if time_range.start else first
end = time_range.end.date() if time_range.end else today
_logger.debug("Range is %s to %s", start, end)
# download data from every today from [start, end]
while start <= end:
url = "https://api.weatherapi.com/v1/history.json"
params = {"key": self.api_key, "q": self.location, "dt": start}
query_string = urllib.parse.urlencode(params)
_logger.info("GET %s?%s", url, query_string)
response = self._session.get(url, params=params)
if response.ok:
payload = response.json()
local_timezone = dateutil.tz.gettz(payload["location"]["tz_id"])
for record in payload["forecast"]["forecastday"][0]["hour"]:
row = {column: record[column] for column in self.get_columns()}
row["time"] = dateutil.parser.parse(record["time"]).replace(
tzinfo=local_timezone,
)
row["rowid"] = int(row["time_epoch"])
_logger.debug(row)
yield row
start += timedelta(days=1)