키워드
-
데이터 엔지니어
-
개발
-
완전관리형 서비스 - Fully Managed Service
-
시간과 생산성
데이터 분석가, 데이터 사이언티스트
Google Kubernetes Engine ->
twitter 스트리밍 데이터 ->
Google Cloud Pub/Sub ->
Google Cloud Functions ->
BigQuery ->
DataStudio
Google Cloud Pub/Sub
-
실시간 메세징 서비스(택배를 분류해서 분배하는 것이라 생각하면 됨!)
-
Kafka와 유사
- Producer => Broker => Consumer
-
Publisher => Pub/Sub Topic => Subscriber
import json
import tweepy
from google.cloud import pubsub_v1
from google.oauth2 import service_account
key_path = "ta-1016-onsite-0796b0bda810.json"
credentials = service_account.Credentials.from_service_account_file(
key_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = pubsub_v1.PublisherClient(credentials=credentials)
topic_path = client.topic_path('ta-1016-onsite', 'tweets')
twitter_api_key = 'F80OhlC4jaPbM2cgjYgjZiXun'
twitter_api_secret_key = '53DL9ODKApxsWjimhZIcDns4gH10zkaDxMAzMJqcafGo5G5JuB'
twitter_access_token = '1316746935928152064-iAwTl6AkGmy7c3ITfs1lEyjs2aQWK1'
twitter_access_token_secret = 'S7JySvKrVonwwTZExAhKsglTAXKv4zway699eHun1mF81'
class SimpleStreamListener(tweepy.StreamListener):
def on_status(self, status):
print(status)
tweet = json.dumps({'id': status.id, 'created_at': status.created_at, 'text': status.text}, default=str)
client.publish(topic_path, data=tweet.encode('utf-8'))
def on_error(self, status_code):
print(status_code)
if status_code == 420:
return False
stream_listener = SimpleStreamListener()
auth = tweepy.OAuthHandler(twitter_api_key, twitter_api_secret_key)
auth.set_access_token(twitter_access_token, twitter_access_token_secret)
twitterStream = tweepy.Stream(auth, stream_listener)
twitterStream.filter(track=['data'])
- Google Kubernetes Engine