"""""""""""""""""""""""""""""""""""有限狀態機裡的巢狀類別States可以定義所有的狀態。繼承這個有限狀態機的類別需要覆寫StateProcess方法,該方法就是回傳狀態機的所需狀態清單。如此一來,使用上僅需要呼叫nextState方法,就能轉換狀態。
Library
"""""""""""""""""""""""""""""""""""
from abc import ABCMeta, abstractmethod # abstract class
from enum import Enum # constant
"""""""""""""""""""""""""""""""""""
Class
"""""""""""""""""""""""""""""""""""
## Avoids having to specify the value for each enumeration member.
class AutoNumber(Enum):
## Override.
def __new__(cls):
value = len(cls.__members__) + 1
obj = object.__new__(cls)
obj._value_ = value
return obj
## Finite state machine.
class FSM(object, metaclass = ABCMeta):
## State definition.
class States(AutoNumber):
Initial = ()
Ready = ()
Idle = ()
Start = ()
Pause = ()
Stop = ()
SendingRequest = ()
SentRequest = ()
ReceivingResponse = ()
ReceivedResponse = ()
Creating = ()
Created = ()
Storing = ()
Stored = ()
Reading = ()
Read = ()
Writing = ()
Wrote = ()
Removing = ()
Removed = ()
Parsing = ()
Parsed = ()
Finished = ()
_Error = () # This is not public state.
_Except = () # This is not public state.
## Constructor.
def __init__(self):
self._stateProcess = self.StateProcess
self.__stateIndex = 0
self._state = self._stateProcess[0]
## Get state process definition.
# @details The state process could be a cycle. For example: [FSM.State.Initial, FSM.State.SendingRequest, FSM.State.ReceivingResponse, FSM.State.SendingRequest, FSM.State.Finished].
# @note If there are two cycles with the same states, it is a wrong state process definition.
# @return A list of state.
@property
@abstractmethod
def StateProcess(self):
"""Get state process definition."""
return [FSM.States.Initial, FSM.States.Finished]
## Get state.
# @return Current state.
@property
def State(self):
"""Get current state."""
return self._state
## Get state before except.
# @return The last state.
@property
def StateBeforeExcept(self):
"""Get previous state before except/error."""
return self._stateProcess[self.__stateIndex]
## Process definition
# @param skip - number of state to be passes. (optional)
# @param end - if true, the latest state sets the ending state; otherwise, it sets the next state. (optional)
# @param error - if true, the state sets the error state. (optional)
def nextState(self, skip = 0, end = False, error = False):
# avoid to reboot
if self.isStopped():
return
# The highest priority
if error is True:
self._state = FSM.States._Error
return
# check parameters
if skip < 0:
self._state = FSM.States._Except
return
if end is True:
self.__stateIndex = len(self._stateProcess) - 1
self._state = self._stateProcess[self.__stateIndex]
return
self.__stateIndex = min(self.__stateIndex + 1 + skip, len(self._stateProcess) - 1)
# check cycle
i = self._stateProcess.index(self._stateProcess[self.__stateIndex])
if i != self.__stateIndex:
self.__stateIndex = i
self._state = self._stateProcess[self.__stateIndex]
## Reset state to the initial process.
def resetState(self):
self.__stateIndex = 0
self._state = self._stateProcess[0]
## Check if state is end or not.
# @retval true if the state is the end.
# @retval false otherwise.
def isEndState(self):
return self._state == self._stateProcess[-1]
## Check if state is except or not.
# @retval true if the state is the except state.
# @retval false otherwise.
def isExceptState(self):
return self._state == FSM.States._Except
## Check if state is error or not.
# @retval true if the state is the error state.
# @retval false otherwise.
def isErrorState(self):
return self._state == FSM.States._Error
## Check if the FSM is stopped or not.
# @retval true if the FSM is stopped.
# @retval false otherwise.
def isStopped(self):
return self.isEndState() or self.isExceptState() or self.isErrorState()
2014年7月23日 星期三
Python - 有限狀態機
在撰寫模組時,因為使用有限狀態機來記錄每個模組的運作情況,所以寫了一個有限狀態機的類別供繼承用。
訂閱:
張貼留言 (Atom)
沒有留言:
張貼留言