##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2023, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################

import time
import secrets
import simplejson as json
import os

from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from pgadmin.utils import does_utility_exist


class MaintenanceJobTest(BaseTestGenerator):
    """Maintenance api test cases"""
    scenarios = [
        ('When maintenance the object with the default options',
         dict(
             params=dict(
                 data={
                     'database': 'postgres',
                     'op': 'VACUUM',
                     'vacuum_analyze': False,
                     'vacuum_freeze': False,
                     'vacuum_full': False,
                     'verbose': True
                 },
                 cmd="VACUUM VERBOSE;\n"
             ),
             url='/maintenance/job/{0}/{1}',
             expected_cmd='VACUUM VERBOSE',
             expected_exit_code=[0, None]
         ))
    ]

    def setUp(self):
        if 'default_binary_paths' not in self.server or \
            self.server['default_binary_paths'] is None or \
            self.server['type'] not in self.server['default_binary_paths'] or\
                self.server['default_binary_paths'][self.server['type']] == '':
            self.skipTest(
                "default_binary_paths is not set for the server {0}".format(
                    self.server['name']
                )
            )

        binary_path = os.path.join(
            self.server['default_binary_paths'][self.server['type']], 'psql')

        if os.name == 'nt':
            binary_path = binary_path + '.exe'

        retVal = does_utility_exist(binary_path)
        if retVal is not None:
            self.skipTest(retVal)

    def runTest(self):
        self.server_id = parent_node_dict["database"][-1]["server_id"]
        self.db_id = parent_node_dict["database"][-1]["db_id"]
        url = self.url.format(self.server_id, self.db_id)

        # Create the backup job
        response = self.tester.post(url,
                                    data=json.dumps(self.params['data']),
                                    content_type='html/json')
        self.assertEqual(response.status_code, 200)
        response_data = json.loads(response.data.decode('utf-8'))
        job_id = response_data['data']['job_id']

        cnt = 0
        the_process = None
        while True:
            if cnt >= 10:
                break
            # Check the process list
            response1 = self.tester.get('/misc/bgprocess/?_={0}'.format(
                secrets.choice(range(1, 9999999))))
            self.assertEqual(response1.status_code, 200)
            process_list = json.loads(response1.data.decode('utf-8'))

            try:
                the_process = next(
                    p for p in process_list if p['id'] == job_id)
            except Exception:
                the_process = None

            if the_process and 'execution_time' in the_process:
                break
            time.sleep(0.5)
            cnt += 1

        self.assertTrue('execution_time' in the_process)
        self.assertTrue('stime' in the_process)
        self.assertTrue('exit_code' in the_process)
        self.assertTrue(the_process['exit_code'] in
                        self.expected_exit_code)

        self.assertIn(self.expected_cmd, the_process['details']['query'])

        # Check the process details
        p_details = self.tester.get('/misc/bgprocess/{0}?_={1}'.format(
            job_id, secrets.choice(range(1, 9999999)))
        )
        self.assertEqual(p_details.status_code, 200)

        p_details = self.tester.get(
            '/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
                job_id, 0, 0, secrets.choice(range(1, 9999999))
            )
        )
        self.assertEqual(p_details.status_code, 200)
        p_details_data = json.loads(p_details.data.decode('utf-8'))

        # Retrieve the backup job process logs
        while True:
            out, err, status = MaintenanceJobTest.get_params(p_details_data)
            if status:
                break

            p_details = self.tester.get(
                '/misc/bgprocess/{0}/{1}/{2}/?_={3}'.format(
                    job_id, out, err, secrets.choice(range(1, 9999999)))
            )
            self.assertEqual(p_details.status_code, 200)
            p_details_data = json.loads(p_details.data.decode('utf-8'))

            time.sleep(1)

        # Check the job is complete.
        backup_ack = self.tester.put('/misc/bgprocess/{0}'.format(job_id))
        self.assertEqual(backup_ack.status_code, 200)
        backup_ack_res = json.loads(backup_ack.data.decode('utf-8'))

        self.assertEqual(backup_ack_res['success'], 1)

    @staticmethod
    def get_params(data):
        out = 0
        out_done = False
        err = 0
        err_done = False
        if 'out' in data:
            out = data['out'] and data['out']['pos']

            if 'done' in data['out']:
                out_done = data['out']['done']

        if 'err' in data:
            err = data['err'] and data['err']['pos']

            if 'done' in data['err']:
                err_done = data['err']['done']

        return out, err, (out_done and err_done)
