Source code for pragma_sdk.common.fetchers.interface
importabcfromtypingimportList,Optional,Dict,AnyfromaiohttpimportClientSessionfrompragma_sdk.onchain.clientimportPragmaOnChainClientfrompragma_sdk.common.types.entryimportEntryfrompragma_sdk.common.types.pairimportPairfrompragma_sdk.onchain.typesimportNetworkfrompragma_sdk.common.utilsimportadd_sync_methods,str_to_feltfrompragma_sdk.common.fetchers.handlers.hop_handlerimportHopHandlerfrompragma_sdk.common.exceptionsimportPublisherFetchError# TODO(akhercha): FetcherInterfaceT should take as parameter the client instead of creating it# Abstract base class for all fetchers
[docs]@abc.abstractmethodasyncdeffetch(self,session:ClientSession)->List[Entry|PublisherFetchError|BaseException]:""" Fetches the data from the fetcher and returns a list of Entry objects. """...
[docs]@abc.abstractmethodasyncdeffetch_pair(self,pair:Pair,session:ClientSession)->Entry|PublisherFetchError:""" Fetches the data for a specific pair from the fetcher and returns a SpotEntry object. """...
[docs]@abc.abstractmethoddefformat_url(self,pair:Pair)->str:"""Formats the URL for the fetcher, used in `fetch_pair` to get the data."""...
[docs]asyncdefget_stable_price(self,stable_asset:str)->float:""" Query the PragmaOnChainClient for the price of the stable asset in USD e.g get_stable_price("USDT") returns the price of USDT in USD """usdt_str=str_to_felt(stable_asset+"/USD")usdt_entry=awaitself.client.get_spot(usdt_str)returnint(usdt_entry.price)/int(10**int(usdt_entry.decimals))