Unknown event fetching

Damus will now try really hard to find unknown events. It will use the
event relay hints to connect to new relays and fetch any id that is
missing.
This commit is contained in:
William Casarin 2022-11-20 12:43:32 -08:00
parent 994305ccff
commit 45b36a9945
2 changed files with 222 additions and 74 deletions

View file

@ -56,7 +56,10 @@ function init_home_model() {
all_events: {}, all_events: {},
reactions_to: {}, reactions_to: {},
chatrooms: {}, chatrooms: {},
unknown_ids: {},
unknown_pks: {},
deletions: {}, deletions: {},
but_wait_theres_more: 0,
cw_open: {}, cw_open: {},
views: { views: {
home: init_timeline('home'), home: init_timeline('home'),
@ -156,9 +159,12 @@ async function damus_web_init_ready()
home: "home",//uuidv4(), home: "home",//uuidv4(),
contacts: "contacts",//uuidv4(), contacts: "contacts",//uuidv4(),
notifications: "notifications",//uuidv4(), notifications: "notifications",//uuidv4(),
unknowns: "unknowns",//uuidv4(),
dms: "dms",//uuidv4(), dms: "dms",//uuidv4(),
} }
model.ids = ids
model.pool = pool model.pool = pool
model.view_el = document.querySelector("#view") model.view_el = document.querySelector("#view")
@ -176,13 +182,13 @@ async function damus_web_init_ready()
if (!model.done_init[relay]) { if (!model.done_init[relay]) {
send_initial_filters(ids.account, model.pubkey, relay) send_initial_filters(ids.account, model.pubkey, relay)
} else { } else {
send_home_filters(ids, model, relay) send_home_filters(model, relay)
} }
//relay.subscribe(comments_id, {kinds: [1,42], limit: 100}) //relay.subscribe(comments_id, {kinds: [1,42], limit: 100})
}); });
pool.on('event', (relay, sub_id, ev) => { pool.on('event', (relay, sub_id, ev) => {
handle_home_event(ids, model, relay, sub_id, ev) handle_home_event(model, relay, sub_id, ev)
}) })
pool.on('notice', (relay, notice) => { pool.on('notice', (relay, notice) => {
@ -198,6 +204,8 @@ async function damus_web_init_ready()
//log_debug("got profiles EOSE from %s", relay.url) //log_debug("got profiles EOSE from %s", relay.url)
const view = get_current_view() const view = get_current_view()
handle_profiles_loaded(ids, model, view, relay) handle_profiles_loaded(ids, model, view, relay)
} else if (sub_id === ids.unknowns) {
model.pool.unsubscribe(ids.unknowns, relay)
} }
}) })
@ -299,34 +307,212 @@ function is_deleted(model, evid)
return false return false
} }
function process_event(model, ev) function has_profile(damus, pk) {
return pk in damus.profiles
}
function has_event(damus, evid) {
return evid in damus.all_events
}
const ID_REG = /^[a-f0-9]{64}$/
function is_valid_id(evid)
{
return ID_REG.test(evid)
}
function make_unk(hint, ev)
{
const attempts = 0
const parent_created = ev.created_at
if (hint && hint !== "")
return {attempts, hint: hint.trim().toLowerCase(), parent_created}
return {attempts, parent_created}
}
function notice_unknown_ids(damus, ev)
{
// make sure this event itself is removed from unknowns
if (ev.kind === 0)
delete damus.unknown_pks[ev.pubkey]
delete damus.unknown_ids[ev.id]
let got_some = false
for (const tag of ev.tags) {
if (tag.length >= 2) {
if (tag[0] === "p") {
const pk = tag[1]
if (!has_profile(damus, pk) && is_valid_id(pk)) {
got_some = true
damus.unknown_pks[pk] = make_unk(tag[2], ev)
}
} else if (tag[0] === "e") {
const evid = tag[1]
if (!has_event(damus, evid) && is_valid_id(evid)) {
got_some = true
damus.unknown_ids[evid] = make_unk(tag[2], ev)
}
}
}
}
return got_some
}
function gather_unknown_hints(damus, pks, evids)
{
let relays = new Set()
for (const pk of pks) {
const unk = damus.unknown_pks[pk]
if (unk && unk.hint && unk.hint !== "")
relays.add(unk.hint)
}
for (const evid of evids) {
const unk = damus.unknown_ids[evid]
if (unk && unk.hint && unk.hint !== "")
relays.add(unk.hint)
}
return Array.from(relays)
}
function get_non_expired_unknowns(unks, type)
{
const MAX_ATTEMPTS = 3
function sort_parent_created(a_id, b_id) {
const a = unks[a_id]
const b = unks[b_id]
return b.parent_created - a.parent_created
}
let new_expired = 0
const ids = Object.keys(unks).sort(sort_parent_created).reduce((ids, unk_id) => {
if (ids.length >= 128)
return ids
const unk = unks[unk_id]
if (unk.attempts >= MAX_ATTEMPTS) {
if (!unk.expired) {
unk.expired = true
new_expired++
}
return ids
}
unk.attempts++
ids.push(unk_id)
return ids
}, [])
if (new_expired !== 0)
log_debug("Gave up looking for %d %s", new_expired, type)
return ids
}
function fetch_unknown_events(damus)
{
let filters = []
const pks = get_non_expired_unknowns(damus.unknown_pks, 'profiles')
const evids = get_non_expired_unknowns(damus.unknown_ids, 'events')
const relays = gather_unknown_hints(damus, pks, evids)
for (const relay of relays) {
if (!damus.pool.has(relay)) {
log_debug("adding %s to relays to fetch unknown events", relay)
damus.pool.add(relay)
}
}
if (evids.length !== 0) {
filters.push({ids: evids})
filters.push({"#e": evids, limit: 100})
}
if (pks.length !== 0)
filters.push({authors: pks, kinds:[0]})
if (filters.length === 0)
return
log_debug("fetching unknowns", filters)
damus.pool.subscribe('unknowns', filters)
}
function shuffle(arr)
{
let i = arr.length;
while (--i > 0) {
let randIndex = Math.floor(Math.random() * (i + 1));
[arr[randIndex], arr[i]] = [arr[i], arr[randIndex]];
}
return arr;
}
function schedule_unknown_refetch(damus)
{
const INTERVAL = 10000
if (!damus.unknown_timer) {
log_debug("fetching unknown events now and in %d seconds", INTERVAL / 1000)
damus.unknown_timer = setTimeout(() => {
fetch_unknown_events(damus)
setTimeout(() => {
delete damus.unknown_timer
if (damus.but_wait_theres_more > 0) {
damus.but_wait_theres_more = 0
schedule_unknown_refetch(damus)
}
}, INTERVAL)
}, INTERVAL)
fetch_unknown_events(damus)
} else {
damus.but_wait_theres_more++
}
}
function process_event(damus, ev)
{ {
ev.refs = determine_event_refs(ev.tags) ev.refs = determine_event_refs(ev.tags)
const notified = was_pubkey_notified(model.pubkey, ev) const notified = was_pubkey_notified(damus.pubkey, ev)
ev.notified = notified ev.notified = notified
const got_some_unknowns = notice_unknown_ids(damus, ev)
if (got_some_unknowns)
schedule_unknown_refetch(damus)
ev.pow = calculate_pow(ev) ev.pow = calculate_pow(ev)
if (ev.kind === 7) if (ev.kind === 7)
process_reaction_event(model, ev) process_reaction_event(damus, ev)
else if (ev.kind === 42 && ev.refs && ev.refs.root) else if (ev.kind === 42 && ev.refs && ev.refs.root)
notice_chatroom(model, ev.refs.root) notice_chatroom(damus, ev.refs.root)
else if (ev.kind === 40) else if (ev.kind === 40)
process_chatroom_event(model, ev) process_chatroom_event(damus, ev)
else if (ev.kind === 6)
process_json_content(ev)
else if (ev.kind === 5) else if (ev.kind === 5)
process_deletion_event(model, ev) process_deletion_event(damus, ev)
else if (ev.kind === 0) else if (ev.kind === 0)
process_profile_event(model, ev) process_profile_event(damus, ev)
else if (ev.kind === 3) else if (ev.kind === 3)
process_contact_event(model, ev) process_contact_event(damus, ev)
const last_notified = get_local_state('last_notified_date') const last_notified = get_local_state('last_notified_date')
if (notified && (last_notified == null || ((ev.created_at*1000) > last_notified))) { if (notified && (last_notified == null || ((ev.created_at*1000) > last_notified))) {
set_local_state('last_notified_date', new Date().getTime()) set_local_state('last_notified_date', new Date().getTime())
model.notifications++ damus.notifications++
update_title(model) update_title(damus)
} }
} }
@ -392,9 +578,11 @@ function handle_redraw_logic(model, view_name)
} }
} }
function handle_home_event(ids, model, relay, sub_id, ev) { function handle_home_event(model, relay, sub_id, ev) {
const ids = model.ids
// ignore duplicates // ignore duplicates
if (!model.all_events[ev.id]) { if (!has_event(model, ev.id)) {
model.all_events[ev.id] = ev model.all_events[ev.id] = ev
process_event(model, ev) process_event(model, ev)
} }
@ -403,7 +591,7 @@ function handle_home_event(ids, model, relay, sub_id, ev) {
let is_new = true let is_new = true
switch (sub_id) { switch (sub_id) {
case ids.explore: case model.ids.explore:
const view = model.views.explore const view = model.views.explore
// show more things in explore timeline // show more things in explore timeline
@ -416,7 +604,7 @@ function handle_home_event(ids, model, relay, sub_id, ev) {
handle_redraw_logic(model, 'explore') handle_redraw_logic(model, 'explore')
break; break;
case ids.notifications: case model.ids.notifications:
if (should_add_to_notification_timeline(model.pubkey, model.contacts, ev, model.pow)) if (should_add_to_notification_timeline(model.pubkey, model.contacts, ev, model.pow))
is_new = insert_event_sorted(model.views.notifications.events, ev) is_new = insert_event_sorted(model.views.notifications.events, ev)
@ -424,32 +612,33 @@ function handle_home_event(ids, model, relay, sub_id, ev) {
handle_redraw_logic(model, 'notifications') handle_redraw_logic(model, 'notifications')
break; break;
case ids.home: case model.ids.home:
if (should_add_to_timeline(ev)) if (should_add_to_timeline(ev))
is_new = insert_event_sorted(model.views.home.events, ev) is_new = insert_event_sorted(model.views.home.events, ev)
if (is_new) if (is_new)
handle_redraw_logic(model, 'home') handle_redraw_logic(model, 'home')
break; break;
case ids.account: case model.ids.account:
switch (ev.kind) { switch (ev.kind) {
case 3: case 3:
model.done_init[relay] = true model.done_init[relay] = true
model.pool.unsubscribe(ids.account, relay) model.pool.unsubscribe(model.ids.account, relay)
send_home_filters(ids, model, relay) send_home_filters(model, relay)
break break
} }
case ids.profiles: break
case model.ids.profiles:
break break
} }
} }
function process_profile_event(model, ev) { function process_profile_event(model, ev) {
const prev_ev = model.profile_events[ev.pubkey] const prev_ev = model.all_events[model.profile_events[ev.pubkey]]
if (prev_ev && prev_ev.created_at > ev.created_at) if (prev_ev && prev_ev.created_at > ev.created_at)
return return
model.profile_events[ev.pubkey] = ev model.profile_events[ev.pubkey] = ev.id
try { try {
model.profiles[ev.pubkey] = JSON.parse(ev.content) model.profiles[ev.pubkey] = JSON.parse(ev.content)
} catch(e) { } catch(e) {
@ -463,7 +652,8 @@ function send_initial_filters(account_id, pubkey, relay) {
relay.subscribe(account_id, filter) relay.subscribe(account_id, filter)
} }
function send_home_filters(ids, model, relay) { function send_home_filters(model, relay) {
const ids = model.ids
const friends = contacts_friend_list(model.contacts) const friends = contacts_friend_list(model.contacts)
friends.push(model.pubkey) friends.push(model.pubkey)
@ -635,11 +825,12 @@ function load_our_relays(our_pubkey, pool, ev) {
} }
for (const relay of Object.keys(relays)) { for (const relay of Object.keys(relays)) {
if (!pool.has(relay)) {
log_debug("adding relay", relay) log_debug("adding relay", relay)
if (!pool.has(relay))
pool.add(relay) pool.add(relay)
} }
} }
}
function load_our_contacts(contacts, our_pubkey, ev) { function load_our_contacts(contacts, our_pubkey, ev) {
if (ev.pubkey !== our_pubkey) if (ev.pubkey !== our_pubkey)
@ -654,28 +845,6 @@ function load_our_contacts(contacts, our_pubkey, ev) {
} }
} }
function get_referenced_events(model, events)
{
let evset = new Set()
for (const ev of events) {
for (const tag of ev.tags) {
if (tag.length >= 2 && tag[0] === "e") {
const e = tag[1]
if (!model.all_events[e]) {
evset.add(e)
}
}
}
}
return Array.from(evset)
}
function fetch_referenced_events(refevents_id, model, relay) {
const ref = df
model.pool.subscribe(refevents_id, [filter], relay)
}
function handle_profiles_loaded(ids, model, view, relay) { function handle_profiles_loaded(ids, model, view, relay) {
// stop asking for profiles // stop asking for profiles
model.pool.unsubscribe(ids.profiles, relay) model.pool.unsubscribe(ids.profiles, relay)
@ -716,17 +885,6 @@ function debounce(f, interval) {
}; };
} }
function get_unknown_chatroom_ids(state)
{
let chatroom_ids = []
for (const key of Object.keys(state.chatrooms)) {
const chatroom = state.chatrooms[key]
if (chatroom.name === undefined)
chatroom_ids.push(key)
}
return chatroom_ids
}
// load profiles after comment notes are loaded // load profiles after comment notes are loaded
function handle_comments_loaded(ids, model, events, relay) function handle_comments_loaded(ids, model, events, relay)
{ {
@ -743,25 +901,13 @@ function handle_comments_loaded(ids, model, events, relay)
const authors = Array.from(pubkeys) const authors = Array.from(pubkeys)
// load profiles and noticed chatrooms // load profiles and noticed chatrooms
const chatroom_ids = get_unknown_chatroom_ids(model)
const profile_filter = {kinds: [0,3], authors: authors} const profile_filter = {kinds: [0,3], authors: authors}
const chatroom_filter = {kinds: [40], ids: chatroom_ids}
let filters = [] let filters = []
if (authors.length > 0) if (authors.length > 0)
filters.push(profile_filter) filters.push(profile_filter)
if (chatroom_ids.length > 0)
filters.push(chatroom_filter)
const ref_evids = get_referenced_events(model, events)
if (ref_evids.length > 0) {
log_debug("got %d new referenced events to pull from %s after initial load", ref_evids.length, relay.url)
filters.push({ids: ref_evids})
filters.push({"#e": ref_evids})
}
if (filters.length === 0) { if (filters.length === 0) {
log_debug("No profiles filters to request...") log_debug("No profiles filters to request...")
return return
@ -1070,7 +1216,7 @@ function get_tag_event(tag)
return DAMUS.all_events[tag[1]] return DAMUS.all_events[tag[1]]
if (tag[0] === "p") if (tag[0] === "p")
return DAMUS.profile_events[tag[1]] return DAMUS.all_events[DAMUS.profile_events[tag[1]]]
return null return null
} }

View file

@ -233,6 +233,7 @@ Relay.prototype.close = function relayClose() {
} }
Relay.prototype.subscribe = function relay_subscribe(sub_id, filters) { Relay.prototype.subscribe = function relay_subscribe(sub_id, filters) {
//console.debug("[%s] %s %s", this.url, sub_id, filters)
if (Array.isArray(filters)) if (Array.isArray(filters))
this.send(["REQ", sub_id, ...filters]) this.send(["REQ", sub_id, ...filters])
else else
@ -240,6 +241,7 @@ Relay.prototype.subscribe = function relay_subscribe(sub_id, filters) {
} }
Relay.prototype.unsubscribe = function relay_unsubscribe(sub_id) { Relay.prototype.unsubscribe = function relay_unsubscribe(sub_id) {
//console.debug("[%s] CLOSE %s", this.url, sub_id)
this.send(["CLOSE", sub_id]) this.send(["CLOSE", sub_id])
} }