Add python/matrix stuff.

master
jowj 4 years ago
parent 116f6fab51
commit 30080f1de3

@ -0,0 +1,610 @@
# -*- coding: utf-8 -*-
# Weechat Matrix Protocol Script
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
import os
import socket
import ssl
import textwrap
# pylint: disable=redefined-builtin
from builtins import str
from itertools import chain
# pylint: disable=unused-import
from typing import Any, AnyStr, Deque, Dict, List, Optional, Set, Text, Tuple
import logbook
import OpenSSL.crypto as crypto
from future.utils import bytes_to_native_str as n
from logbook import Logger, StreamHandler
from nio import RemoteProtocolError, RemoteTransportError, TransportType
from matrix import globals as G
from matrix.bar_items import (
init_bar_items,
matrix_bar_item_buffer_modes,
matrix_bar_item_lag,
matrix_bar_item_name,
matrix_bar_item_plugin,
matrix_bar_nicklist_count,
matrix_bar_typing_notices_cb
)
from matrix.buffer import room_buffer_close_cb, room_buffer_input_cb
# Weechat searches for the registered callbacks in the scope of the main script
# file, import the callbacks here so weechat can find them.
from matrix.commands import (hook_commands, hook_page_up,
matrix_command_buf_clear_cb, matrix_command_cb,
matrix_command_pgup_cb, matrix_invite_command_cb,
matrix_join_command_cb, matrix_kick_command_cb,
matrix_me_command_cb, matrix_part_command_cb,
matrix_redact_command_cb, matrix_topic_command_cb,
matrix_olm_command_cb, matrix_devices_command_cb,
matrix_room_command_cb, matrix_uploads_command_cb,
matrix_upload_command_cb, matrix_send_anyways_cb)
from matrix.completion import (init_completion, matrix_command_completion_cb,
matrix_debug_completion_cb,
matrix_message_completion_cb,
matrix_olm_device_completion_cb,
matrix_olm_user_completion_cb,
matrix_server_command_completion_cb,
matrix_server_completion_cb,
matrix_user_completion_cb,
matrix_own_devices_completion_cb,
matrix_room_completion_cb)
from matrix.config import (MatrixConfig, config_log_category_cb,
config_log_level_cb, config_server_buffer_cb,
matrix_config_reload_cb, config_pgup_cb)
from matrix.globals import SCRIPT_NAME, SERVERS, W, MAX_EVENTS
from matrix.server import (MatrixServer, create_default_server,
matrix_config_server_change_cb,
matrix_config_server_read_cb,
matrix_config_server_write_cb, matrix_timer_cb,
send_cb, matrix_load_users_cb,
matrix_partial_sync_cb)
from matrix.utf import utf8_decode
from matrix.utils import server_buffer_prnt, server_buffer_set_title
from matrix.uploads import UploadsBuffer, upload_cb
# yapf: disable
WEECHAT_SCRIPT_NAME = SCRIPT_NAME
WEECHAT_SCRIPT_DESCRIPTION = "matrix chat plugin" # type: str
WEECHAT_SCRIPT_AUTHOR = "Damir Jelić <poljar@termina.org.uk>" # type: str
WEECHAT_SCRIPT_VERSION = "0.1" # type: str
WEECHAT_SCRIPT_LICENSE = "ISC" # type: str
# yapf: enable
logger = Logger("matrix-cli")
def print_certificate_info(buff, sock, cert):
cert_pem = ssl.DER_cert_to_PEM_cert(sock.getpeercert(True))
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
public_key = x509.get_pubkey()
key_type = ("RSA" if public_key.type() == crypto.TYPE_RSA else "DSA")
key_size = str(public_key.bits())
sha256_fingerprint = x509.digest(n(b"SHA256"))
sha1_fingerprint = x509.digest(n(b"SHA1"))
signature_algorithm = x509.get_signature_algorithm()
key_info = ("key info: {key_type} key {bits} bits, signed using "
"{algo}").format(
key_type=key_type, bits=key_size,
algo=n(signature_algorithm))
validity_info = (" Begins on: {before}\n"
" Expires on: {after}").format(
before=cert["notBefore"], after=cert["notAfter"])
rdns = chain(*cert["subject"])
subject = ", ".join(["{}={}".format(name, value) for name, value in rdns])
rdns = chain(*cert["issuer"])
issuer = ", ".join(["{}={}".format(name, value) for name, value in rdns])
subject = "subject: {sub}, serial number {serial}".format(
sub=subject, serial=cert["serialNumber"])
issuer = "issuer: {issuer}".format(issuer=issuer)
fingerprints = (" SHA1: {}\n"
" SHA256: {}").format(n(sha1_fingerprint),
n(sha256_fingerprint))
wrapper = textwrap.TextWrapper(
initial_indent=" - ", subsequent_indent=" ")
message = ("{prefix}matrix: received certificate\n"
" - certificate info:\n"
"{subject}\n"
"{issuer}\n"
"{key_info}\n"
" - period of validity:\n{validity_info}\n"
" - fingerprints:\n{fingerprints}").format(
prefix=W.prefix("network"),
subject=wrapper.fill(subject),
issuer=wrapper.fill(issuer),
key_info=wrapper.fill(key_info),
validity_info=validity_info,
fingerprints=fingerprints)
W.prnt(buff, message)
def wrap_socket(server, file_descriptor):
# type: (MatrixServer, int) -> None
sock = None # type: socket.socket
temp_socket = socket.fromfd(file_descriptor, socket.AF_INET,
socket.SOCK_STREAM)
# fromfd() duplicates the file descriptor, we can close the one we got from
# weechat now since we use the one from our socket when calling hook_fd()
os.close(file_descriptor)
# For python 2.7 wrap_socket() doesn't work with sockets created from an
# file descriptor because fromfd() doesn't return a wrapped socket, the bug
# was fixed for python 3, more info: https://bugs.python.org/issue13942
# pylint: disable=protected-access,unidiomatic-typecheck
if type(temp_socket) == socket._socket.socket:
# pylint: disable=no-member
sock = socket._socketobject(_sock=temp_socket)
else:
sock = temp_socket
# fromfd() duplicates the file descriptor but doesn't retain it's blocking
# non-blocking attribute, so mark the socket as non-blocking even though
# weechat already did that for us
sock.setblocking(False)
message = "{prefix}matrix: Doing SSL handshake...".format(
prefix=W.prefix("network"))
W.prnt(server.server_buffer, message)
ssl_socket = server.ssl_context.wrap_socket(
sock, do_handshake_on_connect=False,
server_hostname=server.address) # type: ssl.SSLSocket
server.socket = ssl_socket
try_ssl_handshake(server)
@utf8_decode
def ssl_fd_cb(server_name, file_descriptor):
server = SERVERS[server_name]
if server.ssl_hook:
W.unhook(server.ssl_hook)
server.ssl_hook = None
try_ssl_handshake(server)
return W.WEECHAT_RC_OK
def try_ssl_handshake(server):
sock = server.socket
while True:
try:
sock.do_handshake()
cipher = sock.cipher()
cipher_message = ("{prefix}matrix: Connected using {tls}, and "
"{bit} bit {cipher} cipher suite.").format(
prefix=W.prefix("network"),
tls=cipher[1],
bit=cipher[2],
cipher=cipher[0])
W.prnt(server.server_buffer, cipher_message)
cert = sock.getpeercert()
if cert:
print_certificate_info(server.server_buffer, sock, cert)
finalize_connection(server)
return True
except ssl.SSLWantReadError:
hook = W.hook_fd(server.socket.fileno(), 1, 0, 0, "ssl_fd_cb",
server.name)
server.ssl_hook = hook
return False
except ssl.SSLWantWriteError:
hook = W.hook_fd(server.socket.fileno(), 0, 1, 0, "ssl_fd_cb",
server.name)
server.ssl_hook = hook
return False
except (ssl.SSLError, ssl.CertificateError, socket.error) as error:
try:
str_error = error.reason if error.reason else "Unknown error"
except AttributeError:
str_error = str(error)
message = ("{prefix}Error while doing SSL handshake"
": {error}").format(
prefix=W.prefix("network"), error=str_error)
server_buffer_prnt(server, message)
server_buffer_prnt(
server, ("{prefix}matrix: disconnecting from server..."
).format(prefix=W.prefix("network")))
server.disconnect()
return False
@utf8_decode
def receive_cb(server_name, file_descriptor):
server = SERVERS[server_name]
while True:
try:
data = server.socket.recv(4096)
except ssl.SSLWantReadError:
break
except socket.error as error:
errno = "error" + str(error.errno) + " " if error.errno else ""
str_error = error.strerror if error.strerror else "Unknown error"
str_error = errno + str_error
message = ("{prefix}Error while reading from "
"socket: {error}").format(
prefix=W.prefix("network"), error=str_error)
server_buffer_prnt(server, message)
server_buffer_prnt(
server, ("{prefix}matrix: disconnecting from server..."
).format(prefix=W.prefix("network")))
server.disconnect()
return W.WEECHAT_RC_OK
if not data:
server_buffer_prnt(
server,
"{prefix}matrix: Error while reading from socket".format(
prefix=W.prefix("network")))
server_buffer_prnt(
server, ("{prefix}matrix: disconnecting from server..."
).format(prefix=W.prefix("network")))
server.disconnect()
break
try:
server.client.receive(data)
except (RemoteTransportError, RemoteProtocolError) as e:
server.error(str(e))
server.disconnect()
break
response = server.client.next_response(MAX_EVENTS)
# Check if we need to send some data back
data_to_send = server.client.data_to_send()
if data_to_send:
server.send(data_to_send)
if response:
server.handle_response(response)
break
return W.WEECHAT_RC_OK
def finalize_connection(server):
hook = W.hook_fd(
server.socket.fileno(),
1,
0,
0,
"receive_cb",
server.name
)
server.fd_hook = hook
server.connected = True
server.connecting = False
server.reconnect_delay = 0
negotiated_protocol = (server.socket.selected_alpn_protocol() or
server.socket.selected_npn_protocol())
if negotiated_protocol == "h2":
server.transport_type = TransportType.HTTP2
else:
server.transport_type = TransportType.HTTP
data = server.client.connect(server.transport_type)
server.send(data)
server.login()
@utf8_decode
def connect_cb(data, status, gnutls_rc, sock, error, ip_address):
# pylint: disable=too-many-arguments,too-many-branches
status_value = int(status) # type: int
server = SERVERS[data]
if status_value == W.WEECHAT_HOOK_CONNECT_OK:
file_descriptor = int(sock) # type: int
server.numeric_address = ip_address
server_buffer_set_title(server)
wrap_socket(server, file_descriptor)
return W.WEECHAT_RC_OK
elif status_value == W.WEECHAT_HOOK_CONNECT_ADDRESS_NOT_FOUND:
server.error('{address} not found'.format(address=ip_address))
elif status_value == W.WEECHAT_HOOK_CONNECT_IP_ADDRESS_NOT_FOUND:
server.error('IP address not found')
elif status_value == W.WEECHAT_HOOK_CONNECT_CONNECTION_REFUSED:
server.error('Connection refused')
elif status_value == W.WEECHAT_HOOK_CONNECT_PROXY_ERROR:
server.error('Proxy fails to establish connection to server')
elif status_value == W.WEECHAT_HOOK_CONNECT_LOCAL_HOSTNAME_ERROR:
server.error('Unable to set local hostname')
elif status_value == W.WEECHAT_HOOK_CONNECT_GNUTLS_INIT_ERROR:
server.error('TLS init error')
elif status_value == W.WEECHAT_HOOK_CONNECT_GNUTLS_HANDSHAKE_ERROR:
server.error('TLS Handshake failed')
elif status_value == W.WEECHAT_HOOK_CONNECT_MEMORY_ERROR:
server.error('Not enough memory')
elif status_value == W.WEECHAT_HOOK_CONNECT_TIMEOUT:
server.error('Timeout')
elif status_value == W.WEECHAT_HOOK_CONNECT_SOCKET_ERROR:
server.error('Unable to create socket')
else:
server.error('Unexpected error: {status}'.format(status=status_value))
server.disconnect(reconnect=True)
return W.WEECHAT_RC_OK
@utf8_decode
def room_close_cb(data, buffer):
W.prnt("",
"Buffer '%s' will be closed!" % W.buffer_get_string(buffer, "name"))
return W.WEECHAT_RC_OK
@utf8_decode
def matrix_unload_cb():
for server in SERVERS.values():
server.config.free()
G.CONFIG.free()
# for server in SERVERS.values():
# server.store_olm()
return W.WEECHAT_RC_OK
def autoconnect(servers):
for server in servers.values():
if server.config.autoconnect:
server.connect()
def debug_buffer_close_cb(data, buffer):
G.CONFIG.debug_buffer = ""
return W.WEECHAT_RC_OK
def server_buffer_cb(server_name, buffer, input_data):
message = ("{}{}: this buffer is not a room buffer!").format(
W.prefix("error"), SCRIPT_NAME)
W.prnt(buffer, message)
return W.WEECHAT_RC_OK
class WeechatHandler(StreamHandler):
def __init__(self, level=logbook.NOTSET, format_string=None, filter=None,
bubble=False):
StreamHandler.__init__(
self,
object(),
level,
format_string,
None,
filter,
bubble
)
def write(self, item):
buf = ""
if G.CONFIG.network.debug_buffer:
if not G.CONFIG.debug_buffer:
G.CONFIG.debug_buffer = W.buffer_new(
"Matrix Debug", "", "", "debug_buffer_close_cb", "")
buf = G.CONFIG.debug_buffer
W.prnt(buf, item)
def buffer_switch_cb(_, _signal, buffer_ptr):
"""Do some buffer operations when we switch buffers.
This function is called every time we switch a buffer. The pointer of
the new buffer is given to us by weechat.
If it is one of our room buffers we check if the members for the room
aren't fetched and fetch them now if they aren't.
Read receipts are send out from here as well.
"""
for server in SERVERS.values():
if buffer_ptr == server.server_buffer:
return W.WEECHAT_RC_OK
if buffer_ptr not in server.buffers.values():
continue
room_buffer = server.find_room_from_ptr(buffer_ptr)
if not room_buffer:
continue
if room_buffer.should_send_read_marker:
event_id = room_buffer.last_event_id
# A buffer may not have any events, in that case no event id is
# here returned
if event_id:
server.room_send_read_marker(
room_buffer.room.room_id, event_id)
room_buffer.last_read_event = event_id
if room_buffer.members_fetched:
return W.WEECHAT_RC_OK
room_id = room_buffer.room.room_id
server.get_joined_members(room_id)
break
return W.WEECHAT_RC_OK
def typing_notification_cb(data, signal, buffer_ptr):
"""Send out typing notifications if the user is typing.
This function is called every time the input text is changed.
It checks if we are on a buffer we own, and if we are sends out a typing
notification if the room is configured to send them out.
"""
for server in SERVERS.values():
room_buffer = server.find_room_from_ptr(buffer_ptr)
if room_buffer:
server.room_send_typing_notice(room_buffer)
return W.WEECHAT_RC_OK
if buffer_ptr == server.server_buffer:
return W.WEECHAT_RC_OK
return W.WEECHAT_RC_OK
def buffer_command_cb(data, _, command):
"""Override the buffer command to allow switching buffers by short name."""
command = command[7:].strip()
buffer_subcommands = ["list", "add", "clear", "move", "swap", "cycle",
"merge", "unmerge", "hide", "unhide", "renumber",
"close", "notify", "localvar", "set", "get"]
if not command:
return W.WEECHAT_RC_OK
command_words = command.split()
if len(command_words) >= 1:
if command_words[0] in buffer_subcommands:
return W.WEECHAT_RC_OK
elif command_words[0].startswith("*"):
return W.WEECHAT_RC_OK
try:
int(command_words[0])
return W.WEECHAT_RC_OK
except ValueError:
pass
room_buffers = []
for server in SERVERS.values():
room_buffers.extend(server.room_buffers.values())
sorted_buffers = sorted(
room_buffers,
key=lambda b: b.weechat_buffer.number
)
for room_buffer in sorted_buffers:
buffer = room_buffer.weechat_buffer
if command in buffer.short_name:
displayed = W.current_buffer() == buffer._ptr
if displayed:
continue
W.buffer_set(buffer._ptr, 'display', '1')
return W.WEECHAT_RC_OK_EAT
return W.WEECHAT_RC_OK
if __name__ == "__main__":
if W.register(WEECHAT_SCRIPT_NAME, WEECHAT_SCRIPT_AUTHOR,
WEECHAT_SCRIPT_VERSION, WEECHAT_SCRIPT_LICENSE,
WEECHAT_SCRIPT_DESCRIPTION, 'matrix_unload_cb', ''):
if not W.mkdir_home("matrix", 0o700):
message = ("{prefix}matrix: Error creating session "
"directory").format(prefix=W.prefix("error"))
W.prnt("", message)
handler = WeechatHandler()
handler.format_string = "{record.channel}: {record.message}"
handler.push_application()
# TODO if this fails we should abort and unload the script.
G.CONFIG = MatrixConfig()
G.CONFIG.read()
hook_commands()
init_bar_items()
init_completion()
W.hook_command_run("/buffer", "buffer_command_cb", "")
W.hook_signal("buffer_switch", "buffer_switch_cb", "")
W.hook_signal("input_text_changed", "typing_notification_cb", "")
if not SERVERS:
create_default_server(G.CONFIG)
autoconnect(SERVERS)

@ -0,0 +1,248 @@
import datetime
import random
import string
WEECHAT_BASE_COLORS = {
"black": "0",
"red": "1",
"green": "2",
"brown": "3",
"blue": "4",
"magenta": "5",
"cyan": "6",
"default": "7",
"gray": "8",
"lightred": "9",
"lightgreen": "10",
"yellow": "11",
"lightblue": "12",
"lightmagenta": "13",
"lightcyan": "14",
"white": "15"
}
class MockObject(object):
pass
class MockConfig(object):
config_template = {
'debug_buffer': None,
'debug_category': None,
'_ptr': None,
'read': None,
'free': None,
'page_up_hook': None,
'color': {
'error_message_bg': "",
'error_message_fg': "",
'quote_bg': "",
'quote_fg': "",
'unconfirmed_message_bg': "",
'unconfirmed_message_fg': "",
'untagged_code_bg': "",
'untagged_code_fg': "",
},
'upload_buffer': {
'display': None,
'move_line_down': None,
'move_line_up': None,
'render': None,
},
'look': {
'bar_item_typing_notice_prefix': None,
'busy_sign': None,
'code_block_margin': None,
'code_blocks': None,
'disconnect_sign': None,
'encrypted_room_sign': None,
'encryption_warning_sign': None,
'max_typing_notice_item_length': None,
'pygments_style': None,
'redactions': None,
'server_buffer': None,
},
'network': {
'debug_buffer': None,
'debug_category': None,
'debug_level': None,
'fetch_backlog_on_pgup': None,
'lag_min_show': None,
'lag_reconnect': None,
'lazy_load_room_users': None,
'max_initial_sync_events': None,
'max_nicklist_users': None,
'print_unconfirmed_messages': None,
'read_markers_conditions': None,
'typing_notice_conditions': None,
},
}
def __init__(self):
for category, options in MockConfig.config_template.items():
if options:
category_object = MockObject()
for option, value in options.items():
setattr(category_object, option, value)
else:
category_object = options
setattr(self, category, category_object)
def color(color_name):
# type: (str) -> str
# yapf: disable
escape_codes = []
reset_code = "0"
def make_fg_color(color_code):
return "38;5;{}".format(color_code)
def make_bg_color(color_code):
return "48;5;{}".format(color_code)
attributes = {
"bold": "1",
"-bold": "21",
"reverse": "27",
"-reverse": "21",
"italic": "3",
"-italic": "23",
"underline": "4",
"-underline": "24",
"reset": "0",
"resetcolor": "39"
}
short_attributes = {
"*": "1",
"!": "27",
"/": "3",
"_": "4"
}
colors = color_name.split(",", 2)
fg_color = colors.pop(0)
bg_color = colors.pop(0) if colors else ""
if fg_color in attributes:
escape_codes.append(attributes[fg_color])
else:
chars = list(fg_color)
for char in chars:
if char in short_attributes:
escape_codes.append(short_attributes[char])
elif char == "|":
reset_code = ""
else:
break
stripped_color = fg_color.lstrip("*_|/!")
if stripped_color in WEECHAT_BASE_COLORS:
escape_codes.append(
make_fg_color(WEECHAT_BASE_COLORS[stripped_color]))
elif stripped_color.isdigit():
num_color = int(stripped_color)
if 0 <= num_color < 256:
escape_codes.append(make_fg_color(stripped_color))
if bg_color in WEECHAT_BASE_COLORS:
escape_codes.append(make_bg_color(WEECHAT_BASE_COLORS[bg_color]))
else:
if bg_color.isdigit():
num_color = int(bg_color)
if 0 <= num_color < 256:
escape_codes.append(make_bg_color(bg_color))
escape_string = "\033[{}{}m".format(reset_code, ";".join(escape_codes))
return escape_string
def prefix(prefix_string):
prefix_to_symbol = {
"error": "=!=",
"network": "--",
"action": "*",
"join": "-->",
"quit": "<--"
}
if prefix_string in prefix_to_symbol:
return prefix_to_symbol[prefix]
return ""
def prnt(_, message):
print(message)
def prnt_date_tags(_, date, tags_string, data):
message = "{} {} [{}]".format(
datetime.datetime.fromtimestamp(date),
data,
tags_string
)
print(message)
def config_search_section(*_, **__):
pass
def config_new_option(*_, **__):
pass
def mkdir_home(*_, **__):
return True
def info_get(info, *_):
if info == "nick_color_name":
return random.choice(list(WEECHAT_BASE_COLORS.keys()))
return ""
def buffer_new(*_, **__):
return "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(8)
)
def buffer_set(*_, **__):
return
def buffer_get_string(_ptr, property):
if property == "localvar_type":
return "channel"
return ""
def nicklist_add_group(*_, **__):
return
def nicklist_add_nick(*_, **__):
return
def nicklist_remove_nick(*_, **__):
return
def nicklist_search_nick(*args, **kwargs):
return buffer_new(args, kwargs)
def string_remove_color(message, _):
return message

@ -0,0 +1,202 @@
# -*- coding: utf-8 -*-
# Copyright © 2018, 2019 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
from . import globals as G
from .globals import SERVERS, W
from .utf import utf8_decode
@utf8_decode
def matrix_bar_item_plugin(data, item, window, buffer, extra_info):
# pylint: disable=unused-argument
for server in SERVERS.values():
if buffer in server.buffers.values() or buffer == server.server_buffer:
return "matrix{color}/{color_fg}{name}".format(
color=W.color("bar_delim"),
color_fg=W.color("bar_fg"),
name=server.name,
)
ptr_plugin = W.buffer_get_pointer(buffer, "plugin")
name = W.plugin_get_name(ptr_plugin)
return name
@utf8_decode
def matrix_bar_item_name(data, item, window, buffer, extra_info):
# pylint: disable=unused-argument
for server in SERVERS.values():
if buffer in server.buffers.values():
color = (
"status_name_ssl"
if server.ssl_context.check_hostname
else "status_name"
)
room_buffer = server.find_room_from_ptr(buffer)
room = room_buffer.room
return "{color}{name}".format(
color=W.color(color), name=room.display_name
)
if buffer == server.server_buffer:
color = (
"status_name_ssl"
if server.ssl_context.check_hostname
else "status_name"
)
return "{color}server{del_color}[{color}{name}{del_color}]".format(
color=W.color(color),
del_color=W.color("bar_delim"),
name=server.name,
)
name = W.buffer_get_string(buffer, "name")
return "{}{}".format(W.color("status_name"), name)
@utf8_decode
def matrix_bar_item_lag(data, item, window, buffer, extra_info):
# pylint: disable=unused-argument
for server in SERVERS.values():
if buffer in server.buffers.values() or buffer == server.server_buffer:
if server.lag >= G.CONFIG.network.lag_min_show:
color = W.color("irc.color.item_lag_counting")
if server.lag_done:
color = W.color("irc.color.item_lag_finished")
lag = "{0:.3f}" if round(server.lag) < 1000 else "{0:.0f}"
lag_string = "Lag: {color}{lag}{ncolor}".format(
lag=lag.format((server.lag / 1000)),
color=color,
ncolor=W.color("reset"),
)
return lag_string
return ""
return ""
@utf8_decode
def matrix_bar_item_buffer_modes(data, item, window, buffer, extra_info):
# pylint: disable=unused-argument
for server in SERVERS.values():
if buffer in server.buffers.values():
room_buffer = server.find_room_from_ptr(buffer)
room = room_buffer.room
modes = []
if room.encrypted:
modes.append(G.CONFIG.look.encrypted_room_sign)
if (server.client
and server.client.room_contains_unverified(room.room_id)):
modes.append(G.CONFIG.look.encryption_warning_sign)
if not server.connected:
modes.append(G.CONFIG.look.disconnect_sign)
if room_buffer.backlog_pending or server.busy:
modes.append(G.CONFIG.look.busy_sign)
return "".join(modes)
return ""
@utf8_decode
def matrix_bar_nicklist_count(data, item, window, buffer, extra_info):
# pylint: disable=unused-argument
color = W.color("status_nicklist_count")
for server in SERVERS.values():
if buffer in server.buffers.values():
room_buffer = server.find_room_from_ptr(buffer)
room = room_buffer.room
return "{}{}".format(color, room.member_count)
nicklist_enabled = bool(W.buffer_get_integer(buffer, "nicklist"))
if nicklist_enabled:
nick_count = W.buffer_get_integer(buffer, "nicklist_visible_count")
return "{}{}".format(color, nick_count)
return ""
@utf8_decode
def matrix_bar_typing_notices_cb(data, item, window, buffer, extra_info):
"""Update a status bar item showing users currently typing.
This function is called by weechat every time a buffer is switched or
W.bar_item_update(<item>) is explicitly called. The bar item shows
currently typing users for the current buffer."""
# pylint: disable=unused-argument
for server in SERVERS.values():
if buffer in server.buffers.values():
room_buffer = server.find_room_from_ptr(buffer)
room = room_buffer.room
if room.typing_users:
nicks = []
for user_id in room.typing_users:
if user_id == room.own_user_id:
continue
nick = room_buffer.displayed_nicks.get(user_id, user_id)
nicks.append(nick)
if not nicks:
return ""
msg = "{}{}".format(
G.CONFIG.look.bar_item_typing_notice_prefix,
", ".join(sorted(nicks))
)
max_len = G.CONFIG.look.max_typing_notice_item_length
if len(msg) > max_len:
msg[:max_len - 3] + "..."
return msg
return ""
return ""
def init_bar_items():
W.bar_item_new("(extra)buffer_plugin", "matrix_bar_item_plugin", "")
W.bar_item_new("(extra)buffer_name", "matrix_bar_item_name", "")
W.bar_item_new("(extra)lag", "matrix_bar_item_lag", "")
W.bar_item_new(
"(extra)buffer_nicklist_count",
"matrix_bar_nicklist_count",
""
)
W.bar_item_new(
"(extra)matrix_typing_notice",
"matrix_bar_typing_notices_cb",
""
)
W.bar_item_new("(extra)buffer_modes", "matrix_bar_item_buffer_modes", "")
W.bar_item_new("(extra)matrix_modes", "matrix_bar_item_buffer_modes", "")

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -0,0 +1,366 @@
# -*- coding: utf-8 -*-
# Copyright © 2018, 2019 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
from typing import List, Optional
from matrix.globals import SERVERS, W, SCRIPT_NAME
from matrix.utf import utf8_decode
from matrix.utils import tags_from_line_data
from nio import LocalProtocolError
def add_servers_to_completion(completion):
for server_name in SERVERS:
W.hook_completion_list_add(
completion, server_name, 0, W.WEECHAT_LIST_POS_SORT
)
@utf8_decode
def matrix_server_command_completion_cb(
data, completion_item, buffer, completion
):
buffer_input = W.buffer_get_string(buffer, "input").split()
args = buffer_input[1:]
commands = ["add", "delete", "list", "listfull"]
def complete_commands():
for command in commands:
W.hook_completion_list_add(
completion, command, 0, W.WEECHAT_LIST_POS_SORT
)
if len(args) == 1:
complete_commands()
elif len(args) == 2:
if args[1] not in commands:
complete_commands()
else:
if args[1] == "delete" or args[1] == "listfull":
add_servers_to_completion(completion)
elif len(args) == 3:
if args[1] == "delete" or args[1] == "listfull":
if args[2] not in SERVERS:
add_servers_to_completion(completion)
return W.WEECHAT_RC_OK
@utf8_decode
def matrix_server_completion_cb(data, completion_item, buffer, completion):
add_servers_to_completion(completion)
return W.WEECHAT_RC_OK
@utf8_decode
def matrix_command_completion_cb(data, completion_item, buffer, completion):
for command in [
"connect",
"disconnect",
"reconnect",
"server",
"help",
"debug",
]:
W.hook_completion_list_add(
completion, command, 0, W.WEECHAT_LIST_POS_SORT
)
return W.WEECHAT_RC_OK
@utf8_decode
def matrix_debug_completion_cb(data, completion_item, buffer, completion):
for debug_type in ["messaging", "network", "timing"]:
W.hook_completion_list_add(
completion, debug_type, 0, W.WEECHAT_LIST_POS_SORT
)
return W.WEECHAT_RC_OK
# TODO this should be configurable
REDACTION_COMP_LEN = 50
@utf8_decode
def matrix_message_completion_cb(data, completion_item, buffer, completion):
max_events = 500
def redacted_or_not_message(tags):
# type: (List[str]) -> bool
if SCRIPT_NAME + "_redacted" in tags:
return True
if SCRIPT_NAME + "_message" not in tags:
return True
return False
def event_id_from_tags(tags):
# type: (List[str]) -> Optional[str]
for tag in tags:
if tag.startswith("matrix_id"):
event_id = tag[10:]
return event_id
return None
for server in SERVERS.values():
if buffer in server.buffers.values():
room_buffer = server.find_room_from_ptr(buffer)
lines = room_buffer.weechat_buffer.lines
added = 0
for line in lines:
tags = line.tags
if redacted_or_not_message(tags):
continue
event_id = event_id_from_tags(tags)
if not event_id:
continue
message = line.message
if len(message) > REDACTION_COMP_LEN + 2:
message = message[:REDACTION_COMP_LEN] + ".."
item = ('{event_id}:"{message}"').format(
event_id=event_id, message=message
)
W.hook_completion_list_add(
completion, item, 0, W.WEECHAT_LIST_POS_END
)
added += 1
if added >= max_events:
break
return W.WEECHAT_RC_OK
return W.WEECHAT_RC_OK
def server_from_buffer(buffer):
for server in SERVERS.values():
if buffer in server.buffers.values():
return server
if buffer == server.server_buffer:
return server
return None
@utf8_decode
def matrix_olm_user_completion_cb(data, completion_item, buffer, completion):
server = server_from_buffer(buffer)
if not server:
return W.WEECHAT_RC_OK
try:
device_store = server.client.device_store
except LocalProtocolError:
return W.WEECHAT_RC_OK
for user in device_store.users:
W.hook_completion_list_add(
completion, user, 0, W.WEECHAT_LIST_POS_SORT
)
return W.WEECHAT_RC_OK
@utf8_decode
def matrix_olm_device_completion_cb(data, completion_item, buffer, completion):
server = server_from_buffer(buffer)
if not server:
return W.WEECHAT_RC_OK
try:
device_store = server.client.device_store
except LocalProtocolError:
return W.WEECHAT_RC_OK
args = W.hook_completion_get_string(completion, "args")
fields = args.split()
if len(fields) < 2:
return W.WEECHAT_RC_OK
user = fields[-1]
if user not in device_store.users:
return W.WEECHAT_RC_OK
for device in device_store.active_user_devices(user):
W.hook_completion_list_add(
completion, device.id, 0, W.WEECHAT_LIST_POS_SORT
)
return W.WEECHAT_RC_OK
@utf8_decode
def matrix_own_devices_completion_cb(
data,
completion_item,
buffer,
completion
):
server = server_from_buffer(buffer)
if not server:
return W.WEECHAT_RC_OK
olm = server.client.olm
if not olm:
return W.WEECHAT_RC_OK
W.hook_completion_list_add(
completion, olm.device_id, 0, W.WEECHAT_LIST_POS_SORT
)
user = olm.user_id
if user not in olm.device_store.users:
return W.WEECHAT_RC_OK
for device in olm.device_store.active_user_devices(user):
W.hook_completion_list_add(
completion, device.id, 0, W.WEECHAT_LIST_POS_SORT
)
return W.WEECHAT_RC_OK
@utf8_decode
def matrix_user_completion_cb(data, completion_item, buffer, completion):
def add_user(completion, user):
W.hook_completion_list_add(
completion, user, 0, W.WEECHAT_LIST_POS_SORT
)
for server in SERVERS.values():
if buffer == server.server_buffer:
return W.WEECHAT_RC_OK
room_buffer = server.find_room_from_ptr(buffer)
if not room_buffer:
continue
users = room_buffer.room.users
users = [user[1:] for user in users]
for user in users:
add_user(completion, user)
return W.WEECHAT_RC_OK
@utf8_decode
def matrix_room_completion_cb(data, completion_item, buffer, completion):
"""Completion callback for matrix room names."""
for server in SERVERS.values():
for room_buffer in server.room_buffers.values():
name = room_buffer.weechat_buffer.short_name
W.hook_completion_list_add(
completion, name, 0, W.WEECHAT_LIST_POS_SORT
)
return W.WEECHAT_RC_OK
def init_completion():
W.hook_completion(
"matrix_server_commands",
"Matrix server completion",
"matrix_server_command_completion_cb",
"",
)
W.hook_completion(
"matrix_servers",
"Matrix server completion",
"matrix_server_completion_cb",
"",
)
W.hook_completion(
"matrix_commands",
"Matrix command completion",
"matrix_command_completion_cb",
"",
)
W.hook_completion(
"matrix_messages",
"Matrix message completion",
"matrix_message_completion_cb",
"",
)
W.hook_completion(
"matrix_debug_types",
"Matrix debugging type completion",
"matrix_debug_completion_cb",
"",
)
W.hook_completion(
"olm_user_ids",
"Matrix olm user id completion",
"matrix_olm_user_completion_cb",
"",
)
W.hook_completion(
"olm_devices",
"Matrix olm device id completion",
"matrix_olm_device_completion_cb",
"",
)
W.hook_completion(
"matrix_users",
"Matrix user id completion",
"matrix_user_completion_cb",
"",
)
W.hook_completion(
"matrix_own_devices",
"Matrix own devices completion",
"matrix_own_devices_completion_cb",
"",
)
W.hook_completion(
"matrix_rooms",
"Matrix room name completion",
"matrix_room_completion_cb",
"",
)

@ -0,0 +1,807 @@
# -*- coding: utf-8 -*-
# Copyright © 2018, 2019 Damir Jelić <poljar@termina.org.uk>
# Copyright © 2018, 2019 Denis Kasak <dkasak@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""weechat-matrix Configuration module.
This module contains abstractions on top of weechats configuration files and
the main script configuration class.
To add configuration options refer to MatrixConfig.
Server specific configuration options are handled in server.py
"""
from builtins import super
from collections import namedtuple
from enum import Enum, unique
import logbook
import nio
from matrix.globals import SCRIPT_NAME, SERVERS, W
from matrix.utf import utf8_decode
from . import globals as G
@unique
class RedactType(Enum):
STRIKETHROUGH = 0
NOTICE = 1
DELETE = 2
@unique
class ServerBufferType(Enum):
MERGE_CORE = 0
MERGE = 1
INDEPENDENT = 2
nio.logger_group.level = logbook.ERROR
class Option(
namedtuple(
"Option",
[
"name",
"type",
"string_values",
"min",
"max",
"value",
"description",
"cast_func",
"change_callback",
],
)
):
"""A class representing a new configuration option.
An option object is consumed by the ConfigSection class adding
configuration options to weechat.
"""
__slots__ = ()
def __new__(
cls,
name,
type,
string_values,
min,
max,
value,
description,
cast=None,
change_callback=None,
):
"""
Parameters:
name (str): Name of the configuration option
type (str): Type of the configuration option, can be one of the
supported weechat types: string, boolean, integer, color
string_values: (str): A list of string values that the option can
accept seprated by |
min (int): Minimal value of the option, only used if the type of
the option is integer
max (int): Maximal value of the option, only used if the type of
the option is integer
description (str): Description of the configuration option
cast (callable): A callable function taking a single value and
returning a modified value. Useful to turn the configuration
option into an enum while reading it.
change_callback(callable): A function that will be called
by weechat every time the configuration option is changed.
"""
return super().__new__(
cls,
name,
type,
string_values,
min,
max,
value,
description,
cast,
change_callback,
)
@utf8_decode
def matrix_config_reload_cb(data, config_file):
return W.WEECHAT_RC_OK
def change_log_level(category, level):
"""Change the log level of the underlying nio lib
Called every time the user changes the log level or log category
configuration option."""
if category == "all":
nio.logger_group.level = level
elif category == "http":
nio.http.logger.level = level
elif category == "client":
nio.client.logger.level = level
elif category == "events":
nio.events.logger.level = level
elif category == "responses":
nio.responses.logger.level = level
elif category == "encryption":
nio.crypto.logger.level = level
@utf8_decode
def config_server_buffer_cb(data, option):
"""Callback for the look.server_buffer option.
Is called when the option is changed and merges/splits the server
buffer"""
for server in SERVERS.values():
server.buffer_merge()
return 1
@utf8_decode
def config_log_level_cb(data, option):
"""Callback for the network.debug_level option."""
change_log_level(
G.CONFIG.network.debug_category, G.CONFIG.network.debug_level
)
return 1
@utf8_decode
def config_log_category_cb(data, option):
"""Callback for the network.debug_category option."""
change_log_level(G.CONFIG.debug_category, logbook.ERROR)
G.CONFIG.debug_category = G.CONFIG.network.debug_category
change_log_level(
G.CONFIG.network.debug_category, G.CONFIG.network.debug_level
)
return 1
@utf8_decode
def config_pgup_cb(data, option):
"""Callback for the network.fetch_backlog_on_pgup option.
Enables or disables the hook that is run when /window page_up is called"""
if G.CONFIG.network.fetch_backlog_on_pgup:
if not G.CONFIG.page_up_hook:
G.CONFIG.page_up_hook = W.hook_command_run(
"/window page_up", "matrix_command_pgup_cb", ""
)
else:
if G.CONFIG.page_up_hook:
W.unhook(G.CONFIG.page_up_hook)
G.CONFIG.page_up_hook = None
return 1
def level_to_logbook(value):
if value == 0:
return logbook.ERROR
if value == 1:
return logbook.WARNING
if value == 2:
return logbook.INFO
if value == 3:
return logbook.DEBUG
return logbook.ERROR
def logbook_category(value):
if value == 0:
return "all"
if value == 1:
return "http"
if value == 2:
return "client"
if value == 3:
return "events"
if value == 4:
return "responses"
if value == 5:
return "encryption"
return "all"
def eval_cast(string):
"""A function that passes a string to weechat which evaluates it using its
expression evaluation syntax.
Can only be used with strings, useful for passwords or options that contain
a formatted string to e.g. add colors.
More info here:
https://weechat.org/files/doc/stable/weechat_plugin_api.en.html#_string_eval_expression"""
return W.string_eval_expression(string, {}, {}, {})
class WeechatConfig(object):
"""A class representing a weechat configuration file
Wraps weechats configuration creation functionality"""
def __init__(self, sections):
"""Create a new weechat configuration file, expects the global
SCRIPT_NAME to be defined and a reload callback
Parameters:
sections (List[Tuple[str, List[Option]]]): List of config sections
that will be created for the configuration file.
"""
self._ptr = W.config_new(
SCRIPT_NAME, SCRIPT_NAME + "_config_reload_cb", ""
)
for section in sections:
name, options = section
section_class = ConfigSection.build(name, options)
setattr(self, name, section_class(name, self._ptr, options))
def free(self):
"""Free all the config sections and their options as well as the
configuration file. Should be called when the script is unloaded."""
for section in [
getattr(self, a)
for a in dir(self)
if isinstance(getattr(self, a), ConfigSection)
]:
section.free()
W.config_free(self._ptr)
def read(self):
"""Read the config file"""
return_code = W.config_read(self._ptr)
if return_code == W.WEECHAT_CONFIG_READ_OK:
return True
if return_code == W.WEECHAT_CONFIG_READ_MEMORY_ERROR:
return False
if return_code == W.WEECHAT_CONFIG_READ_FILE_NOT_FOUND:
return True
return False
class ConfigSection(object):
"""A class representing a weechat config section.
Should not be used on its own, the WeechatConfig class uses this to build
config sections."""
@classmethod
def build(cls, name, options):
def constructor(self, name, config_ptr, options):
self._ptr = W.config_new_section(
config_ptr, name, 0, 0, "", "", "", "", "", "", "", "", "", ""
)
self._config_ptr = config_ptr
self._option_ptrs = {}
for option in options:
self._add_option(option)
attributes = {
option.name: cls.option_property(
option.name, option.type, cast_func=option.cast_func
)
for option in options
}
attributes["__init__"] = constructor
section_class = type(name.title() + "Section", (cls,), attributes)
return section_class
def free(self):
W.config_section_free_options(self._ptr)
W.config_section_free(self._ptr)
def _add_option(self, option):
cb = option.change_callback.__name__ if option.change_callback else ""
option_ptr = W.config_new_option(
self._config_ptr,
self._ptr,
option.name,
option.type,
option.description,
option.string_values,
option.min,
option.max,
option.value,
option.value,
0,
"",
"",
cb,
"",
"",
"",
)
self._option_ptrs[option.name] = option_ptr
@staticmethod
def option_property(name, option_type, evaluate=False, cast_func=None):
"""Create a property for this class that makes the reading of config
option values pythonic. The option will be available as a property with
the name of the option.
If a cast function was defined for the option the property will pass
the option value to the cast function and return its result."""
def bool_getter(self):
return bool(W.config_boolean(self._option_ptrs[name]))
def str_getter(self):
if cast_func:
return cast_func(W.config_string(self._option_ptrs[name]))
return W.config_string(self._option_ptrs[name])
def str_evaluate_getter(self):
return W.string_eval_expression(
W.config_string(self._option_ptrs[name]), {}, {}, {}
)
def int_getter(self):
if cast_func:
return cast_func(W.config_integer(self._option_ptrs[name]))
return W.config_integer(self._option_ptrs[name])
if option_type in ("string", "color"):
if evaluate:
return property(str_evaluate_getter)
return property(str_getter)
if option_type == "boolean":
return property(bool_getter)
if option_type == "integer":
return property(int_getter)
class MatrixConfig(WeechatConfig):
"""Main matrix configuration file.
This class defines all the global matrix configuration options.
New global options should be added to the constructor of this class under
the appropriate section.
There are three main sections defined:
Look: This section is for options that change the way matrix messages
are shown or the way the buffers are shown.
Color: This section should mainly be for color options, options that
change color schemes or themes should go to the look section.
Network: This section is for options that change the way the script
behaves, e.g. the way it communicates with the server, it handles
responses or any other behavioural change that doesn't fit in the
previous sections.
There is a special section called server defined which contains per server
configuration options. Server options aren't defined here, they need to be
added in server.py
"""
def __init__(self):
self.debug_buffer = ""
self.upload_buffer = ""
self.debug_category = "all"
self.page_up_hook = None
self.human_buffer_names = None
look_options = [
Option(
"redactions",
"integer",
"strikethrough|notice|delete",
0,
0,
"strikethrough",
(
"Only notice redactions, strike through or delete "
"redacted messages"
),
RedactType,
),
Option(
"server_buffer",
"integer",
"merge_with_core|merge_without_core|independent",
0,
0,
"merge_with_core",
"Merge server buffers",
ServerBufferType,
config_server_buffer_cb,
),
Option(
"max_typing_notice_item_length",
"integer",
"",
10,
1000,
"50",
("Limit the length of the typing notice bar item."),
),
Option(
"bar_item_typing_notice_prefix",
"string",
"",
0,
0,
"Typing: ",
("Prefix for the typing notice bar item."),
),
Option(
"encryption_warning_sign",
"string",
"",
0,
0,
"⚠️ ",
("A sign that is used to signal trust issues in encrypted "
"rooms (note: content is evaluated, see /help eval)"),
eval_cast,
),
Option(
"busy_sign",
"string",
"",
0,
0,
"",
("A sign that is used to signal that the client is busy e.g. "
"when the room backlog is fetching"
" (note: content is evaluated, see /help eval)"),
eval_cast,
),
Option(
"encrypted_room_sign",
"string",
"",
0,
0,
"🔐",
("A sign that is used to show that the current room is "
"encrypted "
"(note: content is evaluated, see /help eval)"),
eval_cast,
),
Option(
"disconnect_sign",
"string",
"",
0,
0,
"",
("A sign that is used to show that the server is disconnected "
"(note: content is evaluated, see /help eval)"),
eval_cast,
),
Option(
"pygments_style",
"string",
"",
0,
0,
"native",
"Pygments style to use for highlighting source code blocks",
),
Option(
"code_blocks",
"boolean",
"",
0,
0,
"on",
("Display preformatted code blocks as rectangular areas by "
"padding them with whitespace up to the length of the longest"
" line (with optional margin)"),
),
Option(
"code_block_margin",
"integer",
"",
0,
100,
"2",
("Number of spaces to add as a margin around around a code "
"block"),
),
Option(
"human_buffer_names",
"boolean",
"",
0,
0,
"off",
("If turned on the buffer name will consist of the server "
"name and the room name instead of the Matrix room ID. Note, "
"this requires a change to the logger.file.mask setting "
"since conflicts can happen otherwise "
"(requires a script reload)."),
),
]
network_options = [
Option(
"max_initial_sync_events",
"integer",
"",
1,
10000,
"30",
("How many events to fetch during the initial sync"),
),
Option(
"max_backlog_sync_events",
"integer",
"",
1,
100,
"10",
("How many events to fetch during backlog fetching"),
),
Option(
"fetch_backlog_on_pgup",
"boolean",
"",
0,
0,
"on",
("Fetch messages in the backlog on a window page up event"),
None,
config_pgup_cb,
),
Option(
"debug_level",
"integer",
"error|warn|info|debug",
0,
0,
"error",
"Enable network protocol debugging.",
level_to_logbook,
config_log_level_cb,
),
Option(
"debug_category",
"integer",
"all|http|client|events|responses|encryption",
0,
0,
"all",
"Debugging category",
logbook_category,
config_log_category_cb,
),
Option(
"debug_buffer",
"boolean",
"",
0,
0,
"off",
("Use a separate buffer for debug logs."),
),
Option(
"lazy_load_room_users",
"boolean",
"",
0,
0,
"off",
("If on, room users won't be loaded in the background "
"proactively, they will be loaded when the user switches to "
"the room buffer. This only affects non-encrypted rooms."),
),
Option(
"max_nicklist_users",
"integer",
"",
100,
20000,
"5000",
("Limit the number of users that are added to the nicklist. "
"Active users and users with a higher power level are always."
" Inactive users will be removed from the nicklist after a "
"day of inactivity."),
),
Option(
"lag_reconnect",
"integer",
"",
5,
604800,
"90",
("Reconnect to the server if the lag is greater than this "
"value (in seconds)"),
),
Option(
"print_unconfirmed_messages",
"boolean",
"",
0,
0,
"on",
("If off, messages are only printed after the server confirms "
"their receival. If on, messages are immediately printed but "
"colored differently until receival is confirmed."),
),
Option(
"lag_min_show",
"integer",
"",
1,
604800,
"500",
("minimum lag to show (in milliseconds)"),
),
Option(
"typing_notice_conditions",
"string",
"",
0,
0,
"${typing_enabled}",
("conditions to send typing notifications (note: content is "
"evaluated, see /help eval); besides the buffer and window "
"variables the typing_enabled variable is also expanded; "
"the typing_enabled variable can be manipulated with the "
"/room command, see /help room"),
),
Option(
"read_markers_conditions",
"string",
"",
0,
0,
"${markers_enabled}",
("conditions to send read markers (note: content is "
"evaluated, see /help eval); besides the buffer and window "
"variables the markers_enabled variable is also expanded; "
"the markers_enabled variable can be manipulated with the "
"/room command, see /help room"),
),
Option(
"resending_ignores_devices",
"boolean",
"",
0,
0,
"on",
("If on resending the same message to a room that contains "
"unverified devices will mark the devices as ignored and "
"continue sending the message. If off resending the message "
"will again fail and devices need to be marked as verified "
"one by one or the /send-anyways command needs to be used to "
"ignore them."),
),
]
color_options = [
Option(
"quote_fg",
"color",
"",
0,
0,
"lightgreen",
"Foreground color for matrix style blockquotes",
),
Option(
"quote_bg",
"color",
"",
0,
0,
"default",
"Background counterpart of quote_fg",
),
Option(
"error_message_fg",
"color",
"",
0,
0,
"darkgray",
("Foreground color for error messages that appear inside a "
"room buffer (e.g. when a message errors out when sending or "
"when a message is redacted)"),
),
Option(
"error_message_bg",
"color",
"",
0,
0,
"default",
"Background counterpart of error_message_fg.",
),
Option(
"unconfirmed_message_fg",
"color",
"",
0,
0,
"darkgray",
("Foreground color for messages that are printed out but the "
"server hasn't confirmed the that he received them."),
),
Option(
"unconfirmed_message_bg",
"color",
"",
0,
0,
"default",
"Background counterpart of unconfirmed_message_fg."
),
Option(
"untagged_code_fg",
"color",
"",
0,
0,
"blue",
("Foreground color for code without a language specifier. "
"Also used for `inline code`."),
),
Option(
"untagged_code_bg",
"color",
"",
0,
0,
"default",
"Background counterpart of untagged_code_fg",
),
]
sections = [
("network", network_options),
("look", look_options),
("color", color_options),
]
super().__init__(sections)
# The server section is essentially a section with subsections and no
# options, handle that case independently.
W.config_new_section(
self._ptr,
"server",
0,
0,
"matrix_config_server_read_cb",
"",
"matrix_config_server_write_cb",
"",
"",
"",
"",
"",
"",
"",
)
def read(self):
super().read()
self.human_buffer_names = self.look.human_buffer_names
def free(self):
section_ptr = W.config_search_section(self._ptr, "server")
W.config_section_free(section_ptr)
super().free()

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
# Copyright © 2018, 2019 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
import sys
from typing import Dict, Optional
from logbook import Logger
from collections import OrderedDict
from .utf import WeechatWrapper
if False:
from .server import MatrixServer
from .config import MatrixConfig
from .uploads import Upload
try:
import weechat
W = weechat if sys.hexversion >= 0x3000000 else WeechatWrapper(weechat)
except ImportError:
import matrix._weechat as weechat # type: ignore
W = weechat
SERVERS = dict() # type: Dict[str, MatrixServer]
CONFIG = None # type: Optional[MatrixConfig]
ENCRYPTION = True # type: bool
SCRIPT_NAME = "matrix" # type: str
MAX_EVENTS = 100
TYPING_NOTICE_TIMEOUT = 4000 # 4 seconds typing notice lifetime
LOGGER = Logger("weechat-matrix")
UPLOADS = OrderedDict() # type: Dict[str, Upload]

File diff suppressed because it is too large Load Diff

@ -0,0 +1,399 @@
# -*- coding: utf-8 -*-
# Copyright © 2018, 2019 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""Module implementing upload functionality."""
from __future__ import unicode_literals
import attr
import time
import json
from typing import Dict, Any
from uuid import uuid1, UUID
from enum import Enum
try:
from json.decoder import JSONDecodeError
except ImportError:
JSONDecodeError = ValueError # type: ignore
from .globals import SCRIPT_NAME, SERVERS, W, UPLOADS
from .utf import utf8_decode
from matrix import globals as G
from nio import Api
class UploadState(Enum):
created = 0
active = 1
finished = 2
error = 3
aborted = 4
@attr.s
class Proxy(object):
ptr = attr.ib(type=str)
@property
def name(self):
return W.infolist_string(self.ptr, "name")
@property
def address(self):
return W.infolist_string(self.ptr, "address")
@property
def type(self):
return W.infolist_string(self.ptr, "type_string")
@property
def port(self):
return str(W.infolist_integer(self.ptr, "port"))
@property
def user(self):
return W.infolist_string(self.ptr, "username")
@property
def password(self):
return W.infolist_string(self.ptr, "password")
@attr.s
class Upload(object):
"""Class representing an upload to a matrix server."""
server_name = attr.ib(type=str)
server_address = attr.ib(type=str)
access_token = attr.ib(type=str)
room_id = attr.ib(type=str)
filepath = attr.ib(type=str)
encrypt = attr.ib(type=bool, default=False)
done = 0
total = 0
uuid = None
buffer = None
upload_hook = None
content_uri = None
file_name = None
mimetype = "?"
state = UploadState.created
def __attrs_post_init__(self):
self.uuid = uuid1()
self.buffer = ""
server = SERVERS[self.server_name]
proxy_name = server.config.proxy
proxy = None
proxies_list = None
if proxy_name:
proxies_list = W.infolist_get("proxy", "", proxy_name)
if proxies_list:
W.infolist_next(proxies_list)
proxy = Proxy(proxies_list)
process_args = {
"arg1": self.filepath,
"arg2": self.server_address,
"arg3": self.access_token,
"buffer_flush": "1",
}
arg_count = 3
if self.encrypt:
arg_count += 1
process_args["arg{}".format(arg_count)] = "--encrypt"
if not server.config.ssl_verify:
arg_count += 1
process_args["arg{}".format(arg_count)] = "--insecure"
if proxy:
arg_count += 1
process_args["arg{}".format(arg_count)] = "--proxy-type"
arg_count += 1
process_args["arg{}".format(arg_count)] = proxy.type
arg_count += 1
process_args["arg{}".format(arg_count)] = "--proxy-address"
arg_count += 1
process_args["arg{}".format(arg_count)] = proxy.address
arg_count += 1
process_args["arg{}".format(arg_count)] = "--proxy-port"
arg_count += 1
process_args["arg{}".format(arg_count)] = proxy.port
if proxy.user:
arg_count += 1
process_args["arg{}".format(arg_count)] = "--proxy-user"
arg_count += 1
process_args["arg{}".format(arg_count)] = proxy.user
if proxy.password:
arg_count += 1
process_args["arg{}".format(arg_count)] = "--proxy-password"
arg_count += 1
process_args["arg{}".format(arg_count)] = proxy.password
self.upload_hook = W.hook_process_hashtable(
"matrix_upload",
process_args,
0,
"upload_cb",
str(self.uuid)
)
if proxies_list:
W.infolist_free(proxies_list)
def abort(self):
pass
@property
def msgtype(self):
# type: () -> str
assert self.mimetype
return Api.mimetype_to_msgtype(self.mimetype)
@property
def content(self):
# type: () -> Dict[Any, Any]
assert self.content_uri
if self.encrypt:
content = {
"body": self.file_name,
"msgtype": self.msgtype,
"file": self.file_keys,
}
content["file"]["url"] = self.content_uri
content["file"]["mimetype"] = self.mimetype
# TODO thumbnail if it's an image
return content
return {
"msgtype": self.msgtype,
"body": self.file_name,
"url": self.content_uri,
}
@property
def render(self):
# type: () -> str
assert self.content_uri
if self.encrypt:
http_url = Api.encrypted_mxc_to_plumb(
self.content_uri,
self.file_keys["key"]["k"],
self.file_keys["hashes"]["sha256"],
self.file_keys["iv"]
)
url = http_url if http_url else self.content_uri
description = "{}".format(self.file_name)
return ("{del_color}<{ncolor}{desc}{del_color}>{ncolor} "
"{del_color}[{ncolor}{url}{del_color}]{ncolor}").format(
del_color=W.color("chat_delimiters"),
ncolor=W.color("reset"),
desc=description, url=url)
http_url = Api.mxc_to_http(self.content_uri)
description = ("/{}".format(self.file_name) if self.file_name
else "")
return "{url}{desc}".format(url=http_url, desc=description)
@attr.s
class UploadsBuffer(object):
"""Weechat buffer showing the uploads for a server."""
_ptr = "" # type: str
_selected_line = 0 # type: int
uploads = UPLOADS
def __attrs_post_init__(self):
self._ptr = W.buffer_new(
SCRIPT_NAME + ".uploads",
"",
"",
"",
"",
)
W.buffer_set(self._ptr, "type", "free")
W.buffer_set(self._ptr, "title", "Upload list")
W.buffer_set(self._ptr, "key_bind_meta2-A", "/uploads up")
W.buffer_set(self._ptr, "key_bind_meta2-B", "/uploads down")
W.buffer_set(self._ptr, "localvar_set_type", "uploads")
self.render()
def move_line_up(self):
self._selected_line = max(self._selected_line - 1, 0)
self.render()
def move_line_down(self):
self._selected_line = min(
self._selected_line + 1,
len(self.uploads) - 1
)
self.render()
def display(self):
"""Display the buffer."""
W.buffer_set(self._ptr, "display", "1")
def render(self):
"""Render the new state of the upload buffer."""
# This function is under the MIT license.
# Copyright (c) 2016 Vladimir Ignatev
def progress(count, total):
bar_len = 60
if total == 0:
bar = '-' * bar_len
return "[{}] {}%".format(bar, "?")
filled_len = int(round(bar_len * count / float(total)))
percents = round(100.0 * count / float(total), 1)
bar = '=' * filled_len + '-' * (bar_len - filled_len)
return "[{}] {}%".format(bar, percents)
W.buffer_clear(self._ptr)
header = "{}{}{}{}{}{}{}{}".format(
W.color("green"),
"Actions (letter+enter):",
W.color("lightgreen"),
" [A] Accept",
" [C] Cancel",
" [R] Remove",
" [P] Purge finished",
" [Q] Close this buffer"
)
W.prnt_y(self._ptr, 0, header)
for line_number, upload in enumerate(self.uploads.values()):
line_color = "{},{}".format(
"white" if line_number == self._selected_line else "default",
"blue" if line_number == self._selected_line else "default",
)
first_line = ("%s%s %-24s %s%s%s %s (%s.%s)" % (
W.color(line_color),
"*** " if line_number == self._selected_line else " ",
upload.room_id,
"\"",
upload.filepath,
"\"",
upload.mimetype,
SCRIPT_NAME,
upload.server_name,
))
W.prnt_y(self._ptr, (line_number * 2) + 2, first_line)
status_color = "{},{}".format("green", "blue")
status = "{}{}{}".format(
W.color(status_color),
upload.state.name,
W.color(line_color)
)
second_line = ("{color}{prefix} {status} {progressbar} "
"{done} / {total}").format(
color=W.color(line_color),
prefix="*** " if line_number == self._selected_line else " ",
status=status,
progressbar=progress(upload.done, upload.total),
done=W.string_format_size(upload.done),
total=W.string_format_size(upload.total))
W.prnt_y(self._ptr, (line_number * 2) + 3, second_line)
def find_upload(uuid):
return UPLOADS.get(uuid, None)
def handle_child_message(upload, message):
if message["type"] == "progress":
upload.done = message["data"]
elif message["type"] == "status":
if message["status"] == "started":
upload.state = UploadState.active
upload.total = message["total"]
upload.mimetype = message["mimetype"]
upload.file_name = message["file_name"]
elif message["status"] == "done":
upload.state = UploadState.finished
upload.content_uri = message["url"]
upload.file_keys = message.get("file_keys", None)
server = SERVERS.get(upload.server_name, None)
if not server:
return
server.room_send_upload(upload)
elif message["status"] == "error":
upload.state = UploadState.error
if G.CONFIG.upload_buffer:
G.CONFIG.upload_buffer.render()
@utf8_decode
def upload_cb(data, command, return_code, out, err):
upload = find_upload(UUID(data))
if not upload:
return W.WEECHAT_RC_OK
if return_code == W.WEECHAT_HOOK_PROCESS_ERROR:
W.prnt("", "Error with command '%s'" % command)
return W.WEECHAT_RC_OK
if err != "":
W.prnt("", "Error with command '%s'" % err)
upload.state = UploadState.error
if out != "":
upload.buffer += out
messages = upload.buffer.split("\n")
upload.buffer = ""
for m in messages:
try:
message = json.loads(m)
except (JSONDecodeError, TypeError):
upload.buffer += m
continue
handle_child_message(upload, message)
return W.WEECHAT_RC_OK

@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2014-2016 Ryan Huber <rhuber@gmail.com>
# Copyright (c) 2015-2016 Tollef Fog Heen <tfheen@err.no>
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import unicode_literals
import sys
# pylint: disable=redefined-builtin
from builtins import bytes, str
from functools import wraps
if sys.version_info.major == 3 and sys.version_info.minor >= 3:
from collections.abc import Iterable, Mapping
else:
from collections import Iterable, Mapping
# These functions were written by Trygve Aaberge for wee-slack and are under a
# MIT License.
# More info can be found in the wee-slack repository under the commit:
# 5e1c7e593d70972afb9a55f29d13adaf145d0166, the repository can be found at:
# https://github.com/wee-slack/wee-slack
class WeechatWrapper(object):
def __init__(self, wrapped_class):
self.wrapped_class = wrapped_class
# Helper method used to encode/decode method calls.
def wrap_for_utf8(self, method):
def hooked(*args, **kwargs):
result = method(*encode_to_utf8(args), **encode_to_utf8(kwargs))
# Prevent wrapped_class from becoming unwrapped
if result == self.wrapped_class:
return self
return decode_from_utf8(result)
return hooked
# Encode and decode everything sent to/received from weechat. We use the
# unicode type internally in wee-slack, but has to send utf8 to weechat.
def __getattr__(self, attr):
orig_attr = self.wrapped_class.__getattribute__(attr)
if callable(orig_attr):
return self.wrap_for_utf8(orig_attr)
return decode_from_utf8(orig_attr)
# Ensure all lines sent to weechat specify a prefix. For lines after the
# first, we want to disable the prefix, which is done by specifying a
# space.
def prnt_date_tags(self, buffer, date, tags, message):
message = message.replace("\n", "\n \t")
return self.wrap_for_utf8(self.wrapped_class.prnt_date_tags)(
buffer, date, tags, message
)
def utf8_decode(function):
"""
Decode all arguments from byte strings to unicode strings. Use this for
functions called from outside of this script, e.g. callbacks from weechat.
"""
@wraps(function)
def wrapper(*args, **kwargs):
# Don't do anything if we're python 3
if sys.hexversion >= 0x3000000:
return function(*args, **kwargs)
return function(*decode_from_utf8(args), **decode_from_utf8(kwargs))
return wrapper
def decode_from_utf8(data):
if isinstance(data, bytes):
return data.decode("utf-8")
if isinstance(data, str):
return data
elif isinstance(data, Mapping):
return type(data)(map(decode_from_utf8, data.items()))
elif isinstance(data, Iterable):
return type(data)(map(decode_from_utf8, data))
return data
def encode_to_utf8(data):
if isinstance(data, str):
return data.encode("utf-8")
if isinstance(data, bytes):
return data
elif isinstance(data, Mapping):
return type(data)(map(encode_to_utf8, data.items()))
elif isinstance(data, Iterable):
return type(data)(map(encode_to_utf8, data))
return data

@ -0,0 +1,168 @@
# -*- coding: utf-8 -*-
# Copyright © 2018, 2019 Damir Jelić <poljar@termina.org.uk>
# Copyright © 2018, 2019 Denis Kasak <dkasak@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals, division
import time
from typing import Any, Dict, List
from .globals import W
if False:
from .server import MatrixServer
def key_from_value(dictionary, value):
# type: (Dict[str, Any], Any) -> str
return list(dictionary.keys())[list(dictionary.values()).index(value)]
def server_buffer_prnt(server, string):
# type: (MatrixServer, str) -> None
assert server.server_buffer
buffer = server.server_buffer
now = int(time.time())
W.prnt_date_tags(buffer, now, "", string)
def tags_from_line_data(line_data):
# type: (str) -> List[str]
tags_count = W.hdata_get_var_array_size(
W.hdata_get("line_data"), line_data, "tags_array"
)
tags = [
W.hdata_string(
W.hdata_get("line_data"), line_data, "%d|tags_array" % i
)
for i in range(tags_count)
]
return tags
def create_server_buffer(server):
# type: (MatrixServer) -> None
buffer_name = "server.{}".format(server.name)
server.server_buffer = W.buffer_new(
buffer_name, "server_buffer_cb", server.name, "", ""
)
server_buffer_set_title(server)
W.buffer_set(server.server_buffer, "short_name", server.name)
W.buffer_set(server.server_buffer, "localvar_set_type", "server")
W.buffer_set(
server.server_buffer, "localvar_set_nick", server.config.username
)
W.buffer_set(server.server_buffer, "localvar_set_server", server.name)
W.buffer_set(server.server_buffer, "localvar_set_channel", server.name)
server.buffer_merge()
def server_buffer_set_title(server):
# type: (MatrixServer) -> None
if server.numeric_address:
ip_string = " ({address})".format(address=server.numeric_address)
else:
ip_string = ""
title = ("Matrix: {address}:{port}{ip}").format(
address=server.address, port=server.config.port, ip=ip_string
)
W.buffer_set(server.server_buffer, "title", title)
def server_ts_to_weechat(timestamp):
# type: (float) -> int
date = int(timestamp / 1000)
return date
def strip_matrix_server(string):
# type: (str) -> str
return string.rsplit(":", 1)[0]
def shorten_sender(sender):
# type: (str) -> str
return strip_matrix_server(sender)[1:]
def string_strikethrough(string):
return "".join(["{}\u0336".format(c) for c in string])
def string_color_and_reset(string, color):
"""Color string with color, then reset all attributes."""
lines = string.split('\n')
lines = ("{}{}{}".format(W.color(color), line, W.color("reset"))
for line in lines)
return "\n".join(lines)
def string_color(string, color):
"""Color string with color, then reset the color attribute."""
lines = string.split('\n')
lines = ("{}{}{}".format(W.color(color), line, W.color("resetcolor"))
for line in lines)
return "\n".join(lines)
def color_pair(color_fg, color_bg):
"""Make a color pair from a pair of colors."""
if color_bg:
return "{},{}".format(color_fg, color_bg)
else:
return color_fg
def text_block(text, margin=0):
"""
Pad block of text with whitespace to form a regular block, optionally
adding a margin.
"""
# add vertical margin
vertical_margin = margin // 2
text = "{}{}{}".format(
"\n" * vertical_margin,
text,
"\n" * vertical_margin
)
lines = text.split("\n")
longest_len = max(len(l) for l in lines) + margin
# pad block and add horizontal margin
text = "\n".join(
"{pre}{line}{post}".format(
pre=" " * margin,
line=l,
post=" " * (longest_len - len(l)))
for l in lines)
return text
def colored_text_block(text, margin=0, color_pair=""):
""" Like text_block, but also colors it."""
return string_color_and_reset(text_block(text, margin=margin), color_pair)
Loading…
Cancel
Save