Source code for shillelagh.adapters.api.weatherapi

"""
An adapter to WeatherAPI (https://www.weatherapi.com/).
"""

import logging
import urllib.parse
from collections.abc import Iterator
from datetime import date, datetime, timedelta, timezone
from typing import Any, Optional, Union, cast

import dateutil.parser
import dateutil.tz

from shillelagh.adapters.base import Adapter
from shillelagh.exceptions import ImpossibleFilterError
from shillelagh.fields import DateTime, Float, IntBoolean, Integer, Order, String
from shillelagh.filters import Filter, Impossible, Operator, Range
from shillelagh.lib import get_session
from shillelagh.typing import RequestedOrder, Row

_logger = logging.getLogger(__name__)

INITIAL_COST = 0
FETCHING_COST = 1000


[docs] def combine_time_filters(bounds: dict[str, Filter]) -> Range: """ Combine both time filters together. The adapter has two time columns that can be used to filter the data, "time" as a timestamp and "time_epoch" as a float. We convert the latter to a timestamp and combine the two filters into a single ``Range``. """ time_range = bounds.get("time", Range()) time_epoch_range = bounds.get("time_epoch", Range()) if isinstance(time_range, Impossible) or isinstance(time_epoch_range, Impossible): raise ImpossibleFilterError() if not isinstance(time_range, Range) or not isinstance(time_epoch_range, Range): raise Exception("Invalid filter") # pylint: disable=broad-exception-raised # convert time_epoch range to datetime so we can combine it # with the time range time_epoch_range.start = ( datetime.fromtimestamp(time_epoch_range.start, tz=timezone.utc) if time_epoch_range.start is not None else None ) time_epoch_range.end = ( datetime.fromtimestamp(time_epoch_range.end, tz=timezone.utc) if time_epoch_range.end is not None else None ) # combine time ranges together and check if the result is a valid range time_range += time_epoch_range if isinstance(time_range, Impossible): raise ImpossibleFilterError() return cast(Range, time_range)
[docs] class WeatherAPI(Adapter): """ An adapter for WeatherAPI (https://www.weatherapi.com/). The adapter expects an URL like:: https://api.weatherapi.com/v1/history.json?key=$key&q=$location Where ``$key`` is an API key (available for free), and ``$location`` is a freeform value that can be a US Zipcode, UK Postcode, Canada Postalcode, IP address, Latitude/Longitude (decimal degree) or city name. """ safe = True # Since the adapter doesn't return exact data (see the time columns below) # implementing limit/offset is not worth the trouble. supports_limit = False supports_offset = False # These two columns can be used to filter the results from the API. We # define them as inexact since we will retrieve data for the whole day, # even if specific hours are requested. The post-filtering will be done # by the backend. time = DateTime(filters=[Range], order=Order.ASCENDING, exact=False) time_epoch = Float(filters=[Range], order=Order.ASCENDING, exact=False) temp_c = Float() temp_f = Float() is_day = IntBoolean() wind_mph = Float() wind_kph = Float() wind_degree = Integer() wind_dir = String() pressure_mb = Float() pressure_in = Float() precip_mm = Float() precip_in = Float() humidity = Integer() cloud = Integer() feelslike_c = Float() feelslike_f = Float() windchill_c = Float() windchill_f = Float() heatindex_c = Float() heatindex_f = Float() dewpoint_c = Float() dewpoint_f = Float() will_it_rain = IntBoolean() chance_of_rain = String() will_it_snow = IntBoolean() chance_of_snow = String() vis_km = Float() vis_miles = Float() gust_mph = Float() gust_kph = Float()
[docs] @staticmethod def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: """https://api.weatherapi.com/v1/history.json?key=XXX&q=94158""" parsed = urllib.parse.urlparse(uri) query_string = urllib.parse.parse_qs(parsed.query) return ( parsed.netloc == "api.weatherapi.com" and parsed.path == "/v1/history.json" and "q" in query_string and ("key" in query_string or "api_key" in kwargs) )
[docs] @staticmethod def parse_uri(uri: str) -> Union[tuple[str], tuple[str, str]]: parsed = urllib.parse.urlparse(uri) query_string = urllib.parse.parse_qs(parsed.query) location = query_string["q"][0] # key can be passed in the URL or via connection arguments if "key" in query_string: return (location, query_string["key"][0]) return (location,)
def __init__(self, location: str, api_key: str, window: int = 7): super().__init__() self.location = location self.api_key = api_key self.window = window # use a cache, since the adapter does a lot of similar API requests, # and the data should rarely (never?) change self._session = get_session( request_headers={}, cache_name="weatherapi_cache", expire_after=timedelta(minutes=3), )
[docs] def get_cost( self, filtered_columns: list[tuple[str, Operator]], order: list[tuple[str, RequestedOrder]], ) -> float: cost = INITIAL_COST # if the operator is ``Operator.EQ`` we only need to fetch 1 day of data; # otherwise we potentially need to fetch "window" days of data for _, operator in filtered_columns: weight = 1 if operator == Operator.EQ else self.window cost += FETCHING_COST * weight return cost
[docs] def get_data( # pylint: disable=too-many-locals self, bounds: dict[str, Filter], order: list[tuple[str, RequestedOrder]], **kwargs: Any, ) -> Iterator[Row]: # combine filters from the two time columns try: time_range = combine_time_filters(bounds) except ImpossibleFilterError: return today = date.today() first = today - timedelta(days=self.window - 1) start = time_range.start.date() if time_range.start else first end = time_range.end.date() if time_range.end else today _logger.debug("Range is %s to %s", start, end) # download data from every today from [start, end] while start <= end: url = "https://api.weatherapi.com/v1/history.json" params = {"key": self.api_key, "q": self.location, "dt": start} query_string = urllib.parse.urlencode(params) _logger.info("GET %s?%s", url, query_string) response = self._session.get(url, params=params) if response.ok: payload = response.json() local_timezone = dateutil.tz.gettz(payload["location"]["tz_id"]) for record in payload["forecast"]["forecastday"][0]["hour"]: row = {column: record[column] for column in self.get_columns()} row["time"] = dateutil.parser.parse(record["time"]).replace( tzinfo=local_timezone, ) row["rowid"] = int(row["time_epoch"]) _logger.debug(row) yield row start += timedelta(days=1)