Source code for finvizfinance.screener.base

"""
.. module:: screener.base
   :synopsis: screen base module.

.. moduleauthor:: Tianning Li <ltianningli@gmail.com>

"""

import warnings
import pandas as pd
from time import sleep
from finvizfinance.quote import finvizfinance
from finvizfinance.util import (
    web_scrap,
    number_covert,
    progress_bar,
)
from finvizfinance.constants import NUMBER_COL, signal_dict, filter_dict, order_dict


class Base:
    """Base
    Getting information from the finviz screener page.
    """

    v_page = None
    url = "https://finviz.com/screener.ashx"
    size = 20
    request_params = {}

    def __init__(self):
        """initiate module"""
        self.reset()

    def _set_signal(self, signal):
        """set signal.

        Args:
            signal(str): ticker signal
        """
        if not signal:
            return
        if signal not in signal_dict:
            signal_keys = list(signal_dict.keys())
            raise ValueError(
                "Invalid signal '{}'. Possible signal: {}".format(signal, signal_keys)
            )
        self.request_params["s"] = signal_dict[signal]

    def _set_filters(self, filters_dict):
        """Set filters.

        Args:
            filters_dict(dict): dictionary of filters

        Returns:
            url_filter(str): filter string for url
        """
        filters = []
        for key, value in filters_dict.items():
            if key not in filter_dict:
                filter_keys = list(filter_dict.keys())
                raise ValueError(
                    "Invalid filter '{}'. Possible filter: {}".format(key, filter_keys)
                )
            if value not in filter_dict[key]["option"]:
                filter_options = list(filter_dict[key]["option"].keys())
                raise ValueError(
                    "Invalid filter option '{}'. Possible filter options: {}".format(
                        value, filter_options
                    )
                )
            prefix = filter_dict[key]["prefix"]
            urlcode = filter_dict[key]["option"][value]
            if urlcode != "":
                filters.append("{}_{}".format(prefix, urlcode))
        if len(filters) != 0:
            self.request_params["f"] = ",".join(filters)

    def _set_ticker(self, ticker):
        """Set ticker.

        Args:
            ticker(str): ticker string
        """
        if ticker == "":
            return
        self.request_params["t"] = ticker

    def set_filter(self, signal="", filters_dict={}, ticker=""):
        """Update the settings.

        Args:
            signal(str): ticker signal
            filters_dict(dict): dictionary of filters
            ticker(str): ticker string
        """
        self._set_signal(signal)
        self._set_ticker(ticker)
        self._set_filters(filters_dict)

    def _get_page(self, soup):
        """Check the page number"""
        try:
            options = soup.find(id="pageSelect").findAll("option")
            return len(options)
        except:
            return 0

    def _get_table(self, rows, df, num_col_index, table_header, limit=-1):
        """Get screener table helper function.

        Returns:
            df(pandas.DataFrame): screener information table
        """
        rows = rows[1:]
        if limit != -1:
            rows = rows[0:limit]

        frame = []
        for row in rows:
            cols = row.findAll("td")[1:]
            info_dict = {}
            for i, col in enumerate(cols):
                # check if the col is number
                if i not in num_col_index:
                    info_dict[table_header[i]] = col.text
                else:
                    info_dict[table_header[i]] = number_covert(col.text)
            frame.append(info_dict)
        return pd.concat([df, pd.DataFrame(frame)], ignore_index=True)

    @staticmethod
    def _parse_table_header(soup):
        table = soup.find("table", class_="screener_table")
        rows = table.findAll("tr")
        table_headers = [i.text.strip() for i in rows[0].findAll("th")][1:]
        return table_headers

    def _parse_table(self, df, soup, limit):
        if df is None:
            table_headers = self._parse_table_header(soup)
            df = pd.DataFrame([], columns=table_headers)
        table_headers = list(df.columns)
        num_col_index = [
            table_headers.index(i) for i in table_headers if i in NUMBER_COL
        ]
        table = soup.find("table", class_="screener_table")
        rows = table.find_all("tr")
        df = self._get_table(rows, df, num_col_index, table_headers, limit)
        return df

    def _parse_columns(self, columns):
        if not columns:
            return
        if 0 in columns:
            columns.remove(0)
        columns.insert(0, 0)
        columns = [str(i) for i in columns]
        self.request_params["c"] = ",".join(columns)

    def reset(self):
        self.request_params = {"v": self.v_page}

    def screener_view(
        self,
        order="Ticker",
        limit=100000,
        select_page=None,
        verbose=1,
        ascend=True,
        columns=None,
        sleep_sec=1,
    ):
        """Get screener table.

        Args:
            order(str): sort the table by the choice of order.
            limit(int): set the top k rows of the screener.
            select_page(int): set the page of the screener.
            verbose(int): choice of visual the progress. 1 for visualize progress.
            ascend(bool): if True, the order is ascending.
            sleep_sec(int): sleep seconds for fetching each page.
        Returns:
            df(pandas.DataFrame): screener information table
        """
        if order not in order_dict:
            order_keys = list(order_dict.keys())
            raise ValueError(
                "Invalid order '{}'. Possible order: {}".format(order, order_keys)
            )
        self.request_params["o"] = ("" if ascend else "-") + order_dict[order]

        if select_page:
            self.request_params["r"] = (select_page - 1) * self.size + 1

        self._parse_columns(columns)

        soup = web_scrap(self.url, self.request_params)

        page = self._get_page(soup)
        if page == 0:
            print("No ticker found.")
            return None
        df = self._parse_table(None, soup, limit)
        limit -= self.size
        if select_page:
            if select_page > page:
                return None
            warnings.warn("Limit parameter is ignored when page is selected.")
            return df

        for i in range(1, page):
            if limit <= 0:
                break
            sleep(sleep_sec)
            if verbose == 1:
                progress_bar(i, page)
            self.request_params["r"] = i * self.size + 1
            soup = web_scrap(self.url, self.request_params)
            df = self._parse_table(df, soup, limit)
            limit -= self.size
        self.reset()
        return df

    def compare(self, ticker, compare_list, order="ticker", verbose=1):
        """Get screener table of similar property (Sector, Industry, Country)

        Args:
            ticker(str): the ticker to compare
            compare_list(list): choice of compare property (Sector, Industry, Country) or combination.
            order(str): sort the table by the choice of order
            verbose(int): choice of visual the progress. 1 for visualize progress
        Returns:
            df(pandas.DataFrame): screener information table
        """
        check_list = ["Sector", "Industry", "Country"]
        error_list = [i for i in compare_list if i not in check_list]
        if len(error_list) != 0:
            raise ValueError("Please check: {}".format(error_list))

        stock = finvizfinance(ticker)
        stock_fundament = stock.ticker_fundament()
        filters_dict = {}
        for compare in compare_list:
            filters_dict[compare] = stock_fundament[compare]

        self.set_filter(filters_dict=filters_dict)
        df = self.screener_view(order=order, verbose=verbose)
        return df