commit 1bbbda53851f5de13535e95143e598c29e80fa51 Author: sati.ac Date: Wed Jun 28 13:56:39 2023 +0300 initial commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..38c079c --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# sati-go - golang api wrapper for [sati.ac](sati.ac) \ No newline at end of file diff --git a/events.go b/events.go new file mode 100644 index 0000000..0226867 --- /dev/null +++ b/events.go @@ -0,0 +1,80 @@ +package sati + +import ( + "fmt" + "sync" + + "github.com/mitchellh/mapstructure" +) + +type EventBus struct { + mu *sync.Mutex + idCounter uint32 + handlers map[string]map[uint32]func(data any) + eventTypes map[string]any +} + +type EventHandler struct { + id uint32 + type_ string + bus *EventBus +} + +func (e *EventHandler) Off() { + e.bus.mu.Lock() + defer e.bus.mu.Unlock() + + delete(e.bus.handlers[e.type_], e.id) +} + +func (e *EventBus) On(event string, handler func(data any)) (*EventHandler, error) { + e.mu.Lock() + defer e.mu.Unlock() + + if _, ok := e.eventTypes[event]; !ok { + return nil, fmt.Errorf("bad event type") + } + + e.idCounter++ + e.handlers[event][e.idCounter] = handler + return &EventHandler{ + id: e.idCounter, + type_: event, + bus: e, + }, nil +} + +func (e *EventBus) dispatch(event string, data any) error { + e.mu.Lock() + defer e.mu.Unlock() + + formattedData, ok := e.eventTypes[event] + if !ok { + return fmt.Errorf("bad event type") + } + + if err := mapstructure.Decode(data, &formattedData); err != nil { + return err + } + + for _, handler := range e.handlers[event] { + handler(formattedData) + } + + return nil +} + +// eventTypes values must be structs, not pointers +func newEventBus(eventTypes map[string]any) *EventBus { + e := &EventBus{ + mu: &sync.Mutex{}, + handlers: make(map[string]map[uint32]func(data any)), + eventTypes: eventTypes, + } + + for event := range eventTypes { + e.handlers[event] = make(map[uint32]func(data any)) + } + + return e +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..dff5a10 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module git.sati.ac/sati.ac/sati-go + +go 1.20 + +require ( + github.com/gorilla/websocket v1.5.0 + github.com/mitchellh/mapstructure v1.5.0 + github.com/shopspring/decimal v1.3.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..814ee9a --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= diff --git a/sati.go b/sati.go new file mode 100644 index 0000000..54c7120 --- /dev/null +++ b/sati.go @@ -0,0 +1,85 @@ +package sati + +import ( + "fmt" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/shopspring/decimal" +) + +type Config struct { + ReconnectionInterval time.Duration + Endpoint string + Debug bool + Token string +} + +func NewConfig(token string) Config { + return Config{ + ReconnectionInterval: time.Second, + Debug: false, + Endpoint: "wss://api.sati.ac/ws", + Token: token, + } +} + +type Api struct { + socket *socket +} + +func (a *Api) Solve(task AnyTask, result any) (*TaskEntity, error) { + createdTask := CreateTaskResult{} + if err := a.socket.call("createTask", task.serialize(), &createdTask); err != nil { + return nil, err + } + + resultCh := make(chan *TaskUpdateEvent) + handler, err := a.socket.events.On("taskUpdate", func(data any) { + e := data.(TaskUpdateEvent) + if createdTask.Id == e.Id && (e.State == "error" || e.State == "success") { + resultCh <- &e + } + }) + + if err != nil { + return nil, err + } + + solvedTask := <-resultCh + handler.Off() + + if solvedTask.State == "error" { + return nil, fmt.Errorf("sati: failed to solve task") + } + + if err := mapstructure.Decode(solvedTask.Result, result); err != nil { + return nil, err + } + + return (*TaskEntity)(solvedTask), nil +} + +func (a *Api) GetBalance() (*decimal.Decimal, error) { + result := GetBalanceResult{} + if err := a.socket.call("getBalance", &GetBalanceRequest{}, &result); err != nil { + return nil, err + } + + balance, err := decimal.NewFromString(result.Balance) + if err != nil { + return nil, err + } + + return &balance, nil +} + +func (a *Api) Dispose() { + a.socket.close() +} + +func NewApi(config Config) *Api { + return &Api{ + socket: newSocket(config), + } +} diff --git a/socket.go b/socket.go new file mode 100644 index 0000000..e917b0b --- /dev/null +++ b/socket.go @@ -0,0 +1,285 @@ +package sati + +import ( + "fmt" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/mitchellh/mapstructure" +) + +const ( + stReconnecting uint32 = iota + stConnected + stUnrecoverable +) + +type message struct { + Type string + Id uint32 + Data any + To uint32 +} + +type outgoingMessage struct { + msg message + result chan *incomingMessage +} + +type incomingMessage struct { + msg *message + err error +} + +type socket struct { + idCounter uint32 + state uint32 + unrecoverableError error + closeNotifier chan struct{} + outgoing chan *outgoingMessage + ws *websocket.Conn + awaitedReplies map[uint32]chan *incomingMessage + mu *sync.Mutex + events *EventBus + config Config +} + +func (s *socket) reciever() { + for { + var message message + if err := s.ws.ReadJSON(&message); err != nil { + if s.config.Debug { + fmt.Println("sati: got error while reading socket", err.Error()) + } + s.mu.Lock() + for _, ch := range s.awaitedReplies { + ch <- &incomingMessage{ + err: s.unrecoverableError, + } + } + s.awaitedReplies = make(map[uint32]chan *incomingMessage) + s.mu.Unlock() + s.closeNotifier <- struct{}{} + return + } + + if s.config.Debug { + fmt.Println("sati: recieved message", &message) + } + + switch message.Type { + case "auth": + fallthrough + case "call": + if message.To == 0 { + continue + } + s.mu.Lock() + resultCh, ok := s.awaitedReplies[message.To] + if ok { + delete(s.awaitedReplies, message.To) + } + s.mu.Unlock() + if resultCh != nil { + resultCh <- &incomingMessage{msg: &message} + } + case "event": + var event struct { + Type string `json:"type"` + Data any `json:"data"` + } + + mapstructure.Decode(message.Data, &event) + if err := s.events.dispatch(event.Type, event.Data); err != nil && s.config.Debug { + fmt.Println("sati: error while dispatching event:", err.Error()) + } + } + } +} + +func (s *socket) send(msg *outgoingMessage) error { + s.idCounter++ + msg.msg.Id = s.idCounter + if msg.result != nil { + s.mu.Lock() + s.awaitedReplies[msg.msg.Id] = msg.result + s.mu.Unlock() + } + + if s.config.Debug { + fmt.Println("sati: sending message", msg) + } + + err := s.ws.WriteJSON(msg.msg) + if msg.result != nil && err != nil { + s.mu.Lock() + s.awaitedReplies[msg.msg.Id] <- &incomingMessage{ + err: err, + } + delete(s.awaitedReplies, msg.msg.Id) + s.mu.Unlock() + } + return err +} + +func (s *socket) sender() { + for { + select { + case msg := <-s.outgoing: + s.send(msg) + case <-s.closeNotifier: + return + } + } +} + +func (s *socket) connect() error { + if s.config.Debug { + fmt.Println("sati: connecting") + } + ws, _, err := websocket.DefaultDialer.Dial(s.config.Endpoint, http.Header{}) + if err != nil { + return err + } + + s.mu.Lock() + s.state = stReconnecting + s.ws = ws + s.mu.Unlock() + + resultChan := make(chan *incomingMessage) + s.send(&outgoingMessage{ + message{ + Type: "auth", + Data: struct { + Token string `json:"token"` + }{s.config.Token}, + }, resultChan, + }) + + go s.reciever() + + rawResult := <-resultChan + if rawResult.err != nil { + return rawResult.err + } + + var result struct { + Success bool `json:"success"` + } + + if err := mapstructure.Decode(rawResult.msg.Data, &result); err != nil { + return err + } + + if !result.Success { + s.setUnrecoverableState("invalid auth token") + return s.unrecoverableError + } + + s.sender() + + return nil +} + +func (s *socket) connector() { + for { + s.mu.Lock() + state := s.state + s.mu.Unlock() + if state == stUnrecoverable { + break + } + + err := s.connect() // will block until disconnect + if s.config.Debug && err != nil { + fmt.Println("sati: disconnected", err.Error()) + } + time.Sleep(s.config.ReconnectionInterval) + } +} + +func (s *socket) setUnrecoverableState(err string) { + s.mu.Lock() + defer s.mu.Unlock() + if s.state == stUnrecoverable { + return + } + s.unrecoverableError = fmt.Errorf("sati: %s", err) + s.state = stUnrecoverable + s.ws.Close() +} + +func newSocket(config Config) *socket { + s := &socket{ + closeNotifier: make(chan struct{}), + outgoing: make(chan *outgoingMessage), + awaitedReplies: make(map[uint32]chan *incomingMessage), + mu: &sync.Mutex{}, + events: newEventBus(map[string]any{ + "taskUpdate": TaskUpdateEvent{}, + "tokenReissue": TokenReissueEvent{}, + }), + config: config, + } + + s.events.On("tokenReissue", func(any) { + s.setUnrecoverableState("token was reissued") + }) + + go s.connector() + + return s +} + +func (s *socket) close() { + s.setUnrecoverableState("socket closed") +} + +func (s *socket) call(method string, data any, result any) error { + s.mu.Lock() + if s.state == stUnrecoverable { + err := s.unrecoverableError + s.mu.Unlock() + return err + } + s.mu.Unlock() + + resultCh := make(chan *incomingMessage) + s.outgoing <- &outgoingMessage{ + msg: message{ + Type: "call", + Data: CallMessageOutgoing{ + Method: method, + Data: data, + }, + }, + result: resultCh, + } + + resultMsg := <-resultCh + if resultMsg.err != nil { + return resultMsg.err + } + + callResult := CallMessageIncoming{} + if err := mapstructure.Decode(resultMsg.msg.Data, &callResult); err != nil { + return err + } + + if !callResult.Success { + callErr := &CallError{} + if err := mapstructure.Decode(callResult.Data, callErr); err != nil { + return err + } + return callErr + } + + if err := mapstructure.Decode(callResult.Data, result); err != nil { + return err + } + + return nil +} diff --git a/tasks.go b/tasks.go new file mode 100644 index 0000000..d6b8a73 --- /dev/null +++ b/tasks.go @@ -0,0 +1,44 @@ +package sati + +type AnyTask interface { + serialize() task +} + +type task struct { + Type string `json:"type"` + Data any `json:"data"` +} + +type TurnstileTask struct { + SiteKey string `json:"siteKey"` + PageUrl string `json:"pageUrl"` + CData *string `json:"cData,omitempty"` + Action *string `json:"action,omitempty"` +} + +func (t *TurnstileTask) serialize() task { + return task{ + Type: "Turnstile", + Data: t, + } +} + +type TurnstileResult struct { + Token string `json:"token"` +} + +type ReCaptcha2Task struct { + SiteKey string `json:"siteKey"` + PageUrl string `json:"pageUrl"` +} + +func (t *ReCaptcha2Task) serialize() task { + return task{ + Type: "ReCaptcha2", + Data: t, + } +} + +type ReCaptcha2Result struct { + Token string `json:"token"` +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..14eaa27 --- /dev/null +++ b/types.go @@ -0,0 +1,42 @@ +package sati + +import ( + "fmt" +) + +type TokenReissueEvent struct{} + +type TaskUpdateEvent TaskEntity + +type CallMessageOutgoing struct { + Method string `json:"method"` + Data any `json:"data"` +} + +type CallMessageIncoming struct { + Success bool `json:"success"` + Data any `json:"data"` +} + +type CallError struct { + Description string `json:"description"` + Code uint32 `json:"code"` +} + +func (c *CallError) Error() string { + return fmt.Sprintf("sati: api: #%d %s", c.Code, c.Description) +} + +type TaskEntity struct { + Id uint32 `json:"id"` + Type string `json:"type"` + State string `json:"state"` + Cost string `json:"cost"` + Result any `json:"result"` +} + +type CreateTaskResult TaskEntity +type GetBalanceRequest struct{} +type GetBalanceResult struct { + Balance string `json:"balance"` +}