commit 96de250ef8a27f812ec7edfab0abd2a0ee9ed567 Author: Loren McIntyre Date: Thu Nov 21 20:29:24 2024 -0800 viz: commit jetstream-firehose logger -- with time-based color, filtering on event/post.type and event/post.commit.type & on message keywords diff --git a/bluesky-firehose-viz.orig.py b/bluesky-firehose-viz.orig.py new file mode 100644 index 0000000..254990e --- /dev/null +++ b/bluesky-firehose-viz.orig.py @@ -0,0 +1,154 @@ +import asyncio +import json +import websockets +import curses +import random +import colorsys +import time +from collections import deque + +class BlueskyFirehoseVisualizer: + def __init__(self, stdscr, websocket_url): + self.stdscr = stdscr + self.websocket_url = websocket_url + + # Initialize color pairs + curses.start_color() + curses.use_default_colors() + + # Set up screen + curses.curs_set(0) + self.stdscr.clear() + + # Track posts with their display details + self.posts = {} + self.max_posts = 1000 # Limit to prevent memory growth + + # Generate color palette + self.color_palette = self._generate_color_palette() + + def _generate_color_palette(self, num_colors=256): + """Generate a diverse color palette.""" + colors = [] + for i in range(num_colors): + # Use HSV color space to generate visually distinct colors + hue = (i / num_colors) % 1.0 + saturation = 0.7 + (random.random() * 0.3) # 70-100% saturation + value = 0.7 + (random.random() * 0.3) # 70-100% brightness + + # Convert HSV to RGB + rgb = colorsys.hsv_to_rgb(hue, saturation, value) + + # Scale RGB to curses color range (0-1000) + r, g, b = [int(x * 1000) for x in rgb] + + # Initialize color pair + try: + color_index = len(colors) + 1 # Start from 1 + curses.init_color(color_index, r, g, b) + curses.init_pair(color_index, color_index, -1) + colors.append(color_index) + except Exception: + # If we run out of color pairs, wrap around + color_index = (len(colors) % 256) + 1 + colors.append(color_index) + + return colors + + def _display_post(self, post_id, text): + """Display a post with a unique color, fading out over time.""" + # Assign a unique color + color_index = self.color_palette[hash(post_id) % len(self.color_palette)] + + # Get screen dimensions + max_y, max_x = self.stdscr.getmaxyx() + + # Trim text to fit screen width + text = text[:max_x-1] + + # Track post details + if post_id not in self.posts: + # Remove oldest post if we've reached max + if len(self.posts) >= self.max_posts: + oldest_id = min(self.posts, key=lambda k: self.posts[k]['timestamp']) + del self.posts[oldest_id] + + # Find a free vertical position + used_y = set(post['y'] for post in self.posts.values()) + y = next(i for i in range(max_y) if i not in used_y) + + self.posts[post_id] = { + 'text': text, + 'color': color_index, + 'y': y, + 'timestamp': time.time(), + 'fade_count': 0 + } + + # Render posts + for pid, post_info in list(self.posts.items()): + # Calculate fade effect + age = time.time() - post_info['timestamp'] + fade_speed = 0.5 # Adjust for desired fade speed + + if age > fade_speed * post_info['fade_count']: + try: + # Gradually reduce text intensity + intensity = max(0, 1 - (post_info['fade_count'] / 10)) + color = curses.color_pair(post_info['color']) + + # Render text at constant horizontal position + self.stdscr.addstr( + post_info['y'], + post_info['fade_count'], + post_info['text'][:max_x-1], + color + ) + + post_info['fade_count'] += 1 + except curses.error: + # If we can't write (e.g., screen boundaries), remove post + del self.posts[pid] + + # Refresh display + self.stdscr.refresh() + + async def connect_and_visualize(self): + """Connect to Bluesky firehose and visualize posts.""" + try: + async with websockets.connect(self.websocket_url) as websocket: + while True: + message = await websocket.recv() + post = json.loads(message) + + # Extract meaningful text (adjust based on actual Bluesky JSON structure) + post_id = post.get('id', str(random.random())) + text = post.get('text', 'Unknown post') + + # Display the post + self._display_post(post_id, text) + + except Exception as e: + self.stdscr.addstr(0, 0, f"Error: {str(e)}") + self.stdscr.refresh() + + def run(self): + """Run the visualizer.""" + asyncio.run(self.connect_and_visualize()) + +def main(stdscr): + # Replace with actual Bluesky firehose websocket URL + BLUESKY_FIREHOSE_WS = "ws://example.com/bluesky-firehose" + + visualizer = BlueskyFirehoseVisualizer(stdscr, BLUESKY_FIREHOSE_WS) + visualizer.run() + +if __name__ == "__main__": + # Wrap main in curses wrapper to handle terminal setup/teardown + curses.wrapper(main) + +# Dependencies (install with pip): +# websockets +# +# Note: You'll need to replace the websocket URL with the actual +# Bluesky firehose websocket endpoint when available. diff --git a/bluesky-firehose-viz.py b/bluesky-firehose-viz.py new file mode 100644 index 0000000..2d81d59 --- /dev/null +++ b/bluesky-firehose-viz.py @@ -0,0 +1,168 @@ +import asyncio +import json +import websockets +import curses +import random +import colorsys +import time +from collections import deque +import logging +FORMAT = '%(asctime)s:%(loglevel)s:%(name)s %(message)s' +#'%(asctime)s %(clientip)-15s %(user)-8s %(message)s' +logging.basicConfig(level=logging.DEBUG, filename="bviz.log", format=FORMAT) +logger = logging.getLogger("bviz") + +class BlueskyFirehoseVisualizer: + def __init__(self, stdscr, websocket_url): + logger.info("init-start") + self.stdscr = stdscr + self.websocket_url = websocket_url + + # Initialize color pairs + curses.start_color() + curses.use_default_colors() + + # Set up screen + curses.curs_set(0) + #self.stdscr.clear() + + # Track posts with their display details + self.posts = {} + self.max_posts = 1000 # Limit to prevent memory growth + + # Generate color palette + self.color_palette = self._generate_color_palette() + logger.info("init-end") + + def _generate_color_palette(self, num_colors=256): + """Generate a diverse color palette.""" + colors = [] + for i in range(num_colors): + # Use HSV color space to generate visually distinct colors + hue = (i / num_colors) % 1.0 + saturation = 0.7 + (random.random() * 0.3) # 70-100% saturation + value = 0.7 + (random.random() * 0.3) # 70-100% brightness + + # Convert HSV to RGB + rgb = colorsys.hsv_to_rgb(hue, saturation, value) + + # Scale RGB to curses color range (0-1000) + r, g, b = [int(x * 1000) for x in rgb] + + # Initialize color pair + try: + color_index = len(colors) + 1 # Start from 1 + curses.init_color(color_index, r, g, b) + curses.init_pair(color_index, color_index, -1) + colors.append(color_index) + except Exception: + # If we run out of color pairs, wrap around + color_index = (len(colors) % 256) + 1 + colors.append(color_index) + + return colors + + def _display_post(self, post_id, text): + """Display a post with a unique color, fading out over time.""" + # Assign a unique color + color_index = self.color_palette[hash(post_id) % len(self.color_palette)] + + # Get screen dimensions + max_y, max_x = self.stdscr.getmaxyx() + + # Trim text to fit screen width + text = text[:max_x-1] + + # Track post details + if post_id not in self.posts: + # Remove oldest post if we've reached max + if len(self.posts) >= self.max_posts: + oldest_id = min(self.posts, key=lambda k: self.posts[k]['timestamp']) + del self.posts[oldest_id] + + # Find a free vertical position + used_y = set(post['y'] for post in self.posts.values()) + y = next(i for i in range(max_y) if i not in used_y) + + self.posts[post_id] = { + 'text': text, + 'color': color_index, + 'y': y, + 'timestamp': time.time(), + 'fade_count': 0 + } + + # Render posts + for pid, post_info in list(self.posts.items()): + # Calculate fade effect + age = time.time() - post_info['timestamp'] + fade_speed = 0.5 # Adjust for desired fade speed + + if age > fade_speed * post_info['fade_count']: + try: + # Gradually reduce text intensity + intensity = max(0, 1 - (post_info['fade_count'] / 10)) + color = curses.color_pair(post_info['color']) + + # Render text at constant horizontal position + self.stdscr.addstr( + post_info['y'], + post_info['fade_count'], + post_info['text'][:max_x-1], + color + ) + + post_info['fade_count'] += 1 + except curses.error: + # If we can't write (e.g., screen boundaries), remove post + del self.posts[pid] + + # Refresh display + self.stdscr.refresh() + + async def connect_and_visualize(self): + """Connect to Bluesky firehose and visualize posts.""" + logger.info("connect_and_visualize") + try: + async with websockets.connect(self.websocket_url) as websocket: + while True: + message = await websocket.recv() + post = json.loads(message) + + # Extract meaningful text (adjust based on actual Bluesky JSON structure) + post_id = post.get('did', "r:"+str(random.random())) + record = post.get('record', '---(no record in post)---') + text = record.get('text', '---(no text in post)---') + + # Display the post + logger.info(f"{post_id=} {text=}") + self._display_post(post_id, text) + + except Exception as e: + self.stdscr.addstr(0, 0, f"Error: {str(e)}") + self.stdscr.refresh() + + def run(self): + """Run the visualizer.""" + asyncio.run(self.connect_and_visualize()) + +def main(stdscr): + logger.info("main()") + # Replace with actual Bluesky firehose websocket URL + BLUESKY_FIREHOSE_WS = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post" + + visualizer = BlueskyFirehoseVisualizer(stdscr, BLUESKY_FIREHOSE_WS) + visualizer.run() + +if __name__ == "__main__": + # Wrap main in curses wrapper to handle terminal setup/teardown + try: + curses.wrapper(main) + except Exception as e: + logger.error(e) + +# Dependencies (install with pip): +# websockets +# +# Note: You'll need to replace the websocket URL with the actual +# Bluesky firehose websocket endpoint when available. diff --git a/bluesky-simple-print.orig.py b/bluesky-simple-print.orig.py new file mode 100644 index 0000000..2ff857b --- /dev/null +++ b/bluesky-simple-print.orig.py @@ -0,0 +1,94 @@ +import asyncio +import json +import websockets +import random +from colorama import init, Fore, Style + +# Initialize colorama for cross-platform colored output +init(autoreset=True) + +class BlueskyFirehosePrinter: + def __init__(self): + # Preset list of vibrant colors + self.colors = [ + Fore.RED, Fore.GREEN, Fore.YELLOW, Fore.BLUE, + Fore.MAGENTA, Fore.CYAN, Fore.WHITE, + Fore.LIGHTRED_EX, Fore.LIGHTGREEN_EX, + Fore.LIGHTYELLOW_EX, Fore.LIGHTBLUE_EX, + Fore.LIGHTMAGENTA_EX, Fore.LIGHTCYAN_EX + ] + + def _get_post_color(self, post_id): + """Deterministically select a color based on post ID""" + # Use hash of post ID to consistently select a color + return self.colors[hash(post_id) % len(self.colors)] + + def _extract_post_text(self, post): + """ + Extract meaningful text from a post. + Modify this method based on the actual Bluesky post JSON structure. + """ + # Example of potential extraction, will need to be adapted + if isinstance(post, dict): + # Try different possible text fields + text = post.get('text') or \ + post.get('content', {}).get('text') or \ + str(post) + return text[:200] # Limit text length + return str(post) + + async def connect_and_print(self, websocket_url): + """Connect to websocket and print posts""" + try: + async with websockets.connect(websocket_url) as websocket: + print(f"Connected to {websocket_url}") + while True: + try: + message = await websocket.recv() + + # Parse message + try: + post = json.loads(message) + except json.JSONDecodeError: + post = message + + # Generate a unique post ID + post_id = str(hash(json.dumps(post))) + + # Extract and print text + text = self._extract_post_text(post) + + # Select color based on post ID + color = self._get_post_color(post_id) + + # Print colored post + print(f"{color}{text}") + + except Exception as e: + print(f"Error processing message: {e}") + + except Exception as e: + print(f"Websocket connection error: {e}") + +async def main(): + # Replace with actual Bluesky firehose websocket URL + BLUESKY_FIREHOSE_WS = "ws://example.com/bluesky-firehose" + + printer = BlueskyFirehosePrinter() + await printer.connect_and_print(BLUESKY_FIREHOSE_WS) + +def cli_main(): + # Run the async main function + asyncio.run(main()) + +if __name__ == "__main__": + cli_main() + +# Dependencies: +# pip install websockets colorama +# +# Notes: +# 1. Replace BLUESKY_FIREHOSE_WS with actual websocket URL +# 2. The post extraction method (_extract_post_text) +# will likely need customization based on the +# actual Bluesky firehose JSON structure diff --git a/bluesky-simple-print.py b/bluesky-simple-print.py new file mode 100644 index 0000000..59741c4 --- /dev/null +++ b/bluesky-simple-print.py @@ -0,0 +1,220 @@ +import asyncio +import json +import websockets +import random +from colorama import init, Fore, Style + +import munch +import colorsys +import math +import fire + +import logging +import os +level = os.environ.get("LOGLEVEL", logging.INFO) +try: level = int(level) +except: level = level.upper() +logging.basicConfig(level=level) +logger = logging.getLogger("bfire") + +# Initialize colorama for cross-platform colored output +init(autoreset=True) + +class BlueskyFirehosePrinter: + def __init__(self): + # Preset list of vibrant colors + self.colors = [ + Fore.RED, Fore.GREEN, Fore.YELLOW, Fore.BLUE, + Fore.MAGENTA, Fore.CYAN, Fore.WHITE, + Fore.LIGHTRED_EX, Fore.LIGHTGREEN_EX, + Fore.LIGHTYELLOW_EX, Fore.LIGHTBLUE_EX, + Fore.LIGHTMAGENTA_EX, Fore.LIGHTCYAN_EX + ] + + def _get_post_color(self, ts): + """Deterministically select a color based on post ID^W^W timestamp""" + # Use hash of post ID to consistently select a color + #return self.colors[hash(post_id) % len(self.colors)] + return self.colors[int(ts) % len(self.colors)] + + def _extract_post_text(self, post): + """ + Extract meaningful text from a post. + Modify this method based on the actual Bluesky post JSON structure. + """ + # Example of potential extraction, will need to be adapted + if isinstance(post, dict): + # Try different possible text fields + text = post.get('text') or \ + post.get('content', {}).get('text') or \ + str(post) + return text[:200] # Limit text length + return str(post) + + def _hsv_termcolor(h, s, v): + """[0,1] h, s, v -> 256 color terminal codes""" + assert (h <= 1 and h >= 0), "h" + assert (s <= 1 and s >= 0), "s" + assert (v <= 1 and v >= 0), "v" + rgb1 = colorsys.hsv_to_rgb(h, s, v) + rgb256 = list(int(i*255) for i in rgb1) + colorstr = "\033[38;2;{};{};{}m".format(*rgb256) + return colorstr + + + async def connect_and_print(self, websocket_url, skips=[], onlys=[], count=None, cfilters={}, fkeeps=[], fdrops=[]): + """Connect to websocket and print posts""" + n=0 + try: + async with websockets.connect(websocket_url) as websocket: + print(f"Connected to {websocket_url}") + while True: + try: + + eventws = await websocket.recv() + + # Parse event + try: + post = json.loads(eventws) + post = munch.munchify(post) + except json.JSONDecodeError: + post = "err:" + eventws + + if post.type not in onlys: continue + if post.type in skips: continue + # type in ["com", "id", "acc"] # [com]mit, [id]entity, [acc]ount, ..? [del]ete? or commit type + ts = post.time_us//10e3 + + + # Select color based on post ID + #color = self._get_post_color(post.time_us//10e4) + hsv1 = [ (ts % 255)/255, .8, .8] + + # Generate a unique post ID + #post_id = str(hash(json.dumps(post))) + post_id = post.get("did", "r:"+str(random.random())) + + # Extract and print text + #text = self._extract_post_text(post) + try: + if post.type in ["com"]: + if cfilters.get("-") and any(map(lambda w: w in post.commit.type, cfilters.get("-"))): + continue + if cfilters.get("+") and not any(map(lambda w: w in post.commit.type, cfilters.get("+"))): + continue + if fdrops and any(map(lambda w: w in post.commit.record.text, fdrops)): + continue + if fkeeps and not any(map(lambda w: w in post.commit.record.text, fkeeps)): + continue + + if post.commit.record.text: + text = post.commit.record.text + else: + text = f"post.commit.record={post.commit.record.toJSON()}" + hsv1[2] = 1-min(1, math.log(len(text))/math.log(256*16)) + except Exception as e: + text = str(post.toJSON()) + hsv1[1] = .8 + #hsv1[2] = 1-min(1, math.log(len(text))/math.log(256*16)) ## ~ 80 + hsv1[2] = 60/255 + hsv1[2] = 120/255 + #color = "\033[38;2;%s;%s;%sm" % (64,64,64) # /255 ea + # Red\033[0m + #Fore.LIGHTWHITE_EX + if count is not None: + n+=1 + if n > count: + return + + + # 0 - 500 5 vs 30 vs 90 + # [h,s,v] + #int(255 * math.log(len(text))) + #rgb256[2] = min(255, len(text)) + #rgb256[1] = max(8, 255-len(text)) + + #hsv1[1] = max(8, 255-len(text))/256 + #hsv1[2] = (max(0, 255-len(text)))/256 + ### hsv1[2] = 1-min(1, math.log(len(text))/math.log(256*16)) + #(max(0, 255-len(text)))/256 + + + rgb1 = colorsys.hsv_to_rgb(*hsv1) + rgb256 = list(int(i*255) for i in rgb1) + #logger.info(f"{rgb256=}") + + color = "\033[38;2;{};{};{}m".format(*rgb256) + hsv1s = ":".join(f"{x:.1}" for x in hsv1) + + ihsv1 = list(hsv1) + #ihsv1[1], ihsv1[2] *= .1, .4 + + ihsv1[2] *= .4 # v -- non- dark/black -ness + ihsv1[1] *= .1 # s -- color sat + ihsv1[2] = max(.3, ihsv1[2]) + irgb1 = colorsys.hsv_to_rgb(*ihsv1) + irgb256 = list(int(i*255) for i in irgb1) + infocolor = "\033[38;2;{};{};{}m".format(*irgb256) + + # Print colored post + try: + if post.type == "com": + print(f'{infocolor}{int(ts)}|type:{getattr(post,"type",None)}|{color}{text}{infocolor}|hsv:{hsv1s} type:{getattr(post,"type",None)} kind:{getattr(post,"kind",None)} {post.commit.type=} {post.commit.operation=}') + else: + ihsv1[1] *= .3 + ihsv1[2] = 1 + infocolor=_hsv_termcolor(*ihsv1) + print(f'{infocolor}{int(ts)}|type:{getattr(post,"type",None)}|{color}{text}{infocolor}|hsv:{hsv1s} type:{getattr(post,"type",None)} kind:{getattr(post,"kind",None)}') + except Exception as e: + print(f'{infocolor}{int(ts)}|{color}{text}{infocolor}|hsv:{hsv1s} type:{getattr(post,"type",None)} kind:{getattr(post,"kind",None)} -- no post commit') + + except Exception as e: + print(f"Error processing event: {e}") + raise e + + except Exception as e: + print(f"Websocket connection error: {e}") + +async def main(skips=[], onlys=[], count=None, cfilters={}, fkeeps=[], fdrops=[]): + # Replace with actual Bluesky firehose websocket URL + BLUESKY_FIREHOSE_WS = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post" + + printer = BlueskyFirehosePrinter() + await printer.connect_and_print(BLUESKY_FIREHOSE_WS, skips=skips, onlys=onlys, count=count, cfilters=cfilters, fkeeps=fkeeps, fdrops=fdrops) + + + +def cli_main(skips="", only="", cfilters="", filters="", count=None): + """ + run the async func. + --skip=[value][,value]* + --only=[value][,value]* + --filters=+include,-skip,+more,-nope + --cfilters= -- commit types: delete, create, (reply?), (post?) + --count=[n] -- stop after + """ + # Run the async main function + skips = skips.split(",") + onlys = only.split(",") + cfs = cfilters.split(",") + cfilters = {"+": [f[1:] for f in cfs if f[:1] == "+"], + "-": [f[1:] for f in cfs if f[:1] == "-"] } + fs = filters.split(",") + fkeeps = [f[1:] for f in fs if f[:1] == "+"] + fdrops = [f[1:] for f in fs if f[:1] == "-"] + try: + asyncio.run(main(skips=skips, onlys=onlys, count=count, cfilters=cfilters, fkeeps=fkeeps, fdrops=fdrops)) + except KeyboardInterrupt as kb: + print("done") + +if __name__ == "__main__": + fire.Fire(cli_main) + +# Dependencies: +# pip install websockets colorama +# +# Notes: +# 1. Replace BLUESKY_FIREHOSE_WS with actual websocket URL +# 2. The post extraction method (_extract_post_text) +# will likely need customization based on the +# actual Bluesky firehose JSON structure diff --git a/bsky-event-1.json b/bsky-event-1.json new file mode 100644 index 0000000..210acbc --- /dev/null +++ b/bsky-event-1.json @@ -0,0 +1 @@ +{"did":"did:plc:hdps5qzxhbhmgyfht7xbrf4u","time_us":1732139039346541,"type":"com","kind":"commit","commit":{"rev":"3lbfwa42a6326","type":"c","operation":"create","collection":"app.bsky.feed.post","rkey":"3lbfwa3w22c2d","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-20T21:43:58.933Z","langs":["en"],"reply":{"parent":{"cid":"bafyreidaivmfhldd4jzjxjztfxywkgszhovguvgd6hfr6qbumg73imerpe","uri":"at://did:plc:5ybjw77wuat2wpeq434hzyen/app.bsky.feed.post/3kwi7jcnbsf2s"},"root":{"cid":"bafyreigugwpm7vfdwltwyr4j6lb4bixdl3irdymasyy6ponm2u23sfp6aq","uri":"at://did:plc:dfdn5h5ejeloscqpou577jvy/app.bsky.feed.post/3kwhy3mlxte2o"}},"text":"I agree"},"cid":"bafyreifoxsw7x4tstgm5367zx2dauyrt4qt6j65amv2zvsqrq43tkve7mu"}}