slidge.plugins.mattermost.websocket#

Module Contents#

Classes#

EventType

str(object='') -> str

MattermostEvent

Websocket

Functions#

handle_event(d, event_handler)

Attributes#

log

class slidge.plugins.mattermost.websocket.EventType[source]#

Bases: str, enum.Enum

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

Initialize self. See help(type(self)) for accurate signature.

AddedToTeam = added_to_team[source]#
AuthenticationChallenge = authentication_challenge[source]#
ChannelConverted = channel_converted[source]#
ChannelCreated = channel_created[source]#
ChannelDeleted = channel_deleted[source]#
ChannelMemberUpdated = channel_member_updated[source]#
ChannelUpdated = channel_updated[source]#
ChannelViewed = channel_viewed[source]#
ConfigChanged = config_changed[source]#
DeleteTeam = delete_team[source]#
DirectAdded = direct_added[source]#
EmojiAdded = emoji_added[source]#
EphemeralMessage = ephemeral_message[source]#
GroupAdded = group_added[source]#
Hello = hello[source]#
LeaveTeam = leave_team[source]#
LicenseChanged = license_changed[source]#
MemberroleUpdated = memberrole_updated[source]#
NewUser = new_user[source]#
PluginDisabled = plugin_disabled[source]#
PluginEnabled = plugin_enabled[source]#
PluginStatusesChanged = plugin_statuses_changed[source]#
PostDeleted = post_deleted[source]#
PostEdited = post_edited[source]#
PostUnread = post_unread[source]#
Posted = posted[source]#
PreferenceChanged = preference_changed[source]#
PreferencesChanged = preferences_changed[source]#
PreferencesDeleted = preferences_deleted[source]#
ReactionAdded = reaction_added[source]#
ReactionRemoved = reaction_removed[source]#
Response = response[source]#
RoleUpdated = role_updated[source]#
StatusChange = status_change[source]#
Typing = typing[source]#
UpdateTeam = update_team[source]#
UserAdded = user_added[source]#
UserRemoved = user_removed[source]#
UserRoleUpdated = user_role_updated[source]#
UserUpdated = user_updated[source]#
DialogOpened = dialog_opened[source]#
ThreadUpdated = thread_updated[source]#
ThreadFollowChanged = thread_follow_changed[source]#
ThreadReadChanged = thread_read_changed[source]#
SidebarCategoryUpdated = sidebar_category_updated[source]#
Unknown = __unknown__[source]#
class slidge.plugins.mattermost.websocket.MattermostEvent[source]#
type :EventType[source]#
data :dict[source]#
broadcast :dict[source]#
left :dict[source]#
__str__()[source]#

Return str(self).

class slidge.plugins.mattermost.websocket.Websocket(url, token)[source]#
async connect(event_handler)[source]#

Connect to the websocket and authenticate it. When the authentication has finished, start the loop listening for messages, sending a ping to the server to keep the connection alive. :param event_handler: Every websocket event will be passed there. Takes one argument. :type event_handler: Function(message) :return:

async _start_loop(websocket, event_handler)[source]#

We will listen for websockets events, sending a heartbeats on a timer. If we don’t the webserver would close the idle connection, forcing us to reconnect.

async _do_heartbeats(websocket)[source]#

This is a little complicated, but we only need to pong the websocket if we haven’t received a message inside the timeout window. Since messages can be received, while we are waiting we need to check after sleep.

disconnect()[source]#

Sets self._alive to False so the loop in self._start_loop will finish.

async _authenticate_websocket(websocket)[source]#

Sends an authentication challenge over a websocket. This is not needed when we just send the cookie we got on login when connecting to the websocket.

async user_typing(channel_id)[source]#
async slidge.plugins.mattermost.websocket.handle_event(d, event_handler)[source]#
slidge.plugins.mattermost.websocket.log[source]#