Files
phs_v1.0.1.0/build/hb/services/menu.py
2024-09-27 19:16:49 +08:00

424 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
from __future__ import unicode_literals
import sys
import importlib
from prompt_toolkit.shortcuts import run_application
from prompt_toolkit.application import Application
from prompt_toolkit.key_binding.manager import KeyBindingManager
from prompt_toolkit.keys import Keys
from prompt_toolkit.layout.containers import Window
from prompt_toolkit.filters import IsDone
from prompt_toolkit.layout.controls import TokenListControl
from prompt_toolkit.layout.containers import ConditionalContainer
from prompt_toolkit.layout.containers import HSplit
from prompt_toolkit.layout.dimension import LayoutDimension as D
from prompt_toolkit.token import Token
from exceptions.ohos_exception import OHOSException
from services.interface.menu_interface import MenuInterface
from helper.separator import Separator
from containers.arg import Arg, ModuleType
from resources.config import Config
from util.log_util import LogUtil
from util.product_util import ProductUtil
class Menu(MenuInterface):
def select_compile_option(self) -> dict:
config = Config()
results = {}
all_build_args = Arg.parse_all_args(ModuleType.BUILD)
for arg in all_build_args.values():
if isinstance(arg, Arg) and arg.arg_attribute.get("optional"):
if arg.arg_name != 'target_cpu':
choices = [
choice if isinstance(choice, Separator) else {
'name': choice,
'value': arg.arg_name
} for choice in arg.arg_attribute.get("optional")
]
else:
if config.support_cpu is not None and isinstance(config.support_cpu, list):
choices = []
for cpu in config.support_cpu:
choices.append({
'name': cpu,
'value': arg.arg_name,
})
elif config.target_cpu is not None:
choices = [
{
'name': config.target_cpu,
'value': arg.arg_name,
}
]
else:
choices = [
{
'name': all_build_args.get('target_cpu').arg_value,
'value': arg.arg_name,
}
]
result = self._list_promt(arg.arg_name, 'select {} value'.format(
arg.arg_name), choices).get(arg.arg_name)[0]
results[arg.arg_name] = result
return results
def _select_os_level(self) -> str:
choices = [
{
'name': 'mini',
'value': 'os_level'
},
{
'name': 'small',
'value': 'os_level'
},
{
'name': 'standard',
'value': 'os_level'
}
]
return self._list_promt('os_level', 'Which os_level do you need?',
choices).get('os_level')[0]
def select_product(self) -> dict:
product_path_dict = {}
company_separator = None
os_level = self._select_os_level()
for product_info in ProductUtil.get_products():
if product_info['os_level'] is None:
raise OHOSException("")
if product_info['os_level'] == os_level:
company = product_info['company']
product = product_info['name']
if company_separator is None or company_separator != company:
company_separator = company
product_key = Separator(company_separator)
product_path_dict[product_key] = None
product_path_dict['{}@{}'.format(product,
company)] = product_info
if not len(product_path_dict):
raise OHOSException('no valid product found')
choices = [
product if isinstance(product, Separator) else {
'name': product.split('@')[0],
'value': product.split('@')[1]
} for product in product_path_dict.keys()
]
product = self._list_promt('product', 'Which product do you need?',
choices).get('product')
product_key = f'{product[0]}@{product[1]}'
return product_path_dict.get(product_key)
def _list_promt(self, name, message, choices, **kwargs):
questions = self._get_questions('list', name, message, choices)
return self._prompt(questions=questions, **kwargs)
def _get_questions(self, promt_type, name, message, choices):
questions = [{
'type': promt_type,
'qmark': 'OHOS',
'name': name,
'message': message,
'choices': choices
}]
return questions
def _prompt(self, questions, answers=None, **kwargs):
if isinstance(questions, dict):
questions = [questions]
answers = answers or {}
patch_stdout = kwargs.pop('patch_stdout', False)
return_asyncio_coroutine = kwargs.pop(
'return_asyncio_coroutine', False)
true_color = kwargs.pop('true_color', False)
refresh_interval = kwargs.pop('refresh_interval', 0)
eventloop = kwargs.pop('eventloop', None)
kbi_msg = kwargs.pop('keyboard_interrupt_msg', 'Cancelled by user')
for question in questions:
try:
choices = question.get('choices')
if choices is not None and callable(choices):
question['choices'] = choices(answers)
_kwargs = {}
_kwargs.update(kwargs)
_kwargs.update(question)
question_type = _kwargs.pop('type')
name = _kwargs.pop('name')
message = _kwargs.pop('message')
when = _kwargs.pop('when', None)
question_filter = _kwargs.pop('filter', None)
if when:
# at least a little sanity check!
if callable(question['when']):
try:
if not question['when'](answers):
continue
except Exception as error:
raise ValueError(
'Problem in \'when\' check of %s question: %s' %
(name, error))
else:
raise ValueError('\'when\' needs to be function that '
'accepts a dict argument')
if question_filter:
# at least a little sanity check!
if not callable(question['filter']):
raise ValueError('\'filter\' needs to be function that '
'accepts an argument')
if callable(question.get('default')):
_kwargs['default'] = question['default'](answers)
application = self._question(message, **_kwargs)
answer = run_application(
application,
patch_stdout=patch_stdout,
return_asyncio_coroutine=return_asyncio_coroutine,
true_color=true_color,
refresh_interval=refresh_interval,
eventloop=eventloop)
if answer is not None:
if question_filter:
try:
answer = question['filter'](answer)
except Exception as error:
raise ValueError('Problem processing \'filter\' of'
'{} question: {}'.format(name, error))
answers[name] = answer
except AttributeError as attr_error:
LogUtil.hb_error(attr_error)
raise ValueError('No question type \'%s\'' % question_type)
except KeyboardInterrupt:
LogUtil.hb_warning('')
LogUtil.hb_warning(kbi_msg)
LogUtil.hb_warning('')
return {}
return answers
def _question(self, message, **kwargs):
if 'choices' not in kwargs:
raise OHOSException("You must choose one platform.")
choices = kwargs.pop('choices', None)
qmark = kwargs.pop('qmark', '?')
style = kwargs.pop('style', _get_style('terminal'))
inquirer_control = InquirerControl(choices)
def get_prompt_tokens(cli):
tokens = []
tokens.append((Token.QuestionMark, qmark))
tokens.append((Token.Question, ' %s ' % message))
if inquirer_control.answered:
tokens.append((Token.Answer, ' ' +
inquirer_control.get_selection()[0]))
else:
tokens.append((Token.Instruction, ' (Use arrow keys)'))
return tokens
# assemble layout
layout = HSplit([
Window(height=D.exact(1),
content=TokenListControl(get_prompt_tokens)),
ConditionalContainer(
Window(inquirer_control),
filter=~IsDone()
)
])
# key bindings
manager = KeyBindingManager.for_prompt()
@manager.registry.add_binding(Keys.ControlQ, eager=True)
@manager.registry.add_binding(Keys.ControlC, eager=True)
def _(event):
raise KeyboardInterrupt()
@manager.registry.add_binding(Keys.Down, eager=True)
def move_cursor_down(event):
def _next():
inquirer_control.selected_option_index = (
(inquirer_control.selected_option_index + 1) %
inquirer_control.choice_count)
_next()
while isinstance(inquirer_control.choices[
inquirer_control.selected_option_index][0], Separator) \
or inquirer_control.choices[
inquirer_control.selected_option_index][2]:
_next()
@manager.registry.add_binding(Keys.Up, eager=True)
def move_cursor_up(event):
def _prev():
inquirer_control.selected_option_index = (
(inquirer_control.selected_option_index - 1) %
inquirer_control.choice_count)
_prev()
while isinstance(inquirer_control.choices[
inquirer_control.selected_option_index][0], Separator) \
or inquirer_control.choices[
inquirer_control.selected_option_index][2]:
_prev()
@manager.registry.add_binding(Keys.Enter, eager=True)
def set_answer(event):
inquirer_control.answered = True
event.cli.set_return_value(inquirer_control.get_selection())
return Application(
layout=layout,
key_bindings_registry=manager.registry,
mouse_support=False,
style=style
)
class InquirerControl(TokenListControl):
def __init__(self, choices, **kwargs):
self.selected_option_index = 0
self.answered = False
self.choices = choices
self._init_choices(choices)
super(InquirerControl, self).__init__(self._get_choice_tokens,
**kwargs)
def _init_choices(self, choices, default=None):
# helper to convert from question format to internal format
self.choices = [] # list (name, value, disabled)
searching_first_choice = True
for index, choice in enumerate(choices):
if isinstance(choice, Separator):
self.choices.append((choice, None, None))
else:
base_string = str if sys.version_info[0] >= 3 else None
if isinstance(choice, base_string):
self.choices.append((choice, choice, None))
else:
name = choice.get('name')
value = choice.get('value', name)
disabled = choice.get('disabled', None)
self.choices.append((name, value, disabled))
if searching_first_choice:
self.selected_option_index = index
searching_first_choice = False
@property
def choice_count(self):
return len(self.choices)
def _get_choice_tokens(self, cli):
tokens = []
token = Token
def append(index, choice):
selected = (index == self.selected_option_index)
@_if_mousedown
def select_item(cli, mouse_event):
# bind option with this index to mouse event
self.selected_option_index = index
self.answered = True
tokens.append((token.Pointer if selected else token, ' \u276f '
if selected else ' '))
if selected:
tokens.append((Token.SetCursorPosition, ''))
if choice[2]: # disabled
tokens.append((token.Selected if selected else token,
'- %s (%s)' % (choice[0], choice[2])))
else:
if isinstance(choice[0], Separator):
tokens.append((token.Separator,
str(choice[0]),
select_item))
else:
try:
tokens.append((token.Selected if selected else token,
str(choice[0]), select_item))
except Exception:
tokens.append((token.Selected if selected else
token, choice[0], select_item))
tokens.append((token, '\n'))
# prepare the select choices
for i, choice in enumerate(self.choices):
append(i, choice)
tokens.pop() # Remove last newline.
return tokens
def get_selection(self):
return self.choices[self.selected_option_index]
def _get_style(style_type):
style = importlib.import_module('prompt_toolkit.styles')
token = importlib.import_module('prompt_toolkit.token')
if style_type == 'terminal':
return style.style_from_dict({
token.Token.Separator: '#75c951',
token.Token.QuestionMark: '#5F819D',
token.Token.Selected: '', # default
token.Token.Pointer: '#FF9D00 bold', # AWS orange
token.Token.Instruction: '', # default
token.Token.Answer: '#FF9D00 bold', # AWS orange
token.Token.Question: 'bold',
})
if style_type == 'answer':
return style.style_from_dict({
token.Token.Separator: '#75c951',
token.Token.QuestionMark: '#E91E63 bold',
token.Token.Selected: '#cc5454', # default
token.Token.Pointer: '#ed9164 bold',
token.Token.Instruction: '', # default
token.Token.Answer: '#f44336 bold',
token.Token.Question: '',
})
return None
def _if_mousedown(handler):
def handle_if_mouse_down(cli, mouse_event):
mouse_events = importlib.import_module('prompt_toolkit.mouse_events')
if mouse_event.event_type == mouse_events.MouseEventTypes.MOUSE_DOWN:
return handler(cli, mouse_event)
else:
return NotImplemented
return handle_if_mouse_down