Files
SNote/Server/Endpoints/SyncEndpoints.cs
T
2026-06-01 05:09:20 +10:00

501 lines
22 KiB
C#

using System;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Routing;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using SNote.Models;
using SNote.Server.Data;
using SNote.Server.Security;
namespace SNote.Server.Endpoints;
public static class SyncEndpoints
{
public static void MapSyncEndpoints(this IEndpointRouteBuilder routes)
{
// 0. Heartbeat ping endpoint
routes.MapGet("/api/sync/ping", () => Results.Ok(new { status = "Healthy" }));
// 1. Handshake Endpoint: Receive registration of downstream peers
routes.MapPost("/api/sync/register-peer", async (RegisterPeerRequest req, HttpContext context, PeerCache peerCache, CertificateManager certManager) =>
{
if (string.IsNullOrWhiteSpace(req.PeerUrl))
{
return Results.BadRequest("Peer URL is required.");
}
// Authenticate server-to-server JWT
if (!AuthHelper.IsServerAuthenticated(context, certManager))
{
return Results.Json(new { error = "Unauthorized server connection handshake." }, statusCode: 401);
}
peerCache.RegisterPeer(req.PeerUrl);
return Results.Ok(new { message = "Server peer registered successfully." });
});
// 2. Full Database pulling (bootstrapping new server node)
routes.MapPost("/api/sync/pull-database", async (HttpContext context, ServerDbContext db, IConfiguration config, CertificateManager certManager) =>
{
var enablePull = config.GetValue<bool>("EnableFullDbPull", false);
if (!enablePull)
{
return Results.Json(new { error = "Full database pulling is disabled on this server." }, statusCode: 403);
}
if (!AuthHelper.IsServerAuthenticated(context, certManager))
{
return Results.Json(new { error = "Unauthorized server node." }, statusCode: 401);
}
var users = await db.Users.ToListAsync();
var nodes = await db.Nodes.ToListAsync();
var nodeKeys = await db.NodeKeys.ToListAsync();
var pendingShares = await db.PendingShares.ToListAsync();
var aclEntries = await db.AclEntries.ToListAsync();
var peerServers = await db.PeerServers.ToListAsync();
var dump = new DatabaseDumpPayload(users, nodes, nodeKeys, pendingShares, aclEntries, peerServers);
return Results.Ok(dump);
});
// 3. Synchronize nodes states
routes.MapPost("/api/sync/nodes", async (SyncExchangeRequest req, HttpContext context, ServerDbContext db, CertificateManager certManager) =>
{
if (!AuthHelper.IsServerAuthenticated(context, certManager))
{
return Results.Unauthorized();
}
var toPush = new List<SyncPushItem>();
var toRequest = new List<string>();
var localNodes = await db.Nodes.ToListAsync();
var localNodeUuids = localNodes.Select(n => n.Uuid).ToHashSet();
foreach (var remoteItem in req.Items)
{
var localNode = localNodes.FirstOrDefault(n => n.Uuid == remoteItem.Uuid);
if (localNode == null)
{
toRequest.Add(remoteItem.Uuid);
}
else
{
if (localNode.UpdatedAt > remoteItem.UpdatedAt)
{
var acls = await db.AclEntries.Where(a => a.NodeUuid == localNode.Uuid).ToListAsync();
var keys = await db.NodeKeys.Where(k => k.NodeUuid == localNode.Uuid).ToListAsync();
toPush.Add(new SyncPushItem(localNode, acls, keys));
}
else if (localNode.UpdatedAt < remoteItem.UpdatedAt)
{
toRequest.Add(remoteItem.Uuid);
}
}
}
var remoteUuids = req.Items.Select(i => i.Uuid).ToHashSet();
foreach (var localNode in localNodes)
{
if (!remoteUuids.Contains(localNode.Uuid))
{
var acls = await db.AclEntries.Where(a => a.NodeUuid == localNode.Uuid).ToListAsync();
var keys = await db.NodeKeys.Where(k => k.NodeUuid == localNode.Uuid).ToListAsync();
toPush.Add(new SyncPushItem(localNode, acls, keys));
}
}
return Results.Ok(new SyncExchangeResponse(toPush, toRequest));
});
// 4. Receive full node pushes during sync
routes.MapPost("/api/sync/push", async (List<SyncPushItem> items, HttpContext context, ServerDbContext db, CertificateManager certManager) =>
{
if (!AuthHelper.IsServerAuthenticated(context, certManager))
{
return Results.Unauthorized();
}
foreach (var item in items)
{
var localNode = await db.Nodes.FirstOrDefaultAsync(n => n.Uuid == item.Node.Uuid);
if (localNode == null)
{
db.Nodes.Add(item.Node);
db.AclEntries.AddRange(item.AclEntries);
db.NodeKeys.AddRange(item.NodeKeys);
}
else if (localNode.UpdatedAt < item.Node.UpdatedAt)
{
localNode.Name = item.Node.Name;
localNode.Type = item.Node.Type;
localNode.Content = item.Node.Content;
localNode.OwnerUsername = item.Node.OwnerUsername;
localNode.UpdatedAt = item.Node.UpdatedAt;
localNode.IsDeleted = item.Node.IsDeleted;
var existingAcls = await db.AclEntries.Where(a => a.NodeUuid == item.Node.Uuid).ToListAsync();
db.AclEntries.RemoveRange(existingAcls);
db.AclEntries.AddRange(item.AclEntries);
var existingKeys = await db.NodeKeys.Where(k => k.NodeUuid == item.Node.Uuid).ToListAsync();
db.NodeKeys.RemoveRange(existingKeys);
db.NodeKeys.AddRange(item.NodeKeys);
}
}
await db.SaveChangesAsync();
return Results.Ok(new { message = "Synced elements updated successfully." });
});
// 5. Receive broadcast of changes (broadcasts single node updates)
routes.MapPost("/api/sync/broadcast", async (SyncPushItem item, HttpContext context, ServerDbContext db, PeerCache peerCache, CertificateManager certManager, HttpClient httpClient) =>
{
if (!AuthHelper.IsServerAuthenticated(context, certManager))
{
return Results.Unauthorized();
}
var broadcastId = item.BroadcastId;
if (!string.IsNullOrEmpty(broadcastId))
{
// Loop check: if already processed, ignore!
if (!peerCache.TryProcessBroadcast(broadcastId))
{
return Results.Ok(new { message = "Broadcast already processed (loop detected).", updated = false });
}
}
var localNode = await db.Nodes.FirstOrDefaultAsync(n => n.Uuid == item.Node.Uuid);
bool updated = false;
if (localNode == null)
{
db.Nodes.Add(item.Node);
db.AclEntries.AddRange(item.AclEntries);
db.NodeKeys.AddRange(item.NodeKeys);
updated = true;
}
else if (localNode.UpdatedAt < item.Node.UpdatedAt)
{
localNode.Name = item.Node.Name;
localNode.Type = item.Node.Type;
localNode.Content = item.Node.Content;
localNode.OwnerUsername = item.Node.OwnerUsername;
localNode.UpdatedAt = item.Node.UpdatedAt;
localNode.IsDeleted = item.Node.IsDeleted;
var existingAcls = await db.AclEntries.Where(a => a.NodeUuid == item.Node.Uuid).ToListAsync();
db.AclEntries.RemoveRange(existingAcls);
db.AclEntries.AddRange(item.AclEntries);
var existingKeys = await db.NodeKeys.Where(k => k.NodeUuid == item.Node.Uuid).ToListAsync();
db.NodeKeys.RemoveRange(existingKeys);
db.NodeKeys.AddRange(item.NodeKeys);
updated = true;
}
if (updated)
{
await db.SaveChangesAsync();
// Relay broadcast to all peers (upstream + downstream) with loop prevention!
if (!string.IsNullOrEmpty(broadcastId))
{
_ = Task.Run(async () =>
{
await BroadcastNodeChangeAsync(item.Node.Uuid, db, peerCache, certManager, httpClient, broadcastId);
});
}
}
return Results.Ok(new { message = "Broadcast processed.", updated });
});
}
// Handshake: Tells the destination server who we are and registers us
public static async Task RegisterPeerWithDestinationAsync(string destinationUrl, string localUrl, CertificateManager certManager, HttpClient httpClient)
{
try
{
Console.WriteLine($"[Handshake] Registering our address '{localUrl}' with destination '{destinationUrl}'...");
var cleanDest = destinationUrl.TrimEnd('/') + "/api/sync/register-peer";
var payload = new RegisterPeerRequest(localUrl);
var json = JsonSerializer.Serialize(payload);
var request = new HttpRequestMessage(HttpMethod.Post, cleanDest)
{
Content = new StringContent(json, Encoding.UTF8, "application/json")
};
// Generate server-to-server JWT signature
var token = AuthHelper.GenerateServerToken(certManager);
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
var response = await httpClient.SendAsync(request);
if (response.IsSuccessStatusCode)
{
Console.WriteLine("[Handshake] Successfully registered with destination server.");
}
else
{
Console.WriteLine($"[Handshake] Handshake failed: {response.StatusCode}");
}
}
catch (Exception ex)
{
Console.WriteLine($"[Handshake] Handshake error: {ex.Message}");
}
}
// Broadcast a node change to both database and dynamic in-memory cached peer servers
public static async Task BroadcastNodeChangeAsync(string nodeUuid, ServerDbContext db, PeerCache peerCache, CertificateManager certManager, HttpClient httpClient, string? broadcastId = null)
{
var node = await db.Nodes.FirstOrDefaultAsync(n => n.Uuid == nodeUuid);
if (node == null) return;
var acls = await db.AclEntries.Where(a => a.NodeUuid == nodeUuid).ToListAsync();
var keys = await db.NodeKeys.Where(k => k.NodeUuid == nodeUuid).ToListAsync();
var bId = string.IsNullOrEmpty(broadcastId) ? Guid.NewGuid().ToString() : broadcastId;
// Register it locally in PeerCache so we don't process it ourselves if it loops back
peerCache.TryProcessBroadcast(bId);
var payload = new SyncPushItem(node, acls, keys, bId);
var json = JsonSerializer.Serialize(payload);
// Fetch database configured peers
var dbPeers = await db.PeerServers.Select(p => p.Url).ToListAsync();
// Fetch dynamically handshake-registered in-memory peers
var dynamicPeers = peerCache.GetPeers();
// Combine peers
var allPeers = dbPeers.Union(dynamicPeers, StringComparer.OrdinalIgnoreCase).ToList();
foreach (var peerUrl in allPeers)
{
try
{
var request = new HttpRequestMessage(HttpMethod.Post, $"{peerUrl.TrimEnd('/')}/api/sync/broadcast");
request.Content = new StringContent(json, Encoding.UTF8, "application/json");
var token = AuthHelper.GenerateServerToken(certManager);
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
var response = await httpClient.SendAsync(request);
Console.WriteLine($"Broadcast to {peerUrl}: {response.StatusCode}");
}
catch (Exception ex)
{
Console.WriteLine($"Failed to broadcast change to {peerUrl}: {ex.Message}");
}
}
}
// Periodically pings downstream peer servers in PeerCache to ensure they are online
public static async Task StartHeartbeatPingerAsync(PeerCache peerCache, CertificateManager certManager, HttpClient httpClient, System.Threading.CancellationToken token)
{
Console.WriteLine("[Heartbeat] Starting periodic downstream heartbeat pinger...");
while (!token.IsCancellationRequested)
{
try
{
// Wait for 30 seconds
await Task.Delay(30000, token);
var peers = peerCache.GetPeers();
if (peers.Count == 0) continue;
var jwtToken = AuthHelper.GenerateServerToken(certManager);
foreach (var peerUrl in peers)
{
try
{
var request = new HttpRequestMessage(HttpMethod.Get, $"{peerUrl.TrimEnd('/')}/api/sync/ping");
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", jwtToken);
// Set a short timeout (e.g. 5 seconds) to avoid blocking
using var cts = new System.Threading.CancellationTokenSource(TimeSpan.FromSeconds(5));
using var linkedCts = System.Threading.CancellationTokenSource.CreateLinkedTokenSource(token, cts.Token);
var response = await httpClient.SendAsync(request, linkedCts.Token);
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"[Heartbeat] Peer {peerUrl} ping failed with status code: {response.StatusCode}. Evicting...");
peerCache.RemovePeer(peerUrl);
}
}
catch (Exception ex)
{
Console.WriteLine($"[Heartbeat] Peer {peerUrl} is unreachable ({ex.Message}). Evicting...");
peerCache.RemovePeer(peerUrl);
}
}
}
catch (TaskCanceledException)
{
break;
}
catch (Exception ex)
{
Console.WriteLine($"[Heartbeat] Error in heartbeat loop: {ex.Message}");
}
}
}
public static async Task SyncWithDestinationServerAsync(string peerUrl, ServerDbContext db, CertificateManager certManager, HttpClient httpClient)
{
try
{
var token = AuthHelper.GenerateServerToken(certManager);
var localNodes = await db.Nodes.ToListAsync();
var reqItems = localNodes.Select(n => new SyncNodeState(n.Uuid, n.UpdatedAt, n.IsDeleted)).ToList();
var request = new HttpRequestMessage(HttpMethod.Post, $"{peerUrl.TrimEnd('/')}/api/sync/nodes");
request.Content = new StringContent(JsonSerializer.Serialize(new SyncExchangeRequest(reqItems)), Encoding.UTF8, "application/json");
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
var response = await httpClient.SendAsync(request);
if (!response.IsSuccessStatusCode) return;
var result = JsonSerializer.Deserialize<SyncExchangeResponse>(await response.Content.ReadAsStringAsync(), new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
if (result == null) return;
if (result.ToPush != null && result.ToPush.Count > 0)
{
foreach (var item in result.ToPush)
{
var local = await db.Nodes.FirstOrDefaultAsync(n => n.Uuid == item.Node.Uuid);
if (local == null)
{
db.Nodes.Add(item.Node);
db.AclEntries.AddRange(item.AclEntries);
db.NodeKeys.AddRange(item.NodeKeys);
}
else if (local.UpdatedAt < item.Node.UpdatedAt)
{
local.Name = item.Node.Name;
local.Type = item.Node.Type;
local.Content = item.Node.Content;
local.OwnerUsername = item.Node.OwnerUsername;
local.UpdatedAt = item.Node.UpdatedAt;
local.IsDeleted = item.Node.IsDeleted;
var existingAcls = await db.AclEntries.Where(a => a.NodeUuid == item.Node.Uuid).ToListAsync();
db.AclEntries.RemoveRange(existingAcls);
db.AclEntries.AddRange(item.AclEntries);
var existingKeys = await db.NodeKeys.Where(k => k.NodeUuid == item.Node.Uuid).ToListAsync();
db.NodeKeys.RemoveRange(existingKeys);
db.NodeKeys.AddRange(item.NodeKeys);
}
}
await db.SaveChangesAsync();
}
if (result.ToRequest != null && result.ToRequest.Count > 0)
{
var pushItems = new List<SyncPushItem>();
foreach (var uuid in result.ToRequest)
{
var node = await db.Nodes.FirstOrDefaultAsync(n => n.Uuid == uuid);
if (node != null)
{
var acls = await db.AclEntries.Where(a => a.NodeUuid == uuid).ToListAsync();
var keys = await db.NodeKeys.Where(k => k.NodeUuid == uuid).ToListAsync();
pushItems.Add(new SyncPushItem(node, acls, keys));
}
}
if (pushItems.Count > 0)
{
var pushRequest = new HttpRequestMessage(HttpMethod.Post, $"{peerUrl.TrimEnd('/')}/api/sync/push");
pushRequest.Content = new StringContent(JsonSerializer.Serialize(pushItems), Encoding.UTF8, "application/json");
pushRequest.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
await httpClient.SendAsync(pushRequest);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Error synchronizing with peer {peerUrl}: {ex.Message}");
}
}
public static async Task BootstrapFromPeerServerAsync(string peerUrl, ServerDbContext db, CertificateManager certManager, HttpClient httpClient)
{
try
{
Console.WriteLine($"Performing full database pulling bootstrap from {peerUrl}...");
var token = AuthHelper.GenerateServerToken(certManager);
var request = new HttpRequestMessage(HttpMethod.Post, $"{peerUrl.TrimEnd('/')}/api/sync/pull-database");
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
var response = await httpClient.SendAsync(request);
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"Full database pull failed: {response.StatusCode}");
return;
}
var dump = JsonSerializer.Deserialize<DatabaseDumpPayload>(await response.Content.ReadAsStringAsync(), new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
if (dump == null) return;
db.Users.RemoveRange(await db.Users.ToListAsync());
db.Nodes.RemoveRange(await db.Nodes.ToListAsync());
db.NodeKeys.RemoveRange(await db.NodeKeys.ToListAsync());
db.PendingShares.RemoveRange(await db.PendingShares.ToListAsync());
db.AclEntries.RemoveRange(await db.AclEntries.ToListAsync());
db.PeerServers.RemoveRange(await db.PeerServers.ToListAsync());
await db.SaveChangesAsync();
db.Users.AddRange(dump.Users);
db.Nodes.AddRange(dump.Nodes);
db.NodeKeys.AddRange(dump.NodeKeys);
db.PendingShares.AddRange(dump.PendingShares);
db.AclEntries.AddRange(dump.AclEntries);
db.PeerServers.AddRange(dump.PeerServers);
await db.SaveChangesAsync();
Console.WriteLine("Full database pull bootstrap completed successfully!");
}
catch (Exception ex)
{
Console.WriteLine($"Error during full database pulling bootstrap: {ex.Message}");
}
}
}
public record RegisterPeerRequest(string PeerUrl);
public record DatabaseDumpPayload(
List<User> Users,
List<Node> Nodes,
List<NodeKey> NodeKeys,
List<PendingShare> PendingShares,
List<AclEntry> AclEntries,
List<PeerServer> PeerServers
);
public record SyncNodeState(string Uuid, DateTime UpdatedAt, bool IsDeleted);
public record SyncExchangeRequest(List<SyncNodeState> Items);
public record SyncPushItem(Node Node, List<AclEntry> AclEntries, List<NodeKey> NodeKeys, string? BroadcastId = null);
public record SyncExchangeResponse(List<SyncPushItem> ToPush, List<string> ToRequest);