pragma_sdk.offchain#

PragmaAPIClient#

class pragma_sdk.offchain.client.EntryResult[source]#

Bases: object

__init__(pair_id: str, data: Any, num_sources_aggregated: int = 0, timestamp: int | None = None, decimals: int | None = None, expiry: str | None = None)[source]#
assert_attributes_equal(expected_dict)[source]#

Asserts that the attributes of the class object are equal to the values in the dictionary.

class pragma_sdk.offchain.client.PragmaAPIClient[source]#

Bases: PragmaClient

Client for interacting with the Pragma API. see https://docs.pragma.build/Resources/PragmApi/overview

An API Key is required to interact with the API.

__init__(account_private_key: int | str | None, account_contract_address: int | str | None, api_base_url: str, api_key: str)[source]#
account_private_key: int | None = None#
api_base_url: str#
api_key: str#
async get_entry(pair: str, timestamp: int | None = None, interval: Interval | None = None, aggregation: AggregationMode | None = None, routing: bool | None = None) EntryResult[source]#

Get data aggregated on the Pragma API.

Parameters:
  • pair – Pair to get data for

  • timestamp – Timestamp to get data for, defaults to now

  • interval – Interval on which data is aggregated, defaults to 2h

  • routing – If we want to route data for unexisting pair, defaults to False

  • aggregation – Aggregation method, defaults to TWAP

Returns:

[EntryResult] result data

get_entry_sync(pair: str, timestamp: int | None = None, interval: Interval | None = None, aggregation: AggregationMode | None = None, routing: bool | None = None) EntryResult#

Synchronous version of the method.

async get_expiries_list(pair: Pair)[source]#

Get volatility data for a pair in a given time range on the Pragma API.

Parameters:
  • pair – Pair to get data for

  • start – Start timestamp

  • end – End timestamp

get_expiries_list_sync(pair: Pair)#

Synchronous version of the method.

async get_future_entry(pair: str, timestamp: int | None = None, interval: Interval | None = None, aggregation: AggregationMode | None = None, routing: bool | None = None, expiry: str | None = None) EntryResult[source]#

Get data aggregated on the Pragma API.

Parameters:
  • pair – Pair to get data for

  • timestamp – Timestamp to get data for, defaults to now

  • interval – Interval on which data is aggregated, defaults to 2h

  • routing – If we want to route data for unexisting pair, defaults to False

  • aggregation – Aggregation method, defaults to TWAP

Returns:

[EntryResult] result data

get_future_entry_sync(pair: str, timestamp: int | None = None, interval: Interval | None = None, aggregation: AggregationMode | None = None, routing: bool | None = None, expiry: str | None = None) EntryResult#

Synchronous version of the method.

async get_ohlc(pair: str, timestamp: int | None = None, interval: Interval | None = None, aggregation: AggregationMode | None = None) EntryResult[source]#

Retrieve OHLC data from the Pragma API.

Parameters:
  • pair – Pair to get data for

  • timestamp – Timestamp to get data for, defaults to now

  • interval – Interval on which data is aggregated, defaults to 1m

  • aggregation – Aggregation method, defaults to Median

Returns:

[EntryResult] result data

get_ohlc_sync(pair: str, timestamp: int | None = None, interval: Interval | None = None, aggregation: AggregationMode | None = None) EntryResult#

Synchronous version of the method.

async get_volatility(pair: str, start: int, end: int)[source]#

Get volatility data for a pair in a given time range on the Pragma API.

Parameters:
  • pair – Pair to get data for

  • start – Start timestamp

  • end – End timestamp

get_volatility_sync(pair: str, start: int, end: int)#

Synchronous version of the method.

offchain_signer: OffchainSigner | None = None#
async publish_entries(entries: List[Entry]) Tuple[Dict | None, Dict | None] | List[InvokeResult][source]#

Publishes spot and future entries to the Pragma API. This function accepts both type of entries - but they need to be sent through different endpoints & signed differently, so we split them in two separate lists.

Parameters:

entries – List of SpotEntry objects

publish_entries_sync(entries: List[Entry]) Tuple[Dict | None, Dict | None] | List[InvokeResult]#

Synchronous version of the method.

pragma_sdk.offchain.client.get_endpoint_publish_offchain(data_type: DataTypes)[source]#

Returns the correct publish endpoint for the given data type.

OffchainSigner#

class pragma_sdk.offchain.signer.OffchainSigner[source]#

Bases: object

Class used to sign messages for the Pragma API

__init__(signer: StarkCurveSigner)[source]#
sign_publish_message(entries: List[Entry], data_type: DataTypes | None = 'Spot') Tuple[List[int], int][source]#

Sign a publish message

Parameters:

entries – List of SpotEntry objects

Returns:

Tuple containing the signature and the message hash

pragma_sdk.offchain.signer.build_publish_message(entries: List[Entry], data_type: DataTypes | None = 'Spot') TypedData[source]#

Builds a typed data message to publish spot entries on the Pragma API. see https://community.starknet.io/t/snip-off-chain-signatures-a-la-eip712 for reference

Parameters:

entries – List of SpotEntry objects

Interval#

class pragma_sdk.offchain.types.Interval[source]#

Bases: StrEnum

FIFTEEN_MINUTES = '15min'#
ONE_HOUR = '1h'#
ONE_MINUTE = '1min'#
TWO_HOURS = '2h'#
serialize()[source]#