Scanyonero/queue.go
David Vogel 853a1bb58d Rework into FTP scanning server
- Rename to Scanyonero
- Add FTP server that ingests TIFF, PNG, JPEG or PDF files
- Add web interface to check and modify ingested files
- Rework how ocrmypdf is invoked

Basics are working, but the program is not in a usable state.
2025-05-14 12:08:38 +02:00

187 lines
4.9 KiB
Go

package main
import (
"slices"
"sync"
)
// Queue contains a list of scanned documents.
// A user can issue operations on these entries.
//
// The list is synced between the server and clients via websockets.
type Queue struct {
sync.Mutex
Documents []QueueEntry
listeners map[chan<- ServerWebsocketPacket]struct{}
}
// RegisterListener will add the given channel c to receive updates in the form of websocket packets.
//
// UnregisterListener must be called before the channel can be closed or stopped reading from.
func (d *Queue) RegisterListener(c chan<- ServerWebsocketPacket) {
d.Lock()
defer d.Unlock()
if d.listeners == nil {
d.listeners = make(map[chan<- ServerWebsocketPacket]struct{})
}
d.listeners[c] = struct{}{}
// Send current document queue.
c <- &ServerWebsocketPacketQueueReplace{Documents: d.Documents}
}
// UnregisterListener will stop the given listener from receiving updates.
// Upon return, the listener will not receive any updates from the queue.
func (d *Queue) UnregisterListener(c chan<- ServerWebsocketPacket) {
d.Lock()
defer d.Unlock()
if d.listeners == nil {
d.listeners = make(map[chan<- ServerWebsocketPacket]struct{})
}
delete(d.listeners, c)
}
// Broadcast will send a websocket packet to all registered listeners.
//
// The Queue must be locked when calling this.
func (d *Queue) Broadcast(p ...ServerWebsocketPacket) {
for listener := range d.listeners {
for _, packet := range p {
listener <- packet
}
}
}
// DeleteAt removes all elements at [i:j].
//
// This will automatically limit the indices to valid ranges.
//
// The Queue must be locked when calling this.
func (d *Queue) DeleteAt(i, j int) {
i = max(0, min(i, len(d.Documents)-1)) // Limit to [0; len).
j = max(0, min(j, len(d.Documents))) // Limit to [0; len].
if i >= j {
return
}
d.Documents = slices.Delete(d.Documents, i, j)
d.Broadcast(&ServerWebsocketPacketQueueDeleteAt{IndexA: i, IndexB: j})
}
// Delete removes the elements with the given DocumentIDs.
//
// The Queue must be locked when calling this.
func (d *Queue) Delete(ids ...QueueEntryID) {
for i, doc := range slices.Backward(d.Documents) {
if slices.Contains(ids, doc.ID) {
d.DeleteAt(i, i+1)
}
}
}
// InsertAt inserts the given document at index i.
//
// Documents will be shifted accordingly, valid indices are in the range of [0; len(queue)].
// This will automatically limit the index to valid ranges.
//
// The Queue must be locked when calling this.
func (d *Queue) InsertAt(i int, documents ...QueueEntry) {
i = max(0, min(i, len(d.Documents))) // Limit to [0; len].
d.Documents = slices.Insert(d.Documents, i, documents...)
d.Broadcast(&ServerWebsocketPacketQueueInsertAt{Index: i, Documents: documents})
}
// Append will add the given documents to the end of the queue.
//
// The Queue must be locked when calling this.
func (d *Queue) Append(documents ...QueueEntry) {
d.InsertAt(len(d.Documents), documents...)
}
// Replace will replace the whole list of documents with the given one.
//
// The Queue must be locked when calling this.
func (d *Queue) Replace(documents ...QueueEntry) {
d.Documents = slices.Clone(documents)
d.Broadcast(&ServerWebsocketPacketQueueReplace{Documents: documents})
}
// ShiftAt will move the element at index i by the given offset.
//
// This will automatically limit the index and the offset to valid ranges.
//
// The Queue must be locked when calling this.
func (d *Queue) ShiftAt(i, offset int) {
if len(d.Documents) <= 0 {
return
}
i = max(0, min(i, len(d.Documents)-1)) // Limit to [0; len).
offset = max(-i, min(offset, len(d.Documents)-i-1)) // Limit to [-i; len-i-1].
for tempOffset, i := offset, i; tempOffset != 0; {
switch {
case tempOffset > 0:
d.Documents[i], d.Documents[i+1] = d.Documents[i+1], d.Documents[i]
tempOffset--
i++
case offset < 0:
d.Documents[i], d.Documents[i-1] = d.Documents[i-1], d.Documents[i]
tempOffset++
i--
}
}
d.Broadcast(&ServerWebsocketPacketQueueShiftAt{Index: i, Offset: offset})
}
// Shift will move the index of all elements with the given DocumentIDs by offset.
//
// The Queue must be locked when calling this.
func (d *Queue) Shift(offset int, ids ...QueueEntryID) {
switch {
case offset < 0:
for i, entry := range d.Documents {
if slices.Contains(ids, entry.ID) {
if offset < -i {
offset = -i
}
d.ShiftAt(i, offset)
}
}
case offset > 0:
for i, entry := range slices.Backward(d.Documents) {
if slices.Contains(ids, entry.ID) {
if offset > len(d.Documents)-i-1 {
offset = len(d.Documents) - i - 1
}
d.ShiftAt(i, offset)
}
}
}
}
// QueueEntryByID returns the QueueEntry with the given ID.
//
// The Queue must be locked when calling this.
func (d *Queue) QueueEntryByID(id QueueEntryID) *QueueEntry {
for i := range d.Documents {
document := &d.Documents[i]
if document.ID == id {
return document
}
}
return nil
}