package main import ( "bytes" "encoding/json" "errors" "fmt" "log" "net/http" "os" "strings" "time" "github.com/dghubble/go-twitter/twitter" "github.com/dghubble/oauth1" "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" ) var ( ErrExists = errors.New("already subscribed") ) type T struct { t *twitter.Client limit int } func newT(e *env) *T { config := oauth1.NewConfig(e.consumerKey, e.consumerSecret) token := oauth1.NewToken(e.accessToken, e.accessSecret) httpClient := config.Client(oauth1.NoContext, token) c := twitter.NewClient(httpClient) return &T{c, e.limit} } func (t T) search(term string) ([]twitter.User, error) { sp := twitter.UserSearchParams{Count: t.limit} r, _, err := t.t.Users.Search(term, &sp) if err != nil { return nil, err } return r, nil } func (t T) follow(ID int64) error { log.Printf("[DEBUG] following user id: %d", ID) fcp := &twitter.FriendshipCreateParams{UserID: ID} us, resp, err := t.t.Friendships.Create(fcp) if err != nil { return err } log.Println(us.Name) log.Println(resp.Status) return nil } func (t T) unFollow(ID int64) error { log.Printf("[DEBUG] unfollowing user id: %d", ID) fdp := &twitter.FriendshipDestroyParams{UserID: ID} _, _, err := t.t.Friendships.Destroy(fdp) if err != nil { return err } return nil } type store struct { db *sqlx.DB } func newStore(e *env) (*store, error) { conn := fmt.Sprintf("user=%s dbname=%s password=%s host=postgres sslmode=disable", e.db, e.dbUser, e.dbPassword) db, err := sqlx.Connect("postgres", conn) if err != nil { return nil, err } if err := db.Ping(); err != nil { return nil, err } s := &store{db} err = s.CreateTable() if err != nil { return nil, err } return &store{db}, nil } func (s store) CreateTable() error { schemas := []string{ `CREATE TABLE IF NOT EXISTS follows ( id SERIAL PRIMARY KEY, twitter_id int NOT NULL UNIQUE, created timestamp without time zone NOT NULL, active boolean );`, `CREATE TABLE IF NOT EXISTS topics ( id SERIAL PRIMARY KEY, topic text );`, } for _, schema := range schemas { _, err := s.db.Exec(schema) if err != nil { return err } } return nil } func (s store) Teardown() error { query := `DROP TABLE IF EXISTS follows, topics` _, err := s.db.Exec(query) if err != nil { return err } return s.Close() } func (s store) Close() error { return s.db.Close() } func (s store) Save(id int) error { query := `INSERT INTO follows (twitter_id, created, active) VALUES ($1, $2, $3)` _, err := s.db.Exec(query, id, time.Now().UTC(), true) if err != nil { if err, ok := err.(*pq.Error); ok { if err.Code == "23505" { // unique_violation return ErrExists } } return err } return nil } func (s store) OldIDs() ([]int, error) { var ids []int query := `SELECT (twitter_id) FROM follows WHERE created < NOW() - INTERVAL '2 days';` err := s.db.Select(&ids, query) if err != nil { return nil, err } return ids, nil } func (s store) AddTopic(t string) error { query := `INSERT INTO topics (topic) VALUES ($1)` _, err := s.db.Exec(query, t) if err != nil { return err } return nil } func (s store) RemoveTopic(t string) error { query := `DELETE FROM topics WHERE topic = $1` _, err := s.db.Exec(query, t) if err != nil { return err } return nil } func (s store) GetTopics() ([]string, error) { var topics []string query := `SELECT (topic) FROM topics` err := s.db.Select(&topics, query) if err != nil { return nil, err } return topics, nil } func (s store) MarkStale(id int) error { query := `UPDATE follows SET active = false WHERE twitter_id = $1` _, err := s.db.Exec(query, id) if err != nil { return err } return nil } type env struct { consumerKey string consumerSecret string accessToken string accessSecret string dbPassword string slackToken string slackVerificationToken string dbUser string db string limit int } func getEnv() *env { return &env{ consumerKey: os.Getenv("CONSUMER_KEY"), consumerSecret: os.Getenv("CONSUMER_SECRET"), accessToken: os.Getenv("ACCESS_TOKEN"), accessSecret: os.Getenv("ACCESS_SECRET"), dbPassword: os.Getenv("DB_PASS"), slackToken: os.Getenv("SLACK_TOKEN"), slackVerificationToken: os.Getenv("VERIFICATION_TOKEN"), dbUser: "postgres", db: "postgres", limit: 20, } } type S struct { api *slack.Client vt string } func newS(e *env) (*S, error) { return &S{slack.New(e.slackToken), e.slackVerificationToken}, nil } func (s S) SendMsg(channel, msg string) error { opts := slack.MsgOptionText(msg, false) _, _, _, err := s.api.SendMessage(channel, opts) if err != nil { return err } return nil } type Bot struct { store *store tw *T sl *S } func newBot(e *env) (*Bot, error) { store, err := newStore(e) if err != nil { return nil, err } tw := newT(e) sl, err := newS(e) if err != nil { return nil, err } return &Bot{store, tw, sl}, nil } func (b Bot) EventsListener() { http.HandleFunc("/events-endpoint", func(w http.ResponseWriter, r *http.Request) { buf := new(bytes.Buffer) buf.ReadFrom(r.Body) body := buf.String() eventsAPIEvent, err := slackevents.ParseEvent(json.RawMessage(body), slackevents.OptionVerifyToken(&slackevents.TokenComparator{VerificationToken: b.sl.vt})) if err != nil { fmt.Println(err) w.WriteHeader(http.StatusInternalServerError) return } if eventsAPIEvent.Type == slackevents.URLVerification { var r *slackevents.ChallengeResponse err := json.Unmarshal([]byte(body), &r) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text") w.Write([]byte(r.Challenge)) } if eventsAPIEvent.Type == slackevents.CallbackEvent { innerEvent := eventsAPIEvent.InnerEvent switch ev := innerEvent.Data.(type) { case *slackevents.AppMentionEvent: reply := b.processMsg(ev.Text) _, _, err := b.sl.api.PostMessage(ev.Channel, slack.MsgOptionText(reply, false)) if err != nil { log.Println("[ERROR] sending message:", err) } // TODO: fix (hack warning) if strings.Contains(reply, "start tracking") { err = b.searchAndSave() if err != nil { log.Println("[ERROR] searchAndSave():", err) } } } } }) fmt.Println("[INFO] EventsListener starting") http.ListenAndServe(":4040", nil) } // this fn needs to die in hell. func (b Bot) processMsg(text string) string { switch { case strings.Contains(text, "twitter tracking remove"): splt := strings.Split(text, " ") topic := splt[len(splt)-1] err := b.store.RemoveTopic(topic) if err != nil { log.Printf("[ERROR] can't remove topic %s: %v", topic, err) } return fmt.Sprintf("Okay, I'll stop tracking %s", topic) case strings.Contains(text, "twitter tracking add"): splt := strings.Split(text, " ") topic := splt[len(splt)-1] err := b.store.AddTopic(topic) if err != nil { log.Printf("[ERROR] can't add topic %s: %v", topic, err) } return fmt.Sprintf("Okay, I'll start tracking %s", topic) case strings.Contains(text, "twitter tracking"): topics, err := b.store.GetTopics() if err != nil { log.Println("[ERROR] can't get topics:", err) } if len(topics) == 0 { return "I am not tracking anything, use `twitter tracking add `" } return trackingMsg(topics) } return "Hi, currently I can only respond to `twitter tracking `" } func trackingMsg(topics []string) string { msg := "I am currently tracking:\n" for _, topic := range topics { msg = msg + " • " + topic + "\n" } return msg } func (b Bot) searchAndSave() error { toReport := []twitter.User{} topics, err := b.store.GetTopics() if err != nil { return err } for _, term := range topics { usrs, err := b.tw.search(term) if err != nil { log.Println("[ERROR] can't get usrs:", err) continue } toFollow := []twitter.User{} // this is hella hacky, but it'll work. for _, usr := range usrs { if len(toFollow) >= 2 { log.Println("[DEBUG] breaking, user count is", len(toFollow)) break } // should probably do this in a txn err := b.store.Save(int(usr.ID)) if err != nil { if err == ErrExists { log.Printf("[WARN] already followed user %d: %s", usr.ID, usr.Name) continue } log.Printf("[ERROR] cannot save user %d: %v", usr.ID, err) continue } toFollow = append(toFollow, usr) } for _, tf := range toFollow { err = b.tw.follow(tf.ID) if err != nil { log.Printf("[ERROR] cannot follow user %d: %v", tf.ID, err) continue } } toReport = append(toReport, toFollow...) } msg := "I've just subscribed to:\n" for _, u := range toReport { msg = msg + " • " + u.Name + "\n" } b.sl.SendMsg("bot", msg) return nil } func (b Bot) unsubscribe() { ids, err := b.store.OldIDs() if err != nil { log.Println("[WARN] could not get oldIDs:", err) } for _, id := range ids { err := b.tw.unFollow(int64(id)) if err != nil { log.Printf("[WARN] could not unfollow %d: %v", id, err) } err = b.store.MarkStale(id) if err != nil { log.Printf("[WARN] could not mark stale %d: %v", id, err) } } if len(ids) == 0 { return } b.sl.SendMsg("bot", fmt.Sprintf("I just unfollowed %d accounts", len(ids))) } func main() { log.Println("starting") log.Println("waiting for postgres") e := getEnv() // wait for it. for i := 0; i < 5; i++ { _, err := newStore(e) if err != nil { log.Printf("[ERROR]: could not connect to pg: %v", err) time.Sleep(5 * time.Second) log.Println("[INFO]: retrying") continue } log.Println("[INFO]: connected to pg") break } b, err := newBot(e) if err != nil { log.Fatal("[FATAL] could not start", err) } go b.EventsListener() ticker := time.NewTicker(24 * time.Hour) for { select { case <-ticker.C: err := b.searchAndSave() if err != nil { log.Printf("[ERROR] could not search and save: %v", err) } b.unsubscribe() } } }