Source code for civis.io._databases

import logging

from civis import APIClient
from civis._utils import maybe_get_random_name
from civis.futures import CivisFuture

log = logging.getLogger(__name__)


[docs] def query_civis( sql, database, client=None, credential_id=None, preview_rows=10, polling_interval=None, hidden=True, ): """Execute a SQL statement as a Civis query. Run a query that may return no results or where only a small preview is required. To execute a query that returns a large number of rows, see :func:`~civis.io.read_civis_sql`. Parameters ---------- sql : str The SQL statement to execute. database : str or int The name or ID of the database. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. credential_id : str or int, optional The ID of the database credential. If ``None``, the default credential will be used. preview_rows : int, optional The maximum number of rows to return. No more than 100 rows can be returned at once. polling_interval : int or float, optional Number of seconds to wait between checks for query completion. hidden : bool, optional If ``True`` (the default), this job will not appear in the Civis UI. Returns ------- results : :class:`~civis.futures.CivisFuture` A `CivisFuture` object. Examples -------- >>> import civis >>> run = civis.io.query_civis(sql="DELETE schema.table", database='database') >>> run.result() # Wait for query to complete """ if client is None: client = APIClient() database_id = client.get_database_id(database) cred_id = credential_id or client.default_credential resp = client.queries.post( database_id, sql, preview_rows, credential=cred_id, hidden=hidden ) return CivisFuture( client.queries.get, (resp.id,), polling_interval, client=client, poll_on_creation=False, )
[docs] def transfer_table( source_db, dest_db, source_table, dest_table, job_name=None, client=None, source_credential_id=None, dest_credential_id=None, polling_interval=None, **advanced_options, ): """Transfer a table from one location to another. Parameters ---------- source_db : str or int The name of the database where the source table is located. Optionally, could be the database ID. dest_db : str or int The name of the database where the table will be transfered. Optionally, could be the database ID. source_table : str Full name of the table to transfer, e.g., ``'schema.table'``. dest_table : str Full name of the table in the destination database, e.g., ``'schema.table'``. job_name : str, optional A name to give the job. If omitted, a random job name will be used. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. source_credential_id : str or int, optional Optional credential ID for the source database. If ``None``, the default credential will be used. dest_credential_id : str or int, optional Optional credential ID for the destination database. If ``None``, the default credential will be used. polling_interval : int or float, optional Number of seconds to wait between checks for job completion. **advanced_options : kwargs Extra keyword arguments will be passed to the import sync job. See :func:`~civis.resources._resources.Imports.post_syncs`. Returns ------- results : :class:`~civis.futures.CivisFuture` A `CivisFuture` object. Examples -------- >>> import civis >>> civis.io.transfer_table(source_db='Cluster A', dest_db='Cluster B', ... source_table='schma.tbl', dest_table='schma.tbl') """ if client is None: client = APIClient() source_cred_id = source_credential_id or client.default_credential dest_cred_id = dest_credential_id or client.default_credential job_name = maybe_get_random_name(job_name) source = { "remote_host_id": client.get_database_id(source_db), "credential_id": source_cred_id, } destination = { "remote_host_id": client.get_database_id(dest_db), "credential_id": dest_cred_id, } job_id = client.imports.post( job_name, "Dbsync", True, source=source, destination=destination ).id client.imports.post_syncs( id=job_id, source={"path": source_table}, destination={"path": dest_table}, advanced_options=advanced_options, ) run_id = client.imports.post_runs(id=job_id).run_id log.debug("Started run %d of sync for import %d", run_id, job_id) fut = CivisFuture( client.imports.get_files_runs, (job_id, run_id), polling_interval=polling_interval, client=client, poll_on_creation=False, ) return fut