onchain mixins#

NonceMixin#

class pragma_sdk.onchain.mixins.nonce.NonceMixin[source]#

Bases: object

account_contract_address: int | None#
cleanup_nonce_dict()[source]#

Cleanup the nonce_dict by removing all txs that have been either rejected or accepted.

client: Client#
async get_nonce(include_pending=True, block_number: int | str | Literal['pending', 'latest'] | None = None) int[source]#

Get the nonce of the account contract address. Just a wrapper around the client’s get_contract_nonce method.

Parameters:
  • include_pending – Whether to include pending transactions in the nonce calculation.

  • block_number – Custom block number to get the nonce from.

async get_status(transaction_hash: int, check_interval: int = 2, retries: int = 500) TransactionStatus[source]#

Tries to get the status of a transaction by its hash.

Parameters:
  • transaction_hash – The hash of the transaction.

  • check_interval – The interval in seconds between each check.

  • retries – The number of retries before giving up.

Returns:

The status of the transaction.

nonce_dict: Dict[int, int] = {}#
nonce_status: Dict[int, TransactionStatus] = {}#
pending_nonce: int | None = None#
async track_nonce(nonce: int, transaction_hash: int) None[source]#

Callback function to track the nonce of a transaction. Will update the nonce_dict and pending_nonce attributes. We track the nonce used for a given transaction hash.

Parameters:
  • nonce – The nonce of the transaction.

  • transaction_hash – The hash of the transaction.

async update_nonce_dict()[source]#

Update the statuses of the nonces in the nonce_dict.

OracleMixin#

class pragma_sdk.onchain.mixins.oracle.OracleMixin[source]#

Bases: object

account: Account#
client: Client#
execution_config: ExecutionConfig#
async get_admin_address() int[source]#

Return the admin address of the Oracle contract.

async get_all_sources(asset: Asset, block_id: int | Literal['pending', 'latest'] | None = 'latest') List[str][source]#

Query on-chain all sources used for a given asset.

Parameters:
  • asset – Asset

  • block_id – Block number or Block Tag

Returns:

List of sources

async get_decimals(asset: Asset, block_id: int | Literal['pending', 'latest'] | None = 'latest') int[source]#

Query on-chain the decimals for a given asset

Parameters:
  • asset – Asset

  • block_id – Block number or Block Tag

Returns:

Decimals

async get_entry(pair_id: str | int, data_type: DataTypes, publisher: str | int, source: str | int, expiration_timestamp: int | None = None, block_id: int | Literal['pending', 'latest'] | None = 'latest') Entry[source]#

Query the Oracle contract for the entry of a publisher for a source.

Parameters:
  • pair_id – Pair ID

  • data_type – DataTypes

  • publisher – Publisher to check entry for

  • source – Source to check

  • expiration_timestamp – Optional, expiration timestamp for futures. Defaults to 0.

  • block_id – Block number or Block Tag

Returns:

Entry

async get_future(pair_id: str | int, expiry_timestamp: int, aggregation_mode: AggregationMode = AggregationMode.MEDIAN, sources: List[int | str] | None = None, block_id: int | Literal['pending', 'latest'] | None = 'latest') OracleResponse[source]#

Query the Oracle contract for the data of a future asset.

Parameters:
  • pair_id – Pair ID

  • expiry_timestamp – Expiry timestamp of the future contract

  • aggregation_mode – AggregationMode

  • sources – List of sources, if None will use all sources

  • block_id – Block number or Block Tag

Returns:

OracleResponse

async get_future_entries(pair_id: str | int, expiration_timestamp: int, sources: List[int | str] | None = None, block_id: int | Literal['pending', 'latest'] | None = 'latest') List[FutureEntry][source]#
async get_generic(key: str | int, sources: List[int | str] | None = None, block_id: int | Literal['pending', 'latest'] | None = 'latest') GenericEntry[source]#

Query the Oracle contract to retrieve the

Parameters:
  • key – Key ID of the generic entry

  • block_id – Block number or Block Tag

Returns:

GenericEntry

async get_last_checkpoint_before(pair_id: str | int, data_type: DataTypes, timestamp: int, aggregation_mode: AggregationMode = AggregationMode.MEDIAN, expiration_timestamp: int | None = None) Checkpoint[source]#
async get_latest_checkpoint(pair_id: str | int, data_type: DataTypes, aggregation_mode: AggregationMode = AggregationMode.MEDIAN, expiration_timestamp: int | None = None) Checkpoint[source]#
async get_spot(pair_id: str | int, aggregation_mode: AggregationMode = AggregationMode.MEDIAN, sources: List[int | str] | None = None, block_id: int | Literal['pending', 'latest'] | None = 'latest') OracleResponse[source]#

Query the Oracle contract for the data of a spot asset.

Parameters:
  • pair_id – Pair ID

  • aggregation_mode – AggregationMode

  • sources – List of sources, if None will use all sources

  • block_id – Block number or Block Tag

Returns:

OracleResponse

async get_spot_entries(pair_id, sources=None, block_id: int | Literal['pending', 'latest'] | None = 'latest') List[SpotEntry][source]#
async get_time_since_last_published_spot(pair: Pair, publisher: str, block_id: int | Literal['pending', 'latest'] | None = 'latest') int[source]#

Get the time since the last published spot entry by a publisher for a given pair. Will return a large number if no entry is found.

Parameters:
  • pair – Pair

  • publisher – Publisher name e.g “PRAGMA”

  • block_id – Block number or Block Tag

Returns:

Time since last published entry

is_user_client: bool = False#
oracle: Contract#
async publish_many(entries: List[Entry]) List[InvokeResult][source]#
async publish_spot_entry(pair_id: int, value: int, timestamp: int, source: int, publisher: int, volume: int = 0) InvokeResult[source]#
publisher_registry: Contract#
async set_checkpoints(pair_ids: Sequence[str | int], aggregation_mode: AggregationMode = AggregationMode.MEDIAN) InvokeResult[source]#

Set checkpoints for a list of pair IDs.

Parameters:
  • pair_ids – List of pair IDs

  • aggregation_mode – AggregationMode

Returns:

InvokeResult

async set_future_checkpoints(pair_ids: Sequence[int], expiry_timestamps: Sequence[int], aggregation_mode: AggregationMode = AggregationMode.MEDIAN) InvokeResult[source]#
track_nonce: Callable[[object, int, int], Coroutine[None, None, None]]#
async update_oracle(implementation_hash: int) InvokeResult[source]#

Update the Oracle contract to a new implementation.

Parameters:

implementation_hash – New implementation hash

Returns:

InvokeResult

PublisherRegistryMixin#

class pragma_sdk.onchain.mixins.publisher_registry.PublisherRegistryMixin[source]#

Bases: object

account: Account#
async add_publisher(publisher: str, publisher_address: int) InvokeResult[source]#

Add a publisher to the publisher registry. Can only be called by the owner of the publisher registry.

Parameters:
  • publisher – The publisher to add.

  • publisher_address – The address of the publisher.

  • execution_config – The execution config to use.

Returns:

The invocation result.

async add_source_for_publisher(publisher: str, source: str) InvokeResult[source]#

Add a source for a publisher. Can only be called by the owner of the publisher registry.

Parameters:
  • publisher – The publisher to add the source to.

  • source – The source to add.

  • execution_config – The execution config to use.

Returns:

The invocation result.

async add_sources_for_publisher(publisher: str, sources: List[str]) InvokeResult[source]#

Add multiple sources for a publisher. Can only be called by the owner of the publisher registry.

Parameters:
  • publisher – The publisher to add the sources to.

  • sources – The sources to add.

  • execution_config – The execution config to use.

Returns:

The invocation result.

client: Client#
execution_config: ExecutionConfig#
async get_all_publishers() List[int][source]#

Returns all publishers registered in the publisher registry.

Returns:

List of publishers. (as felt integers)

async get_publisher_address(publisher: str) int[source]#

Returns the address of a publisher.

Parameters:

publisher – The publisher to get the address of.

Returns:

The address of the publisher. (as a felt integer)

async get_publisher_sources(publisher: str) List[int][source]#

Returns the sources of a publisher.

Parameters:

publisher – The publisher to get the sources of.

Returns:

The sources of the publisher. (as felt integers)

publisher_registry: Contract#
async update_publisher_address(publisher: str, publisher_address: int) InvokeResult[source]#

Update the address of a publisher. Can only be called by the owner of the publisher registry.

Parameters:
  • publisher – The publisher to update the address of.

  • publisher_address – The new address of the publisher.

  • execution_config – The execution config to use.

Returns:

The invocation result.

RandomnessMixin#

class pragma_sdk.onchain.mixins.randomness.RandomnessMixin[source]#

Bases: object

account: Account | None = None#
async cancel_random_request(vrf_cancel_params: VRFCancelParams) InvokeResult[source]#

Cancel a random request given the request parameters. see more info https://docs.pragma.build/Resources/Cairo%201/randomness/randomness#function-cancel_random_request

Parameters:
  • vrf_cancel_params – VRFCancelParams object containing the cancel parameters.

  • execution_config – ExecutionConfig object containing the execution parameters.

Returns:

InvokeResult object containing the result of the invocation.

client: Client#
async compute_premium_fee(caller_address: int) int[source]#

Query the premium fee for a request given the caller address. see https://docs.pragma.build/Resources/Cairo%201/randomness/randomness#pricing

Parameters:

caller_address – The caller address.

Returns:

The premium fee.

async estimate_gas_cancel_random_op(vrf_cancel_params: VRFCancelParams) EstimatedFee[source]#

Estimate the gas for the cancel_random_request operation.

Parameters:
  • vrf_cancel_params – VRFCancelParams object containing the cancel parameters.

  • execution_config – ExecutionConfig object containing the execution parameters.

Returns:

The estimated gas for the operation.

async estimate_gas_request_random_op(vrf_request_params: VRFRequestParams) EstimatedFee[source]#

Estimate the gas for the request_random operation.

Parameters:
  • vrf_request_params – VRFRequestParams object containing the request parameters.

  • execution_config – ExecutionConfig object containing the execution parameters.

Returns:

The estimated gas for the operation.

async estimate_gas_submit_random_op(vrf_submit_params: VRFSubmitParams) EstimatedFee[source]#

Estimate the gas for the submit_random operation.

Parameters:
  • vrf_submit_params – VRFSubmitParams object containing the submit parameters.

  • execution_config – ExecutionConfig object containing the execution parameters.

Returns:

The estimated gas for the operation.

execution_config: ExecutionConfig#
full_node_client: FullNodeClient#
async get_pending_requests(requestor_address: int, offset: int = 0, max_len: int = 100) List[int][source]#

Query the pending requests of a requestor address.

Parameters:
  • requestor_address – The requestor address.

  • offset – Request id from which to start the query.

  • max_len – Maximum number of requests to query.

Returns:

The pending requests of the requestor address.

async get_request_status(caller_address: int, request_id: int) RequestStatus[source]#

Query the status of a request given the caller address and request ID.

Parameters:
  • caller_address – The caller address.

  • request_id – The request ID.

Returns:

The status of the request.

async get_total_fees(caller_address: int, request_id: int) int[source]#

Query the total fees of a request given the caller address and request ID. Total fees correspond to the sum of the callback fee and the premium fee.

Parameters:
  • caller_address – The caller address.

  • request_id – The request ID.

Returns:

The total fees of the request.

async handle_random(private_key: int, ignore_request_threshold: int = 3)[source]#

Handle randomness requests. Will submit randomness for requests that are not too old and have not been handled yet.

Parameters:
  • private_key – The private key of the account that will sign the randomness.

  • ignore_request_threshold – The number of blocks we ignore requests that are older than.

init_randomness_contract(contract_address: int)[source]#
is_user_client: bool = False#
randomness: Contract#
async refund_operation(request_id: int, requestor_address: int) InvokeResult[source]#

Refund the remaining gas to the requestor address. Only requests with status OUT_OF_GAS can be refunded.

Parameters:
  • request_id – The request ID.

  • requestor_address – The requestor address.

  • execution_config – ExecutionConfig object containing the execution parameters.

Returns:

InvokeResult object containing the result of the invocation.

async request_random(request_params: VRFRequestParams) InvokeResult[source]#

Request randomness from the VRF contract. Must set account. You may do this by invoking self._setup_account_client(private_key, account_contract_address)

Parameters:
  • request_params – VRFRequestParams object containing the request parameters.

  • execution_config – ExecutionConfig object containing the execution parameters.

Returns:

InvokeResult object containing the result of the invocation.

async requestor_current_request_id(caller_address: int) int[source]#

Query the request id of the latest request made by the caller address.

Parameters:

caller_address – The caller address.

Returns:

The request id of the latest request.

async submit_random(vrf_submit_params: VRFSubmitParams) InvokeResult[source]#

Submit randomness to the VRF contract. If fee estimation fails, the status of the request is updated to OUT_OF_GAS. Then, the remaining gas is refunded to the requestor address.

Fee estimation is used to set the callback fee parameter in the VRFSubmitParams object.

Parameters:
  • vrf_submit_params – VRFSubmitParams object containing the submit parameters.

  • execution_config – ExecutionConfig object containing the execution parameters.

Returns:

InvokeResult object containing the result of the invocation.