2014年7月23日 星期三

Python - 有限狀態機

在撰寫模組時,因為使用有限狀態機來記錄每個模組的運作情況,所以寫了一個有限狀態機的類別供繼承用。
"""""""""""""""""""""""""""""""""""
Library
"""""""""""""""""""""""""""""""""""
from abc import ABCMeta, abstractmethod  # abstract class
from enum import Enum  # constant

"""""""""""""""""""""""""""""""""""
Class
"""""""""""""""""""""""""""""""""""
## Avoids having to specify the value for each enumeration member.
class AutoNumber(Enum):
   
    ## Override.
    def __new__(cls):
        value = len(cls.__members__) + 1
        obj = object.__new__(cls)
        obj._value_ = value
        return obj
   
## Finite state machine.
class FSM(object, metaclass = ABCMeta):
   
    ## State definition.
    class States(AutoNumber):
        Initial = ()
        Ready = ()
        Idle = ()
        Start = ()
        Pause = ()
        Stop = ()
        SendingRequest = ()
        SentRequest = ()
        ReceivingResponse = ()
        ReceivedResponse = ()
        Creating = ()
        Created = ()
        Storing = ()
        Stored = ()
        Reading = ()
        Read = ()
        Writing = ()
        Wrote = ()
        Removing = ()
        Removed = ()
        Parsing = ()
        Parsed = ()
        Finished = ()
        _Error = () # This is not public state.
        _Except = () # This is not public state.

    ## Constructor.
    def __init__(self):
        self._stateProcess = self.StateProcess
        self.__stateIndex = 0
        self._state = self._stateProcess[0]

    ## Get state process definition.
    #  @details  The state process could be a cycle. For example: [FSM.State.Initial, FSM.State.SendingRequest, FSM.State.ReceivingResponse, FSM.State.SendingRequest, FSM.State.Finished].
    #  @note     If there are two cycles with the same states, it is a wrong state process definition.
    #  @return   A list of state.
    @property
    @abstractmethod   
    def StateProcess(self):
        """Get state process definition."""
        return [FSM.States.Initial, FSM.States.Finished]

    ## Get state.
    #  @return   Current state.
    @property
    def State(self):
        """Get current state."""
        return self._state

    ## Get state before except.
    #  @return   The last state.
    @property
    def StateBeforeExcept(self):
        """Get previous state before except/error."""
        return self._stateProcess[self.__stateIndex]

    ## Process definition
    #  @param   skip - number of state to be passes. (optional)
    #  @param   end - if true, the latest state sets the ending state; otherwise, it sets the next state. (optional)
    #  @param   error - if true, the state sets the error state. (optional)
    def nextState(self, skip = 0, end = False, error = False):
        # avoid to reboot
        if self.isStopped():
            return
       
        # The highest priority
        if error is True:
            self._state = FSM.States._Error
            return
       
        # check parameters
        if skip < 0:
            self._state = FSM.States._Except
            return
       
        if end is True:
            self.__stateIndex = len(self._stateProcess) - 1
            self._state = self._stateProcess[self.__stateIndex]
            return
       
        self.__stateIndex = min(self.__stateIndex + 1 + skip, len(self._stateProcess) - 1)
       
        # check cycle
        i = self._stateProcess.index(self._stateProcess[self.__stateIndex])
        if i != self.__stateIndex:
            self.__stateIndex = i
       
        self._state = self._stateProcess[self.__stateIndex]

    ## Reset state to the initial process.
    def resetState(self):
        self.__stateIndex = 0
        self._state = self._stateProcess[0]

    ## Check if state is end or not.
    #  @retval  true if the state is the end.
    #  @retval  false otherwise.
    def isEndState(self):
        return self._state == self._stateProcess[-1]

    ## Check if state is except or not.
    #  @retval  true if the state is the except state.
    #  @retval  false otherwise.
    def isExceptState(self):
        return self._state == FSM.States._Except

    ## Check if state is error or not.
    #  @retval  true if the state is the error state.
    #  @retval  false otherwise.
    def isErrorState(self):
        return self._state == FSM.States._Error

    ## Check if the FSM is stopped or not.
    #  @retval  true if the FSM is stopped.
    #  @retval  false otherwise.
    def isStopped(self):
        return self.isEndState() or self.isExceptState() or self.isErrorState()
有限狀態機裡的巢狀類別States可以定義所有的狀態。繼承這個有限狀態機的類別需要覆寫StateProcess方法,該方法就是回傳狀態機的所需狀態清單。如此一來,使用上僅需要呼叫nextState方法,就能轉換狀態。

沒有留言: