Source code for civis.io._tables

import json
import csv
import io
import six
import warnings

from civis import APIClient
from civis.io import civis_to_file
from civis._utils import maybe_get_random_name
from civis.base import EmptyResultError
from civis.futures import CivisFuture
from civis.utils._deprecation import deprecate_param

import requests

try:
    from io import StringIO
except ImportError:
    from cStringIO import StringIO
try:
    import pandas as pd
    NO_PANDAS = False
except ImportError:
    NO_PANDAS = True

__all__ = ['read_civis', 'read_civis_sql', 'civis_to_csv',
           'civis_to_multifile_csv', 'dataframe_to_civis', 'csv_to_civis']

DELIMITERS = {
    ',': 'comma',
    '\t': 'tab',
    '|': 'pipe',
}


@deprecate_param('v2.0.0', 'api_key')
[docs]def read_civis(table, database, columns=None, use_pandas=False, job_name=None, api_key=None, client=None, credential_id=None, polling_interval=None, archive=False, hidden=True, **kwargs): """Read data from a Civis table. Parameters ---------- table : str Name of table, including schema, in the database. I.e. ``'my_schema.my_table'``. database : str or int Read data from this database. Can be the database name or ID. columns : list, optional A list of column names. Column SQL transformations are possible. If omitted, all columns are exported. use_pandas : bool, optional If ``True``, return a :class:`pandas:pandas.DataFrame`. Otherwise, return a list of results from :func:`python:csv.reader`. job_name : str, optional A name to give the job. If omitted, a random job name will be used. api_key : DEPRECATED str, optional Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY` environment variable will be used. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. credential_id : str or int, optional The database credential ID. If ``None``, the default credential will be used. polling_interval : int or float, optional Number of seconds to wait between checks for query completion. archive : bool, optional (deprecated) If ``True``, archive the import job as soon as it completes. hidden : bool, optional If ``True`` (the default), this job will not appear in the Civis UI. **kwargs : kwargs Extra keyword arguments are passed into :func:`pandas:pandas.read_csv` if `use_pandas` is ``True`` or passed into :func:`python:csv.reader` if `use_pandas` is ``False``. Returns ------- data : :class:`pandas:pandas.DataFrame` or list A list of rows (with header as first row) if `use_pandas` is ``False``, otherwise a `pandas` `DataFrame`. Note that if `use_pandas` is ``False``, no parsing of types is performed and each row will be a list of strings. Raises ------ ImportError If `use_pandas` is ``True`` and `pandas` is not installed. Examples -------- >>> table = "schema.table" >>> database = "my_data" >>> columns = ["column_a", "ROW_NUMBER() OVER(ORDER BY date) AS order"] >>> data = read_civis(table, database, columns=columns) >>> columns = data.pop(0) >>> col_a_index = columns.index("column_a") >>> col_a = [row[col_a_index] for row in data] >>> df = read_civis("schema.table", "my_data", use_pandas=True) >>> col_a = df["column_a"] See Also -------- civis.io.read_civis_sql : Read directly into memory using SQL. civis.io.civis_to_csv : Write directly to csv. """ if use_pandas and NO_PANDAS: raise ImportError("use_pandas is True but pandas is not installed.") if archive: warnings.warn("`archive` is deprecated and will be removed in v2.0.0. " "Use `hidden` instead.", FutureWarning) if client is None: # Instantiate client here in case users provide a (deprecated) api_key client = APIClient(api_key=api_key, resources='all') sql = _get_sql_select(table, columns) data = read_civis_sql(sql=sql, database=database, use_pandas=use_pandas, job_name=job_name, client=client, credential_id=credential_id, polling_interval=polling_interval, archive=archive, hidden=hidden, **kwargs) return data
@deprecate_param('v2.0.0', 'api_key')
[docs]def read_civis_sql(sql, database, use_pandas=False, job_name=None, api_key=None, client=None, credential_id=None, polling_interval=None, archive=False, hidden=True, **kwargs): """Read data from Civis using a custom SQL string. Parameters ---------- sql : str, optional The SQL select string to be executed. database : str or int Execute the query against this database. Can be the database name or ID. use_pandas : bool, optional If ``True``, return a :class:`pandas:pandas.DataFrame`. Otherwise, return a list of results from :func:`python:csv.reader`. job_name : str, optional A name to give the job. If omitted, a random job name will be used. api_key : DEPRECATED str, optional Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY` environment variable will be used. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. credential_id : str or int, optional The database credential ID. If ``None``, the default credential will be used. polling_interval : int or float, optional Number of seconds to wait between checks for query completion. archive : bool, optional (deprecated) If ``True``, archive the import job as soon as it completes. hidden : bool, optional If ``True`` (the default), this job will not appear in the Civis UI. **kwargs : kwargs Extra keyword arguments are passed into :func:`pandas:pandas.read_csv` if `use_pandas` is ``True`` or passed into :func:`python:csv.reader` if `use_pandas` is ``False``. Returns ------- data : :class:`pandas:pandas.DataFrame` or list A list of rows (with header as first row) if `use_pandas` is ``False``, otherwise a `pandas` `DataFrame`. Note that if `use_pandas` is ``False``, no parsing of types is performed and each row will be a list of strings. Raises ------ ImportError If `use_pandas` is ``True`` and `pandas` is not installed. Examples -------- >>> sql = "SELECT * FROM schema.table" >>> df = read_civis_sql(sql, "my_database", use_pandas=True) >>> col_a = df["column_a"] >>> data = read_civis_sql(sql, "my_database") >>> columns = data.pop(0) >>> col_a_index = columns.index("column_a") >>> col_a = [row[col_a_index] for row in data] Notes ----- This reads the data into memory. See Also -------- civis.io.read_civis : Read directly into memory without SQL. civis.io.civis_to_csv : Write directly to a CSV file. """ if client is None: client = APIClient(api_key=api_key, resources='all') if use_pandas and NO_PANDAS: raise ImportError("use_pandas is True but pandas is not installed.") if archive: warnings.warn("`archive` is deprecated and will be removed in v2.0.0. " "Use `hidden` instead.", FutureWarning) script_id, run_id = _sql_script(client, sql, database, job_name, credential_id, hidden=hidden) fut = CivisFuture(client.scripts.get_sql_runs, (script_id, run_id), polling_interval=polling_interval, client=client, poll_on_creation=False) if archive: def f(x): return client.scripts.put_sql_archive(script_id, True) fut.add_done_callback(f) fut.result() outputs = client.scripts.get_sql_runs(script_id, run_id)["output"] if not outputs: raise EmptyResultError("Query {} returned no output." .format(script_id)) url = outputs[0]["path"] if use_pandas: data = pd.read_csv(url, **kwargs) else: r = requests.get(url) r.raise_for_status() data = list(csv.reader(StringIO(r.text), **kwargs)) return data
@deprecate_param('v2.0.0', 'api_key')
[docs]def civis_to_csv(filename, sql, database, job_name=None, api_key=None, client=None, credential_id=None, include_header=True, compression='none', delimiter=',', unquoted=False, archive=False, hidden=True, polling_interval=None): """Export data from Civis to a local CSV file. Parameters ---------- filename : str Download exported data into this file. sql : str, optional The SQL select string to be executed. database : str or int Export data from this database. Can be the database name or ID. job_name : str, optional A name to give the job. If omitted, a random job name will be used. api_key : DEPRECATED str, optional Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY` environment variable will be used. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. credential_id : str or int, optional The ID of the database credential. If ``None``, the default credential will be used. include_header: bool, optional If ``True``, the first line of the CSV will be headers. Default: ``True``. compression: str, optional Type of compression to use, if any. One of ``'none'``, ``'zip'``, or ``'gzip'``. Default ``'none'``. delimiter, str: optional Which delimiter to use, if any. One of ``','``, ``'\t'``, or ``'|'``. Default: ``','``. unquoted: bool, optional Whether or not to quote fields. Default: ``False``. polling_interval : int or float, optional Number of seconds to wait between checks for query completion. archive : bool, optional (deprecated) If ``True``, archive the import job as soon as it completes. hidden : bool, optional If ``True`` (the default), this job will not appear in the Civis UI. Returns ------- results : :class:`~civis.futures.CivisFuture` A `CivisFuture` object. Examples -------- >>> sql = "SELECT * FROM schema.table" >>> fut = civis_to_csv("file.csv", sql, "my_database") >>> fut.result() # Wait for job to complete See Also -------- civis.io.read_civis : Read table contents into memory. civis.io.read_civis_sql : Read results of a SQL query into memory. """ if archive: warnings.warn("`archive` is deprecated and will be removed in v2.0.0. " "Use `hidden` instead.", FutureWarning) if client is None: client = APIClient(api_key=api_key, resources='all') delimiter = DELIMITERS.get(delimiter) if not delimiter: raise ValueError("delimiter must be one of {}" .format(DELIMITERS.keys())) csv_settings = dict(include_header=include_header, compression=compression, column_delimiter=delimiter, unquoted=unquoted, filename_prefix=None, force_multifile=False) script_id, run_id = _sql_script(client, sql, database, job_name, credential_id, hidden=hidden, csv_settings=csv_settings) fut = CivisFuture(client.scripts.get_sql_runs, (script_id, run_id), polling_interval=polling_interval, client=client, poll_on_creation=False) download = _download_callback(script_id, run_id, client, filename) fut.add_done_callback(download) if archive: def f(x): return client.scripts.put_sql_archive(script_id, True) fut.add_done_callback(f) return fut
@deprecate_param('v2.0.0', 'api_key')
[docs]def civis_to_multifile_csv(sql, database, job_name=None, api_key=None, client=None, credential_id=None, include_header=True, compression='none', delimiter='|', unquoted=False, prefix=None, polling_interval=None, hidden=True): """Unload the result of SQL query and return presigned urls. This function is intended for unloading large queries/tables from redshift as it uses a 'PARALLEL ON' S3 unload. It returns a similar manifest file to conventional S3 UNLOAD statements except the CSV parts are accessible via both files endpoint IDs and presigned S3 urls. Parameters ---------- sql : str, optional The SQL select string to be executed. database : str or int Execute the query against this database. Can be the database name or ID. job_name : str, optional A name to give the job. If omitted, a random job name will be used. api_key : DEPRECATED str, optional Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY` environment variable will be used. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. credential_id : str or int, optional The database credential ID. If ``None``, the default credential will be used. include_header: bool, optional If ``True`` include a key in the returned dictionary containing a list of column names. Default: ``True``. compression: str, optional Type of compression to use, if any. One of ``'none'``, ``'zip'``, or ``'gzip'``. Default ``'none'``. delimiter, str: optional Which delimiter to use, if any. One of ``','``, ``'\t'``, or ``'|'``. Default: ``'|'``. unquoted: bool, optional Whether or not to quote fields. Default: ``False``. prefix: str, optional A user specified filename prefix for the output file to have. Default: ``None``. polling_interval : int or float, optional Number of seconds to wait between checks for query completion. hidden : bool, optional If ``True`` (the default), this job will not appear in the Civis UI. Returns ------- unload_manifest: dict A dictionary resembling an AWS manifest file. Has the following keys: 'query': str The query. 'header': list of str The columns from the query. 'entries': list of dict Each dict has the following keys: 'id': int File ID 'name': str Filename 'size': int File size in bytes 'url': str Unsigned S3 URL ('s3://...') 'url_signed': str Signed S3 URL ('https://...') 'unquoted': bool Whether the cells are quoted. 'compression': str Type of compression used. 'delimiter': str Delimiter that separates the cells. Examples -------- >>> sql = "SELECT * FROM schema.my_big_table" >>> database = "my_database" >>> delimiter = "|" >>> manifest = civis_to_multifile_csv(sql, database, delimiter=delimiter) >>> ids = [entry['id'] for entry in manifest['entries']] >>> buf = BytesIO() >>> civis_to_file(ids[0], buf) >>> buf.seek(0) >>> df = pd.read_csv(buf, delimiter=delimiter) See Also -------- civis.APIClient.scripts.post_sql """ if client is None: client = APIClient(api_key=api_key, resources='all') delimiter = DELIMITERS.get(delimiter) assert delimiter, "delimiter must be one of {}".format(DELIMITERS.keys()) csv_settings = dict(include_header=include_header, compression=compression, column_delimiter=delimiter, unquoted=unquoted, filename_prefix=prefix, force_multifile=True) script_id, run_id = _sql_script(client, sql, database, job_name, credential_id, hidden, csv_settings=csv_settings) fut = CivisFuture(client.scripts.get_sql_runs, (script_id, run_id), polling_interval=polling_interval, client=client, poll_on_creation=False) outputs = fut.result()["output"] if not outputs: raise EmptyResultError("Unload query {} returned no manifest." .format(script_id)) buf = io.BytesIO() civis_to_file(outputs[0]['file_id'], buf) txt = io.TextIOWrapper(buf, encoding='utf-8') txt.seek(0) unload_manifest = json.load(txt) return unload_manifest
@deprecate_param('v2.0.0', 'api_key')
[docs]def dataframe_to_civis(df, database, table, api_key=None, client=None, max_errors=None, existing_table_rows="fail", distkey=None, sortkey1=None, sortkey2=None, headers=None, credential_id=None, polling_interval=None, archive=False, hidden=True, **kwargs): """Upload a `pandas` `DataFrame` into a Civis table. The `DataFrame`'s index will not be included. To store the index along with the other values, use `df.reset_index()` instead of `df` as the first argument to this function. Parameters ---------- df : :class:`pandas:pandas.DataFrame` The `DataFrame` to upload to Civis. database : str or int Upload data into this database. Can be the database name or ID. table : str The schema and table you want to upload to. E.g., ``'scratch.table'``. api_key : DEPRECATED str, optional Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY` environment variable will be used. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. max_errors : int, optional The maximum number of rows with errors to remove from the import before failing. existing_table_rows : str, optional The behaviour if a table with the requested name already exists. One of ``'fail'``, ``'truncate'``, ``'append'`` or ``'drop'``. Defaults to ``'fail'``. distkey : str, optional The column to use as the distkey for the table. sortkey1 : str, optional The column to use as the sortkey for the table. sortkey2 : str, optional The second column in a compound sortkey for the table. headers : bool, optional Whether or not the first row of the file should be treated as headers. The default, ``None``, attempts to autodetect whether or not the first row contains headers. credential_id : str or int, optional The ID of the database credential. If ``None``, the default credential will be used. polling_interval : int or float, optional Number of seconds to wait between checks for job completion. archive : bool, optional (deprecated) If ``True``, archive the import job as soon as it completes. hidden : bool, optional If ``True`` (the default), this job will not appear in the Civis UI. **kwargs : kwargs Extra keyword arguments will be passed to :meth:`pandas:pandas.DataFrame.to_csv`. Returns ------- fut : :class:`~civis.futures.CivisFuture` A `CivisFuture` object. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}) >>> fut = civis.io.dataframe_to_civis(df, 'my-database', ... 'scratch.df_table') >>> fut.result() """ if client is None: client = APIClient(api_key=api_key, resources='all') if archive: warnings.warn("`archive` is deprecated and will be removed in v2.0.0. " "Use `hidden` instead.", FutureWarning) buf = six.BytesIO() if six.PY3: txt = io.TextIOWrapper(buf, encoding='utf-8') else: txt = buf df.to_csv(txt, encoding='utf-8', index=False, **kwargs) txt.flush() buf.seek(0) delimiter = ',' return _import_bytes(buf, database, table, client, max_errors, existing_table_rows, distkey, sortkey1, sortkey2, delimiter, headers, credential_id, polling_interval, archive, hidden=hidden)
@deprecate_param('v2.0.0', 'api_key')
[docs]def csv_to_civis(filename, database, table, api_key=None, client=None, max_errors=None, existing_table_rows="fail", distkey=None, sortkey1=None, sortkey2=None, delimiter=",", headers=None, credential_id=None, polling_interval=None, archive=False, hidden=True): """Upload the contents of a local CSV file to Civis. Parameters ---------- filename : str Upload the contents of this file. database : str or int Upload data into this database. Can be the database name or ID. table : str The schema and table you want to upload to. E.g., ``'scratch.table'``. api_key : DEPRECATED str, optional Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY` environment variable will be used. client : :class:`civis.APIClient`, optional If not provided, an :class:`civis.APIClient` object will be created from the :envvar:`CIVIS_API_KEY`. max_errors : int, optional The maximum number of rows with errors to remove from the import before failing. existing_table_rows : str, optional The behaviour if a table with the requested name already exists. One of ``'fail'``, ``'truncate'``, ``'append'`` or ``'drop'``. Defaults to ``'fail'``. distkey : str, optional The column to use as the distkey for the table. sortkey1 : str, optional The column to use as the sortkey for the table. sortkey2 : str, optional The second column in a compound sortkey for the table. delimiter : string, optional The column delimiter. One of ``','``, ``'\\t'`` or ``'|'``. headers : bool, optional Whether or not the first row of the file should be treated as headers. The default, ``None``, attempts to autodetect whether or not the first row contains headers. credential_id : str or int, optional The ID of the database credential. If ``None``, the default credential will be used. polling_interval : int or float, optional Number of seconds to wait between checks for job completion. archive : bool, optional (deprecated) If ``True``, archive the import job as soon as it completes. hidden : bool, optional If ``True`` (the default), this job will not appear in the Civis UI. Returns ------- results : :class:`~civis.futures.CivisFuture` A `CivisFuture` object. Notes ----- This reads the contents of `filename` into memory. Examples -------- >>> with open('input_file.csv', 'w') as _input: ... _input.write('a,b,c\\n1,2,3') >>> fut = civis.io.csv_to_civis('input_file.csv', ... 'my-database', ... 'scratch.my_data') >>> fut.result() """ if client is None: client = APIClient(api_key=api_key, resources='all') if archive: warnings.warn("`archive` is deprecated and will be removed in v2.0.0. " "Use `hidden` instead.", FutureWarning) with open(filename, "rb") as data: fut = _import_bytes(data, database, table, client, max_errors, existing_table_rows, distkey, sortkey1, sortkey2, delimiter, headers, credential_id, polling_interval, archive, hidden=hidden) return fut
def _sql_script(client, sql, database, job_name, credential_id, hidden=False, csv_settings=None): job_name = maybe_get_random_name(job_name) db_id = client.get_database_id(database) cred_id = credential_id or client.default_credential csv_settings = csv_settings or {} export_job = client.scripts.post_sql(job_name, remote_host_id=db_id, credential_id=cred_id, sql=sql, hidden=hidden, csv_settings=csv_settings) run_job = client.scripts.post_sql_runs(export_job.id) return export_job.id, run_job.id def _get_sql_select(table, columns=None): if columns and not isinstance(columns, (list, tuple)): raise TypeError("columns must be a list, tuple or None") select = ", ".join(columns) if columns is not None else "*" sql = "select {} from {}".format(select, table) return sql def _download_file(url, local_path): response = requests.get(url, stream=True) response.raise_for_status() chunk_size = 32 * 1024 with open(local_path, 'wb') as fout: chunk = response.iter_content(chunk_size) for lines in chunk: fout.write(lines) def _download_callback(job_id, run_id, client, filename): def callback(future): outputs = client.scripts.get_sql_runs(job_id, run_id)["output"] if not outputs: if future.succeeded(): # Only warn if the job succeeded. Otherwise the user # will see an error surfaced through the Future. warnings.warn("Job %s, run %s does not have any output to " "download. Not creating file %s." % (job_id, run_id, filename), RuntimeWarning) return else: url = outputs[0]["path"] return _download_file(url, filename) return callback def _import_bytes(buf, database, table, client, max_errors, existing_table_rows, distkey, sortkey1, sortkey2, delimiter, headers, credential_id, polling_interval, archive, hidden): schema, table = table.split(".", 1) db_id = client.get_database_id(database) cred_id = credential_id or client.default_credential delimiter = DELIMITERS.get(delimiter) assert delimiter, "delimiter must be one of {}".format(DELIMITERS.keys()) kwargs = dict(schema=schema, name=table, remote_host_id=db_id, credential_id=cred_id, max_errors=max_errors, existing_table_rows=existing_table_rows, distkey=distkey, sortkey1=sortkey1, sortkey2=sortkey2, column_delimiter=delimiter, first_row_is_header=headers, hidden=hidden) import_job = client.imports.post_files(**kwargs) put_response = requests.put(import_job.upload_uri, buf) put_response.raise_for_status() run_job_result = client._session.post(import_job.run_uri) run_job_result.raise_for_status() run_info = run_job_result.json() fut = CivisFuture(client.imports.get_files_runs, (run_info['importId'], run_info['id']), polling_interval=polling_interval, client=client, poll_on_creation=False) if archive: def f(x): return client.imports.put_archive(import_job.id, True) fut.add_done_callback(f) return fut