[docs]@add_sync_methodsclassFetcherClient:""" This client extends the pragma client with functionality for fetching from our third party sources. It can be used to synchronously or asynchronously fetch assets. The client works by setting up fetchers that are provided the assets to fetch and the publisher name. Example usage: .. code-block:: python pairs = [ Pair.from_tickers("BTC", "USD"), Pair.from_tickers("ETH", "USD"), ] bitstamp_fetcher = BitstampFetcher(pairs, "publisher_test") gateio_fetcher = GateIOFetcher(pairs, "publisher_test") fetchers = [ bitstamp_fetcher, gateio_fetcher, ] fc = FetcherClient() fc.add_fetchers(fetchers) await fc.fetch() fc.fetch_sync() You can also set a custom timeout duration as followed: .. code-block:: python await fc.fetch(timeout_duration=20) # Denominated in seconds (default=10) """__fetchers:List[FetcherInterfaceT]=[]@propertydeffetchers(self)->List[FetcherInterfaceT]:returnself.__fetchers@fetchers.setterdeffetchers(self,value:List[FetcherInterfaceT])->None:iflen(value)>0:self.__fetchers=valueelse:raiseValueError("Fetcher list cannot be empty")
[docs]defadd_fetchers(self,fetchers:List[FetcherInterfaceT])->None:""" Add fetchers to the supported fetchers list. """self.fetchers.extend(fetchers)
[docs]defadd_fetcher(self,fetcher:FetcherInterfaceT)->None:""" Add a single fetcher to the supported fetchers list. """self.fetchers.append(fetcher)
[docs]asyncdeffetch(self,filter_exceptions:bool=True,return_exceptions:bool=True,timeout_duration:int=20,)->List[Entry|PublisherFetchError|Exception]:""" Fetch data from all fetchers asynchronously. Fetching is done in parallel for all fetchers. :param filter_exceptions: If True, filters out exceptions from the result list :param return_exceptions: If True, returns exceptions in the result list :param timeout_duration: Timeout duration for each fetcher :return: List of fetched data """tasks=[]timeout=aiohttp.ClientTimeout(total=timeout_duration)# 20 seconds per requestasyncwithaiohttp.ClientSession(timeout=timeout)assession:forfetcherinself.fetchers:data=fetcher.fetch(session)tasks.append(data)result=awaitasyncio.gather(*tasks,return_exceptions=return_exceptions)result=[valforsublinresultforvalinsubl]# type: ignore[misc,union-attr]iffilter_exceptions:result=[sublforsublinresultifnotisinstance(subl,BaseException)]returnresult# type: ignore[union-attr, misc, return-value]